Source code for scml.scml2020.world

"""Implements the world class for the SCML2020 world """

from __future__ import annotations

import copy
import itertools
import logging
import math
import random
import sys
from collections import Counter, defaultdict, namedtuple
from typing import Any, Callable, Collection, Iterable

import networkx as nx
import numpy as np
import pandas as pd
from matplotlib.axis import Axis

# from negmas import Adapter
from negmas import Action, Agent, Breach, Contract
from negmas.helpers import get_class, get_full_type_name, instantiate, unique_name
from negmas.situated import (
    DEFAULT_EDGE_TYPES,
    BreachProcessing,
    Operations,
    TimeInAgreementMixin,
    World,
)

from ..common import (
    distribute_quantities,
    fraction_cut,
    integer_cut,
    intin,
    make_array,
    realin,
)
from ..oneshot.agent import OneShotAgent
from .agent import OneShotAdapter, SCML2020Agent, _SystemAgent
from .awi import AWI
from .common import (
    COMPENSATION_ID,
    INFINITE_COST,
    NO_COMMAND,
    SYSTEM_BUYER_ID,
    SYSTEM_SELLER_ID,
    ContractInfo,
    ExogenousContract,
    FactoryProfile,
    Failure,
    FinancialReport,
    is_system_agent,
)
from .factory import Factory

__all__ = [
    "SCML2020World",
    "SCML2021World",
    "SCML2022World",
    "SCML2023World",
    "SCML2024World",
    "Failure",
    "AWI",
]

MAX_WORLD_GENERATION_TRIALS = 100
"""Maximum number of world generation trials for recoverable errors"""


def _realin(rng: tuple[float, float] | float) -> float:
    """
    Selects a random number within a range if given or the input if it was a float

    Args:
        rng: Range or single value

    Returns:

        the real within the given range
    """
    if not isinstance(rng, Iterable):
        return rng
    if abs(rng[1] - rng[0]) < 1e-8:
        return rng[0]
    return rng[0] + random.random() * (rng[1] - rng[0])


class RecoverableWorldGenerationException(Exception):
    """Thrown internally by world generation functions if they cannot
    generate the world but may be able to do so with the same inputs
    in another call"""

    pass


CompensationRecord = namedtuple(
    "CompensationRecord", ["product", "quantity", "money", "seller_bankrupt", "factory"]
)
"""A record of delayed compensation used when a factory goes bankrupt to keep
honoring its future contracts to the limit possible"""

ContractRecord = namedtuple(
    "ContractRecord",
    [
        "product",
        "signed",
        "time",
        "seller",
        "buyer",
        "quantity",
        "unit_price",
        "is_exogenous",
    ],
)
"""A record of a contract using during world generation"""


