"""Implements the world class for the SCML2020 world """
from __future__ import annotations
import copy
import itertools
import logging
import math
import random
import sys
from collections import Counter, defaultdict, namedtuple
from typing import Any, Callable, Collection, Iterable
import networkx as nx
import numpy as np
import pandas as pd
from matplotlib.axis import Axis
# from negmas import Adapter
from negmas import Action, Agent, Breach, Contract
from negmas.helpers import get_class, get_full_type_name, instantiate, unique_name
from negmas.situated import (
DEFAULT_EDGE_TYPES,
BreachProcessing,
Operations,
TimeInAgreementMixin,
World,
)
from ..common import (
distribute_quantities,
fraction_cut,
integer_cut,
intin,
make_array,
realin,
)
from ..oneshot.agent import OneShotAgent
from .agent import OneShotAdapter, SCML2020Agent, _SystemAgent
from .awi import AWI
from .common import (
COMPENSATION_ID,
INFINITE_COST,
NO_COMMAND,
SYSTEM_BUYER_ID,
SYSTEM_SELLER_ID,
ContractInfo,
ExogenousContract,
FactoryProfile,
Failure,
FinancialReport,
is_system_agent,
)
from .factory import Factory
__all__ = [
"SCML2020World",
"SCML2021World",
"SCML2022World",
"SCML2023World",
"SCML2024World",
"Failure",
"AWI",
]
MAX_WORLD_GENERATION_TRIALS = 100
"""Maximum number of world generation trials for recoverable errors"""
def _realin(rng: tuple[float, float] | float) -> float:
"""
Selects a random number within a range if given or the input if it was a float
Args:
rng: Range or single value
Returns:
the real within the given range
"""
if not isinstance(rng, Iterable):
return rng
if abs(rng[1] - rng[0]) < 1e-8:
return rng[0]
return rng[0] + random.random() * (rng[1] - rng[0])
class RecoverableWorldGenerationException(Exception):
"""Thrown internally by world generation functions if they cannot
generate the world but may be able to do so with the same inputs
in another call"""
pass
CompensationRecord = namedtuple(
"CompensationRecord", ["product", "quantity", "money", "seller_bankrupt", "factory"]
)
"""A record of delayed compensation used when a factory goes bankrupt to keep
honoring its future contracts to the limit possible"""
ContractRecord = namedtuple(
"ContractRecord",
[
"product",
"signed",
"time",
"seller",
"buyer",
"quantity",
"unit_price",
"is_exogenous",
],
)
"""A record of a contract using during world generation"""
[docs]
class SCML2020World(TimeInAgreementMixin, World):
"""A Supply Chain SCML2020World simulation as described for the SCML league of ANAC @ IJCAI 2020.
Args:
process_inputs: An n_processes vector specifying the number of inputs from each product needed to execute
each process.
process_outputs: An n_processes vector specifying the number of inputs from each product generated by
executing each process.
catalog_prices: An n_products vector (i.e. n_processes+1 vector) giving the catalog price of all products
profiles: An n_agents list of `FactoryProfile` objects specifying the private profile of the factory
associated with each agent.
agent_types: An n_agents list of strings/ `SCML2020Agent` classes specifying the type of each agent
agent_params: An n_agents dictionaries giving the parameters of each agent
initial_balance: The initial balance in each agent's wallet. All agents will start with this same value.
allow_selling_input: Allows agents to sell their input product(s) through negotiation
allow_buying_output: Allows agents to buy their output product(s) through negotiation
catalog_quantities: The quantities in the past for which catalog_prices are the average unit prices. This
is used when updating the trading prices. If set to zero then the trading price will
follow the market price and will not use the catalog_price (except for products that are
never sold in the market for which the trading price will take the default value of the
catalog price). If set to a large value (e.g. 10000), the price at which a product is sold
will not affect the trading price
spot_market_global_loss: Buying from the spot market will cost trading-price * (1+`spot_market_global_loss) and
selling to it will cost trading-price / (1+ spot_market_global_loss) for agents with
unit spot-market-loss-multiplier
financial_report_period: The number of steps between financial reports. If < 1, it is a fraction of n_steps
borrow_on_breach: If true, agents will be forced to borrow money on breach as much as possible to honor the
contract
interest_rate: The interest at which loans grow over time (it only affect a factory when its balance is
negative)
bankruptcy_limit: The maximum amount that be be borrowed (including interest). The balance of any factory cannot
go lower than - borrow_limit or the agent will go bankrupt immediately
liquidation_rate: The rate at which future contracts get liquidated when an agent gets bankrupt. It should be
between zero and one.
compensation_fraction: Fraction of a contract to be compensated (at most) if a partner goes bankrupt. Notice
that this fraction is not guaranteed because the bankrupt agent may not have enough
assets to pay all of its standing contracts to this level of compensation. In such
cases, a smaller fraction will be used.
compensate_immediately: If true, compensation will happen immediately when an agent goes bankrupt and in
in money. This means that agents with contracts involving the bankrupt agent will
just have these contracts be nullified and receive monetary compensation immediately
. If false, compensation will not happen immediately but at the contract execution
time. In this case, agents with contracts involving the bankrupt agent will be
informed of the compensation fraction (instead of the compensation money) at the
time of bankruptcy and will receive the compensation in kind (money if they are
sellers and products if they are buyers) at the normal execution time of the
contract. In the special case of no-compensation (i.e. `compensation_fraction` is
zero or the bankrupt agent has no assets), the two options will behave similarity.
compensate_before_past_debt: If true, then compensations will be paid before past debt is considered,
otherwise, the money from liquidating bankrupt agents will first be used to
pay past debt then whatever remains will be used for compensation. Notice that
in all cases, the trigger of bankruptcy will be paid before compensation and
past debts.
exogenous_horizon: The horizon for revealing external contracts
exogenous_force_max: If true, exogenous contracts are forced to be signed independent of the setting of
`force_signing`
production_no_borrow: If true, agents will not borrow if they fail to satisfy its production need to execute
a scheduled production command
production_no_bankruptcy: If true, agents will not go bankrupt because of an production related transaction.
production_penalty: The penalty paid when buying from spot-market to satisfy production needs
production_confirm: If true, the factory will confirm running processes at every time-step just before running
them by calling `confirm_production` on the agent controlling it.
compact: If True, no logs will be kept and the whole simulation will use a smaller memory footprint
n_steps: Number of simulation steps (can be considered as days).
time_limit: Total time allowed for the complete simulation in seconds.
neg_n_steps: Number of negotiation steps allowed for all negotiations.
neg_time_limit: Total time allowed for a complete negotiation in seconds.
neg_step_time_limit: Total time allowed for a single step of a negotiation. in seconds.
negotiation_speed: The number of negotiation steps that pass in every simulation step. If 0, negotiations
will be guaranteed to finish within a single simulation step
signing_delay: The number of simulation steps to pass between a contract is concluded and signed
name: The name of the simulations
**kwargs: Other parameters that are passed directly to `SCML2020World` constructor.
"""
def __init__(
self,
# SCML2020 specific parameters
process_inputs: np.ndarray,
process_outputs: np.ndarray,
catalog_prices: np.ndarray,
profiles: list[FactoryProfile],
agent_types: list[type[SCML2020Agent]],
agent_params: list[dict[str, Any]] | None = None,
exogenous_contracts: Collection[ExogenousContract] = (),
initial_balance: np.ndarray | tuple[int, int] | int = 1000,
allow_buying_output=False,
allow_selling_input=False,
catalog_quantities: int | np.ndarray = 50,
# breach processing parameters
buy_missing_products=True,
borrow_on_breach=True,
bankruptcy_limit=0.0,
liquidation_rate=1.0,
spot_market_global_loss=0.30,
interest_rate=0.05,
financial_report_period: int = 5,
# compensation parameters (for victims of bankrupt agents)
compensation_fraction: float = 1.0,
compensate_immediately=False,
compensate_before_past_debt=True,
# external contracts parameters
exogenous_horizon: int | None = None,
exogenous_force_max: bool = False,
# production failure parameters
production_confirm=False,
production_buy_missing=False,
production_no_borrow=True,
production_no_bankruptcy=False,
production_penalty=0.15,
# General SCML2020World Parameters
compact=False,
no_logs=False,
n_steps=1000,
time_limit=60 * 90,
# mechanism params
neg_n_steps=20,
neg_time_limit=2 * 60,
neg_step_time_limit=60,
negotiation_speed=21,
negotiation_quota_per_step=None,
negotiation_quota_per_simulation=float("inf"),
n_concurrent_negs_between_partners=float("inf"),
shuffle_negotiations=False,
end_negotiation_on_refusal_to_propose=True,
# trading price parameters
trading_price_discount=0.9,
# spot market parameters
spot_discount=0.9,
spot_multiplier=0.05,
# simulation parameters
signing_delay=0,
force_signing=False,
batch_signing=True,
name: str = None,
# public information
publish_exogenous_summary=True,
publish_trading_prices=True,
# debugging parameters
agent_name_reveals_position: bool = True,
agent_name_reveals_type: bool = True,
# evaluation paramters
inventory_valuation_trading: float = 0.5,
inventory_valuation_catalog: float = 0.0,
**kwargs,
):
if negotiation_quota_per_step is None:
negotiation_quota_per_step = len(agent_types) * n_steps // 2
if exogenous_horizon is None:
exogenous_horizon = n_steps
[docs]
self.publish_exogenous_summary = publish_exogenous_summary
[docs]
self.publish_trading_prices = publish_trading_prices
[docs]
self.allow_buying_output = allow_buying_output
[docs]
self.exogenous_horizon = exogenous_horizon
[docs]
self.buy_missing_products = buy_missing_products
[docs]
self.production_buy_missing = production_buy_missing
[docs]
self.liquidation_rate = liquidation_rate
[docs]
self.trading_price_discount = trading_price_discount
[docs]
self.spot_discount = spot_discount
[docs]
self.spot_multiplier = spot_multiplier
[docs]
self.catalog_quantities = catalog_quantities
[docs]
self.inventory_valuation_trading = inventory_valuation_trading
[docs]
self.inventory_valuation_catalog = inventory_valuation_catalog
[docs]
self.n_concurrent_negs_between_partners = n_concurrent_negs_between_partners
kwargs["log_to_file"] = not no_logs
if compact:
kwargs["event_file_name"] = None
kwargs["event_types"] = []
kwargs["log_screen_level"] = logging.CRITICAL
kwargs["log_file_level"] = logging.ERROR
kwargs["log_negotiations"] = False
kwargs["log_ufuns"] = False
# kwargs["save_mechanism_state_in_contract"] = False
kwargs["save_cancelled_contracts"] = False
kwargs["save_resolved_breaches"] = False
kwargs["save_negotiations"] = True
else:
kwargs["save_negotiations"] = True
if negotiation_speed == 0:
negotiation_speed = neg_n_steps + 1
mechanisms = kwargs.pop("mechanisms", {})
super().__init__(
bulletin_board=None,
breach_processing=BreachProcessing.NONE,
awi_type="scml.scml2020.AWI",
shuffle_negotiations=shuffle_negotiations,
mechanisms={
"negmas.sao.SAOMechanism": mechanisms.get(
"negmas.sao.SAOMechanism",
dict(
end_on_no_response=end_negotiation_on_refusal_to_propose,
dynamic_entry=False,
max_wait=negotiation_quota_per_step,
),
)
},
default_signing_delay=signing_delay,
n_steps=n_steps,
time_limit=time_limit,
negotiation_speed=negotiation_speed,
neg_n_steps=neg_n_steps,
neg_time_limit=neg_time_limit,
neg_step_time_limit=neg_step_time_limit,
force_signing=force_signing,
batch_signing=batch_signing,
negotiation_quota_per_step=negotiation_quota_per_step,
negotiation_quota_per_simulation=negotiation_quota_per_simulation,
no_logs=no_logs,
operations=(
Operations.StatsUpdate,
Operations.SimulationStep,
Operations.Negotiations,
Operations.ContractSigning,
Operations.ContractExecution,
Operations.AgentSteps,
Operations.SimulationStep,
Operations.StatsUpdate,
),
name=name,
**kwargs,
)
self.bulletin_board.record(
"settings", publish_trading_prices, "public_trading_prices"
)
self.bulletin_board.record(
"settings", publish_exogenous_summary, "public_exogenous_summary"
)
self.bulletin_board.record(
"settings", buy_missing_products, "buy_missing_products"
)
self.bulletin_board.record(
"settings", inventory_valuation_trading, "inventory_valuation_trading"
)
self.bulletin_board.record(
"settings", inventory_valuation_catalog, "inventory_valuation_catalog"
)
self.bulletin_board.record(
"settings",
inventory_valuation_trading + inventory_valuation_catalog,
"inventory_valuation",
)
self.bulletin_board.record("settings", borrow_on_breach, "borrow_on_breach")
self.bulletin_board.record("settings", bankruptcy_limit, "bankruptcy_limit")
self.bulletin_board.record(
"settings", spot_market_global_loss, "spot_market_global_loss"
)
self.bulletin_board.record(
"settings", financial_report_period, "financial_report_period"
)
self.bulletin_board.record("settings", interest_rate, "interest_rate")
self.bulletin_board.record("settings", exogenous_horizon, "exogenous_horizon")
self.bulletin_board.record(
"settings", compensation_fraction, "compensation_fraction"
)
self.bulletin_board.record(
"settings", compensate_immediately, "compensate_immediately"
)
self.bulletin_board.record(
"settings", compensate_before_past_debt, "compensate_before_past_debt"
)
self.bulletin_board.record(
"settings", exogenous_force_max, "exogenous_force_max"
)
self.bulletin_board.record("settings", production_confirm, "production_confirm")
self.bulletin_board.record(
"settings", production_buy_missing, "production_buy_missing"
)
self.bulletin_board.record(
"settings", production_no_borrow, "production_no_borrow"
)
self.bulletin_board.record(
"settings", production_no_bankruptcy, "production_no_bankruptcy"
)
self.bulletin_board.record("settings", production_penalty, "production_penalty")
self.bulletin_board.record(
"settings", len(exogenous_contracts) > 0, "has_exogenous_contracts"
)
self.bulletin_board.record(
"settings",
n_concurrent_negs_between_partners,
"n_concurrent_negs_between_partners",
)
if self.info is None:
self.info = {}
self.info.update(
shuffle_negotiations=shuffle_negotiations,
end_negotiation_on_refusal_to_propose=end_negotiation_on_refusal_to_propose,
process_inputs=process_inputs,
process_outputs=process_outputs,
catalog_prices=catalog_prices,
agent_types_final=[get_full_type_name(_) for _ in agent_types],
agent_params_final=agent_params,
initial_balance_final=initial_balance,
buy_missing_products=buy_missing_products,
production_buy_missing=production_buy_missing,
borrow_on_breach=borrow_on_breach,
bankruptcy_limit=bankruptcy_limit,
spot_market_global_loss=spot_market_global_loss,
financial_report_period=financial_report_period,
interest_rate=interest_rate,
compensation_fraction=compensation_fraction,
compensate_immediately=compensate_immediately,
compensate_before_past_debt=compensate_before_past_debt,
exogenous_force_max=exogenous_force_max,
production_no_borrow=production_no_borrow,
production_no_bankruptcy=production_no_bankruptcy,
production_penalty=production_penalty,
compact=compact,
no_logs=no_logs,
n_steps=n_steps,
time_limit=time_limit,
neg_n_steps=neg_n_steps,
neg_time_limit=neg_time_limit,
neg_step_time_limit=neg_step_time_limit,
negotiation_speed=negotiation_speed,
signing_delay=signing_delay,
agent_name_reveals_position=agent_name_reveals_position,
agent_name_reveals_type=agent_name_reveals_type,
publish_exogenous_summary=publish_exogenous_summary,
publish_trading_prices=publish_trading_prices,
)
TimeInAgreementMixin.init(self, time_field="time")
[docs]
self.spot_market_global_loss = spot_market_global_loss
self.bulletin_board.add_section("reports_time")
self.bulletin_board.add_section("reports_agent")
if self.publish_exogenous_summary:
self.bulletin_board.add_section("exogenous_contracts_summary")
if self.publish_trading_prices:
self.bulletin_board.add_section("trading_prices")
[docs]
self.production_no_borrow = production_no_borrow
[docs]
self.production_no_bankruptcy = production_no_bankruptcy
[docs]
self.production_penalty = production_penalty
[docs]
self.compensation_fraction = compensation_fraction
if not isinstance(agent_types, Iterable):
agent_types = [agent_types] * len(profiles)
assert len(profiles) == len(agent_types)
[docs]
self.profiles = profiles
[docs]
self.catalog_prices = catalog_prices
[docs]
self.process_outputs = process_outputs
[docs]
self.n_products = len(catalog_prices)
[docs]
self.n_processes = len(process_inputs)
[docs]
self.borrow_on_breach = borrow_on_breach
[docs]
self.interest_rate = interest_rate
[docs]
self.exogenous_force_max = exogenous_force_max
[docs]
self.compensate_before_past_debt = compensate_before_past_debt
[docs]
self.confirm_production = production_confirm
[docs]
self.financial_reports_period = (
financial_report_period
if financial_report_period >= 1
else int(0.5 + financial_report_period * n_steps)
)
self.compensation_fraction = compensation_fraction
initial_balance = make_array(initial_balance, len(profiles), dtype=int)
# assert all([_> 1e-5 for _ in initial_balance]), "Some initial balances are zero or negative {initial_balance}"
agent_types = [get_class(_) for _ in agent_types]
[docs]
self.bankruptcy_limit = (
-bankruptcy_limit
if isinstance(bankruptcy_limit, int)
else -int(0.5 + bankruptcy_limit * initial_balance.mean())
)
assert self.n_products == self.n_processes + 1
n_agents = len(profiles)
if agent_name_reveals_position or agent_name_reveals_type:
default_names = [f"{_:02}" for _ in range(n_agents)]
else:
default_names = [unique_name("", add_time=False) for _ in range(n_agents)]
if agent_name_reveals_type:
for i, (at, ap) in enumerate(zip(agent_types, agent_params)):
if issubclass(at, OneShotAdapter):
s2 = (
get_class(ap["oneshot_type"])
._type_name()
.split(".")[-1]
.replace("Agent", "")
.replace("Adapter", "")
.replace("OneShot", "")
)
s2 += f'O({at._type_name().split(".")[-1].replace("Agent", "").replace("OneShot", "").replace("Adapter", "")})'
else:
s2 = at._type_name().split(".")[-1].replace("Agent", "")
s = "".join([c for c in s2 if c.isupper()])[:3]
try:
if len(s) < 3:
if len(s2) > 3:
s = s2[:2]
elif len(s2) >= 2:
s = s2[0] + s2[1 : 1 + (3 - len(s))] + s2[1:]
elif len(s2) > 0:
s = s2[0] * 3
else:
s = "Agt"
except Exception:
pass
default_names[i] += f"{s}"
agent_levels = [
int(np.nonzero(np.max(p.costs != INFINITE_COST, axis=0).flatten())[0])
for p in profiles
]
if agent_name_reveals_position:
for i, L_ in enumerate(agent_levels):
default_names[i] += f"@{L_:01}"
# for i, (t_, p_ ) in enumerate(zip(agent_types, agent_params)):
# if issubclass(get_class(t_), Adapter):
# p_["obj"] = get_class(p_["oneshot_type"])(**p_.get("oneshot_params", dict()))
if agent_params is None:
agent_params = [dict(name=name) for i, name in enumerate(default_names)]
elif isinstance(agent_params, dict):
a = copy.copy(agent_params)
agent_params = []
for i, name in enumerate(default_names):
b = copy.deepcopy(a)
b["name"] = name
agent_params.append(b)
elif len(agent_params) == 1:
a = copy.copy(agent_params[0])
agent_params = []
for i, _ in enumerate(default_names):
b = copy.deepcopy(a)
b["name"] = name
agent_params.append(b)
else:
if agent_name_reveals_type or agent_name_reveals_position:
for i, (ns, ps) in enumerate(zip(default_names, agent_params)):
agent_params[i] = dict(**ps)
agent_params[i]["name"] = ns
n_processes = len(process_inputs)
n_products = n_processes + 1
agent_types += [_SystemAgent, _SystemAgent]
agent_params += [{"role": SYSTEM_SELLER_ID}, {"role": SYSTEM_BUYER_ID}]
initial_balance = initial_balance.tolist() + [
sys.maxsize // 4,
sys.maxsize // 4,
]
profiles.append(FactoryProfile(INFINITE_COST * np.ones(n_processes, dtype=int)))
profiles.append(FactoryProfile(INFINITE_COST * np.ones(n_processes, dtype=int)))
agents = []
for i, (atype, aparams) in enumerate(zip(agent_types, agent_params)):
# aparams = {k:v for k, v in aparams if k not in ("oneshot_params", "oneshot_type")}
a = instantiate(atype, **aparams)
a.id = a.name
self.join(a, i)
agents.append(a)
[docs]
self.agent_types = [_.type_name for _ in agents]
[docs]
self.agent_params = [
{k: v for k, v in _.items() if k not in ("name",)} for _ in agent_params
]
agent_params = [
{
k: v
for k, v in _.items()
if k not in ("name", "oneshot_type", "oneshot_params", "obj")
}
for _ in agent_params
]
[docs]
self.agent_unique_types = [
f"{t}{hash(str(p))}" if len(p) > 0 else t
for t, p in zip(self.agent_types, agent_params)
]
assert isinstance(initial_balance, Iterable)
[docs]
self.factories = [
Factory(
world=self,
profile=profile,
initial_balance=initial_balance[i],
inputs=process_inputs,
outputs=process_outputs,
agent_id=agents[i].id,
catalog_prices=catalog_prices,
compensate_before_past_debt=self.compensate_before_past_debt,
buy_missing_products=self.buy_missing_products,
production_buy_missing=self.production_buy_missing,
production_penalty=self.production_penalty,
production_no_borrow=self.production_no_borrow,
production_no_bankruptcy=self.production_no_bankruptcy,
confirm_production=self.confirm_production,
initial_inventory=(
None
if i < len(profiles) - 2
else sys.maxsize // 4 * np.ones(n_products, dtype=int)
),
)
for i, profile in enumerate(profiles)
]
[docs]
self.a2f = dict(zip((_.id for _ in agents), self.factories))
[docs]
self.afp = list(zip(agents, self.factories, profiles))
self.f2i = self.a2i = dict(zip((_.id for _ in agents), range(n_agents)))
[docs]
self.i2f = self.factories
[docs]
self.breach_prob = dict(zip((_.id for _ in agents), itertools.repeat(0.0)))
[docs]
self._breach_level = dict(zip((_.id for _ in agents), itertools.repeat(0.0)))
[docs]
self.agent_n_contracts = dict(zip((_.id for _ in agents), itertools.repeat(0)))
[docs]
self.suppliers: list[list[str]] = [[] for _ in range(n_products)]
[docs]
self.consumers: list[list[str]] = [[] for _ in range(n_products)]
[docs]
self.agent_processes: dict[str, list[int]] = defaultdict(list)
[docs]
self.agent_outputs: dict[str, list[int]] = defaultdict(list)
[docs]
self.agent_consumers: dict[str, list[str]] = defaultdict(list)
[docs]
self.agent_suppliers: dict[str, list[str]] = defaultdict(list)
self.consumers[n_products - 1].append(SYSTEM_BUYER_ID)
self.agent_processes[SYSTEM_BUYER_ID] = []
self.agent_inputs[SYSTEM_BUYER_ID] = [n_products - 1]
self.agent_outputs[SYSTEM_BUYER_ID] = []
self.suppliers[0].append(SYSTEM_SELLER_ID)
self.agent_processes[SYSTEM_SELLER_ID] = []
self.agent_inputs[SYSTEM_SELLER_ID] = []
self.agent_outputs[SYSTEM_SELLER_ID] = [0]
[docs]
self.agent_profiles: dict[str, Any] = dict()
[docs]
self.initial_balances: dict[str, Any] = dict()
for agent_id, profile in zip(self.agents.keys(), profiles):
self.agent_profiles[agent_id] = profile
for agent_id, ib in zip(self.agents.keys(), initial_balance):
self.initial_balances[agent_id] = ib
for p in range(n_processes):
for agent_id, profile in zip(self.agents.keys(), profiles):
self.agent_profiles[agent_id] = profile
if is_system_agent(agent_id):
continue
if np.all(profile.costs[:, p] == INFINITE_COST):
continue
self.suppliers[p + 1].append(agent_id)
self.consumers[p].append(agent_id)
self.agent_processes[agent_id].append(p)
self.agent_inputs[agent_id].append(p)
self.agent_outputs[agent_id].append(p + 1)
for p in range(n_products):
for a in self.suppliers[p]:
self.agent_consumers[a] = self.consumers[p]
for a in self.consumers[p]:
self.agent_suppliers[a] = self.suppliers[p]
self.agent_processes = {k: np.array(v) for k, v in self.agent_processes.items()}
self.agent_inputs = {k: np.array(v) for k, v in self.agent_inputs.items()}
self.agent_outputs = {k: np.array(v) for k, v in self.agent_outputs.items()}
assert all(
len(v) == 1 or is_system_agent(self.agents[k].id)
for k, v in self.agent_outputs.items()
), f"Not all agent outputs are singular:\n{self.agent_outputs}"
assert all(
len(v) == 1 or is_system_agent(self.agents[k].id)
for k, v in self.agent_inputs.items()
), f"Not all agent inputs are singular:\n{self.agent_outputs}"
assert all(
is_system_agent(k) or self.agent_inputs[k][0] == self.agent_outputs[k] - 1
for k in self.agent_inputs.keys()
), f"Some agents have outputs != input+1\n{self.agent_outputs}\n{self.agent_inputs}"
[docs]
self._n_production_failures = 0
# self.is_bankrupt: dict[str, bool] = dict(
# zip(self.agents.keys(), itertools.repeat(False))
# )
[docs]
self.compensation_balance = 0
[docs]
self.compensation_records: dict[str, list[CompensationRecord]] = defaultdict(
list
)
[docs]
self.exogenous_contracts: dict[int : list[Contract]] = defaultdict(list)
for c in exogenous_contracts:
seller_id = agents[c.seller].id if c.seller >= 0 else SYSTEM_SELLER_ID
buyer_id = agents[c.buyer].id if c.buyer >= 0 else SYSTEM_BUYER_ID
if not c.quantity:
continue
contract = Contract(
agreement={
"time": c.time,
"quantity": c.quantity,
"unit_price": c.unit_price,
},
partners=[buyer_id, seller_id],
issues=[],
signatures=dict(),
signed_at=-1,
to_be_signed_at=c.revelation_time,
annotation={
"seller": seller_id,
"buyer": buyer_id,
"caller": (
SYSTEM_SELLER_ID
if seller_id == SYSTEM_SELLER_ID
else SYSTEM_BUYER_ID
),
"is_buy": random.random() > 0.5,
"product": c.product,
},
)
self.exogenous_contracts[c.revelation_time].append(contract)
[docs]
self.compensation_factory = Factory(
FactoryProfile(np.zeros((n_steps, n_processes), dtype=int)),
initial_balance=0,
inputs=self.process_inputs,
outputs=self.process_outputs,
world=self,
agent_id=COMPENSATION_ID,
agent_name=COMPENSATION_ID,
catalog_prices=catalog_prices,
compensate_before_past_debt=True,
buy_missing_products=False,
production_buy_missing=False,
production_penalty=0.0,
production_no_borrow=True,
production_no_bankruptcy=True,
confirm_production=False,
)
# _real_prices are the prices at which products are actually traded (it can be nan if the product was
# never traded at some step). _trading_prices are the effective prices taking catalog prices and past trade
# into account
# Note that we go to step n_steps not only to n_steps-1 which means that the value at s is the value BEFORE
# step s which is what we really need usually.
[docs]
self._agent_output = np.zeros((n_agents, n_products), dtype=int)
for i, profile in enumerate(profiles[:-2]):
self._agent_output[i, profile.processes + 1] = 1
self._agent_input[i, profile.processes] = 1
[docs]
self._traded_quantity = np.ones(n_products) * self.catalog_quantities
[docs]
self._real_price = np.nan * np.ones((n_products, n_steps + 1))
[docs]
self._sold_quantity = np.zeros((n_products, n_steps + 1), dtype=int)
# self._real_price[0, :] = self.catalog_prices[0]
# self._real_price[-1, :] = self.catalog_prices[-1]
self._real_price[:, 0] = self.catalog_prices
[docs]
self._trading_price = np.tile(
self._real_price[:, 0].reshape((n_products, 1)), (1, n_steps + 1)
)
[docs]
self._betas = np.ones(n_steps + 1)
self._betas[1] = self.trading_price_discount
self._betas[1:] = np.cumprod(self._betas[1:])
[docs]
self._betas_sum = self.catalog_quantities * np.ones((n_products, n_steps + 1))
[docs]
self._spot_quantity = np.zeros((n_agents, n_steps), dtype=int)
[docs]
self._alphas = np.ones(n_steps + 1)
self._alphas[1] = self.spot_discount
self._alphas[1:] = np.cumprod(self._alphas[1:])
[docs]
self._agent_spot_loss = self.spot_market_global_loss * np.ones(
(n_agents, n_steps)
)
[docs]
self._agent_spot_quantity = np.zeros((n_agents, n_steps), dtype=int)
[docs]
self._registered_negs: dict[tuple[str], int] = Counter()
if self.publish_trading_prices:
self.bulletin_board.record("trading_prices", self._trading_price[:, 1])
[docs]
self.exogenous_contracts_summary = None
if self.publish_exogenous_summary:
q = np.zeros((self.n_products, self.n_steps, 2))
for s in range(self.n_steps):
for contract in self.exogenous_contracts[s]:
product = contract.annotation["product"]
quantity, unit_price, t = (
contract.agreement["quantity"],
contract.agreement["unit_price"],
contract.agreement["time"],
)
q[product, t, 0] += quantity
if quantity:
q[product, t, 1] += quantity * unit_price
self.exogenous_contracts_summary = q
for s in range(self.n_steps):
self.bulletin_board.record(
"exogenous_contracts_summary",
value=self.exogenous_contracts_summary,
key=s,
)
self.info.update(
dict(
agent_profiles={
k: dict(
costs=v.costs.tolist(),
n_lines=v.n_lines,
input_product=int(
v.input_products[0]
if v.input_products is not None and len(v.input_products)
else -1
),
output_product=int(
v.output_products[0]
if v.output_products is not None and len(v.output_products)
else -1
),
)
for k, v in self.agent_profiles.items()
}
)
)
self.info.update(dict(agent_inputs=self.agent_inputs))
self.info.update(dict(agent_outputs=self.agent_outputs))
self.info.update(dict(agent_processes=self.agent_processes))
self.info.update(dict(agent_initial_balances=self.initial_balances))
@classmethod
[docs]
def generate(
cls,
agent_types: list[type[SCML2020Agent] | str],
agent_params: list[dict[str, Any]] | None = None,
agent_processes: list[int] | None = None,
n_steps: tuple[int, int] | int = (50, 200),
n_processes: tuple[int, int] | int = (2, 4),
n_lines: np.ndarray | tuple[int, int] | int = 10,
n_agents_per_process: np.ndarray | tuple[int, int] | int = (2, 4),
process_inputs: np.ndarray | tuple[int, int] | int = 1,
process_outputs: np.ndarray | tuple[int, int] | int = 1,
production_costs: np.ndarray | tuple[int, int] | int = (1, 4),
profit_means: np.ndarray | tuple[float, float] | float = (0.15, 0.2),
profit_stddevs: np.ndarray | tuple[float, float] | float = 0.001,
max_productivity: np.ndarray | tuple[float, float] | float = 1.0,
initial_balance: np.ndarray | tuple[int, int] | int | None = None,
cost_increases_with_level=True,
equal_exogenous_supply=False,
equal_exogenous_sales=False,
exogenous_supply_predictability: tuple[float, float] | float = (0.6, 0.9),
exogenous_sales_predictability: tuple[float, float] | float = (0.6, 0.9),
exogenous_control: tuple[float, float] | float = (0.2, 0.8),
cash_availability: tuple[float, float] | float = (1.5, 2.5),
force_signing=False,
profit_basis=np.max,
horizon: tuple[float, float] | float = (0.2, 0.5),
inventory_valuation_trading: np.ndarray | tuple[float, float] | float = 0.5,
inventory_valuation_catalog: np.ndarray | tuple[float, float] | float = 0.0,
random_agent_types: bool = False,
cost_relativity: float = 1.0,
exogenous_generation_method="profitable",
exogenous_supply_surplus: tuple[float, float] | float = 0.0,
exogenous_sales_surplus: tuple[float, float] | float = 0.0,
run_extra_checks: bool = True,
**kwargs,
) -> dict[str, Any]:
"""
Generates the configuration for a world
Args:
agent_types: All agent types
agent_params: Agent parameters used to initialize them
n_steps: Number of simulation steps
n_processes: Number of processes in the production chain
n_lines: Number of lines per factory
process_inputs: Number of input units per process
process_outputs: Number of output units per process
production_costs: Production cost per factory
profit_means: Mean profitability per production level (i.e. process).
profit_stddevs: Std. Dev. of the profitability of every level (i.e. process).
inventory_valuation_catalog: The fraction of catalog price to value items at the end.
inventory_valuation_trading: The fraction of trading price to value items at the end.
max_productivity: Maximum possible productivity per level (i.e. process).
initial_balance: The initial balance of all agents
n_agents_per_process: Number of agents per process
agent_processes: The process for each agent. If not `None` , it will override `n_agents_per_process` and must be a list/tuple
of the same length as `agent_types` . Morevoer, `random_agent_types` must be False in this case
cost_increases_with_level: If true, production cost will be higher for processes nearer to the final
product.
profit_basis: The statistic used when controlling catalog prices by profit arguments. It can be np.mean,
np.median, np.min, np.max or any Callable[[list[float]], float] and is used to summarize
production costs at every level.
horizon: The horizon used for revealing external supply/sales as a fraction of n_steps
equal_exogenous_supply: If true, external supply will be distributed equally among all agents in the first
layer
equal_exogenous_sales: If true, external sales will be distributed equally among all agents in the last
layer
exogenous_supply_predictability: How predictable are exogenous supplies of each agent over time. 1.0 means
that every agent will have the same quantity for all of its contracts over
time. 0.0 means quantities per agent are completely random
exogenous_sales_predictability: How predictable are exogenous supplies of each agent over time. 1.0 means
that every agent will have the same quantity for all of its contracts over
time. 0.0 means quantities per agent are completely random
cash_availability: The fraction of the total money needs of the agent to work at maximum capacity that
is available as `initial_balance` . This is only effective if `initial_balance` is set
to `None` .
force_signing: Whether to force contract signatures (exogenous contracts are treated in the same way).
exogenous_control: How much control does the agent have over exogenous contract signing. Only effective if
force_signing is False and use_exogenous_contracts is True
random_agent_types: If True, the final agent types used by the generato wil always be sampled from the given types.
If False, this random sampling will only happin if len(agent_types) != n_agents.
cost_relativity: The exponent of production cost used to distribute contracts during generation
method: The method used for world generation. Available methods are "profitable" and "guaranteed_profit"
exogenous_supply_surplus: The surpolus exogenous supply contract quantity to add to the system as a fraction of the
a fraction of the contracts generated by the given method.
exogenous_sales_surplus: The surpolus exogenous sales contract quantity to add to the system as a fraction of the
a fraction of the contracts generated by the given method.
run_extra_checks: If given, the world generation method will check
whether the genrated world "makes sense" given its
internal criteria. May slow down world generation
**kwargs:
Returns:
world configuration as a dict[str, Any]. A world can be generated from this dict by calling SCML2020World(**d)
Remarks:
- There are two general ways to use this generator:
1. Pass `random_agent_types = True`, and pass `agent_types`, `agent_processes` to control placement of each
agent in each level of the production graph.
2. Pass `random_agent_types = False` and pass `agent_types`, `n_agents_per_process` to make the system randomly
place the specified number of agents in each production level
- Most parameters (i.e. `process_inputs` , `process_outputs` , `n_agents_per_process` , `costs` ) can
take a single value, a tuple of two values, or a list of values.
If it has a single value, it is repeated for all processes/factories as appropriate. If it is a
tuple of two numbers $(i, j)$, each process will take a number sampled from a uniform distribution
supported on $[i, j]$ inclusive. If it is a list of values, of the length `n_processes` , it is used as
it is otherwise, it is used to sample values for each process.
"""
if agent_processes is not None and random_agent_types:
raise ValueError(
"You cannot pass `agent_processes` and use `random_agent_types`. The first is only used when you want to fix the assignment of all agents to specific processes which is compatible with randomizing agnet types"
)
if agent_processes is not None and len(agent_processes) != len(agent_types):
raise ValueError(
f"Length of `agent_processes` ({len(agent_processes)}) must equal the length of `agent_types` ({len(agent_types)})"
)
runner = dict(
profitable=cls.generate_profitable,
guaranteed_profit=cls.generate_guaranteed_profit,
)
info = dict(
n_steps=n_steps,
n_processes=n_processes,
n_lines=n_lines,
force_signing=force_signing,
agent_processes=agent_processes,
n_agents_per_process=n_agents_per_process,
process_inputs=process_inputs,
process_outputs=process_outputs,
production_costs=production_costs,
profit_means=profit_means,
profit_stddevs=profit_stddevs,
max_productivity=max_productivity,
initial_balance=initial_balance,
cost_increases_with_level=cost_increases_with_level,
equal_exogenous_sales=equal_exogenous_sales,
equal_exogenous_supply=equal_exogenous_supply,
exogenous_supply_predictability=exogenous_supply_predictability,
exogenous_sales_predictability=exogenous_sales_predictability,
cash_availability=cash_availability,
inventory_valuation_trading=inventory_valuation_trading,
inventory_valuation_catalog=inventory_valuation_catalog,
cost_relativity=cost_relativity,
profit_basis=(
"min"
if profit_basis == np.min
else (
"mean"
if profit_basis == np.mean
else (
"max"
if profit_basis == np.max
else "median"
if profit_basis == np.median
else "unknown"
)
)
),
exogenous_supply_surplus=exogenous_supply_surplus,
exogenous_sales_surplus=exogenous_sales_surplus,
)
exogenous_supply_surplus = realin(exogenous_supply_surplus)
exogenous_sales_surplus = realin(exogenous_sales_surplus)
inventory_valuation_trading = realin(inventory_valuation_trading)
inventory_valuation_catalog = realin(inventory_valuation_catalog)
n_processes = intin(n_processes)
n_steps = intin(n_steps)
exogenous_sales_predictability = realin(exogenous_sales_predictability)
exogenous_supply_predictability = realin(exogenous_supply_predictability)
exogenous_control = realin(exogenous_control)
np.errstate(divide="ignore")
# n_startup = n_processes
n_startup = n_processes
if n_steps <= n_startup:
raise ValueError(
f"Cannot generate a world with n_steps <= n_processes: {n_steps} <= {n_startup}"
)
horizon = max(1, min(n_steps, int(realin(horizon) * n_steps)))
process_inputs = make_array(process_inputs, n_processes, dtype=int)
process_outputs = make_array(process_outputs, n_processes, dtype=int)
fixed_assignment = agent_processes is not None and not random_agent_types
if agent_processes is not None:
pcount = defaultdict(int)
for i in agent_processes:
pcount[i] += 1
pnums = list(pcount.keys())
assert (
min(pnums) == 0 and max(pnums) == len(pnums) - 1
), f"`agent_processes` is invalid: {agent_processes} as it leads to the following `n_agents_per_process`: {dict(pcount)}"
n_agents_per_process = np.asarray([pcount[i] for i in range(len(pnums))])
assert not any(
_ <= 0 for _ in n_agents_per_process
), "We have some levels with no processes"
else:
n_agents_per_process = make_array(
n_agents_per_process, n_processes, dtype=int
)
# profit_means = make_array(profit_means, n_processes, dtype=float)
# profit_stddevs = make_array(profit_stddevs, n_processes, dtype=float)
max_productivity_process = make_array(
max_productivity, n_processes * n_steps, dtype=float
).reshape((n_processes, n_steps))
n_agents = int(n_agents_per_process.sum())
assert n_agents >= n_processes
profit_means_agent = make_array(profit_means, n_agents, dtype=float)
profit_stddevs_agent = make_array(profit_stddevs, n_agents, dtype=float)
max_productivity_agent = make_array(max_productivity, n_agents, dtype=float)
n_processes + 1
production_costs = make_array(production_costs, n_agents, dtype=int)
if initial_balance is not None:
initial_balance = make_array(initial_balance, n_agents, dtype=int)
if not isinstance(agent_types, Iterable):
agent_types = [agent_types] * n_agents
if agent_params is None:
agent_params = dict()
if isinstance(agent_params, dict):
agent_params = [copy.copy(agent_params) for _ in range(n_agents)]
else:
assert len(agent_params) == 1
agent_params = [copy.copy(agent_params[0]) for _ in range(n_agents)]
elif not fixed_assignment:
if agent_params is None:
agent_params = [dict() for _ in range(len(agent_types))]
if isinstance(agent_params, dict):
agent_params = [
copy.copy(agent_params) for _ in range(len(agent_types))
]
assert len(agent_types) == len(agent_params)
tp = random.choices(list(range(len(agent_types))), k=n_agents)
agent_types = [copy.copy(agent_types[_]) for _ in tp]
agent_params = [copy.copy(agent_params[_]) for _ in tp]
else:
if agent_params is None:
agent_params = [dict() for _ in range(len(agent_types))]
if isinstance(agent_params, dict):
agent_params = [
copy.copy(agent_params) for _ in range(len(agent_types))
]
agent_types = list(agent_types)
agent_params = list(agent_params)
assert len(agent_types) == len(agent_params)
# find first and last agent for each process
n_agents_cumsum = n_agents_per_process.cumsum().tolist()
first_agent = [0] + n_agents_cumsum[:-1]
last_agent = n_agents_cumsum[:-1] + [n_agents]
# generate production costs making sure that every agent can do exactly one process
process_of_agent = np.empty(n_agents, dtype=int)
for i, (f, L_) in enumerate(zip(first_agent, last_agent)):
process_of_agent[f:L_] = i
if cost_increases_with_level:
production_costs[f:L_] = np.round(
production_costs[f:L_] * (i + 1) # math.sqrt(i + 1)
).astype(int)
# costs is the same as production costs but repeated for all n_lines
costs = INFINITE_COST * np.ones((n_agents, n_lines, n_processes), dtype=int)
for p, (f, L_) in enumerate(zip(first_agent, last_agent)):
costs[f:L_, :, p] = production_costs[f:L_].reshape((L_ - f), 1)
# generate external contract amounts (controlled by productivity):
generated_exogenous, trial = False, 0
while not generated_exogenous and trial < MAX_WORLD_GENERATION_TRIALS:
# - generate total amount of input to the market (it will end up being an n_products list of n_steps vectors)
first_quantities = np.round(
n_lines * n_agents_per_process[0] * max_productivity_process[0, :]
).astype(int)
first_quantities[-n_startup:] = 0
# - divide the quantity at every level between factories
exogenous_supplies = distribute_quantities(
equal_exogenous_supply,
exogenous_supply_predictability,
first_quantities,
n_agents_per_process[0],
n_steps,
)
exogenous_supplies = np.asarray(exogenous_supplies)
# remove extra quantities that cannot be produced and sold
possible_production = n_lines * (n_steps - n_processes)
extra_supply = exogenous_supplies.sum(axis=0) - possible_production
for a in range(n_agents_per_process[0]):
if extra_supply[a] <= 0:
continue
for s in range(n_steps - 1, -1, -1):
to_use = min(extra_supply[a], exogenous_supplies[s, a])
if to_use <= 0:
continue
extra_supply[a] -= to_use
exogenous_supplies[s, a] -= to_use
if extra_supply[a] <= 0:
break
assert np.all(exogenous_supplies[-n_startup:] == 0)
generated_exogenous = np.any(exogenous_supplies > 0)
trial += 1
assert generated_exogenous, f"Cannot generate this world because we cannot generate any exogenous supply: n_steps: {n_steps}, n_processes: {n_processes}"
params = dict(
n_steps=n_steps,
n_lines=n_lines,
n_agents_per_process=n_agents_per_process,
process_of_agent=process_of_agent,
first_agent=first_agent,
last_agent=last_agent,
production_costs=production_costs,
exogenous_control=exogenous_control,
cash_availability=cash_availability,
force_signing=force_signing,
horizon=horizon,
exogenous_supplies=exogenous_supplies,
initial_balance=initial_balance,
cost_relativity=cost_relativity,
profit_basis=profit_basis,
inventory_valuation_trading=inventory_valuation_trading,
inventory_valuation_catalog=inventory_valuation_catalog,
exogenous_sales_predictability=exogenous_sales_predictability,
max_productivity_process=max_productivity_process,
max_productivity_agent=max_productivity_agent,
equal_exogenous_sales=equal_exogenous_sales,
process_inputs=process_inputs,
process_outputs=process_outputs,
costs=costs,
profit_stddevs_agent=profit_stddevs_agent,
profit_means_agent=profit_means_agent,
run_extra_checks=run_extra_checks,
)
generated, n_trials = False, 0
while not generated:
try:
(
exogenous,
catalog_prices,
profiles,
initial_balance,
extra_info,
) = runner[exogenous_generation_method](**params)
generated = True
except RecoverableWorldGenerationException:
n_trials += 1
if n_trials > MAX_WORLD_GENERATION_TRIALS:
raise ValueError(
f"Tried to generate the world {n_trials} times using {exogenous_generation_method} but failed"
)
# add surplus exogneous contracts if needed
for surplus, is_sale in [
(exogenous_supply_surplus, False),
(exogenous_sales_surplus, True),
]:
if surplus < 1e-6:
continue
process = n_processes - 1 if is_sale else 0
for a in range(first_agent[process], last_agent[process]):
contracts = [
_
for _ in exogenous
if (_.seller == a and is_sale) or (_.buyer == a and not is_sale)
]
if len(contracts) < 1:
continue
q = int(round(sum(_.quantity for _ in contracts) * surplus))
if q < 1:
continue
unit_price = int(catalog_prices[process + 1 if is_sale else process])
unit_price = random.randint(
int(unit_price * 0.8), int(unit_price * 1.2)
)
for step, quantity in enumerate(integer_cut(q, n_steps, 0)):
exogenous.append(
ExogenousContract(
product=process + 1 if is_sale else process,
quantity=quantity,
unit_price=unit_price,
time=step,
revelation_time=max(0, step - horizon),
seller=a if is_sale else -1,
buyer=a if not is_sale else -1,
)
)
if run_extra_checks:
for c in exogenous:
assert c.revelation_time <= c.time
if c.seller < 0:
assert c.product == 0, f"Invalid Exogenous Contract (Seller): {c}"
else:
assert (
c.product == process_of_agent[c.seller] + 1
), f"Invalid Contract (seller): {c}"
if c.buyer < 0:
assert (
c.product == n_processes
), f"Invalid Exogenous Contract (buyer): {c}"
else:
assert (
c.product == process_of_agent[c.buyer]
), f"Invalid Contract (Buyer): {c}"
info = dict(**info, **extra_info)
for i, (t, p) in enumerate(zip(agent_types, agent_params)):
if t is not None and issubclass(get_class(t), OneShotAgent):
agent_types[i] = get_full_type_name(OneShotAdapter)
agent_params[i] = dict(oneshot_type=t, oneshot_params=p, obj=None)
return dict(
process_inputs=process_inputs,
process_outputs=process_outputs,
catalog_prices=catalog_prices,
profiles=profiles,
exogenous_contracts=exogenous,
agent_types=agent_types,
agent_params=agent_params,
initial_balance=initial_balance,
n_steps=n_steps,
info=info,
force_signing=force_signing,
exogenous_horizon=horizon,
inventory_valuation_trading=inventory_valuation_trading,
inventory_valuation_catalog=inventory_valuation_catalog,
**kwargs,
)
@classmethod
[docs]
def generate_guaranteed_profit(
cls,
n_steps: int,
n_lines: int,
n_agents_per_process: int,
process_of_agent: list[int],
first_agent: list[int],
last_agent: list[int],
production_costs: list[int],
exogenous_control: float,
cash_availability: float,
force_signing: bool,
horizon: int,
exogenous_supplies: list[int],
max_productivity_process: list[float],
max_productivity_agent: list[float],
equal_exogenous_sales: bool,
process_inputs: list[int],
process_outputs: list[int],
exogenous_sales_predictability: float,
costs: np.ndarray,
profit_stddevs_agent=list[float],
profit_means_agent=list[float],
initial_balance: np.ndarray | tuple[int, int] | int | None = None,
cost_relativity: float = 1.0,
profit_basis=np.max,
inventory_valuation_trading: float = 0.5,
inventory_valuation_catalog: float = 0.0,
run_extra_checks=True,
) -> tuple[
list[ExogenousContract],
list[int],
list[FactoryProfile],
list[float],
dict[str, Any],
]:
"""
Generates prices, contracts and profiles ensuring that all agents can
profit and returning a set of explict contracts that can achieve this profit
"""
n_processes = len(first_agent)
n_agents = len(process_of_agent)
n_products = n_processes + 1
can_choose_initial_balance = initial_balance is None
COST0 = 10
supplies = np.zeros((n_agents, n_steps), dtype=np.int64)
revenue = np.zeros((n_agents, n_steps), dtype=np.int64)
np.zeros((n_agents, n_steps), dtype=np.int64)
sales = np.zeros((n_agents, n_steps), dtype=np.int64)
total_costs = np.zeros((n_agents, n_steps), dtype=np.int64)
active_lines = np.zeros((n_agents, n_steps), dtype=np.int64)
supplies[first_agent[0] : last_agent[0], :] = exogenous_supplies.transpose()
exogenous_prices = (
exogenous_supplies
* (COST0 + 0.5 * np.random.randn(*exogenous_supplies.shape))
.round()
.astype(int)
).transpose()
total_costs[first_agent[0] : last_agent[0], :] = exogenous_prices
agent_profits = (
np.random.randn(n_agents) * profit_stddevs_agent + profit_means_agent
)
simulated_contracts: list[ContractRecord] = []
# signing step, delivery step, seller, buyer, quantity, unit price, is_exogenous
# generate distribution variables that sum
pc = 0.05 + (production_costs.max() - production_costs) / production_costs.max()
normalized_production_cost = np.ones_like(pc, dtype=float)
for p in range(0, n_processes):
f, L_ = first_agent[p], last_agent[p]
normalized_production_cost[f:L_] = np.float_power(
production_costs[f:L_] / production_costs[f:L_].max(), cost_relativity
)
def distribute_sales(
active_lines: np.ndarray,
sales: np.ndarray,
supplies: np.ndarray,
revenue: np.ndarray,
total_costs: np.ndarray,
production_costs: np.ndarray,
agent_profits: np.ndarray,
p: np.ndarray,
first_seller: int,
last_seller: int,
first_buyer: int,
last_buyer: int,
horizon: int,
n_lines: int,
product: int,
production_time: int = 1,
final=False,
):
simulated = []
n_agents, n_steps = supplies.shape
if final:
n_buyers = 1
else:
n_buyers = last_buyer - first_buyer
# distribute production:
# the goal here is to produce everything we have as supplies
# we cannot produce after the time when our output can propagate
# to the end of the network and be sold to the BUYER agent
production_limit = n_steps - (n_processes - product)
# we cannot produce before we can get our first supplies. This is
# a speedup no more
production_start = product
if run_extra_checks:
assert supplies[first_seller:last_seller, :production_start].sum() == 0
for s in range(production_start, production_limit):
n = production_limit - s
not_produced = 0
for i in range(first_seller, last_seller):
if supplies[i, s] < 1:
continue
to_produce, not_produced = supplies[i, s] + not_produced, 0
for k in range(s, production_limit):
can_use = min(
n_lines - active_lines[i, k],
supplies[i, : k + 1].sum() - active_lines[i, : k + 1].sum(),
to_produce,
)
active_lines[i, k] += can_use
to_produce -= can_use
if to_produce <= 0:
break
else:
not_produced = to_produce
# distribute sales
# I can always sell one step after final production
distribution_limit = production_limit + production_time
# production_start + production_time
# this loop is for selling production so it goes on the production
# rather than distribution limits
for s in range(production_start, production_limit):
n = production_limit - s
# find the probability distribution with which to distribute
# the products to the bueyrs.
if n_buyers == 1:
p1 = np.asarray([1.0])
else:
# the probability of getting products go up with available
# production capacity and with production_costs
available = np.ones(n_buyers)
active = active_lines[first_buyer:last_buyer, s:]
if active.sum() > 0:
active /= active.max()
available = n_lines * n - active.sum(axis=1)
if available.sum() > 0:
available = available / available.sum()
p1 = (production_limit - s) * p[
first_buyer:last_buyer
] + s * available
if p1.sum() < 1e-5:
p1 = np.ones_like(p1) / len(p1)
else:
p1 = p1 / p1.sum()
for i in range(first_seller, last_seller):
beg = min(production_time + s, distribution_limit)
end = min(beg + 1, distribution_limit)
n = end - beg
if n <= 0:
continue
if active_lines[i, s] < 1:
continue
d = fraction_cut(active_lines[i, s], p1).reshape(n_buyers, n)
unit_price = max(
1,
int(
np.ceil(
(
total_costs[i, s] / active_lines[i, s]
+ production_costs[i]
)
* (1 + agent_profits[i])
)
),
)
sales[i, beg:end] += d.sum(axis=0)
revenue[i, beg:end] += unit_price * d.sum(axis=0)
if not final:
supplies[first_buyer:last_buyer, beg:end] += d
total_costs[first_buyer:last_buyer, beg:end] += unit_price * d
for j in range(n_buyers):
for k in range(n):
q = d[j, k]
if q < 1:
continue
simulated.append(
ContractRecord(
product + 1,
s if not final else k + beg - horizon,
k + beg,
i,
j + first_buyer if not final else -1,
q,
unit_price,
final,
)
)
# signing step, delivery step, seller, buyer, quantity, unit price, is_exogenous
return simulated
# generate a good set of contracts that achieve the givne productivity
for p in range(0, n_processes):
first_seller, last_seller = first_agent[p], last_agent[p]
if p < n_processes - 1:
first_buyer, last_buyer = first_agent[p + 1], last_agent[p + 1]
else:
first_buyer, last_buyer = None, None
simulated_contracts += distribute_sales(
active_lines,
sales,
supplies,
revenue,
total_costs,
production_costs,
agent_profits,
normalized_production_cost,
first_seller,
last_seller,
first_buyer,
last_buyer,
horizon,
n_lines,
p,
1,
final=first_buyer is None,
)
assert sales.sum() <= supplies.sum()
assert np.all(active_lines.cumsum(axis=1) >= sales.cumsum(axis=1))
assert np.all(active_lines.cumsum(axis=1) <= supplies.cumsum(axis=1))
assert np.all(active_lines <= n_lines)
# sales_per_agent = sales.sum(axis=1)
# supplies_per_agent = supplies.sum(axis=1)
# # remove extra supplies in the first layer that are not needed
# for a in range(n_agents_per_process[0]):
# diff = supplies_per_agent[a] - sales_per_agent[a]
# if diff <= 0:
# continue
# for s in range(n_steps - 1, -1, -1):
# to_remove = min(diff, supplies[a, s])
# supplies[a, s] -= to_remove
# diff -= to_remove
# if diff <= 0:
# break
# assert (
# sales.sum() == supplies.sum()
# ), f"{supplies.sum(axis=1)}\n{sales.sum(axis=1)}\nerror: {supplies.sum() - sales.sum()}"
# add correct exogenous supply contracts
exogenous_supplies = supplies[first_agent[0] : last_agent[0], :].transpose()
exogenous_prices = total_costs[first_agent[0] : last_agent[0], :]
for j in range(n_agents_per_process[0]):
for k in range(n_steps):
q = exogenous_supplies[k, j]
if q < 1:
continue
simulated_contracts.append(
ContractRecord(
0,
k - horizon,
k,
-1,
j,
q,
exogenous_prices[j, k] // q,
True,
)
)
exogenous_supplies = supplies[first_agent[0] : last_agent[0], :].transpose()
# - now exogenous_supplies and exogenous_sales are both n_steps lists of n_agents_per_process[p] vectors (jagged)
catalog_prices = np.zeros(n_products, dtype=int)
for p in range(n_products):
if p < n_processes:
f, L_ = first_agent[p], last_agent[p]
total = supplies[f:L_, :].sum()
if total < 1:
continue
supply_unit = total_costs[f:L_, :].sum() / total
else:
f, L_ = first_agent[p - 1], last_agent[p - 1]
total = sales[f:L_, :].sum()
if total < 1:
continue
supply_unit = revenue[f:L_, :].sum() / total
catalog_prices[p] = supply_unit
profile_info: list[FactoryProfile] = []
nxt = 0
for L_ in range(n_processes):
for a in range(n_agents_per_process[L_]):
profile_info.append(FactoryProfile(costs=costs[nxt]))
nxt += 1
assert nxt == n_agents
total_payments = (
total_costs + production_costs.reshape(len(production_costs), 1) * supplies
)
revenue_after_costs = revenue - total_payments
if initial_balance is None:
# every agent at every level will have just enough to do all the needed to do cash_availability fraction of
# production (even though it may not have enough lines to do so)
cash_availability = _realin(cash_availability)
# find maximum money needs
initial_balance = total_payments.sum(axis=1)
max_money_needs_process = np.zeros(n_processes)
for p in range(n_processes):
f, L_ = first_agent[p], last_agent[p]
max_money_needs_process[p] = initial_balance[f:L_].max()
initial_balance[f:L_] = int(
math.ceil(max_money_needs_process[p] * cash_availability)
)
else:
tmp = total_payments.max(axis=1)
cash_availability = (initial_balance / tmp).min()
b = np.sum(initial_balance)
info = dict(
simulated_contracts=[tuple(_) for _ in simulated_contracts],
active_lines=active_lines,
input_quantities=supplies,
output_quantities=sales,
expected_productivity=float(np.sum(active_lines))
/ np.sum(n_lines * n_steps * n_agents_per_process),
expected_productivity_products=np.sum(active_lines, axis=-1),
expected_income=revenue_after_costs,
expected_welfare=float(np.sum(revenue_after_costs)),
expected_income_per_step=revenue.sum(axis=0),
expected_income_per_process=revenue.sum(axis=-1),
expected_mean_profit=(
float(np.sum(revenue_after_costs) / b)
if b != 0
else np.sum(revenue_after_costs)
),
expected_profit_per_agent=(
revenue_after_costs.sum(axis=1) / b
if b != 0
else revenue_after_costs.sum(axis=1)
),
real_cash_availability=cash_availability,
)
exogenous = []
for c in simulated_contracts:
if not c.is_exogenous or c.quantity == 0:
continue
if force_signing or exogenous_control <= 0.0:
exogenous.append(
ExogenousContract(
product=c.product,
quantity=c.quantity,
unit_price=c.unit_price,
time=c.time,
revelation_time=max(0, c.time - horizon),
seller=c.seller,
buyer=c.buyer,
)
)
else:
n_contracts = int(1 + exogenous_control * (c.quantity - 1))
per_contract = integer_cut(c.quantity, n_contracts, 0)
for q in per_contract:
if q == 0:
continue
exogenous.append(
ExogenousContract(
product=c.product,
quantity=q,
unit_price=c.unit_price,
time=c.time,
revelation_time=max(0, c.time - horizon),
seller=c.seller,
buyer=c.buyer,
)
)
if can_choose_initial_balance and run_extra_checks:
def simulate_run(
all_contracts,
active_lines,
initial_balance,
n_products,
production_costs,
process_of_agent,
catalog_prices,
catalog_valuation=inventory_valuation_trading
+ inventory_valuation_catalog,
):
n_agents, n_steps = active_lines.shape
balances = initial_balance[:].copy()
contracts = defaultdict(list)
inventory = np.zeros((n_agents, n_products), dtype=int)
for c in all_contracts:
contracts[c.time].append(c)
def _check(s):
assert (
balances.min() >= 0
), f"Contract Simulation Issue: Some agents went bankrupt at step {s}!!\n{balances}\nSupplies:\n{supplies}\nActive:\n{active_lines}\nSales:\n{sales}"
assert (
inventory.min() >= 0
), f"Contract Simulation Issue: Some agents has negative inventory at step {s}!!\nInventory:{inventory}\nSupplies:\n{supplies}\nActive:\n{active_lines}\nSales:\n{sales}"
for s in range(n_steps):
_check(s)
cs = contracts[s]
if not cs:
continue
cs = sorted(cs, key=lambda c: c.product)
for c in cs:
if c.seller >= 0:
inventory[c.seller, c.product] -= c.quantity
balances[c.seller] += c.quantity * c.unit_price
if c.buyer >= 0:
inventory[c.buyer, c.product] += c.quantity
balances[c.buyer] -= c.quantity * c.unit_price
_check(s)
for a in range(n_agents):
p = process_of_agent[a]
q = inventory[a, p]
produced = active_lines[a, s]
assert (
produced <= q
), f"Agent {a} (level {p}) should produce {produced} at step {s} but only have {q} items of product {p}"
if not produced:
continue
inventory[a, p + 1] += produced
inventory[a, p] -= produced
balances[a] -= produced * production_costs[a]
assets = catalog_valuation * (inventory * catalog_prices)
profit = (
balances + assets.sum(axis=1) - initial_balance
) / initial_balance
_check(n_steps)
assert (
profit.min() > 0
), f"Contract Simulation Issue: Some agents lost!!\n{profit}"
return profit
try:
expected_profit = simulate_run(
simulated_contracts,
active_lines,
initial_balance,
n_products,
production_costs,
process_of_agent,
catalog_prices,
)
except AssertionError as e:
raise RecoverableWorldGenerationException(e)
info["expected_profits_simulated"] = expected_profit
else:
info["expected_profits_simulated"] = None
return exogenous, catalog_prices, profile_info, initial_balance, info
@classmethod
[docs]
def generate_profitable(
cls,
n_steps: int,
n_lines: int,
n_agents_per_process: int,
process_of_agent: list[int],
first_agent: list[int],
last_agent: list[int],
production_costs: list[int],
exogenous_control: float,
cash_availability: float,
force_signing: bool,
horizon: int,
exogenous_supplies: list[int],
max_productivity_process: list[float],
max_productivity_agent: list[float],
equal_exogenous_sales: bool,
process_inputs: list[int],
process_outputs: list[int],
exogenous_sales_predictability: float,
costs: np.ndarray,
profit_stddevs_agent=list[float],
profit_means_agent=list[float],
initial_balance: np.ndarray | tuple[int, int] | int | None = None,
cost_relativity: float = 1.0,
profit_basis=np.max,
inventory_valuation_trading: float = 0.5,
inventory_valuation_catalog: float = 0.0,
run_extra_checks: bool = True,
) -> tuple[
list[ExogenousContract],
list[int],
list[FactoryProfile],
list[float],
dict[str, Any],
]:
"""
Generates the prices, contracts and profiles ensuring there is some
possibility of profit in the market
"""
n_processes = len(first_agent)
n_agents = len(process_of_agent)
n_products = n_processes + 1
n_startup = n_processes
# - make sure there is a cool-down period at the end in which no more input is added that cannot be converted
# into final products in time
quantities = [exogenous_supplies.sum(axis=1)]
# - for each level, find the amount of the output product that can be produced given the input amount and
# productivity
for p in range(n_processes):
agents = n_agents_per_process[p]
lines = n_lines * agents
quantities.append(
np.minimum(
(quantities[-1] // process_outputs[p]) * process_inputs[p],
quantities[-1],
(
(
np.round(lines * max_productivity_process[p, :]).astype(int)
// process_inputs[p]
)
* process_outputs[p]
),
)
)
# * shift quantities one step to account for the one step needed to move the produce to the next level. This
# step results from having production happen after contract execution.
quantities[-1][1:] = quantities[-1][:-1]
quantities[-1][0] = 0
assert quantities[-1][-1] == 0 or p >= n_startup - 1
assert quantities[-1][0] == 0
# assert np.sum(quantities[-1] == 0) >= n_startup
exogenous_sales = distribute_quantities(
equal_exogenous_sales,
exogenous_sales_predictability,
quantities[-1],
n_agents_per_process[-1],
n_steps,
)
# - now exogenous_supplies and exogenous_sales are both n_steps lists of n_agents_per_process[p] vectors (jagged)
# assign prices to the quantities given the profits
catalog_prices = np.zeros(n_products, dtype=int)
catalog_prices[0] = 10
supply_prices = np.zeros((n_agents_per_process[0], n_steps), dtype=int)
supply_prices[:, :] = catalog_prices[0]
sale_prices = np.zeros((n_agents_per_process[-1], n_steps), dtype=int)
manufacturing_costs = np.zeros((n_processes, n_steps), dtype=int)
for p in range(n_processes):
manufacturing_costs[p, :] = profit_basis(
costs[first_agent[p] : last_agent[p], :, p]
)
manufacturing_costs[p, :p] = 0
manufacturing_costs[p, p - n_startup :] = 0
profits = np.zeros((n_processes, n_steps))
for p in range(n_processes):
profits[p, :] = (
np.random.randn() * profit_stddevs_agent[p] + profit_means_agent[p]
)
input_costs = np.zeros((n_processes, n_steps), dtype=int)
for step in range(n_steps):
input_costs[0, step] = np.sum(
exogenous_supplies[step] * supply_prices[:, step][:]
)
input_quantity = np.zeros((n_processes, n_steps), dtype=int)
input_quantity[0, :] = quantities[0]
active_lines = np.hstack(
[(n_lines * n_agents_per_process).reshape((n_processes, 1))] * n_steps
)
assert active_lines.shape == (n_processes, n_steps)
active_lines[0, :] = input_quantity[0, :] // process_inputs[0]
output_quantity = np.zeros((n_processes, n_steps), dtype=int)
output_quantity[0, :] = active_lines[0, :] * process_outputs[0]
manufacturing_costs[0, :-n_startup] *= active_lines[0, :-n_startup]
total_costs = input_costs + manufacturing_costs
output_total_prices = np.ceil(total_costs * (1 + profits)).astype(int)
for p in range(1, n_processes):
input_costs[p, p:] = output_total_prices[p - 1, p - 1 : -1]
input_quantity[p, p:] = output_quantity[p - 1, p - 1 : -1]
active_lines[p, :] = input_quantity[p, :] // process_inputs[p]
output_quantity[p, :] = active_lines[p, :] * process_outputs[p]
manufacturing_costs[p, p : p - n_startup] *= active_lines[
p, p : p - n_startup
]
total_costs[p, :] = input_costs[p, :] + manufacturing_costs[p, :]
output_total_prices[p, :] = np.ceil(
total_costs[p, :] * (1 + profits[p, :])
).astype(int)
aa = output_total_prices[-1, n_startup - 1 : -1].astype(float)
bb = output_quantity[-1, n_startup - 1 : -1].astype(float)
np.errstate(divide="ignore")
sale_prices[:, n_startup:] = np.ceil(
np.divide(aa, bb, out=np.zeros_like(aa, dtype=float), where=bb != 0)
).astype(int)
product_prices = np.zeros((n_products, n_steps))
product_prices[0, :-n_startup] = catalog_prices[0]
product_prices[1:, 1:] = np.ceil(
np.divide(
output_total_prices.astype(float),
output_quantity.astype(float),
out=np.zeros_like(output_total_prices, dtype=float),
where=output_quantity != 0,
)
).astype(int)[:, :-1]
catalog_prices = np.ceil(
[
profit_basis(product_prices[p, p : p + n_steps - n_startup])
for p in range(n_products)
]
).astype(int)
profile_info: list[
tuple[FactoryProfile, np.ndarray, np.ndarray, np.ndarray, np.ndarray]
] = []
nxt = 0
for L_ in range(n_processes):
for a in range(n_agents_per_process[L_]):
esales = np.zeros((n_steps, n_products), dtype=int)
esupplies = np.zeros((n_steps, n_products), dtype=int)
esale_prices = np.zeros((n_steps, n_products), dtype=int)
esupply_prices = np.zeros((n_steps, n_products), dtype=int)
if L_ == 0:
esupplies[:, 0] = [exogenous_supplies[s][a] for s in range(n_steps)]
esupply_prices[:, 0] = supply_prices[a, :]
if L_ == n_processes - 1:
esales[:, -1] = [exogenous_sales[s][a] for s in range(n_steps)]
esale_prices[:, -1] = sale_prices[a, :]
profile_info.append(
(
FactoryProfile(costs=costs[nxt]),
esales,
esale_prices,
esupplies,
esupply_prices,
)
)
nxt += 1
max_income = (
output_quantity * catalog_prices[1:].reshape((n_processes, 1)) - total_costs
)
assert nxt == n_agents
if initial_balance is None:
# every agent at every level will have just enough to do all the needed to do cash_availability fraction of
# production (even though it may not have enough lines to do so)
cash_availability = _realin(cash_availability)
balance = np.ceil(
np.sum(total_costs, axis=1) / n_agents_per_process
).astype(int)
initial_balance = []
for b, a in zip(balance, n_agents_per_process):
initial_balance += [int(math.ceil(b * cash_availability))] * a
b = np.sum(initial_balance)
info = dict(
product_prices=product_prices,
active_lines=active_lines,
input_quantities=input_quantity,
output_quantities=output_quantity,
expected_productivity=float(np.sum(active_lines))
/ np.sum(n_lines * n_steps * n_agents_per_process),
expected_n_products=np.sum(active_lines, axis=-1),
expected_income=max_income,
expected_welfare=float(np.sum(max_income)),
expected_income_per_step=max_income.sum(axis=0),
expected_income_per_process=max_income.sum(axis=-1),
expected_mean_profit=(
float(np.sum(max_income) / b) if b != 0 else np.sum(max_income)
),
expected_profit_sum=(
float(n_agents * np.sum(max_income) / b)
if b != 0
else n_agents * np.sum(max_income)
),
)
exogenous = []
for (
indx,
(profile, esales, esale_prices, esupplies, esupply_prices),
) in enumerate(profile_info):
input_product = process_of_agent[indx]
for step, (sale, price) in enumerate(
zip(esales[:, input_product + 1], esale_prices[:, input_product + 1])
):
if sale == 0:
continue
if force_signing or exogenous_control <= 0.0:
exogenous.append(
ExogenousContract(
product=input_product + 1,
quantity=sale,
unit_price=price,
time=step,
revelation_time=max(0, step - horizon),
seller=indx,
buyer=-1,
)
)
else:
n_contracts = int(1 + exogenous_control * (sale - 1))
per_contract = integer_cut(sale, n_contracts, 0)
for q in per_contract:
if q == 0:
continue
exogenous.append(
ExogenousContract(
product=input_product + 1,
quantity=q,
unit_price=price,
time=step,
revelation_time=max(0, step - horizon),
seller=indx,
buyer=-1,
)
)
for step, (supply, price) in enumerate(
zip(esupplies[:, input_product], esupply_prices[:, input_product])
):
if supply == 0:
continue
if force_signing or exogenous_control <= 0.0:
exogenous.append(
ExogenousContract(
product=input_product,
quantity=supply,
unit_price=price,
time=step,
revelation_time=max(0, step - horizon),
seller=-1,
buyer=indx,
)
)
else:
n_contracts = int(1 + exogenous_control * (supply - 1))
per_contract = integer_cut(supply, n_contracts, 0)
for q in per_contract:
if q == 0:
continue
exogenous.append(
ExogenousContract(
product=input_product,
quantity=q,
unit_price=price,
time=step,
revelation_time=max(0, step - horizon),
seller=-1,
buyer=indx,
)
)
return (
exogenous,
catalog_prices,
[_[0] for _ in profile_info],
initial_balance,
info,
)
[docs]
def get_private_state(self, agent: SCML2020Agent) -> dict:
return vars(self.a2f[agent.id].state)
[docs]
def add_financial_report(
self, agent: SCML2020Agent, factory: Factory, reports_agent, reports_time
) -> None:
"""
Records a financial report for the given agent in the agent indexed reports and time indexed reports
Args:
agent: The agent
factory: Its factory
reports_agent: A dictionary of financial reports indexed by agent id
reports_time: A dictionary of financial reports indexed by time
Returns:
"""
bankrupt = factory.is_bankrupt
inventory = (
int(np.sum(self.trading_prices * factory.current_inventory))
if not bankrupt
else 0
)
report = FinancialReport(
agent_id=agent.id,
step=self.current_step,
cash=factory.current_balance,
assets=inventory,
breach_prob=self.breach_prob[agent.id],
breach_level=self._breach_level[agent.id],
is_bankrupt=bankrupt,
agent_name=agent.name,
)
repstr = str(report).replace("\n", " ")
self.logdebug(f"{agent.name}: {repstr}")
if reports_agent.get(agent.id, None) is None:
reports_agent[agent.id] = {}
reports_agent[agent.id][self.current_step] = report
if reports_time.get(str(self.current_step), None) is None:
reports_time[str(self.current_step)] = {}
reports_time[str(self.current_step)][agent.id] = report
[docs]
def negs_between(self, a1, a2):
return self._registered_negs[tuple(sorted([a1, a2]))]
[docs]
def current_balance(self, agent_id: str):
return self.a2f[agent_id].state.balance
[docs]
def can_negotiate(self, a1, a2):
return self.negs_between(a1, a2) < self.n_concurrent_negs_between_partners
[docs]
def simulation_step(self, stage):
self._registered_negs: dict[tuple[str], int] = Counter()
s = self.current_step
if stage == 0:
# pay interests for negative balances
# -----------------------------------
if self.interest_rate > 0.0:
for agent, factory, _ in self.afp:
if factory.current_balance < 0 and not factory.is_bankrupt:
to_pay = -int(
math.ceil(self.interest_rate * factory.current_balance)
)
factory.pay(to_pay)
# Register exogenous contracts as concluded
# -----------------------------------------
for contract in self.exogenous_contracts[s]:
self.on_contract_concluded(
contract, to_be_signed_at=contract.to_be_signed_at
)
if self.exogenous_force_max:
contract.signatures = dict(
zip(contract.partners, contract.partners)
)
else:
if SYSTEM_SELLER_ID in contract.partners:
contract.signatures[SYSTEM_SELLER_ID] = SYSTEM_SELLER_ID
else:
contract.signatures[SYSTEM_BUYER_ID] = SYSTEM_BUYER_ID
# publish public information
# --------------------------
if self.publish_trading_prices:
self.bulletin_board.record(
"trading_prices",
value=self.trading_prices,
key=self.current_step,
)
# initialize all agents for this step
# ===================================
for _, a in self.agents.items():
if (not self.a2f[_].is_bankrupt) and hasattr(a, "before_step"):
a.before_step()
return
# update trading price information
# --------------------------------
has_trade = self._sold_quantity[:, s + 1] > 0
self._trading_price[~has_trade, s + 1] = self._trading_price[~has_trade, s]
self._betas_sum[~has_trade, s + 1] = self._betas_sum[~has_trade, s]
assert not np.any(
np.isnan(self._real_price[has_trade, s + 1])
), f"Nans in _real_price at step {self.current_step}\n{self._real_price}"
self._trading_price[has_trade, s + 1] = (
self._trading_price[has_trade, s]
* self._betas_sum[has_trade, s]
* self.trading_price_discount
+ self._real_price[has_trade, s + 1] * self._sold_quantity[has_trade, s + 1]
)
self._betas_sum[has_trade, s + 1] = (
self._betas_sum[has_trade, s] * self.trading_price_discount
+ self._sold_quantity[has_trade, s + 1]
)
self._trading_price[has_trade, s + 1] /= self._betas_sum[has_trade, s + 1]
self._trading_price[:, s + 1 :] = self._trading_price[:, s + 1].reshape(
(self.n_products, 1)
)
self._traded_quantity += self._sold_quantity[:, s + 1]
# self._trading_price[has_trade, s] = (
# np.sum(self._betas[:s+1] * self._real_price[has_trade, s::-1])
# ) / self._betas_sum[s+1]
# update agent penalties
# ----------------------
self._agent_spot_loss[:, s] += (
self.spot_market_global_loss
* self.spot_multiplier
* np.sum(self._alphas[: s + 1] * self._spot_quantity[:, s::-1], axis=-1)
)
# run factories
# --------------
for a, f, p in self.afp:
if f.is_bankrupt:
continue
f.step()
# remove contracts saved in factories for this step
for factory in self.factories:
factory.contracts[self.current_step] = []
# publish financial reports
# -------------------------
if self.current_step % self.financial_reports_period == 0:
reports_agent = self.bulletin_board.data["reports_agent"]
reports_time = self.bulletin_board.data["reports_time"]
for agent, factory, _ in self.afp:
if is_system_agent(agent.id):
continue
self.add_financial_report(agent, factory, reports_agent, reports_time)
[docs]
def contract_size(self, contract: Contract) -> float:
return contract.agreement["quantity"] * contract.agreement["unit_price"]
[docs]
def contract_record(self, contract: Contract) -> dict[str, Any]:
c = {
"id": contract.id,
"seller_name": self.agents[contract.annotation["seller"]].name,
"buyer_name": self.agents[contract.annotation["buyer"]].name,
"seller_type": self.agents[contract.annotation["seller"]].type_name,
"buyer_type": self.agents[contract.annotation["buyer"]].type_name,
"delivery_time": contract.agreement["time"],
"quantity": contract.agreement["quantity"],
"unit_price": contract.agreement["unit_price"],
"signed_at": contract.signed_at,
"nullified_at": contract.nullified_at,
"concluded_at": contract.concluded_at,
"signatures": "|".join(str(_) for _ in contract.signatures),
"issues": contract.issues if not self.compact else None,
"seller": contract.annotation["seller"],
"buyer": contract.annotation["buyer"],
"product_name": "p" + str(contract.annotation["product"]),
}
if not self.compact:
c.update(contract.annotation)
c["n_neg_steps"] = (
contract.mechanism_state.step if contract.mechanism_state else 0
)
return c
[docs]
def breach_record(self, breach: Breach) -> dict[str, Any]:
return {
"perpetrator": breach.perpetrator,
"perpetrator_name": breach.perpetrator,
"level": breach.level,
"type": breach.type,
"time": breach.step,
}
[docs]
def execute_action(
self, action: Action, agent: SCML2020Agent, callback: Callable = None
) -> bool:
if action.type == "schedule":
s, _ = self.a2f[agent.id].schedule_production(
process=action.params["process"],
step=action.params.get("step", (self.current_step, self.n_steps - 1)),
line=action.params.get("line", -1),
override=action.params.get("override", True),
method=action.params.get("method", "latest"),
repeats=action.params.get("repeats", 1),
)
return s >= 0
elif action.type == "cancel":
return self.a2f[agent.id].cancel_production(
step=action.params.get("step", -1), line=action.params.get("line", -1)
)
[docs]
def post_step_stats(self):
self._stats["n_contracts_nullified_now"].append(self.__n_nullified)
self._stats["n_bankrupt"].append(self.__n_bankrupt)
market_size = 0
scores = self.scores()
for p in range(self.n_products):
self._stats[f"trading_price_{p}"].append(
self._trading_price[p, self.current_step + 1]
)
self._stats[f"sold_quantity_{p}"].append(
self._sold_quantity[p, self.current_step + 1]
)
self._stats[f"unit_price_{p}"].append(
self._real_price[p, self.current_step + 1]
)
prod = []
for a, f, _ in self.afp:
if is_system_agent(a.id):
continue
ind = self.a2i[a.id]
self._stats[f"spot_market_quantity_{a.id}"].append(
self._spot_quantity[ind, self.current_step]
)
self._stats[f"spot_market_loss_{a.id}"].append(
self._agent_spot_loss[ind, self.current_step]
)
self._stats[f"balance_{a.id}"].append(f.current_balance)
for p in a.awi.my_input_products:
self._stats[f"inventory_{a.id}_input"].append(f.current_inventory[p])
for p in a.awi.my_output_products:
self._stats[f"inventory_{a.id}_output"].append(f.current_inventory[p])
prod.append(
np.sum(f.commands[self.current_step, :] != NO_COMMAND)
/ f.profile.n_lines
)
self._stats[f"productivity_{a.id}"].append(prod[-1])
self._stats[f"assets_{a.id}"].append(
np.sum(f.current_inventory * self.trading_prices)
)
self._stats[f"bankrupt_{a.id}"].append(f.is_bankrupt)
self._stats[f"score_{a.id}"].append(scores[a.id])
if not f.is_bankrupt:
market_size += f.current_balance
self._stats["productivity"].append(float(np.mean(prod)))
self._stats["market_size"].append(market_size)
self._stats["production_failures"].append(
self._n_production_failures / len(self.factories)
if len(self.factories) > 0
else np.nan
)
self._stats["bankruptcy"].append(
np.sum(self.stats["n_bankrupt"]) / len(self.agents)
)
# self._stats["business"] = np.sum(self.stats["business_level"])
[docs]
def pre_step_stats(self):
self._n_production_failures = 0
self.__n_nullified = 0
self.__n_bankrupt = 0
@property
[docs]
def productivity(self) -> float:
"""Fraction of production lines occupied during the simulation"""
return np.mean(self.stats["productivity"])
[docs]
def welfare(self, include_bankrupt: bool = False) -> float:
"""Total welfare of all agents"""
scores = self.scores()
return sum(
scores[f.agent_id] * f.initial_balance
for f in self.factories
if (include_bankrupt or not f.is_bankrupt)
and not is_system_agent(f.agent_id)
)
[docs]
def relative_welfare(self, include_bankrupt: bool = False) -> float | None:
"""Total welfare relative to expected value. Returns None if no expectation is found in self.info"""
if "expected_income" not in self.info.keys():
return None
return self.welfare(include_bankrupt) / np.sum(self.info["expected_income"])
@property
[docs]
def relative_productivity(self) -> float | None:
"""Productivity relative to the expected value. Will return None if self.info does not have
the expected productivity"""
if "expected_productivity" not in self.info.keys():
return None
return self.productivity / self.info["expected_productivity"]
@property
[docs]
def bankruptcy_rate(self) -> float:
"""The fraction of factories that went bankrupt"""
return sum(f.is_bankrupt for f in self.factories) / len(self.factories)
@property
[docs]
def num_bankrupt(self) -> float:
"""The fraction of factories that went bankrupt"""
return sum(f.is_bankrupt for f in self.factories)
[docs]
def order_contracts_for_execution(
self, contracts: Collection[Contract]
) -> Collection[Contract]:
return sorted(contracts, key=lambda x: x.annotation["product"])
[docs]
def _execute(
self,
product: int,
q: int,
p: int,
u: int,
buyer_factory: Factory,
seller_factory: Factory,
has_breaches: bool,
):
"""Executes the contract"""
q, p, u = int(q), int(p), int(u)
if seller_factory.is_bankrupt or buyer_factory.is_bankrupt:
self.logdebug(
f"Bankruptcy prevents transferring {q} of {product} at price {u} ({'breached' if has_breaches else ''})"
)
return
self.logdebug(
f"Transferring {q} of {product} at price {u} ({'breached' if has_breaches else ''})"
)
if q == 0 or u == 0:
self.logwarning(
f"{buyer_factory.agent_name} bought {q} from {seller_factory.agent_name} at {u} dollars"
f" ({'with breaches' if has_breaches else 'no breaches'})!! Zero quantity or unit price"
)
money = (
p
if buyer_factory.current_balance - p > self.bankruptcy_limit
else max(0, buyer_factory.current_balance - self.bankruptcy_limit)
)
quantity = min(seller_factory.current_inventory[product], q)
if quantity == 0 or money == 0:
return
q = min(quantity, money // u)
assert q >= 0, f"executing with quantity {q}"
if q == 0:
return
assert (
seller_factory._inventory[product] >= q
), f"at {self.current_step} Seller has {seller_factory._inventory[product]} but will execute {q} ({'breached' if has_breaches else 'no breaches'})"
assert (
buyer_factory._balance - self.bankruptcy_limit >= u * q
), f"at {self.current_step} Buyer has {buyer_factory._balance} (bankrupt at {self.bankruptcy_limit}) but we need q={q} * u={u} ({'breached' if has_breaches else 'no breaches'})"
bought, buy_cost = buyer_factory.buy(product, q, u, False, 0.0)
sold, sell_cost = seller_factory.buy(product, -q, u, False, 0.0)
if bought == 0:
return
oldq = self._sold_quantity[product, self.current_step + 1]
oldp = self._real_price[product, self.current_step + 1]
totalp = 0.0
if oldq > 0:
totalp = oldp * oldq
self._sold_quantity[product, self.current_step + 1] += bought
self._real_price[product, self.current_step + 1] = (totalp + buy_cost) / (
oldq + bought
)
assert (
bought == sold
), f"Step: {self.current_step} Bought {bought} and sold {sold} ({'breached' if has_breaches else 'no breaches'})\nSeller factory: {vars(seller_factory)}\nBuyer factory {vars(buyer_factory)}"
assert (
buy_cost + sell_cost == 0
), f"Step: {self.current_step} Bought for {buy_cost} and sold for {-sell_cost} ({'breached' if has_breaches else 'no breaches'})\nSeller factory: {vars(seller_factory)}\nBuyer factory {vars(buyer_factory)}"
[docs]
def __register_contract(self, agent_id: str, level: float) -> None:
"""Registers execution of the contract in the agent's stats"""
n_contracts = self.agent_n_contracts[agent_id] - 1
self.breach_prob[agent_id] = (
self.breach_prob[agent_id] * n_contracts + (level > 0)
) / (n_contracts + 1)
self._breach_level[agent_id] = (
self.breach_prob[agent_id] * n_contracts + level
) / (n_contracts + 1)
[docs]
def record_bankrupt(self, factory: Factory) -> None:
"""Records agent bankruptcy"""
agent_id = factory.agent_id
# announce bankruptcy
reports_agent = self.bulletin_board.data["reports_agent"]
reports_time = self.bulletin_board.data["reports_time"]
self.add_financial_report(
self.agents[agent_id], factory, reports_agent, reports_time
)
self.__n_bankrupt += 1
[docs]
def on_contract_concluded(self, contract: Contract, to_be_signed_at: int) -> None:
if (
any(self.a2f[_].is_bankrupt for _ in contract.partners)
or contract.agreement["time"] >= self.n_steps
):
return
super().on_contract_concluded(contract, to_be_signed_at)
[docs]
def on_contract_signed(self, contract: Contract) -> bool:
# we need to cancel this contract if a partner was bankrupt (that is necessary only for
# force_signing case as in this case the two partners will be assued to sign no matter what is
# their bankruptcy status
if (
any(self.a2f[_].is_bankrupt for _ in contract.partners)
or contract.agreement["time"] >= self.n_steps
):
self.ignore_contract(contract, as_dropped=False)
return False
result = super().on_contract_signed(contract)
if not result:
self.ignore_contract(contract, as_dropped=True)
return False
self.logdebug(f"SIGNED {str(contract)}")
t = contract.agreement["time"]
u, q = contract.agreement["unit_price"], contract.agreement["quantity"]
product = contract.annotation["product"]
agent, partner = contract.partners
is_seller = agent == contract.annotation["seller"]
self.a2f[agent].contracts[t].append(
ContractInfo(q, u, product, is_seller, partner, contract)
)
self.a2f[partner].contracts[t].append(
ContractInfo(q, u, product, not is_seller, agent, contract)
)
return True
[docs]
def nullify_contract(self, contract: Contract, new_quantity: int):
self.__n_nullified += 1
contract.nullified_at = self.current_step
contract.annotation["new_quantity"] = new_quantity
[docs]
def __register_breach(
self, agent_id: str, level: float, contract_total: float, factory: Factory
) -> int:
"""
Registers a breach of the given level on the given agent. Assume that the contract is already added
to the agent_contracts
Args:
agent_id: The perpetrator of the breach
level: The breach level
contract_total: The total of the contract breached (quantity * unit_price)
factory: The factory corresponding to the perpetrator
Returns:
If nonzero, the agent should go bankrupt and this amount taken from them
"""
self.logdebug(
f"{self.agents[agent_id].name} breached {level} of {contract_total}"
)
return 0
# if factory.is_bankrupt:
# return 0
# if level <= 0:
# return 0
# penalty = int(math.ceil(level * contract_total))
# if factory.current_balance - penalty < self.bankruptcy_limit:
# return penalty
# if penalty > 0:
# factory.pay(penalty)
# self.penalties += penalty
# return 0
[docs]
def _spot_loss(self, aid: str) -> float:
base = (
self._agent_spot_loss[self.a2i[aid], self.current_step] * self.spot_discount
)
return (
base
+ self._agent_spot_quantity[self.a2i[aid], self.current_step]
* self.spot_multiplier
)
[docs]
def start_contract_execution(self, contract: Contract) -> set[Breach] | None:
self.logdebug(f"Executing {str(contract)}")
s = self.current_step
# get contract info
breaches = set()
if self.compensate_immediately and (
contract.nullified_at >= 0
or any(self.a2f[a].is_bankrupt for a in contract.partners)
):
return None
product = contract.annotation["product"]
buyer_id, seller_id = (
contract.annotation["buyer"],
contract.annotation["seller"],
)
_buyer, buyer_factory = self.agents[buyer_id], self.a2f[buyer_id]
_seller, seller_factory = self.agents[seller_id], self.a2f[seller_id]
q, u, t = (
contract.agreement["quantity"],
contract.agreement["unit_price"],
contract.agreement["time"],
)
if q <= 0 or u <= 0:
self.logwarning(
f"Contract {str(contract)} has zero quantity or unit price!!! will be ignored"
)
return None
# if the contract is already nullified, take care of it
if contract.nullified_at >= 0:
self.compensation_factory._inventory[product] = 0
self.compensation_factory._balance = 0
for c in self.compensation_records.get(contract.id, []):
q = min(q, c.quantity)
if c.product >= 0 and c.quantity > 0:
assert c.product == product
self.compensation_factory._inventory[product] += c.quantity
self.compensation_factory._balance += c.money
if c.seller_bankrupt:
seller_factory = self.compensation_factory
else:
buyer_factory = self.compensation_factory
else:
q = 0
if seller_factory == buyer_factory:
# means that both the seller and buyer are bankrupt
return None
if q == 0:
return breaches
p = q * u
assert t == self.current_step
self.agent_n_contracts[buyer_id] += 1
self.agent_n_contracts[seller_id] += 1
missing_product = max(0, q - seller_factory.current_inventory[product])
missing_money = max(0, p - buyer_factory.current_balance)
# if there are no breaches, just execute the contract
if missing_money <= 0 and missing_product <= 0:
self._execute(
product, q, p, u, buyer_factory, seller_factory, has_breaches=False
)
self.__register_contract(seller_id, 0)
self.__register_contract(buyer_id, 0)
return breaches
# if there is a product breach (the seller does not have enough products), register it
if missing_product <= 0:
self.__register_contract(seller_id, 0)
else:
product_breach_level = missing_product / q
self.__register_contract(seller_id, product_breach_level)
self.__register_breach(seller_id, product_breach_level, p, seller_factory)
if seller_factory != self.compensation_factory:
seller_indx = self.a2i[seller_id]
effective_unit = seller_factory.spot_price(
product, self._spot_loss(seller_id)
)
paid = seller_factory.pay(missing_product * effective_unit)
paid_for = paid // effective_unit
self._agent_spot_quantity[seller_indx, self.current_step] += paid_for
stored = seller_factory.store(
product, paid_for, False, 0.0, no_bankruptcy=True, no_borrowing=True
)
missing_product -= stored
self._spot_quantity[seller_indx, s] += stored
assert seller_factory.current_inventory[product] >= stored
if missing_product > 0:
product_breach_level = missing_product / q
breaches.add(
Breach(
contract=contract,
perpetrator=seller_id,
victims=[buyer_id],
level=product_breach_level,
type="product",
)
)
# if there is a money breach (the buyer does not have enough money), register it
if missing_money <= 0:
self.__register_contract(buyer_id, 0)
else:
money_breach_level = missing_money / p
breaches.add(
Breach(
contract=contract,
perpetrator=buyer_id,
victims=[seller_id],
level=money_breach_level,
type="money",
)
)
self.__register_contract(buyer_id, money_breach_level)
self.__register_breach(buyer_id, money_breach_level, p, buyer_factory)
if self.borrow_on_breach and buyer_factory != self.compensation_factory:
# find out the amount to be paid to borrow the needed money
penalty = (
(1 + self.spot_market_global_loss)
if self.bankruptcy_limit != 0
else 1
)
to_pay = math.ceil(missing_money * penalty)
paid = buyer_factory.pay(to_pay)
missing_money -= (missing_money * paid) // to_pay
# execute the contract to the limit possible
# missing_product = max(0, q - seller_factory.current_inventory[product])
# missing_money = max(0, p - buyer_factory.current_balance)
self._execute(
product,
q,
p,
u,
buyer_factory,
seller_factory,
has_breaches=missing_product > 0 or missing_money > 0,
)
# return the list of breaches
return breaches
[docs]
def complete_contract_execution(
self, contract: Contract, breaches: list[Breach], resolution: Contract
) -> None:
pass
[docs]
def compensate(
self, available: int, factory: Factory
) -> dict[str, list[tuple[Contract, int, int]]]:
"""
Called by a factory when it is going bankrupt after liquidation
Args:
available: The amount available from liquidation
factory: The factory being bankrupted
Returns:
A mapping from agent ID to nullified contracts, the new quantity for them and compensation_money
"""
# get all future contracts of the bankrupt agent that are not executed in their delivery time order
contracts = list(
itertools.chain(
*(factory.contracts[s] for s in range(self.current_step, self.n_steps))
)
)
owed = 0
total_owed = 0
nulled_contracts = []
for contract in contracts:
total_owed += contract.q * contract.u
if (
self.a2f[contract.partner].is_bankrupt
or contract.contract.nullified_at >= 0
or is_system_agent(contract.contract.annotation["seller"])
or is_system_agent(contract.contract.annotation["buyer"])
or contract.contract.executed_at >= 0
):
continue
nulled_contracts.append(contract)
owed += contract.q * contract.u
nullified_contracts = defaultdict(list)
if available <= 0:
for contract in nulled_contracts:
nullified_contracts[self.agents[contract.partner].id].append(
(contract.contract, 0, 0)
)
self.record_bankrupt(factory)
return nullified_contracts
# calculate compensation fraction
# if available >= owed:
# fraction = self.compensation_fraction
# else:
# fraction = self.compensation_fraction * available / owed
# calculate compensation and pay it as needed
for contract in nulled_contracts:
victim = self.agents[contract.partner]
victim_factory = self.a2f.get(victim.id, None)
# calculate compensation (as money)
compensation_quantity = contract.q
if contract.is_seller:
compensation_unit = factory.spot_price(
contract.product,
self._agent_spot_loss[
self.a2i[factory.agent_id], self.current_step
],
)
else:
compensation_unit = contract.u
compensation_quantity = min(
int(math.floor(available / compensation_unit)), compensation_quantity
)
compensation = min(available, compensation_quantity * compensation_unit)
if compensation <= 0 and compensation_quantity <= 0:
nullified_contracts[victim.id].append(
(contract.contract, 0, compensation)
)
self.nullify_contract(contract.contract, 0)
continue
if self.compensate_immediately:
# pay immediate compensation if indicated
victim_factory.pay(-compensation)
else:
# add the required products/money to the internal compensation inventory/funds to be paid at the
# contract execution time.
if contract.is_seller:
self.compensation_records[contract.contract.id].append(
CompensationRecord(
contract.product,
compensation_quantity,
0,
True,
victim_factory,
)
)
else:
self.compensation_records[contract.contract.id].append(
CompensationRecord(-1, 0, compensation, False, victim_factory)
)
compensation = 0
available -= compensation
nullified_contracts[victim.id].append(
(contract.contract, compensation_quantity, compensation)
)
self.nullify_contract(contract.contract, compensation_quantity)
assert available >= 0
self.record_bankrupt(factory)
return nullified_contracts
[docs]
def scores(
self,
assets_multiplier_trading: float | None = None,
assets_multiplier_catalog: float | None = None,
assets_multiplier: float | None = None,
) -> dict[str, float]:
"""scores of all agents given the asset multiplier.
args:
assets_multiplier: a multiplier to multiply the assets with.
"""
if assets_multiplier is not None and assets_multiplier_trading is None:
assets_multiplier_trading = assets_multiplier
if assets_multiplier_catalog is None:
assets_multiplier_catalog = self.inventory_valuation_catalog
if assets_multiplier_catalog is None:
assets_multiplier_catalog = 0
if assets_multiplier_trading is None:
assets_multiplier_trading = self.inventory_valuation_trading
if assets_multiplier_trading is None:
assets_multiplier_trading = 0
scores = dict()
for aid, agent in self.agents.items():
if is_system_agent(aid):
continue
factory = self.a2f[aid]
try:
scores[aid] = (
factory.current_balance
+ (
assets_multiplier_trading
* np.sum(factory.current_inventory * self.trading_prices)
)
if assets_multiplier_trading
else (
0.0
+ (
assets_multiplier_catalog
* np.sum(factory.current_inventory * self.catalog_prices)
)
if assets_multiplier_catalog
else 0.0 - factory.initial_balance
)
) / (1 if not factory.initial_balance else factory.initial_balance)
except Exception:
scores[aid] = float("nan")
return scores
@property
[docs]
def winners(self):
"""The winners of this world (factory managers with maximum wallet balance"""
if len(self.agents) < 1:
return []
scores = self.scores()
sa = sorted(zip(scores.values(), scores.keys()), reverse=True)
max_score = sa[0][0]
winners = []
for s, a in sa:
if s < max_score:
break
winners.append(self.agents[a])
return winners
[docs]
def trading_prices_for(
self, discount: float = 1.0, condition="executed"
) -> np.ndarray:
"""
Calculates the prices at which all products traded using an optional discount factor
Args:
discount: A discount factor to treat older prices less importantly (exponential discounting).
condition: The condition for contracts to consider. Possible values are executed, signed, concluded,
nullified
Returns:
an n_products vector of trading prices
"""
prices = np.nan * np.ones((self.n_products, self.n_steps), dtype=float)
quantities = np.zeros((self.n_products, self.n_steps), dtype=int)
for contract in self.saved_contracts:
if contract["condition" + "_at"] < 0:
continue
p, t, q, u = (
contract["product"],
contract["delivery_time"],
contract["quantity"],
contract["unit_price"],
)
prices[p, t] = (prices[p, t] * quantities[p, t] + u * q) / (
quantities[p, t] + q
)
quantities[p, t] += q
discount = np.cumprod(discount * np.ones(self.n_steps))
discount /= sum(discount)
return np.nansum(np.nanprod(prices, discount), axis=-1)
@property
[docs]
def trading_prices(self):
if self.current_step == self.n_steps:
return self._trading_price[:, -1]
return self._trading_price[:, self.current_step + 1]
@property
[docs]
def stats_df(self) -> pd.DataFrame:
"""Returns a pandas data frame with the stats"""
return pd.DataFrame(super().stats)
@property
[docs]
def contracts_df(self) -> pd.DataFrame:
"""Returns a pandas data frame with the contracts"""
contracts = pd.DataFrame(self.saved_contracts)
contracts["product_index"] = contracts.product_name.str.replace("p", "").astype(
int
)
contracts["breached"] = contracts.breaches.str.len() > 0
contracts["executed"] = contracts.executed_at >= 0
contracts["erred"] = contracts.erred_at >= 0
contracts["nullified"] = contracts.nullified_at >= 0
return contracts
@property
[docs]
def system_agents(self) -> list[SCML2020Agent]:
"""Returns the two system agents"""
return [_ for _ in self.agents.values() if is_system_agent(_.id)]
@property
[docs]
def system_agent_names(self) -> list[str]:
"""Returns the names two system agents"""
return [_ for _ in self.agents.keys() if is_system_agent(_)]
@property
[docs]
def non_system_agents(self) -> list[SCML2020Agent]:
"""Returns all agents except system agents"""
return [_ for _ in self.agents.values() if not is_system_agent(_.id)]
@property
[docs]
def non_system_agent_names(self) -> list[str]:
"""Returns names of all agents except system agents"""
return [_ for _ in self.agents.keys() if not is_system_agent(_)]
@property
[docs]
def agreement_fraction(self) -> float:
"""Fraction of negotiations ending in agreement and leading to signed contracts"""
n_negs = sum(self.stats["n_negotiations"])
n_contracts = self.n_saved_contracts(True)
return n_contracts / n_negs if n_negs != 0 else np.nan
[docs]
system_agent_ids = system_agent_names
[docs]
non_system_agent_ids = non_system_agent_names
[docs]
def draw(
self,
steps: tuple[int, int] | int | None = None,
what: Collection[str] = DEFAULT_EDGE_TYPES,
who: Callable[[Agent], bool] = None,
where: Callable[[Agent], int | tuple[float, float]] = None,
together: bool = True,
axs: Collection[Axis] = None,
ncols: int = 4,
figsize: tuple[int, int] = (15, 15),
**kwargs,
) -> tuple[Axis, nx.Graph] | tuple[list[Axis], list[nx.Graph]]:
if where is None:
def where(x):
return (
self.n_processes + 1
if x == SYSTEM_BUYER_ID
else 0
if x == SYSTEM_SELLER_ID
else int(self.agents[x].awi.profile.processes[0] + 1)
)
return super().draw(
steps,
what,
who,
where,
together=together,
axs=axs,
ncols=ncols,
figsize=figsize,
**kwargs,
)
[docs]
class SCML2021World(SCML2020World):
def __init__(self, *args, **kwargs):
kwargs["n_concurrent_negs_between_partners"] = 5
kwargs["publish_trading_prices"] = True
kwargs["publish_exogenous_summary"] = True
super().__init__(*args, **kwargs)
@classmethod
[docs]
def generate(
cls,
*args,
inventory_valuation_trading: np.ndarray | tuple[float, float] | float = (
0.0,
0.5,
),
horizon: tuple[float, float] | float = (0.2, 0.5),
**kwargs,
) -> dict[str, Any]:
kwargs["inventory_valuation_trading"] = inventory_valuation_trading
kwargs["horizon"] = horizon
return super().generate(*args, **kwargs)
[docs]
class SCML2022World(SCML2021World):
pass
[docs]
class SCML2023World(SCML2022World):
pass
[docs]
class SCML2024World(SCML2022World):
pass