[docs] class SCML2020World(TimeInAgreementMixin, World): """A Supply Chain SCML2020World simulation as described for the SCML league of ANAC @ IJCAI 2020. Args: process_inputs: An n_processes vector specifying the number of inputs from each product needed to execute each process. process_outputs: An n_processes vector specifying the number of inputs from each product generated by executing each process. catalog_prices: An n_products vector (i.e. n_processes+1 vector) giving the catalog price of all products profiles: An n_agents list of `FactoryProfile` objects specifying the private profile of the factory associated with each agent. agent_types: An n_agents list of strings/ `SCML2020Agent` classes specifying the type of each agent agent_params: An n_agents dictionaries giving the parameters of each agent initial_balance: The initial balance in each agent's wallet. All agents will start with this same value. allow_selling_input: Allows agents to sell their input product(s) through negotiation allow_buying_output: Allows agents to buy their output product(s) through negotiation catalog_quantities: The quantities in the past for which catalog_prices are the average unit prices. This is used when updating the trading prices. If set to zero then the trading price will follow the market price and will not use the catalog_price (except for products that are never sold in the market for which the trading price will take the default value of the catalog price). If set to a large value (e.g. 10000), the price at which a product is sold will not affect the trading price spot_market_global_loss: Buying from the spot market will cost trading-price * (1+`spot_market_global_loss) and selling to it will cost trading-price / (1+ spot_market_global_loss) for agents with unit spot-market-loss-multiplier financial_report_period: The number of steps between financial reports. If < 1, it is a fraction of n_steps borrow_on_breach: If true, agents will be forced to borrow money on breach as much as possible to honor the contract interest_rate: The interest at which loans grow over time (it only affect a factory when its balance is negative) bankruptcy_limit: The maximum amount that be be borrowed (including interest). The balance of any factory cannot go lower than - borrow_limit or the agent will go bankrupt immediately liquidation_rate: The rate at which future contracts get liquidated when an agent gets bankrupt. It should be between zero and one. compensation_fraction: Fraction of a contract to be compensated (at most) if a partner goes bankrupt. Notice that this fraction is not guaranteed because the bankrupt agent may not have enough assets to pay all of its standing contracts to this level of compensation. In such cases, a smaller fraction will be used. compensate_immediately: If true, compensation will happen immediately when an agent goes bankrupt and in in money. This means that agents with contracts involving the bankrupt agent will just have these contracts be nullified and receive monetary compensation immediately . If false, compensation will not happen immediately but at the contract execution time. In this case, agents with contracts involving the bankrupt agent will be informed of the compensation fraction (instead of the compensation money) at the time of bankruptcy and will receive the compensation in kind (money if they are sellers and products if they are buyers) at the normal execution time of the contract. In the special case of no-compensation (i.e. `compensation_fraction` is zero or the bankrupt agent has no assets), the two options will behave similarity. compensate_before_past_debt: If true, then compensations will be paid before past debt is considered, otherwise, the money from liquidating bankrupt agents will first be used to pay past debt then whatever remains will be used for compensation. Notice that in all cases, the trigger of bankruptcy will be paid before compensation and past debts. exogenous_horizon: The horizon for revealing external contracts exogenous_force_max: If true, exogenous contracts are forced to be signed independent of the setting of `force_signing` production_no_borrow: If true, agents will not borrow if they fail to satisfy its production need to execute a scheduled production command production_no_bankruptcy: If true, agents will not go bankrupt because of an production related transaction. production_penalty: The penalty paid when buying from spot-market to satisfy production needs production_confirm: If true, the factory will confirm running processes at every time-step just before running them by calling `confirm_production` on the agent controlling it. compact: If True, no logs will be kept and the whole simulation will use a smaller memory footprint n_steps: Number of simulation steps (can be considered as days). time_limit: Total time allowed for the complete simulation in seconds. neg_n_steps: Number of negotiation steps allowed for all negotiations. neg_time_limit: Total time allowed for a complete negotiation in seconds. neg_step_time_limit: Total time allowed for a single step of a negotiation. in seconds. negotiation_speed: The number of negotiation steps that pass in every simulation step. If 0, negotiations will be guaranteed to finish within a single simulation step signing_delay: The number of simulation steps to pass between a contract is concluded and signed name: The name of the simulations **kwargs: Other parameters that are passed directly to `SCML2020World` constructor. """ def __init__( self, # SCML2020 specific parameters process_inputs: np.ndarray, process_outputs: np.ndarray, catalog_prices: np.ndarray, profiles: list[FactoryProfile], agent_types: list[type[SCML2020Agent]], agent_params: list[dict[str, Any]] | None = None, exogenous_contracts: Collection[ExogenousContract] = (), initial_balance: np.ndarray | tuple[int, int] | int = 1000, allow_buying_output=False, allow_selling_input=False, catalog_quantities: int | np.ndarray = 50, # breach processing parameters buy_missing_products=True, borrow_on_breach=True, bankruptcy_limit=0.0, liquidation_rate=1.0, spot_market_global_loss=0.30, interest_rate=0.05, financial_report_period: int = 5, # compensation parameters (for victims of bankrupt agents) compensation_fraction: float = 1.0, compensate_immediately=False, compensate_before_past_debt=True, # external contracts parameters exogenous_horizon: int | None = None, exogenous_force_max: bool = False, # production failure parameters production_confirm=False, production_buy_missing=False, production_no_borrow=True, production_no_bankruptcy=False, production_penalty=0.15, # General SCML2020World Parameters compact=False, no_logs=False, n_steps=1000, time_limit=60 * 90, # mechanism params neg_n_steps=20, neg_time_limit=2 * 60, neg_step_time_limit=60, negotiation_speed=21, negotiation_quota_per_step=None, negotiation_quota_per_simulation=float("inf"), n_concurrent_negs_between_partners=float("inf"), shuffle_negotiations=False, end_negotiation_on_refusal_to_propose=True, # trading price parameters trading_price_discount=0.9, # spot market parameters spot_discount=0.9, spot_multiplier=0.05, # simulation parameters signing_delay=0, force_signing=False, batch_signing=True, name: str = None, # public information publish_exogenous_summary=True, publish_trading_prices=True, # debugging parameters agent_name_reveals_position: bool = True, agent_name_reveals_type: bool = True, # evaluation paramters inventory_valuation_trading: float = 0.5, inventory_valuation_catalog: float = 0.0, **kwargs, ): if negotiation_quota_per_step is None: negotiation_quota_per_step = len(agent_types) * n_steps // 2 if exogenous_horizon is None: exogenous_horizon = n_steps self.publish_exogenous_summary = publish_exogenous_summary self.publish_trading_prices = publish_trading_prices self.allow_buying_output = allow_buying_output self.allow_selling_input = allow_selling_input self.exogenous_horizon = exogenous_horizon self.buy_missing_products = buy_missing_products self.production_buy_missing = production_buy_missing self.liquidation_rate = liquidation_rate self.trading_price_discount = trading_price_discount self.spot_discount = spot_discount self.spot_multiplier = spot_multiplier self.catalog_quantities = catalog_quantities self.inventory_valuation_trading = inventory_valuation_trading self.inventory_valuation_catalog = inventory_valuation_catalog self.n_concurrent_negs_between_partners = n_concurrent_negs_between_partners kwargs["log_to_file"] = not no_logs if compact: kwargs["event_file_name"] = None kwargs["event_types"] = [] kwargs["log_screen_level"] = logging.CRITICAL kwargs["log_file_level"] = logging.ERROR kwargs["log_negotiations"] = False kwargs["log_ufuns"] = False # kwargs["save_mechanism_state_in_contract"] = False kwargs["save_cancelled_contracts"] = False kwargs["save_resolved_breaches"] = False kwargs["save_negotiations"] = True else: kwargs["save_negotiations"] = True self.compact = compact if negotiation_speed == 0: negotiation_speed = neg_n_steps + 1 mechanisms = kwargs.pop("mechanisms", {}) super().__init__( bulletin_board=None, breach_processing=BreachProcessing.NONE, awi_type="scml.scml2020.AWI", shuffle_negotiations=shuffle_negotiations, mechanisms={ "negmas.sao.SAOMechanism": mechanisms.get( "negmas.sao.SAOMechanism", dict( end_on_no_response=end_negotiation_on_refusal_to_propose, dynamic_entry=False, max_wait=negotiation_quota_per_step, ), ) }, default_signing_delay=signing_delay, n_steps=n_steps, time_limit=time_limit, negotiation_speed=negotiation_speed, neg_n_steps=neg_n_steps, neg_time_limit=neg_time_limit, neg_step_time_limit=neg_step_time_limit, force_signing=force_signing, batch_signing=batch_signing, negotiation_quota_per_step=negotiation_quota_per_step, negotiation_quota_per_simulation=negotiation_quota_per_simulation, no_logs=no_logs, operations=( Operations.StatsUpdate, Operations.SimulationStep, Operations.Negotiations, Operations.ContractSigning, Operations.ContractExecution, Operations.AgentSteps, Operations.SimulationStep, Operations.StatsUpdate, ), name=name, **kwargs, ) self.bulletin_board.record( "settings", publish_trading_prices, "public_trading_prices" ) self.bulletin_board.record( "settings", publish_exogenous_summary, "public_exogenous_summary" ) self.bulletin_board.record( "settings", buy_missing_products, "buy_missing_products" ) self.bulletin_board.record( "settings", inventory_valuation_trading, "inventory_valuation_trading" ) self.bulletin_board.record( "settings", inventory_valuation_catalog, "inventory_valuation_catalog" ) self.bulletin_board.record( "settings", inventory_valuation_trading + inventory_valuation_catalog, "inventory_valuation", ) self.bulletin_board.record("settings", borrow_on_breach, "borrow_on_breach") self.bulletin_board.record("settings", bankruptcy_limit, "bankruptcy_limit") self.bulletin_board.record( "settings", spot_market_global_loss, "spot_market_global_loss" ) self.bulletin_board.record( "settings", financial_report_period, "financial_report_period" ) self.bulletin_board.record("settings", interest_rate, "interest_rate") self.bulletin_board.record("settings", exogenous_horizon, "exogenous_horizon") self.bulletin_board.record( "settings", compensation_fraction, "compensation_fraction" ) self.bulletin_board.record( "settings", compensate_immediately, "compensate_immediately" ) self.bulletin_board.record( "settings", compensate_before_past_debt, "compensate_before_past_debt" ) self.bulletin_board.record( "settings", exogenous_force_max, "exogenous_force_max" ) self.bulletin_board.record("settings", production_confirm, "production_confirm") self.bulletin_board.record( "settings", production_buy_missing, "production_buy_missing" ) self.bulletin_board.record( "settings", production_no_borrow, "production_no_borrow" ) self.bulletin_board.record( "settings", production_no_bankruptcy, "production_no_bankruptcy" ) self.bulletin_board.record("settings", production_penalty, "production_penalty") self.bulletin_board.record( "settings", len(exogenous_contracts) > 0, "has_exogenous_contracts" ) self.bulletin_board.record( "settings", n_concurrent_negs_between_partners, "n_concurrent_negs_between_partners", ) if self.info is None: self.info = {} self.info.update( shuffle_negotiations=shuffle_negotiations, end_negotiation_on_refusal_to_propose=end_negotiation_on_refusal_to_propose, process_inputs=process_inputs, process_outputs=process_outputs, catalog_prices=catalog_prices, agent_types_final=[get_full_type_name(_) for _ in agent_types], agent_params_final=agent_params, initial_balance_final=initial_balance, buy_missing_products=buy_missing_products, production_buy_missing=production_buy_missing, borrow_on_breach=borrow_on_breach, bankruptcy_limit=bankruptcy_limit, spot_market_global_loss=spot_market_global_loss, financial_report_period=financial_report_period, interest_rate=interest_rate, compensation_fraction=compensation_fraction, compensate_immediately=compensate_immediately, compensate_before_past_debt=compensate_before_past_debt, exogenous_force_max=exogenous_force_max, production_no_borrow=production_no_borrow, production_no_bankruptcy=production_no_bankruptcy, production_penalty=production_penalty, compact=compact, no_logs=no_logs, n_steps=n_steps, time_limit=time_limit, neg_n_steps=neg_n_steps, neg_time_limit=neg_time_limit, neg_step_time_limit=neg_step_time_limit, negotiation_speed=negotiation_speed, signing_delay=signing_delay, agent_name_reveals_position=agent_name_reveals_position, agent_name_reveals_type=agent_name_reveals_type, publish_exogenous_summary=publish_exogenous_summary, publish_trading_prices=publish_trading_prices, ) TimeInAgreementMixin.init(self, time_field="time") self.spot_market_global_loss = spot_market_global_loss self.bulletin_board.add_section("reports_time") self.bulletin_board.add_section("reports_agent") if self.publish_exogenous_summary: self.bulletin_board.add_section("exogenous_contracts_summary") if self.publish_trading_prices: self.bulletin_board.add_section("trading_prices") self.production_no_borrow = production_no_borrow self.production_no_bankruptcy = production_no_bankruptcy self.production_penalty = production_penalty self.compensation_fraction = compensation_fraction if not isinstance(agent_types, Iterable): agent_types = [agent_types] * len(profiles) assert len(profiles) == len(agent_types) self.profiles = profiles self.catalog_prices = catalog_prices self.process_inputs = process_inputs self.process_outputs = process_outputs self.n_products = len(catalog_prices) self.n_processes = len(process_inputs) self.borrow_on_breach = borrow_on_breach self.interest_rate = interest_rate self.exogenous_force_max = exogenous_force_max self.compensate_before_past_debt = compensate_before_past_debt self.confirm_production = production_confirm self.financial_reports_period = ( financial_report_period if financial_report_period >= 1 else int(0.5 + financial_report_period * n_steps) ) self.compensation_fraction = compensation_fraction self.compensate_immediately = compensate_immediately initial_balance = make_array(initial_balance, len(profiles), dtype=int) # assert all([_> 1e-5 for _ in initial_balance]), "Some initial balances are zero or negative {initial_balance}" agent_types = [get_class(_) for _ in agent_types] self.bankruptcy_limit = ( -bankruptcy_limit if isinstance(bankruptcy_limit, int) else -int(0.5 + bankruptcy_limit * initial_balance.mean()) ) assert self.n_products == self.n_processes + 1 n_agents = len(profiles) if agent_name_reveals_position or agent_name_reveals_type: default_names = [f"{_:02}" for _ in range(n_agents)] else: default_names = [unique_name("", add_time=False) for _ in range(n_agents)] if agent_name_reveals_type: for i, (at, ap) in enumerate(zip(agent_types, agent_params)): if issubclass(at, OneShotAdapter): s2 = ( get_class(ap["oneshot_type"]) ._type_name() .split(".")[-1] .replace("Agent", "") .replace("Adapter", "") .replace("OneShot", "") ) s2 += f'O({at._type_name().split(".")[-1].replace("Agent", "").replace("OneShot", "").replace("Adapter", "")})' else: s2 = at._type_name().split(".")[-1].replace("Agent", "") s = "".join([c for c in s2 if c.isupper()])[:3] try: if len(s) < 3: if len(s2) > 3: s = s2[:2] elif len(s2) >= 2: s = s2[0] + s2[1 : 1 + (3 - len(s))] + s2[1:] elif len(s2) > 0: s = s2[0] * 3 else: s = "Agt" except Exception: pass default_names[i] += f"{s}" agent_levels = [ int(np.nonzero(np.max(p.costs != INFINITE_COST, axis=0).flatten())[0]) for p in profiles ] if agent_name_reveals_position: for i, L_ in enumerate(agent_levels): default_names[i] += f"@{L_:01}" # for i, (t_, p_ ) in enumerate(zip(agent_types, agent_params)): # if issubclass(get_class(t_), Adapter): # p_["obj"] = get_class(p_["oneshot_type"])(**p_.get("oneshot_params", dict())) if agent_params is None: agent_params = [dict(name=name) for i, name in enumerate(default_names)] elif isinstance(agent_params, dict): a = copy.copy(agent_params) agent_params = [] for i, name in enumerate(default_names): b = copy.deepcopy(a) b["name"] = name agent_params.append(b) elif len(agent_params) == 1: a = copy.copy(agent_params[0]) agent_params = [] for i, _ in enumerate(default_names): b = copy.deepcopy(a) b["name"] = name agent_params.append(b) else: if agent_name_reveals_type or agent_name_reveals_position: for i, (ns, ps) in enumerate(zip(default_names, agent_params)): agent_params[i] = dict(**ps) agent_params[i]["name"] = ns n_processes = len(process_inputs) n_products = n_processes + 1 agent_types += [_SystemAgent, _SystemAgent] agent_params += [{"role": SYSTEM_SELLER_ID}, {"role": SYSTEM_BUYER_ID}] initial_balance = initial_balance.tolist() + [ sys.maxsize // 4, sys.maxsize // 4, ] profiles.append(FactoryProfile(INFINITE_COST * np.ones(n_processes, dtype=int))) profiles.append(FactoryProfile(INFINITE_COST * np.ones(n_processes, dtype=int))) agents = [] for i, (atype, aparams) in enumerate(zip(agent_types, agent_params)): # aparams = {k:v for k, v in aparams if k not in ("oneshot_params", "oneshot_type")} a = instantiate(atype, **aparams) a.id = a.name self.join(a, i) agents.append(a) self.agent_types = [_.type_name for _ in agents] self.agent_params = [ {k: v for k, v in _.items() if k not in ("name",)} for _ in agent_params ] agent_params = [ { k: v for k, v in _.items() if k not in ("name", "oneshot_type", "oneshot_params", "obj") } for _ in agent_params ] self.agent_unique_types = [ f"{t}{hash(str(p))}" if len(p) > 0 else t for t, p in zip(self.agent_types, agent_params) ] assert isinstance(initial_balance, Iterable) self.factories = [ Factory( world=self, profile=profile, initial_balance=initial_balance[i], inputs=process_inputs, outputs=process_outputs, agent_id=agents[i].id, catalog_prices=catalog_prices, compensate_before_past_debt=self.compensate_before_past_debt, buy_missing_products=self.buy_missing_products, production_buy_missing=self.production_buy_missing, production_penalty=self.production_penalty, production_no_borrow=self.production_no_borrow, production_no_bankruptcy=self.production_no_bankruptcy, confirm_production=self.confirm_production, initial_inventory=( None if i < len(profiles) - 2 else sys.maxsize // 4 * np.ones(n_products, dtype=int) ), ) for i, profile in enumerate(profiles) ] self.a2f = dict(zip((_.id for _ in agents), self.factories)) self.afp = list(zip(agents, self.factories, profiles)) self.f2i = self.a2i = dict(zip((_.id for _ in agents), range(n_agents))) self.i2a = agents self.i2f = self.factories self.breach_prob = dict(zip((_.id for _ in agents), itertools.repeat(0.0))) self._breach_level = dict(zip((_.id for _ in agents), itertools.repeat(0.0))) self.agent_n_contracts = dict(zip((_.id for _ in agents), itertools.repeat(0))) self.suppliers: list[list[str]] = [[] for _ in range(n_products)] self.consumers: list[list[str]] = [[] for _ in range(n_products)] self.agent_processes: dict[str, list[int]] = defaultdict(list) self.agent_inputs: dict[str, list[int]] = defaultdict(list) self.agent_outputs: dict[str, list[int]] = defaultdict(list) self.agent_consumers: dict[str, list[str]] = defaultdict(list) self.agent_suppliers: dict[str, list[str]] = defaultdict(list) self.consumers[n_products - 1].append(SYSTEM_BUYER_ID) self.agent_processes[SYSTEM_BUYER_ID] = [] self.agent_inputs[SYSTEM_BUYER_ID] = [n_products - 1] self.agent_outputs[SYSTEM_BUYER_ID] = [] self.suppliers[0].append(SYSTEM_SELLER_ID) self.agent_processes[SYSTEM_SELLER_ID] = [] self.agent_inputs[SYSTEM_SELLER_ID] = [] self.agent_outputs[SYSTEM_SELLER_ID] = [0] self.agent_profiles: dict[str, Any] = dict() self.initial_balances: dict[str, Any] = dict() for agent_id, profile in zip(self.agents.keys(), profiles): self.agent_profiles[agent_id] = profile for agent_id, ib in zip(self.agents.keys(), initial_balance): self.initial_balances[agent_id] = ib for p in range(n_processes): for agent_id, profile in zip(self.agents.keys(), profiles): self.agent_profiles[agent_id] = profile if is_system_agent(agent_id): continue if np.all(profile.costs[:, p] == INFINITE_COST): continue self.suppliers[p + 1].append(agent_id) self.consumers[p].append(agent_id) self.agent_processes[agent_id].append(p) self.agent_inputs[agent_id].append(p) self.agent_outputs[agent_id].append(p + 1) for p in range(n_products): for a in self.suppliers[p]: self.agent_consumers[a] = self.consumers[p] for a in self.consumers[p]: self.agent_suppliers[a] = self.suppliers[p] self.agent_processes = {k: np.array(v) for k, v in self.agent_processes.items()} self.agent_inputs = {k: np.array(v) for k, v in self.agent_inputs.items()} self.agent_outputs = {k: np.array(v) for k, v in self.agent_outputs.items()} assert all( len(v) == 1 or is_system_agent(self.agents[k].id) for k, v in self.agent_outputs.items() ), f"Not all agent outputs are singular:\n{self.agent_outputs}" assert all( len(v) == 1 or is_system_agent(self.agents[k].id) for k, v in self.agent_inputs.items() ), f"Not all agent inputs are singular:\n{self.agent_outputs}" assert all( is_system_agent(k) or self.agent_inputs[k][0] == self.agent_outputs[k] - 1 for k in self.agent_inputs.keys() ), f"Some agents have outputs != input+1\n{self.agent_outputs}\n{self.agent_inputs}" self._n_production_failures = 0 self.__n_nullified = 0 self.__n_bankrupt = 0 self.penalties = 0 # self.is_bankrupt: dict[str, bool] = dict( # zip(self.agents.keys(), itertools.repeat(False)) # ) self.compensation_balance = 0 self.compensation_records: dict[str, list[CompensationRecord]] = defaultdict( list ) self.exogenous_contracts: dict[int : list[Contract]] = defaultdict(list) for c in exogenous_contracts: seller_id = agents[c.seller].id if c.seller >= 0 else SYSTEM_SELLER_ID buyer_id = agents[c.buyer].id if c.buyer >= 0 else SYSTEM_BUYER_ID if not c.quantity: continue contract = Contract( agreement={ "time": c.time, "quantity": c.quantity, "unit_price": c.unit_price, }, partners=[buyer_id, seller_id], issues=[], signatures=dict(), signed_at=-1, to_be_signed_at=c.revelation_time, annotation={ "seller": seller_id, "buyer": buyer_id, "caller": ( SYSTEM_SELLER_ID if seller_id == SYSTEM_SELLER_ID else SYSTEM_BUYER_ID ), "is_buy": random.random() > 0.5, "product": c.product, }, ) self.exogenous_contracts[c.revelation_time].append(contract) self.compensation_factory = Factory( FactoryProfile(np.zeros((n_steps, n_processes), dtype=int)), initial_balance=0, inputs=self.process_inputs, outputs=self.process_outputs, world=self, agent_id=COMPENSATION_ID, agent_name=COMPENSATION_ID, catalog_prices=catalog_prices, compensate_before_past_debt=True, buy_missing_products=False, production_buy_missing=False, production_penalty=0.0, production_no_borrow=True, production_no_bankruptcy=True, confirm_production=False, ) # _real_prices are the prices at which products are actually traded (it can be nan if the product was # never traded at some step). _trading_prices are the effective prices taking catalog prices and past trade # into account # Note that we go to step n_steps not only to n_steps-1 which means that the value at s is the value BEFORE # step s which is what we really need usually. self._agent_output = np.zeros((n_agents, n_products), dtype=int) self._agent_input = np.zeros((n_agents, n_products), dtype=int) for i, profile in enumerate(profiles[:-2]): self._agent_output[i, profile.processes + 1] = 1 self._agent_input[i, profile.processes] = 1 self._traded_quantity = np.ones(n_products) * self.catalog_quantities self._real_price = np.nan * np.ones((n_products, n_steps + 1)) self._sold_quantity = np.zeros((n_products, n_steps + 1), dtype=int) # self._real_price[0, :] = self.catalog_prices[0] # self._real_price[-1, :] = self.catalog_prices[-1] self._real_price[:, 0] = self.catalog_prices self._trading_price = np.tile( self._real_price[:, 0].reshape((n_products, 1)), (1, n_steps + 1) ) self._betas = np.ones(n_steps + 1) self._betas[1] = self.trading_price_discount self._betas[1:] = np.cumprod(self._betas[1:]) self._betas_sum = self.catalog_quantities * np.ones((n_products, n_steps + 1)) self._spot_quantity = np.zeros((n_agents, n_steps), dtype=int) self._alphas = np.ones(n_steps + 1) self._alphas[1] = self.spot_discount self._alphas[1:] = np.cumprod(self._alphas[1:]) self._agent_spot_loss = self.spot_market_global_loss * np.ones( (n_agents, n_steps) ) self._agent_spot_quantity = np.zeros((n_agents, n_steps), dtype=int) self._registered_negs: dict[tuple[str], int] = Counter() if self.publish_trading_prices: self.bulletin_board.record("trading_prices", self._trading_price[:, 1]) self.exogenous_contracts_summary = None if self.publish_exogenous_summary: q = np.zeros((self.n_products, self.n_steps, 2)) for s in range(self.n_steps): for contract in self.exogenous_contracts[s]: product = contract.annotation["product"] quantity, unit_price, t = ( contract.agreement["quantity"], contract.agreement["unit_price"], contract.agreement["time"], ) q[product, t, 0] += quantity if quantity: q[product, t, 1] += quantity * unit_price self.exogenous_contracts_summary = q for s in range(self.n_steps): self.bulletin_board.record( "exogenous_contracts_summary", value=self.exogenous_contracts_summary, key=s, ) self.info.update( dict( agent_profiles={ k: dict( costs=v.costs.tolist(), n_lines=v.n_lines, input_product=int( v.input_products[0] if v.input_products is not None and len(v.input_products) else -1 ), output_product=int( v.output_products[0] if v.output_products is not None and len(v.output_products) else -1 ), ) for k, v in self.agent_profiles.items() } ) ) self.info.update(dict(agent_inputs=self.agent_inputs)) self.info.update(dict(agent_outputs=self.agent_outputs)) self.info.update(dict(agent_processes=self.agent_processes)) self.info.update(dict(agent_initial_balances=self.initial_balances)) @classmethod
[docs] def generate( cls, agent_types: list[type[SCML2020Agent] | str], agent_params: list[dict[str, Any]] | None = None, agent_processes: list[int] | None = None, n_steps: tuple[int, int] | int = (50, 200), n_processes: tuple[int, int] | int = (2, 4), n_lines: np.ndarray | tuple[int, int] | int = 10, n_agents_per_process: np.ndarray | tuple[int, int] | int = (2, 4), process_inputs: np.ndarray | tuple[int, int] | int = 1, process_outputs: np.ndarray | tuple[int, int] | int = 1, production_costs: np.ndarray | tuple[int, int] | int = (1, 4), profit_means: np.ndarray | tuple[float, float] | float = (0.15, 0.2), profit_stddevs: np.ndarray | tuple[float, float] | float = 0.001, max_productivity: np.ndarray | tuple[float, float] | float = 1.0, initial_balance: np.ndarray | tuple[int, int] | int | None = None, cost_increases_with_level=True, equal_exogenous_supply=False, equal_exogenous_sales=False, exogenous_supply_predictability: tuple[float, float] | float = (0.6, 0.9), exogenous_sales_predictability: tuple[float, float] | float = (0.6, 0.9), exogenous_control: tuple[float, float] | float = (0.2, 0.8), cash_availability: tuple[float, float] | float = (1.5, 2.5), force_signing=False, profit_basis=np.max, horizon: tuple[float, float] | float = (0.2, 0.5), inventory_valuation_trading: np.ndarray | tuple[float, float] | float = 0.5, inventory_valuation_catalog: np.ndarray | tuple[float, float] | float = 0.0, random_agent_types: bool = False, cost_relativity: float = 1.0, exogenous_generation_method="profitable", exogenous_supply_surplus: tuple[float, float] | float = 0.0, exogenous_sales_surplus: tuple[float, float] | float = 0.0, run_extra_checks: bool = True, **kwargs, ) -> dict[str, Any]: """ Generates the configuration for a world Args: agent_types: All agent types agent_params: Agent parameters used to initialize them n_steps: Number of simulation steps n_processes: Number of processes in the production chain n_lines: Number of lines per factory process_inputs: Number of input units per process process_outputs: Number of output units per process production_costs: Production cost per factory profit_means: Mean profitability per production level (i.e. process). profit_stddevs: Std. Dev. of the profitability of every level (i.e. process). inventory_valuation_catalog: The fraction of catalog price to value items at the end. inventory_valuation_trading: The fraction of trading price to value items at the end. max_productivity: Maximum possible productivity per level (i.e. process). initial_balance: The initial balance of all agents n_agents_per_process: Number of agents per process agent_processes: The process for each agent. If not `None` , it will override `n_agents_per_process` and must be a list/tuple of the same length as `agent_types` . Morevoer, `random_agent_types` must be False in this case cost_increases_with_level: If true, production cost will be higher for processes nearer to the final product. profit_basis: The statistic used when controlling catalog prices by profit arguments. It can be np.mean, np.median, np.min, np.max or any Callable[[list[float]], float] and is used to summarize production costs at every level. horizon: The horizon used for revealing external supply/sales as a fraction of n_steps equal_exogenous_supply: If true, external supply will be distributed equally among all agents in the first layer equal_exogenous_sales: If true, external sales will be distributed equally among all agents in the last layer exogenous_supply_predictability: How predictable are exogenous supplies of each agent over time. 1.0 means that every agent will have the same quantity for all of its contracts over time. 0.0 means quantities per agent are completely random exogenous_sales_predictability: How predictable are exogenous supplies of each agent over time. 1.0 means that every agent will have the same quantity for all of its contracts over time. 0.0 means quantities per agent are completely random cash_availability: The fraction of the total money needs of the agent to work at maximum capacity that is available as `initial_balance` . This is only effective if `initial_balance` is set to `None` . force_signing: Whether to force contract signatures (exogenous contracts are treated in the same way). exogenous_control: How much control does the agent have over exogenous contract signing. Only effective if force_signing is False and use_exogenous_contracts is True random_agent_types: If True, the final agent types used by the generato wil always be sampled from the given types. If False, this random sampling will only happin if len(agent_types) != n_agents. cost_relativity: The exponent of production cost used to distribute contracts during generation method: The method used for world generation. Available methods are "profitable" and "guaranteed_profit" exogenous_supply_surplus: The surpolus exogenous supply contract quantity to add to the system as a fraction of the a fraction of the contracts generated by the given method. exogenous_sales_surplus: The surpolus exogenous sales contract quantity to add to the system as a fraction of the a fraction of the contracts generated by the given method. run_extra_checks: If given, the world generation method will check whether the genrated world "makes sense" given its internal criteria. May slow down world generation **kwargs: Returns: world configuration as a dict[str, Any]. A world can be generated from this dict by calling SCML2020World(**d) Remarks: - There are two general ways to use this generator: 1. Pass `random_agent_types = True`, and pass `agent_types`, `agent_processes` to control placement of each agent in each level of the production graph. 2. Pass `random_agent_types = False` and pass `agent_types`, `n_agents_per_process` to make the system randomly place the specified number of agents in each production level - Most parameters (i.e. `process_inputs` , `process_outputs` , `n_agents_per_process` , `costs` ) can take a single value, a tuple of two values, or a list of values. If it has a single value, it is repeated for all processes/factories as appropriate. If it is a tuple of two numbers $(i, j)$, each process will take a number sampled from a uniform distribution supported on $[i, j]$ inclusive. If it is a list of values, of the length `n_processes` , it is used as it is otherwise, it is used to sample values for each process. """ if agent_processes is not None and random_agent_types: raise ValueError( "You cannot pass `agent_processes` and use `random_agent_types`. The first is only used when you want to fix the assignment of all agents to specific processes which is compatible with randomizing agnet types" ) if agent_processes is not None and len(agent_processes) != len(agent_types): raise ValueError( f"Length of `agent_processes` ({len(agent_processes)}) must equal the length of `agent_types` ({len(agent_types)})" ) runner = dict( profitable=cls.generate_profitable, guaranteed_profit=cls.generate_guaranteed_profit, ) info = dict( n_steps=n_steps, n_processes=n_processes, n_lines=n_lines, force_signing=force_signing, agent_processes=agent_processes, n_agents_per_process=n_agents_per_process, process_inputs=process_inputs, process_outputs=process_outputs, production_costs=production_costs, profit_means=profit_means, profit_stddevs=profit_stddevs, max_productivity=max_productivity, initial_balance=initial_balance, cost_increases_with_level=cost_increases_with_level, equal_exogenous_sales=equal_exogenous_sales, equal_exogenous_supply=equal_exogenous_supply, exogenous_supply_predictability=exogenous_supply_predictability, exogenous_sales_predictability=exogenous_sales_predictability, cash_availability=cash_availability, inventory_valuation_trading=inventory_valuation_trading, inventory_valuation_catalog=inventory_valuation_catalog, cost_relativity=cost_relativity, profit_basis=( "min" if profit_basis == np.min else ( "mean" if profit_basis == np.mean else ( "max" if profit_basis == np.max else "median" if profit_basis == np.median else "unknown" ) ) ), exogenous_supply_surplus=exogenous_supply_surplus, exogenous_sales_surplus=exogenous_sales_surplus, ) exogenous_supply_surplus = realin(exogenous_supply_surplus) exogenous_sales_surplus = realin(exogenous_sales_surplus) inventory_valuation_trading = realin(inventory_valuation_trading) inventory_valuation_catalog = realin(inventory_valuation_catalog) n_processes = intin(n_processes) n_steps = intin(n_steps) exogenous_sales_predictability = realin(exogenous_sales_predictability) exogenous_supply_predictability = realin(exogenous_supply_predictability) exogenous_control = realin(exogenous_control) np.errstate(divide="ignore") # n_startup = n_processes n_startup = n_processes if n_steps <= n_startup: raise ValueError( f"Cannot generate a world with n_steps <= n_processes: {n_steps} <= {n_startup}" ) horizon = max(1, min(n_steps, int(realin(horizon) * n_steps))) process_inputs = make_array(process_inputs, n_processes, dtype=int) process_outputs = make_array(process_outputs, n_processes, dtype=int) fixed_assignment = agent_processes is not None and not random_agent_types if agent_processes is not None: pcount = defaultdict(int) for i in agent_processes: pcount[i] += 1 pnums = list(pcount.keys()) assert ( min(pnums) == 0 and max(pnums) == len(pnums) - 1 ), f"`agent_processes` is invalid: {agent_processes} as it leads to the following `n_agents_per_process`: {dict(pcount)}" n_agents_per_process = np.asarray([pcount[i] for i in range(len(pnums))]) assert not any( _ <= 0 for _ in n_agents_per_process ), "We have some levels with no processes" else: n_agents_per_process = make_array( n_agents_per_process, n_processes, dtype=int ) # profit_means = make_array(profit_means, n_processes, dtype=float) # profit_stddevs = make_array(profit_stddevs, n_processes, dtype=float) max_productivity_process = make_array( max_productivity, n_processes * n_steps, dtype=float ).reshape((n_processes, n_steps)) n_agents = int(n_agents_per_process.sum()) assert n_agents >= n_processes profit_means_agent = make_array(profit_means, n_agents, dtype=float) profit_stddevs_agent = make_array(profit_stddevs, n_agents, dtype=float) max_productivity_agent = make_array(max_productivity, n_agents, dtype=float) n_processes + 1 production_costs = make_array(production_costs, n_agents, dtype=int) if initial_balance is not None: initial_balance = make_array(initial_balance, n_agents, dtype=int) if not isinstance(agent_types, Iterable): agent_types = [agent_types] * n_agents if agent_params is None: agent_params = dict() if isinstance(agent_params, dict): agent_params = [copy.copy(agent_params) for _ in range(n_agents)] else: assert len(agent_params) == 1 agent_params = [copy.copy(agent_params[0]) for _ in range(n_agents)] elif not fixed_assignment: if agent_params is None: agent_params = [dict() for _ in range(len(agent_types))] if isinstance(agent_params, dict): agent_params = [ copy.copy(agent_params) for _ in range(len(agent_types)) ] assert len(agent_types) == len(agent_params) tp = random.choices(list(range(len(agent_types))), k=n_agents) agent_types = [copy.copy(agent_types[_]) for _ in tp] agent_params = [copy.copy(agent_params[_]) for _ in tp] else: if agent_params is None: agent_params = [dict() for _ in range(len(agent_types))] if isinstance(agent_params, dict): agent_params = [ copy.copy(agent_params) for _ in range(len(agent_types)) ] agent_types = list(agent_types) agent_params = list(agent_params) assert len(agent_types) == len(agent_params) # find first and last agent for each process n_agents_cumsum = n_agents_per_process.cumsum().tolist() first_agent = [0] + n_agents_cumsum[:-1] last_agent = n_agents_cumsum[:-1] + [n_agents] # generate production costs making sure that every agent can do exactly one process process_of_agent = np.empty(n_agents, dtype=int) for i, (f, L_) in enumerate(zip(first_agent, last_agent)): process_of_agent[f:L_] = i if cost_increases_with_level: production_costs[f:L_] = np.round( production_costs[f:L_] * (i + 1) # math.sqrt(i + 1) ).astype(int) # costs is the same as production costs but repeated for all n_lines costs = INFINITE_COST * np.ones((n_agents, n_lines, n_processes), dtype=int) for p, (f, L_) in enumerate(zip(first_agent, last_agent)): costs[f:L_, :, p] = production_costs[f:L_].reshape((L_ - f), 1) # generate external contract amounts (controlled by productivity): generated_exogenous, trial = False, 0 while not generated_exogenous and trial < MAX_WORLD_GENERATION_TRIALS: # - generate total amount of input to the market (it will end up being an n_products list of n_steps vectors) first_quantities = np.round( n_lines * n_agents_per_process[0] * max_productivity_process[0, :] ).astype(int) first_quantities[-n_startup:] = 0 # - divide the quantity at every level between factories exogenous_supplies = distribute_quantities( equal_exogenous_supply, exogenous_supply_predictability, first_quantities, n_agents_per_process[0], n_steps, ) exogenous_supplies = np.asarray(exogenous_supplies) # remove extra quantities that cannot be produced and sold possible_production = n_lines * (n_steps - n_processes) extra_supply = exogenous_supplies.sum(axis=0) - possible_production for a in range(n_agents_per_process[0]): if extra_supply[a] <= 0: continue for s in range(n_steps - 1, -1, -1): to_use = min(extra_supply[a], exogenous_supplies[s, a]) if to_use <= 0: continue extra_supply[a] -= to_use exogenous_supplies[s, a] -= to_use if extra_supply[a] <= 0: break assert np.all(exogenous_supplies[-n_startup:] == 0) generated_exogenous = np.any(exogenous_supplies > 0) trial += 1 assert generated_exogenous, f"Cannot generate this world because we cannot generate any exogenous supply: n_steps: {n_steps}, n_processes: {n_processes}" params = dict( n_steps=n_steps, n_lines=n_lines, n_agents_per_process=n_agents_per_process, process_of_agent=process_of_agent, first_agent=first_agent, last_agent=last_agent, production_costs=production_costs, exogenous_control=exogenous_control, cash_availability=cash_availability, force_signing=force_signing, horizon=horizon, exogenous_supplies=exogenous_supplies, initial_balance=initial_balance, cost_relativity=cost_relativity, profit_basis=profit_basis, inventory_valuation_trading=inventory_valuation_trading, inventory_valuation_catalog=inventory_valuation_catalog, exogenous_sales_predictability=exogenous_sales_predictability, max_productivity_process=max_productivity_process, max_productivity_agent=max_productivity_agent, equal_exogenous_sales=equal_exogenous_sales, process_inputs=process_inputs, process_outputs=process_outputs, costs=costs, profit_stddevs_agent=profit_stddevs_agent, profit_means_agent=profit_means_agent, run_extra_checks=run_extra_checks, ) generated, n_trials = False, 0 while not generated: try: ( exogenous, catalog_prices, profiles, initial_balance, extra_info, ) = runner[exogenous_generation_method](**params) generated = True except RecoverableWorldGenerationException: n_trials += 1 if n_trials > MAX_WORLD_GENERATION_TRIALS: raise ValueError( f"Tried to generate the world {n_trials} times using {exogenous_generation_method} but failed" ) # add surplus exogneous contracts if needed for surplus, is_sale in [ (exogenous_supply_surplus, False), (exogenous_sales_surplus, True), ]: if surplus < 1e-6: continue process = n_processes - 1 if is_sale else 0 for a in range(first_agent[process], last_agent[process]): contracts = [ _ for _ in exogenous if (_.seller == a and is_sale) or (_.buyer == a and not is_sale) ] if len(contracts) < 1: continue q = int(round(sum(_.quantity for _ in contracts) * surplus)) if q < 1: continue unit_price = int(catalog_prices[process + 1 if is_sale else process]) unit_price = random.randint( int(unit_price * 0.8), int(unit_price * 1.2) ) for step, quantity in enumerate(integer_cut(q, n_steps, 0)): exogenous.append( ExogenousContract( product=process + 1 if is_sale else process, quantity=quantity, unit_price=unit_price, time=step, revelation_time=max(0, step - horizon), seller=a if is_sale else -1, buyer=a if not is_sale else -1, ) ) if run_extra_checks: for c in exogenous: assert c.revelation_time <= c.time if c.seller < 0: assert c.product == 0, f"Invalid Exogenous Contract (Seller): {c}" else: assert ( c.product == process_of_agent[c.seller] + 1 ), f"Invalid Contract (seller): {c}" if c.buyer < 0: assert ( c.product == n_processes ), f"Invalid Exogenous Contract (buyer): {c}" else: assert ( c.product == process_of_agent[c.buyer] ), f"Invalid Contract (Buyer): {c}" info = dict(**info, **extra_info) for i, (t, p) in enumerate(zip(agent_types, agent_params)): if t is not None and issubclass(get_class(t), OneShotAgent): agent_types[i] = get_full_type_name(OneShotAdapter) agent_params[i] = dict(oneshot_type=t, oneshot_params=p, obj=None) return dict( process_inputs=process_inputs, process_outputs=process_outputs, catalog_prices=catalog_prices, profiles=profiles, exogenous_contracts=exogenous, agent_types=agent_types, agent_params=agent_params, initial_balance=initial_balance, n_steps=n_steps, info=info, force_signing=force_signing, exogenous_horizon=horizon, inventory_valuation_trading=inventory_valuation_trading, inventory_valuation_catalog=inventory_valuation_catalog, **kwargs, )
@classmethod
[docs] def generate_guaranteed_profit( cls, n_steps: int, n_lines: int, n_agents_per_process: int, process_of_agent: list[int], first_agent: list[int], last_agent: list[int], production_costs: list[int], exogenous_control: float, cash_availability: float, force_signing: bool, horizon: int, exogenous_supplies: list[int], max_productivity_process: list[float], max_productivity_agent: list[float], equal_exogenous_sales: bool, process_inputs: list[int], process_outputs: list[int], exogenous_sales_predictability: float, costs: np.ndarray, profit_stddevs_agent=list[float], profit_means_agent=list[float], initial_balance: np.ndarray | tuple[int, int] | int | None = None, cost_relativity: float = 1.0, profit_basis=np.max, inventory_valuation_trading: float = 0.5, inventory_valuation_catalog: float = 0.0, run_extra_checks=True, ) -> tuple[ list[ExogenousContract], list[int], list[FactoryProfile], list[float], dict[str, Any], ]: """ Generates prices, contracts and profiles ensuring that all agents can profit and returning a set of explict contracts that can achieve this profit """ n_processes = len(first_agent) n_agents = len(process_of_agent) n_products = n_processes + 1 can_choose_initial_balance = initial_balance is None COST0 = 10 supplies = np.zeros((n_agents, n_steps), dtype=np.int64) revenue = np.zeros((n_agents, n_steps), dtype=np.int64) np.zeros((n_agents, n_steps), dtype=np.int64) sales = np.zeros((n_agents, n_steps), dtype=np.int64) total_costs = np.zeros((n_agents, n_steps), dtype=np.int64) active_lines = np.zeros((n_agents, n_steps), dtype=np.int64) supplies[first_agent[0] : last_agent[0], :] = exogenous_supplies.transpose() exogenous_prices = ( exogenous_supplies * (COST0 + 0.5 * np.random.randn(*exogenous_supplies.shape)) .round() .astype(int) ).transpose() total_costs[first_agent[0] : last_agent[0], :] = exogenous_prices agent_profits = ( np.random.randn(n_agents) * profit_stddevs_agent + profit_means_agent ) simulated_contracts: list[ContractRecord] = [] # signing step, delivery step, seller, buyer, quantity, unit price, is_exogenous # generate distribution variables that sum pc = 0.05 + (production_costs.max() - production_costs) / production_costs.max() normalized_production_cost = np.ones_like(pc, dtype=float) for p in range(0, n_processes): f, L_ = first_agent[p], last_agent[p] normalized_production_cost[f:L_] = np.float_power( production_costs[f:L_] / production_costs[f:L_].max(), cost_relativity ) def distribute_sales( active_lines: np.ndarray, sales: np.ndarray, supplies: np.ndarray, revenue: np.ndarray, total_costs: np.ndarray, production_costs: np.ndarray, agent_profits: np.ndarray, p: np.ndarray, first_seller: int, last_seller: int, first_buyer: int, last_buyer: int, horizon: int, n_lines: int, product: int, production_time: int = 1, final=False, ): simulated = [] n_agents, n_steps = supplies.shape if final: n_buyers = 1 else: n_buyers = last_buyer - first_buyer # distribute production: # the goal here is to produce everything we have as supplies # we cannot produce after the time when our output can propagate # to the end of the network and be sold to the BUYER agent production_limit = n_steps - (n_processes - product) # we cannot produce before we can get our first supplies. This is # a speedup no more production_start = product if run_extra_checks: assert supplies[first_seller:last_seller, :production_start].sum() == 0 for s in range(production_start, production_limit): n = production_limit - s not_produced = 0 for i in range(first_seller, last_seller): if supplies[i, s] < 1: continue to_produce, not_produced = supplies[i, s] + not_produced, 0 for k in range(s, production_limit): can_use = min( n_lines - active_lines[i, k], supplies[i, : k + 1].sum() - active_lines[i, : k + 1].sum(), to_produce, ) active_lines[i, k] += can_use to_produce -= can_use if to_produce <= 0: break else: not_produced = to_produce # distribute sales # I can always sell one step after final production distribution_limit = production_limit + production_time # production_start + production_time # this loop is for selling production so it goes on the production # rather than distribution limits for s in range(production_start, production_limit): n = production_limit - s # find the probability distribution with which to distribute # the products to the bueyrs. if n_buyers == 1: p1 = np.asarray([1.0]) else: # the probability of getting products go up with available # production capacity and with production_costs available = np.ones(n_buyers) active = active_lines[first_buyer:last_buyer, s:] if active.sum() > 0: active /= active.max() available = n_lines * n - active.sum(axis=1) if available.sum() > 0: available = available / available.sum() p1 = (production_limit - s) * p[ first_buyer:last_buyer ] + s * available if p1.sum() < 1e-5: p1 = np.ones_like(p1) / len(p1) else: p1 = p1 / p1.sum() for i in range(first_seller, last_seller): beg = min(production_time + s, distribution_limit) end = min(beg + 1, distribution_limit) n = end - beg if n <= 0: continue if active_lines[i, s] < 1: continue d = fraction_cut(active_lines[i, s], p1).reshape(n_buyers, n) unit_price = max( 1, int( np.ceil( ( total_costs[i, s] / active_lines[i, s] + production_costs[i] ) * (1 + agent_profits[i]) ) ), ) sales[i, beg:end] += d.sum(axis=0) revenue[i, beg:end] += unit_price * d.sum(axis=0) if not final: supplies[first_buyer:last_buyer, beg:end] += d total_costs[first_buyer:last_buyer, beg:end] += unit_price * d for j in range(n_buyers): for k in range(n): q = d[j, k] if q < 1: continue simulated.append( ContractRecord( product + 1, s if not final else k + beg - horizon, k + beg, i, j + first_buyer if not final else -1, q, unit_price, final, ) ) # signing step, delivery step, seller, buyer, quantity, unit price, is_exogenous return simulated # generate a good set of contracts that achieve the givne productivity for p in range(0, n_processes): first_seller, last_seller = first_agent[p], last_agent[p] if p < n_processes - 1: first_buyer, last_buyer = first_agent[p + 1], last_agent[p + 1] else: first_buyer, last_buyer = None, None simulated_contracts += distribute_sales( active_lines, sales, supplies, revenue, total_costs, production_costs, agent_profits, normalized_production_cost, first_seller, last_seller, first_buyer, last_buyer, horizon, n_lines, p, 1, final=first_buyer is None, ) assert sales.sum() <= supplies.sum() assert np.all(active_lines.cumsum(axis=1) >= sales.cumsum(axis=1)) assert np.all(active_lines.cumsum(axis=1) <= supplies.cumsum(axis=1)) assert np.all(active_lines <= n_lines) # sales_per_agent = sales.sum(axis=1) # supplies_per_agent = supplies.sum(axis=1) # # remove extra supplies in the first layer that are not needed # for a in range(n_agents_per_process[0]): # diff = supplies_per_agent[a] - sales_per_agent[a] # if diff <= 0: # continue # for s in range(n_steps - 1, -1, -1): # to_remove = min(diff, supplies[a, s]) # supplies[a, s] -= to_remove # diff -= to_remove # if diff <= 0: # break # assert ( # sales.sum() == supplies.sum() # ), f"{supplies.sum(axis=1)}\n{sales.sum(axis=1)}\nerror: {supplies.sum() - sales.sum()}" # add correct exogenous supply contracts exogenous_supplies = supplies[first_agent[0] : last_agent[0], :].transpose() exogenous_prices = total_costs[first_agent[0] : last_agent[0], :] for j in range(n_agents_per_process[0]): for k in range(n_steps): q = exogenous_supplies[k, j] if q < 1: continue simulated_contracts.append( ContractRecord( 0, k - horizon, k, -1, j, q, exogenous_prices[j, k] // q, True, ) ) exogenous_supplies = supplies[first_agent[0] : last_agent[0], :].transpose() # - now exogenous_supplies and exogenous_sales are both n_steps lists of n_agents_per_process[p] vectors (jagged) catalog_prices = np.zeros(n_products, dtype=int) for p in range(n_products): if p < n_processes: f, L_ = first_agent[p], last_agent[p] total = supplies[f:L_, :].sum() if total < 1: continue supply_unit = total_costs[f:L_, :].sum() / total else: f, L_ = first_agent[p - 1], last_agent[p - 1] total = sales[f:L_, :].sum() if total < 1: continue supply_unit = revenue[f:L_, :].sum() / total catalog_prices[p] = supply_unit profile_info: list[FactoryProfile] = [] nxt = 0 for L_ in range(n_processes): for a in range(n_agents_per_process[L_]): profile_info.append(FactoryProfile(costs=costs[nxt])) nxt += 1 assert nxt == n_agents total_payments = ( total_costs + production_costs.reshape(len(production_costs), 1) * supplies ) revenue_after_costs = revenue - total_payments if initial_balance is None: # every agent at every level will have just enough to do all the needed to do cash_availability fraction of # production (even though it may not have enough lines to do so) cash_availability = _realin(cash_availability) # find maximum money needs initial_balance = total_payments.sum(axis=1) max_money_needs_process = np.zeros(n_processes) for p in range(n_processes): f, L_ = first_agent[p], last_agent[p] max_money_needs_process[p] = initial_balance[f:L_].max() initial_balance[f:L_] = int( math.ceil(max_money_needs_process[p] * cash_availability) ) else: tmp = total_payments.max(axis=1) cash_availability = (initial_balance / tmp).min() b = np.sum(initial_balance) info = dict( simulated_contracts=[tuple(_) for _ in simulated_contracts], active_lines=active_lines, input_quantities=supplies, output_quantities=sales, expected_productivity=float(np.sum(active_lines)) / np.sum(n_lines * n_steps * n_agents_per_process), expected_productivity_products=np.sum(active_lines, axis=-1), expected_income=revenue_after_costs, expected_welfare=float(np.sum(revenue_after_costs)), expected_income_per_step=revenue.sum(axis=0), expected_income_per_process=revenue.sum(axis=-1), expected_mean_profit=( float(np.sum(revenue_after_costs) / b) if b != 0 else np.sum(revenue_after_costs) ), expected_profit_per_agent=( revenue_after_costs.sum(axis=1) / b if b != 0 else revenue_after_costs.sum(axis=1) ), real_cash_availability=cash_availability, ) exogenous = [] for c in simulated_contracts: if not c.is_exogenous or c.quantity == 0: continue if force_signing or exogenous_control <= 0.0: exogenous.append( ExogenousContract( product=c.product, quantity=c.quantity, unit_price=c.unit_price, time=c.time, revelation_time=max(0, c.time - horizon), seller=c.seller, buyer=c.buyer, ) ) else: n_contracts = int(1 + exogenous_control * (c.quantity - 1)) per_contract = integer_cut(c.quantity, n_contracts, 0) for q in per_contract: if q == 0: continue exogenous.append( ExogenousContract( product=c.product, quantity=q, unit_price=c.unit_price, time=c.time, revelation_time=max(0, c.time - horizon), seller=c.seller, buyer=c.buyer, ) ) if can_choose_initial_balance and run_extra_checks: def simulate_run( all_contracts, active_lines, initial_balance, n_products, production_costs, process_of_agent, catalog_prices, catalog_valuation=inventory_valuation_trading + inventory_valuation_catalog, ): n_agents, n_steps = active_lines.shape balances = initial_balance[:].copy() contracts = defaultdict(list) inventory = np.zeros((n_agents, n_products), dtype=int) for c in all_contracts: contracts[c.time].append(c) def _check(s): assert ( balances.min() >= 0 ), f"Contract Simulation Issue: Some agents went bankrupt at step {s}!!\n{balances}\nSupplies:\n{supplies}\nActive:\n{active_lines}\nSales:\n{sales}" assert ( inventory.min() >= 0 ), f"Contract Simulation Issue: Some agents has negative inventory at step {s}!!\nInventory:{inventory}\nSupplies:\n{supplies}\nActive:\n{active_lines}\nSales:\n{sales}" for s in range(n_steps): _check(s) cs = contracts[s] if not cs: continue cs = sorted(cs, key=lambda c: c.product) for c in cs: if c.seller >= 0: inventory[c.seller, c.product] -= c.quantity balances[c.seller] += c.quantity * c.unit_price if c.buyer >= 0: inventory[c.buyer, c.product] += c.quantity balances[c.buyer] -= c.quantity * c.unit_price _check(s) for a in range(n_agents): p = process_of_agent[a] q = inventory[a, p] produced = active_lines[a, s] assert ( produced <= q ), f"Agent {a} (level {p}) should produce {produced} at step {s} but only have {q} items of product {p}" if not produced: continue inventory[a, p + 1] += produced inventory[a, p] -= produced balances[a] -= produced * production_costs[a] assets = catalog_valuation * (inventory * catalog_prices) profit = ( balances + assets.sum(axis=1) - initial_balance ) / initial_balance _check(n_steps) assert ( profit.min() > 0 ), f"Contract Simulation Issue: Some agents lost!!\n{profit}" return profit try: expected_profit = simulate_run( simulated_contracts, active_lines, initial_balance, n_products, production_costs, process_of_agent, catalog_prices, ) except AssertionError as e: raise RecoverableWorldGenerationException(e) info["expected_profits_simulated"] = expected_profit else: info["expected_profits_simulated"] = None return exogenous, catalog_prices, profile_info, initial_balance, info
@classmethod
[docs] def generate_profitable( cls, n_steps: int, n_lines: int, n_agents_per_process: int, process_of_agent: list[int], first_agent: list[int], last_agent: list[int], production_costs: list[int], exogenous_control: float, cash_availability: float, force_signing: bool, horizon: int, exogenous_supplies: list[int], max_productivity_process: list[float], max_productivity_agent: list[float], equal_exogenous_sales: bool, process_inputs: list[int], process_outputs: list[int], exogenous_sales_predictability: float, costs: np.ndarray, profit_stddevs_agent=list[float], profit_means_agent=list[float], initial_balance: np.ndarray | tuple[int, int] | int | None = None, cost_relativity: float = 1.0, profit_basis=np.max, inventory_valuation_trading: float = 0.5, inventory_valuation_catalog: float = 0.0, run_extra_checks: bool = True, ) -> tuple[ list[ExogenousContract], list[int], list[FactoryProfile], list[float], dict[str, Any], ]: """ Generates the prices, contracts and profiles ensuring there is some possibility of profit in the market """ n_processes = len(first_agent) n_agents = len(process_of_agent) n_products = n_processes + 1 n_startup = n_processes # - make sure there is a cool-down period at the end in which no more input is added that cannot be converted # into final products in time quantities = [exogenous_supplies.sum(axis=1)] # - for each level, find the amount of the output product that can be produced given the input amount and # productivity for p in range(n_processes): agents = n_agents_per_process[p] lines = n_lines * agents quantities.append( np.minimum( (quantities[-1] // process_outputs[p]) * process_inputs[p], quantities[-1], ( ( np.round(lines * max_productivity_process[p, :]).astype(int) // process_inputs[p] ) * process_outputs[p] ), ) ) # * shift quantities one step to account for the one step needed to move the produce to the next level. This # step results from having production happen after contract execution. quantities[-1][1:] = quantities[-1][:-1] quantities[-1][0] = 0 assert quantities[-1][-1] == 0 or p >= n_startup - 1 assert quantities[-1][0] == 0 # assert np.sum(quantities[-1] == 0) >= n_startup exogenous_sales = distribute_quantities( equal_exogenous_sales, exogenous_sales_predictability, quantities[-1], n_agents_per_process[-1], n_steps, ) # - now exogenous_supplies and exogenous_sales are both n_steps lists of n_agents_per_process[p] vectors (jagged) # assign prices to the quantities given the profits catalog_prices = np.zeros(n_products, dtype=int) catalog_prices[0] = 10 supply_prices = np.zeros((n_agents_per_process[0], n_steps), dtype=int) supply_prices[:, :] = catalog_prices[0] sale_prices = np.zeros((n_agents_per_process[-1], n_steps), dtype=int) manufacturing_costs = np.zeros((n_processes, n_steps), dtype=int) for p in range(n_processes): manufacturing_costs[p, :] = profit_basis( costs[first_agent[p] : last_agent[p], :, p] ) manufacturing_costs[p, :p] = 0 manufacturing_costs[p, p - n_startup :] = 0 profits = np.zeros((n_processes, n_steps)) for p in range(n_processes): profits[p, :] = ( np.random.randn() * profit_stddevs_agent[p] + profit_means_agent[p] ) input_costs = np.zeros((n_processes, n_steps), dtype=int) for step in range(n_steps): input_costs[0, step] = np.sum( exogenous_supplies[step] * supply_prices[:, step][:] ) input_quantity = np.zeros((n_processes, n_steps), dtype=int) input_quantity[0, :] = quantities[0] active_lines = np.hstack( [(n_lines * n_agents_per_process).reshape((n_processes, 1))] * n_steps ) assert active_lines.shape == (n_processes, n_steps) active_lines[0, :] = input_quantity[0, :] // process_inputs[0] output_quantity = np.zeros((n_processes, n_steps), dtype=int) output_quantity[0, :] = active_lines[0, :] * process_outputs[0] manufacturing_costs[0, :-n_startup] *= active_lines[0, :-n_startup] total_costs = input_costs + manufacturing_costs output_total_prices = np.ceil(total_costs * (1 + profits)).astype(int) for p in range(1, n_processes): input_costs[p, p:] = output_total_prices[p - 1, p - 1 : -1] input_quantity[p, p:] = output_quantity[p - 1, p - 1 : -1] active_lines[p, :] = input_quantity[p, :] // process_inputs[p] output_quantity[p, :] = active_lines[p, :] * process_outputs[p] manufacturing_costs[p, p : p - n_startup] *= active_lines[ p, p : p - n_startup ] total_costs[p, :] = input_costs[p, :] + manufacturing_costs[p, :] output_total_prices[p, :] = np.ceil( total_costs[p, :] * (1 + profits[p, :]) ).astype(int) aa = output_total_prices[-1, n_startup - 1 : -1].astype(float) bb = output_quantity[-1, n_startup - 1 : -1].astype(float) np.errstate(divide="ignore") sale_prices[:, n_startup:] = np.ceil( np.divide(aa, bb, out=np.zeros_like(aa, dtype=float), where=bb != 0) ).astype(int) product_prices = np.zeros((n_products, n_steps)) product_prices[0, :-n_startup] = catalog_prices[0] product_prices[1:, 1:] = np.ceil( np.divide( output_total_prices.astype(float), output_quantity.astype(float), out=np.zeros_like(output_total_prices, dtype=float), where=output_quantity != 0, ) ).astype(int)[:, :-1] catalog_prices = np.ceil( [ profit_basis(product_prices[p, p : p + n_steps - n_startup]) for p in range(n_products) ] ).astype(int) profile_info: list[ tuple[FactoryProfile, np.ndarray, np.ndarray, np.ndarray, np.ndarray] ] = [] nxt = 0 for L_ in range(n_processes): for a in range(n_agents_per_process[L_]): esales = np.zeros((n_steps, n_products), dtype=int) esupplies = np.zeros((n_steps, n_products), dtype=int) esale_prices = np.zeros((n_steps, n_products), dtype=int) esupply_prices = np.zeros((n_steps, n_products), dtype=int) if L_ == 0: esupplies[:, 0] = [exogenous_supplies[s][a] for s in range(n_steps)] esupply_prices[:, 0] = supply_prices[a, :] if L_ == n_processes - 1: esales[:, -1] = [exogenous_sales[s][a] for s in range(n_steps)] esale_prices[:, -1] = sale_prices[a, :] profile_info.append( ( FactoryProfile(costs=costs[nxt]), esales, esale_prices, esupplies, esupply_prices, ) ) nxt += 1 max_income = ( output_quantity * catalog_prices[1:].reshape((n_processes, 1)) - total_costs ) assert nxt == n_agents if initial_balance is None: # every agent at every level will have just enough to do all the needed to do cash_availability fraction of # production (even though it may not have enough lines to do so) cash_availability = _realin(cash_availability) balance = np.ceil( np.sum(total_costs, axis=1) / n_agents_per_process ).astype(int) initial_balance = [] for b, a in zip(balance, n_agents_per_process): initial_balance += [int(math.ceil(b * cash_availability))] * a b = np.sum(initial_balance) info = dict( product_prices=product_prices, active_lines=active_lines, input_quantities=input_quantity, output_quantities=output_quantity, expected_productivity=float(np.sum(active_lines)) / np.sum(n_lines * n_steps * n_agents_per_process), expected_n_products=np.sum(active_lines, axis=-1), expected_income=max_income, expected_welfare=float(np.sum(max_income)), expected_income_per_step=max_income.sum(axis=0), expected_income_per_process=max_income.sum(axis=-1), expected_mean_profit=( float(np.sum(max_income) / b) if b != 0 else np.sum(max_income) ), expected_profit_sum=( float(n_agents * np.sum(max_income) / b) if b != 0 else n_agents * np.sum(max_income) ), ) exogenous = [] for ( indx, (profile, esales, esale_prices, esupplies, esupply_prices), ) in enumerate(profile_info): input_product = process_of_agent[indx] for step, (sale, price) in enumerate( zip(esales[:, input_product + 1], esale_prices[:, input_product + 1]) ): if sale == 0: continue if force_signing or exogenous_control <= 0.0: exogenous.append( ExogenousContract( product=input_product + 1, quantity=sale, unit_price=price, time=step, revelation_time=max(0, step - horizon), seller=indx, buyer=-1, ) ) else: n_contracts = int(1 + exogenous_control * (sale - 1)) per_contract = integer_cut(sale, n_contracts, 0) for q in per_contract: if q == 0: continue exogenous.append( ExogenousContract( product=input_product + 1, quantity=q, unit_price=price, time=step, revelation_time=max(0, step - horizon), seller=indx, buyer=-1, ) ) for step, (supply, price) in enumerate( zip(esupplies[:, input_product], esupply_prices[:, input_product]) ): if supply == 0: continue if force_signing or exogenous_control <= 0.0: exogenous.append( ExogenousContract( product=input_product, quantity=supply, unit_price=price, time=step, revelation_time=max(0, step - horizon), seller=-1, buyer=indx, ) ) else: n_contracts = int(1 + exogenous_control * (supply - 1)) per_contract = integer_cut(supply, n_contracts, 0) for q in per_contract: if q == 0: continue exogenous.append( ExogenousContract( product=input_product, quantity=q, unit_price=price, time=step, revelation_time=max(0, step - horizon), seller=-1, buyer=indx, ) ) return ( exogenous, catalog_prices, [_[0] for _ in profile_info], initial_balance, info, )
[docs] def get_private_state(self, agent: SCML2020Agent) -> dict: return vars(self.a2f[agent.id].state)
[docs] def add_financial_report( self, agent: SCML2020Agent, factory: Factory, reports_agent, reports_time ) -> None: """ Records a financial report for the given agent in the agent indexed reports and time indexed reports Args: agent: The agent factory: Its factory reports_agent: A dictionary of financial reports indexed by agent id reports_time: A dictionary of financial reports indexed by time Returns: """ bankrupt = factory.is_bankrupt inventory = ( int(np.sum(self.trading_prices * factory.current_inventory)) if not bankrupt else 0 ) report = FinancialReport( agent_id=agent.id, step=self.current_step, cash=factory.current_balance, assets=inventory, breach_prob=self.breach_prob[agent.id], breach_level=self._breach_level[agent.id], is_bankrupt=bankrupt, agent_name=agent.name, ) repstr = str(report).replace("\n", " ") self.logdebug(f"{agent.name}: {repstr}") if reports_agent.get(agent.id, None) is None: reports_agent[agent.id] = {} reports_agent[agent.id][self.current_step] = report if reports_time.get(str(self.current_step), None) is None: reports_time[str(self.current_step)] = {} reports_time[str(self.current_step)][agent.id] = report
[docs] def negs_between(self, a1, a2): return self._registered_negs[tuple(sorted([a1, a2]))]
[docs] def current_balance(self, agent_id: str): return self.a2f[agent_id].state.balance
[docs] def can_negotiate(self, a1, a2): return self.negs_between(a1, a2) < self.n_concurrent_negs_between_partners
[docs] def simulation_step(self, stage): self._registered_negs: dict[tuple[str], int] = Counter() s = self.current_step if stage == 0: # pay interests for negative balances # ----------------------------------- if self.interest_rate > 0.0: for agent, factory, _ in self.afp: if factory.current_balance < 0 and not factory.is_bankrupt: to_pay = -int( math.ceil(self.interest_rate * factory.current_balance) ) factory.pay(to_pay) # Register exogenous contracts as concluded # ----------------------------------------- for contract in self.exogenous_contracts[s]: self.on_contract_concluded( contract, to_be_signed_at=contract.to_be_signed_at ) if self.exogenous_force_max: contract.signatures = dict( zip(contract.partners, contract.partners) ) else: if SYSTEM_SELLER_ID in contract.partners: contract.signatures[SYSTEM_SELLER_ID] = SYSTEM_SELLER_ID else: contract.signatures[SYSTEM_BUYER_ID] = SYSTEM_BUYER_ID # publish public information # -------------------------- if self.publish_trading_prices: self.bulletin_board.record( "trading_prices", value=self.trading_prices, key=self.current_step, ) # initialize all agents for this step # =================================== for _, a in self.agents.items(): if (not self.a2f[_].is_bankrupt) and hasattr(a, "before_step"): a.before_step() return # update trading price information # -------------------------------- has_trade = self._sold_quantity[:, s + 1] > 0 self._trading_price[~has_trade, s + 1] = self._trading_price[~has_trade, s] self._betas_sum[~has_trade, s + 1] = self._betas_sum[~has_trade, s] assert not np.any( np.isnan(self._real_price[has_trade, s + 1]) ), f"Nans in _real_price at step {self.current_step}\n{self._real_price}" self._trading_price[has_trade, s + 1] = ( self._trading_price[has_trade, s] * self._betas_sum[has_trade, s] * self.trading_price_discount + self._real_price[has_trade, s + 1] * self._sold_quantity[has_trade, s + 1] ) self._betas_sum[has_trade, s + 1] = ( self._betas_sum[has_trade, s] * self.trading_price_discount + self._sold_quantity[has_trade, s + 1] ) self._trading_price[has_trade, s + 1] /= self._betas_sum[has_trade, s + 1] self._trading_price[:, s + 1 :] = self._trading_price[:, s + 1].reshape( (self.n_products, 1) ) self._traded_quantity += self._sold_quantity[:, s + 1] # self._trading_price[has_trade, s] = ( # np.sum(self._betas[:s+1] * self._real_price[has_trade, s::-1]) # ) / self._betas_sum[s+1] # update agent penalties # ---------------------- self._agent_spot_loss[:, s] += ( self.spot_market_global_loss * self.spot_multiplier * np.sum(self._alphas[: s + 1] * self._spot_quantity[:, s::-1], axis=-1) ) # run factories # -------------- for a, f, p in self.afp: if f.is_bankrupt: continue f.step() # remove contracts saved in factories for this step for factory in self.factories: factory.contracts[self.current_step] = [] # publish financial reports # ------------------------- if self.current_step % self.financial_reports_period == 0: reports_agent = self.bulletin_board.data["reports_agent"] reports_time = self.bulletin_board.data["reports_time"] for agent, factory, _ in self.afp: if is_system_agent(agent.id): continue self.add_financial_report(agent, factory, reports_agent, reports_time)
[docs] def contract_size(self, contract: Contract) -> float: return contract.agreement["quantity"] * contract.agreement["unit_price"]
[docs] def contract_record(self, contract: Contract) -> dict[str, Any]: c = { "id": contract.id, "seller_name": self.agents[contract.annotation["seller"]].name, "buyer_name": self.agents[contract.annotation["buyer"]].name, "seller_type": self.agents[contract.annotation["seller"]].type_name, "buyer_type": self.agents[contract.annotation["buyer"]].type_name, "delivery_time": contract.agreement["time"], "quantity": contract.agreement["quantity"], "unit_price": contract.agreement["unit_price"], "signed_at": contract.signed_at, "nullified_at": contract.nullified_at, "concluded_at": contract.concluded_at, "signatures": "|".join(str(_) for _ in contract.signatures), "issues": contract.issues if not self.compact else None, "seller": contract.annotation["seller"], "buyer": contract.annotation["buyer"], "product_name": "p" + str(contract.annotation["product"]), } if not self.compact: c.update(contract.annotation) c["n_neg_steps"] = ( contract.mechanism_state.step if contract.mechanism_state else 0 ) return c
[docs] def breach_record(self, breach: Breach) -> dict[str, Any]: return { "perpetrator": breach.perpetrator, "perpetrator_name": breach.perpetrator, "level": breach.level, "type": breach.type, "time": breach.step, }
[docs] def execute_action( self, action: Action, agent: SCML2020Agent, callback: Callable = None ) -> bool: if action.type == "schedule": s, _ = self.a2f[agent.id].schedule_production( process=action.params["process"], step=action.params.get("step", (self.current_step, self.n_steps - 1)), line=action.params.get("line", -1), override=action.params.get("override", True), method=action.params.get("method", "latest"), repeats=action.params.get("repeats", 1), ) return s >= 0 elif action.type == "cancel": return self.a2f[agent.id].cancel_production( step=action.params.get("step", -1), line=action.params.get("line", -1) )
[docs] def post_step_stats(self): self._stats["n_contracts_nullified_now"].append(self.__n_nullified) self._stats["n_bankrupt"].append(self.__n_bankrupt) market_size = 0 scores = self.scores() for p in range(self.n_products): self._stats[f"trading_price_{p}"].append( self._trading_price[p, self.current_step + 1] ) self._stats[f"sold_quantity_{p}"].append( self._sold_quantity[p, self.current_step + 1] ) self._stats[f"unit_price_{p}"].append( self._real_price[p, self.current_step + 1] ) prod = [] for a, f, _ in self.afp: if is_system_agent(a.id): continue ind = self.a2i[a.id] self._stats[f"spot_market_quantity_{a.id}"].append( self._spot_quantity[ind, self.current_step] ) self._stats[f"spot_market_loss_{a.id}"].append( self._agent_spot_loss[ind, self.current_step] ) self._stats[f"balance_{a.id}"].append(f.current_balance) for p in a.awi.my_input_products: self._stats[f"inventory_{a.id}_input"].append(f.current_inventory[p]) for p in a.awi.my_output_products: self._stats[f"inventory_{a.id}_output"].append(f.current_inventory[p]) prod.append( np.sum(f.commands[self.current_step, :] != NO_COMMAND) / f.profile.n_lines ) self._stats[f"productivity_{a.id}"].append(prod[-1]) self._stats[f"assets_{a.id}"].append( np.sum(f.current_inventory * self.trading_prices) ) self._stats[f"bankrupt_{a.id}"].append(f.is_bankrupt) self._stats[f"score_{a.id}"].append(scores[a.id]) if not f.is_bankrupt: market_size += f.current_balance self._stats["productivity"].append(float(np.mean(prod))) self._stats["market_size"].append(market_size) self._stats["production_failures"].append( self._n_production_failures / len(self.factories) if len(self.factories) > 0 else np.nan ) self._stats["bankruptcy"].append( np.sum(self.stats["n_bankrupt"]) / len(self.agents) )
# self._stats["business"] = np.sum(self.stats["business_level"])
[docs] def pre_step_stats(self): self._n_production_failures = 0 self.__n_nullified = 0 self.__n_bankrupt = 0
@property
[docs] def productivity(self) -> float: """Fraction of production lines occupied during the simulation""" return np.mean(self.stats["productivity"])
[docs] def welfare(self, include_bankrupt: bool = False) -> float: """Total welfare of all agents""" scores = self.scores() return sum( scores[f.agent_id] * f.initial_balance for f in self.factories if (include_bankrupt or not f.is_bankrupt) and not is_system_agent(f.agent_id) )
[docs] def relative_welfare(self, include_bankrupt: bool = False) -> float | None: """Total welfare relative to expected value. Returns None if no expectation is found in self.info""" if "expected_income" not in self.info.keys(): return None return self.welfare(include_bankrupt) / np.sum(self.info["expected_income"])
@property
[docs] def relative_productivity(self) -> float | None: """Productivity relative to the expected value. Will return None if self.info does not have the expected productivity""" if "expected_productivity" not in self.info.keys(): return None return self.productivity / self.info["expected_productivity"]
@property
[docs] def bankruptcy_rate(self) -> float: """The fraction of factories that went bankrupt""" return sum(f.is_bankrupt for f in self.factories) / len(self.factories)
@property
[docs] def num_bankrupt(self) -> float: """The fraction of factories that went bankrupt""" return sum(f.is_bankrupt for f in self.factories)
[docs] def order_contracts_for_execution( self, contracts: Collection[Contract] ) -> Collection[Contract]: return sorted(contracts, key=lambda x: x.annotation["product"])
[docs] def _execute( self, product: int, q: int, p: int, u: int, buyer_factory: Factory, seller_factory: Factory, has_breaches: bool, ): """Executes the contract""" q, p, u = int(q), int(p), int(u) if seller_factory.is_bankrupt or buyer_factory.is_bankrupt: self.logdebug( f"Bankruptcy prevents transferring {q} of {product} at price {u} ({'breached' if has_breaches else ''})" ) return self.logdebug( f"Transferring {q} of {product} at price {u} ({'breached' if has_breaches else ''})" ) if q == 0 or u == 0: self.logwarning( f"{buyer_factory.agent_name} bought {q} from {seller_factory.agent_name} at {u} dollars" f" ({'with breaches' if has_breaches else 'no breaches'})!! Zero quantity or unit price" ) money = ( p if buyer_factory.current_balance - p > self.bankruptcy_limit else max(0, buyer_factory.current_balance - self.bankruptcy_limit) ) quantity = min(seller_factory.current_inventory[product], q) if quantity == 0 or money == 0: return q = min(quantity, money // u) assert q >= 0, f"executing with quantity {q}" if q == 0: return assert ( seller_factory._inventory[product] >= q ), f"at {self.current_step} Seller has {seller_factory._inventory[product]} but will execute {q} ({'breached' if has_breaches else 'no breaches'})" assert ( buyer_factory._balance - self.bankruptcy_limit >= u * q ), f"at {self.current_step} Buyer has {buyer_factory._balance} (bankrupt at {self.bankruptcy_limit}) but we need q={q} * u={u} ({'breached' if has_breaches else 'no breaches'})" bought, buy_cost = buyer_factory.buy(product, q, u, False, 0.0) sold, sell_cost = seller_factory.buy(product, -q, u, False, 0.0) if bought == 0: return oldq = self._sold_quantity[product, self.current_step + 1] oldp = self._real_price[product, self.current_step + 1] totalp = 0.0 if oldq > 0: totalp = oldp * oldq self._sold_quantity[product, self.current_step + 1] += bought self._real_price[product, self.current_step + 1] = (totalp + buy_cost) / ( oldq + bought ) assert ( bought == sold ), f"Step: {self.current_step} Bought {bought} and sold {sold} ({'breached' if has_breaches else 'no breaches'})\nSeller factory: {vars(seller_factory)}\nBuyer factory {vars(buyer_factory)}" assert ( buy_cost + sell_cost == 0 ), f"Step: {self.current_step} Bought for {buy_cost} and sold for {-sell_cost} ({'breached' if has_breaches else 'no breaches'})\nSeller factory: {vars(seller_factory)}\nBuyer factory {vars(buyer_factory)}"
[docs] def __register_contract(self, agent_id: str, level: float) -> None: """Registers execution of the contract in the agent's stats""" n_contracts = self.agent_n_contracts[agent_id] - 1 self.breach_prob[agent_id] = ( self.breach_prob[agent_id] * n_contracts + (level > 0) ) / (n_contracts + 1) self._breach_level[agent_id] = ( self.breach_prob[agent_id] * n_contracts + level ) / (n_contracts + 1)
[docs] def record_bankrupt(self, factory: Factory) -> None: """Records agent bankruptcy""" agent_id = factory.agent_id # announce bankruptcy reports_agent = self.bulletin_board.data["reports_agent"] reports_time = self.bulletin_board.data["reports_time"] self.add_financial_report( self.agents[agent_id], factory, reports_agent, reports_time ) self.__n_bankrupt += 1
[docs] def on_contract_concluded(self, contract: Contract, to_be_signed_at: int) -> None: if ( any(self.a2f[_].is_bankrupt for _ in contract.partners) or contract.agreement["time"] >= self.n_steps ): return super().on_contract_concluded(contract, to_be_signed_at)
[docs] def is_valid_contact(self, contract: Contract) -> bool: """Checks whether a signed contract is valid""" return ( contract.agreement["time"] >= self.current_step and contract.agreement["time"] < self.n_steps and contract.agreement["unit_price"] > 0 and contract.agreement["quantity"] > 0 )
[docs] def on_contract_signed(self, contract: Contract) -> bool: # we need to cancel this contract if a partner was bankrupt (that is necessary only for # force_signing case as in this case the two partners will be assued to sign no matter what is # their bankruptcy status if ( any(self.a2f[_].is_bankrupt for _ in contract.partners) or contract.agreement["time"] >= self.n_steps ): self.ignore_contract(contract, as_dropped=False) return False result = super().on_contract_signed(contract) if not result: self.ignore_contract(contract, as_dropped=True) return False self.logdebug(f"SIGNED {str(contract)}") t = contract.agreement["time"] u, q = contract.agreement["unit_price"], contract.agreement["quantity"] product = contract.annotation["product"] agent, partner = contract.partners is_seller = agent == contract.annotation["seller"] self.a2f[agent].contracts[t].append( ContractInfo(q, u, product, is_seller, partner, contract) ) self.a2f[partner].contracts[t].append( ContractInfo(q, u, product, not is_seller, agent, contract) ) return True
[docs] def nullify_contract(self, contract: Contract, new_quantity: int): self.__n_nullified += 1 contract.nullified_at = self.current_step contract.annotation["new_quantity"] = new_quantity
[docs] def __register_breach( self, agent_id: str, level: float, contract_total: float, factory: Factory ) -> int: """ Registers a breach of the given level on the given agent. Assume that the contract is already added to the agent_contracts Args: agent_id: The perpetrator of the breach level: The breach level contract_total: The total of the contract breached (quantity * unit_price) factory: The factory corresponding to the perpetrator Returns: If nonzero, the agent should go bankrupt and this amount taken from them """ self.logdebug( f"{self.agents[agent_id].name} breached {level} of {contract_total}" ) return 0
# if factory.is_bankrupt: # return 0 # if level <= 0: # return 0 # penalty = int(math.ceil(level * contract_total)) # if factory.current_balance - penalty < self.bankruptcy_limit: # return penalty # if penalty > 0: # factory.pay(penalty) # self.penalties += penalty # return 0
[docs] def _spot_loss(self, aid: str) -> float: base = ( self._agent_spot_loss[self.a2i[aid], self.current_step] * self.spot_discount ) return ( base + self._agent_spot_quantity[self.a2i[aid], self.current_step] * self.spot_multiplier )
[docs] def start_contract_execution(self, contract: Contract) -> set[Breach] | None: self.logdebug(f"Executing {str(contract)}") s = self.current_step # get contract info breaches = set() if self.compensate_immediately and ( contract.nullified_at >= 0 or any(self.a2f[a].is_bankrupt for a in contract.partners) ): return None product = contract.annotation["product"] buyer_id, seller_id = ( contract.annotation["buyer"], contract.annotation["seller"], ) _buyer, buyer_factory = self.agents[buyer_id], self.a2f[buyer_id] _seller, seller_factory = self.agents[seller_id], self.a2f[seller_id] q, u, t = ( contract.agreement["quantity"], contract.agreement["unit_price"], contract.agreement["time"], ) if q <= 0 or u <= 0: self.logwarning( f"Contract {str(contract)} has zero quantity or unit price!!! will be ignored" ) return None # if the contract is already nullified, take care of it if contract.nullified_at >= 0: self.compensation_factory._inventory[product] = 0 self.compensation_factory._balance = 0 for c in self.compensation_records.get(contract.id, []): q = min(q, c.quantity) if c.product >= 0 and c.quantity > 0: assert c.product == product self.compensation_factory._inventory[product] += c.quantity self.compensation_factory._balance += c.money if c.seller_bankrupt: seller_factory = self.compensation_factory else: buyer_factory = self.compensation_factory else: q = 0 if seller_factory == buyer_factory: # means that both the seller and buyer are bankrupt return None if q == 0: return breaches p = q * u assert t == self.current_step self.agent_n_contracts[buyer_id] += 1 self.agent_n_contracts[seller_id] += 1 missing_product = max(0, q - seller_factory.current_inventory[product]) missing_money = max(0, p - buyer_factory.current_balance) # if there are no breaches, just execute the contract if missing_money <= 0 and missing_product <= 0: self._execute( product, q, p, u, buyer_factory, seller_factory, has_breaches=False ) self.__register_contract(seller_id, 0) self.__register_contract(buyer_id, 0) return breaches # if there is a product breach (the seller does not have enough products), register it if missing_product <= 0: self.__register_contract(seller_id, 0) else: product_breach_level = missing_product / q self.__register_contract(seller_id, product_breach_level) self.__register_breach(seller_id, product_breach_level, p, seller_factory) if seller_factory != self.compensation_factory: seller_indx = self.a2i[seller_id] effective_unit = seller_factory.spot_price( product, self._spot_loss(seller_id) ) paid = seller_factory.pay(missing_product * effective_unit) paid_for = paid // effective_unit self._agent_spot_quantity[seller_indx, self.current_step] += paid_for stored = seller_factory.store( product, paid_for, False, 0.0, no_bankruptcy=True, no_borrowing=True ) missing_product -= stored self._spot_quantity[seller_indx, s] += stored assert seller_factory.current_inventory[product] >= stored if missing_product > 0: product_breach_level = missing_product / q breaches.add( Breach( contract=contract, perpetrator=seller_id, victims=[buyer_id], level=product_breach_level, type="product", ) ) # if there is a money breach (the buyer does not have enough money), register it if missing_money <= 0: self.__register_contract(buyer_id, 0) else: money_breach_level = missing_money / p breaches.add( Breach( contract=contract, perpetrator=buyer_id, victims=[seller_id], level=money_breach_level, type="money", ) ) self.__register_contract(buyer_id, money_breach_level) self.__register_breach(buyer_id, money_breach_level, p, buyer_factory) if self.borrow_on_breach and buyer_factory != self.compensation_factory: # find out the amount to be paid to borrow the needed money penalty = ( (1 + self.spot_market_global_loss) if self.bankruptcy_limit != 0 else 1 ) to_pay = math.ceil(missing_money * penalty) paid = buyer_factory.pay(to_pay) missing_money -= (missing_money * paid) // to_pay # execute the contract to the limit possible # missing_product = max(0, q - seller_factory.current_inventory[product]) # missing_money = max(0, p - buyer_factory.current_balance) self._execute( product, q, p, u, buyer_factory, seller_factory, has_breaches=missing_product > 0 or missing_money > 0, ) # return the list of breaches return breaches
[docs] def complete_contract_execution( self, contract: Contract, breaches: list[Breach], resolution: Contract ) -> None: pass
[docs] def compensate( self, available: int, factory: Factory ) -> dict[str, list[tuple[Contract, int, int]]]: """ Called by a factory when it is going bankrupt after liquidation Args: available: The amount available from liquidation factory: The factory being bankrupted Returns: A mapping from agent ID to nullified contracts, the new quantity for them and compensation_money """ # get all future contracts of the bankrupt agent that are not executed in their delivery time order contracts = list( itertools.chain( *(factory.contracts[s] for s in range(self.current_step, self.n_steps)) ) ) owed = 0 total_owed = 0 nulled_contracts = [] for contract in contracts: total_owed += contract.q * contract.u if ( self.a2f[contract.partner].is_bankrupt or contract.contract.nullified_at >= 0 or is_system_agent(contract.contract.annotation["seller"]) or is_system_agent(contract.contract.annotation["buyer"]) or contract.contract.executed_at >= 0 ): continue nulled_contracts.append(contract) owed += contract.q * contract.u nullified_contracts = defaultdict(list) if available <= 0: for contract in nulled_contracts: nullified_contracts[self.agents[contract.partner].id].append( (contract.contract, 0, 0) ) self.record_bankrupt(factory) return nullified_contracts # calculate compensation fraction # if available >= owed: # fraction = self.compensation_fraction # else: # fraction = self.compensation_fraction * available / owed # calculate compensation and pay it as needed for contract in nulled_contracts: victim = self.agents[contract.partner] victim_factory = self.a2f.get(victim.id, None) # calculate compensation (as money) compensation_quantity = contract.q if contract.is_seller: compensation_unit = factory.spot_price( contract.product, self._agent_spot_loss[ self.a2i[factory.agent_id], self.current_step ], ) else: compensation_unit = contract.u compensation_quantity = min( int(math.floor(available / compensation_unit)), compensation_quantity ) compensation = min(available, compensation_quantity * compensation_unit) if compensation <= 0 and compensation_quantity <= 0: nullified_contracts[victim.id].append( (contract.contract, 0, compensation) ) self.nullify_contract(contract.contract, 0) continue if self.compensate_immediately: # pay immediate compensation if indicated victim_factory.pay(-compensation) else: # add the required products/money to the internal compensation inventory/funds to be paid at the # contract execution time. if contract.is_seller: self.compensation_records[contract.contract.id].append( CompensationRecord( contract.product, compensation_quantity, 0, True, victim_factory, ) ) else: self.compensation_records[contract.contract.id].append( CompensationRecord(-1, 0, compensation, False, victim_factory) ) compensation = 0 available -= compensation nullified_contracts[victim.id].append( (contract.contract, compensation_quantity, compensation) ) self.nullify_contract(contract.contract, compensation_quantity) assert available >= 0 self.record_bankrupt(factory) return nullified_contracts
[docs] def scores( self, assets_multiplier_trading: float | None = None, assets_multiplier_catalog: float | None = None, assets_multiplier: float | None = None, ) -> dict[str, float]: """scores of all agents given the asset multiplier. args: assets_multiplier: a multiplier to multiply the assets with. """ if assets_multiplier is not None and assets_multiplier_trading is None: assets_multiplier_trading = assets_multiplier if assets_multiplier_catalog is None: assets_multiplier_catalog = self.inventory_valuation_catalog if assets_multiplier_catalog is None: assets_multiplier_catalog = 0 if assets_multiplier_trading is None: assets_multiplier_trading = self.inventory_valuation_trading if assets_multiplier_trading is None: assets_multiplier_trading = 0 scores = dict() for aid, agent in self.agents.items(): if is_system_agent(aid): continue factory = self.a2f[aid] try: scores[aid] = ( factory.current_balance + ( assets_multiplier_trading * np.sum(factory.current_inventory * self.trading_prices) ) if assets_multiplier_trading else ( 0.0 + ( assets_multiplier_catalog * np.sum(factory.current_inventory * self.catalog_prices) ) if assets_multiplier_catalog else 0.0 - factory.initial_balance ) ) / (1 if not factory.initial_balance else factory.initial_balance) except Exception: scores[aid] = float("nan") return scores
@property
[docs] def winners(self): """The winners of this world (factory managers with maximum wallet balance""" if len(self.agents) < 1: return [] scores = self.scores() sa = sorted(zip(scores.values(), scores.keys()), reverse=True) max_score = sa[0][0] winners = [] for s, a in sa: if s < max_score: break winners.append(self.agents[a]) return winners
[docs] def trading_prices_for( self, discount: float = 1.0, condition="executed" ) -> np.ndarray: """ Calculates the prices at which all products traded using an optional discount factor Args: discount: A discount factor to treat older prices less importantly (exponential discounting). condition: The condition for contracts to consider. Possible values are executed, signed, concluded, nullified Returns: an n_products vector of trading prices """ prices = np.nan * np.ones((self.n_products, self.n_steps), dtype=float) quantities = np.zeros((self.n_products, self.n_steps), dtype=int) for contract in self.saved_contracts: if contract["condition" + "_at"] < 0: continue p, t, q, u = ( contract["product"], contract["delivery_time"], contract["quantity"], contract["unit_price"], ) prices[p, t] = (prices[p, t] * quantities[p, t] + u * q) / ( quantities[p, t] + q ) quantities[p, t] += q discount = np.cumprod(discount * np.ones(self.n_steps)) discount /= sum(discount) return np.nansum(np.nanprod(prices, discount), axis=-1)
@property
[docs] def trading_prices(self): if self.current_step == self.n_steps: return self._trading_price[:, -1] return self._trading_price[:, self.current_step + 1]
@property
[docs] def stats_df(self) -> pd.DataFrame: """Returns a pandas data frame with the stats""" return pd.DataFrame(super().stats)
@property
[docs] def contracts_df(self) -> pd.DataFrame: """Returns a pandas data frame with the contracts""" contracts = pd.DataFrame(self.saved_contracts) contracts["product_index"] = contracts.product_name.str.replace("p", "").astype( int ) contracts["breached"] = contracts.breaches.str.len() > 0 contracts["executed"] = contracts.executed_at >= 0 contracts["erred"] = contracts.erred_at >= 0 contracts["nullified"] = contracts.nullified_at >= 0 return contracts
@property
[docs] def system_agents(self) -> list[SCML2020Agent]: """Returns the two system agents""" return [_ for _ in self.agents.values() if is_system_agent(_.id)]
@property
[docs] def system_agent_names(self) -> list[str]: """Returns the names two system agents""" return [_ for _ in self.agents.keys() if is_system_agent(_)]
@property
[docs] def non_system_agents(self) -> list[SCML2020Agent]: """Returns all agents except system agents""" return [_ for _ in self.agents.values() if not is_system_agent(_.id)]
@property
[docs] def non_system_agent_names(self) -> list[str]: """Returns names of all agents except system agents""" return [_ for _ in self.agents.keys() if not is_system_agent(_)]
@property
[docs] def agreement_fraction(self) -> float: """Fraction of negotiations ending in agreement and leading to signed contracts""" n_negs = sum(self.stats["n_negotiations"]) n_contracts = self.n_saved_contracts(True) return n_contracts / n_negs if n_negs != 0 else np.nan
[docs] system_agent_ids = system_agent_names
[docs] non_system_agent_ids = non_system_agent_names
[docs] def draw( self, steps: tuple[int, int] | int | None = None, what: Collection[str] = DEFAULT_EDGE_TYPES, who: Callable[[Agent], bool] = None, where: Callable[[Agent], int | tuple[float, float]] = None, together: bool = True, axs: Collection[Axis] = None, ncols: int = 4, figsize: tuple[int, int] = (15, 15), **kwargs, ) -> tuple[Axis, nx.Graph] | tuple[list[Axis], list[nx.Graph]]: if where is None: def where(x): return ( self.n_processes + 1 if x == SYSTEM_BUYER_ID else 0 if x == SYSTEM_SELLER_ID else int(self.agents[x].awi.profile.processes[0] + 1) ) return super().draw( steps, what, who, where, together=together, axs=axs, ncols=ncols, figsize=figsize, **kwargs, )
[docs] class SCML2021World(SCML2020World): def __init__(self, *args, **kwargs): kwargs["n_concurrent_negs_between_partners"] = 5 kwargs["publish_trading_prices"] = True kwargs["publish_exogenous_summary"] = True super().__init__(*args, **kwargs) @classmethod
[docs] def generate( cls, *args, inventory_valuation_trading: np.ndarray | tuple[float, float] | float = ( 0.0, 0.5, ), horizon: tuple[float, float] | float = (0.2, 0.5), **kwargs, ) -> dict[str, Any]: kwargs["inventory_valuation_trading"] = inventory_valuation_trading kwargs["horizon"] = horizon return super().generate(*args, **kwargs)
[docs] class SCML2022World(SCML2021World): pass
[docs] class SCML2023World(SCML2022World): pass
[docs] class SCML2024World(SCML2022World): pass