Source code for scml.oneshot.world

from __future__ import annotations

import copy
import itertools
import logging
import math
import random
import sys
import warnings
from collections import defaultdict
from typing import Any, Callable, Collection, Iterable, Literal

import networkx as nx
import numpy as np
import pandas as pd
from matplotlib.axis import Axis
from negmas import (
    DEFAULT_EDGE_TYPES,
    Outcome,
    Agent,
    Breach,
    BreachProcessing,
    ContiguousIssue,
    Contract,
    Operations,
    SAOResponse,
    TimeInAgreementMixin,
    World,
    make_issue,
)
from negmas.helpers import get_class, get_full_type_name, instantiate, unique_name
from negmas.sao import ControlledSAONegotiator, SAOController, SAONegotiator
from negmas.situated import NegotiationInfo
from .awi import OneShotAWI

from scml.oneshot.ufun import OneShotUFun

from ..common import (
    distribute_quantities,
    integer_cut,
    intin,
    make_array,
    realin,
    strin,
)
from .adapter import OneShotSCML2020Adapter
from .agent import OneShotAgent
from .common import (
    INFINITE_COST,
    QUANTITY,
    SYSTEM_BUYER_ID,
    SYSTEM_SELLER_ID,
    TIME,
    UNIT_PRICE,
    FinancialReport,
    NegotiationDetails,
    OneShotExogenousContract,
    OneShotProfile,
    is_system_agent,
)
from .sysagents import DefaultOneShotAdapter, _StdSystemAgent

"""
Implements the one shot version of SCML
"""


# from rich import print


__all__ = [
    "SCMLBaseWorld",
    "OneShotWorld",
    "SCML2020OneShotWorld",
    "SCML2021OneShotWorld",
    "SCML2022OneShotWorld",
    "SCML2023OneShotWorld",
    "SCML2024OneShotWorld",
    "PLACEHOLDER_AGENT_PREFIX",
]
[docs] PLACEHOLDER_AGENT_PREFIX = "PlaceHolder__"
def get_n_lines(config: dict[str, Any]) -> tuple[int, int]: if "profiles" in config: mn_lines, mx_lines = float("inf"), float("-inf") for profile in config["profiles"]: mn_lines = min(mn_lines, profile.n_lines) mx_lines = max(mx_lines, profile.n_lines) return (int(mn_lines), int(mx_lines)) if "n_lines" in config: n_lines = config["n_lines"] if isinstance(n_lines, Iterable): n_lines = tuple(n_lines) assert len(n_lines) == 2, f"Found {n_lines=}" return n_lines return (n_lines, n_lines) raise ValueError( "Cannot find profiles, n_agents_per_profile, agent_processes. I cannot determine the number of agents per level" ) def get_n_agents_per_process(config: dict[str, Any]) -> list[int]: if "profiles" in config: counts = defaultdict(int) for profile in config["profiles"]: counts[profile.level] += 1 mx = max(counts.keys()) return [counts.get(i, 0) for i in range(mx + 1)] if "n_agents_per_process" in config: return config["n_agents_per_process"] if "agent_processes" in config: counts = defaultdict(int) for i in config["agent_processes"]: counts[i] += 1 mx = max(counts.keys()) return [counts.get(i, 0) for i in range(mx + 1)] raise ValueError( "Cannot find profiles, n_agents_per_profile, agent_processes. I cannot determine the number of agents per level" ) def values(x: int | tuple[int, int]): if not isinstance(x, Iterable): return int(x), int(x) return int(x[0]), int(x[1]) def to_lists(d): return { k: v.tolist() if isinstance(v, np.ndarray) else list(v) for k, v in d.items() }
[docs] class SCMLBaseWorld(TimeInAgreementMixin, World[OneShotAWI, DefaultOneShotAdapter]): """Implements the a generalized form of SCML-OneShot game which supports both oneshot and standard simulations Args: catalog_prices: An n_products vector (i.e. n_processes+1 vector) giving the catalog price of all products profiles: An n_agents list of `OneShotFactoryProfile` objects specifying the private profile of the factory associated with each agent. agent_types: An n_agents list of strings/ `OneShotAgent` classes specifying the type of each agent agent_params: An n_agents dictionaries giving the parameters of each agent catalog_quantities: The quantities in the past for which catalog_prices are the average unit prices. This is used when updating the trading prices. If set to zero then the trading price will follow the market price and will not use the catalog_price (except for products that are never sold in the market for which the trading price will take the default value of the catalog price). If set to a large value (e.g. 10000), the price at which a product is sold will not affect the trading price financial_report_period: The number of steps between financial reports. If < 1, it is a fraction of n_steps exogenous_force_max: If true, exogenous contracts are forced to be signed independent of the setting of `force_signing` compact: If True, no logs will be kept and the whole simulation will use a smaller memory footprint n_steps: Number of simulation steps (can be considered as days). time_limit: Total time allowed for the complete simulation in seconds. neg_n_steps: Number of negotiation steps allowed for all negotiations. neg_time_limit: Total time allowed for a complete negotiation in seconds. neg_step_time_limit: Total time allowed for a single step of a negotiation. in seconds. negotiation_speed: The number of negotiation steps that pass in every simulation step. If 0, negotiations will be guaranteed to finish within a single simulation step signing_delay: The number of simulation steps to pass between a contract is concluded and signed name: The name of the simulations **kwargs: Other parameters that are passed directly to `SCML2020World` constructor. """ def __init__( self, # SCML2020 specific parameters catalog_prices: np.ndarray, profiles: list[OneShotProfile], agent_types: list[type[OneShotAgent]], agent_params: list[dict[str, Any]], catalog_quantities: int | np.ndarray = 50, # breach processing parameters financial_report_period=5, bankruptcy_limit=0.0, penalize_bankrupt_for_future_contracts=True, penalties_scale: Literal["trading", "catalog", "unit", "none"] = "trading", # external contracts parameters exogenous_contracts: Collection[OneShotExogenousContract] = tuple(), exogenous_dynamic: bool = False, exogenous_force_max: bool = False, # factory parameters initial_balance: np.ndarray | tuple[int, int] | int = 1000, # General SCML2020World Parameters compact=True, no_logs=True, fast=True, n_steps=1000, time_limit=60 * 15, sync_calls=False, # mechanism params neg_n_steps=20, neg_time_limit=None, neg_hidden_time_limit=60, neg_step_time_limit=20, negotiation_speed=None, shuffle_negotiations=False, one_offer_per_step=False, # public information publish_exogenous_summary=True, publish_trading_prices=True, publish_assets=False, publish_production_capacity=True, # negotiation params, price_multiplier=0.0, price_range_fraction=0.0, wide_price_range=False, allow_zero_quantity: bool = False, # trading price parameters trading_price_discount=0.9, # simulation parameters signing_delay=0, force_signing=False, batch_signing=True, name: str | None = None, # debugging parameters agent_name_reveals_position: bool = True, agent_name_reveals_type: bool = True, # evaluation paramters (for compatibility with SCML2020World) inventory_valuation_catalog=0, inventory_valuation_trading=0, # parameters for the geenarilzed std version of oneshot perishable=True, horizon=0, one_time_per_negotiation=True, quantity_multiplier: float = 1.0, nullify_bankrupt_contracts: bool = False, # set to True to add more assertions during debuging debug: bool = False, verbose: bool = False, **kwargs, ): self._verbose = verbose if fast: compact, no_logs, debug = True, True, False if horizon == 0: one_time_per_negotiation = True self._debug = debug # neg_n_steps is ALWAYS the number of rounds. We multiply it by 2 if mechanisms are stepped one offer at a time if one_offer_per_step and neg_n_steps is not None: neg_n_steps *= 2 self.agents: dict[str, DefaultOneShotAdapter] self.publish_assets = publish_assets self.publish_production_capacity = publish_production_capacity self.perishable = perishable self.horizon = horizon self.price_range_fraction = price_range_fraction self.nullify_bankrupt_contracts = nullify_bankrupt_contracts self.inventory_valuation_catalog = inventory_valuation_catalog self.inventory_valuation_trading = inventory_valuation_trading self.allow_zero_quantity = allow_zero_quantity self._profits: dict[str, list[float]] = defaultdict(list) self._breach_levels: dict[str, list[float]] = defaultdict(list) self._breaches_of: dict[str, list[bool]] = defaultdict(list) self._inventory_input: dict[str, int] = defaultdict(int) self._inventory_output: dict[str, int] = defaultdict(int) self._productivity: dict[str, float] = defaultdict(float) self._shortfall_quantity: dict[str, int] = defaultdict(int) self._shortfall_penalty: dict[str, float] = defaultdict(float) self._storage_cost: dict[str, float] = defaultdict(float) self._disposal_cost: dict[str, float] = defaultdict(float) self._penalized_quantity: dict[str, int] = defaultdict(int) self._n_nullified: int = 0 self._nullified_quantity: int = 0 self._nullified_price: float = 0 self._activity = 0 self.trading_price_discount = trading_price_discount self.catalog_quantities = catalog_quantities self.publish_exogenous_summary = publish_exogenous_summary self.price_multiplier = price_multiplier self.wide_price_range = wide_price_range self.publish_trading_prices = publish_trading_prices self.penalize_bankrupt_for_future_contracts = ( penalize_bankrupt_for_future_contracts ) self.agent_disposal_cost: dict[str, list[float]] = dict() self.agent_storage_cost: dict[str, list[float]] = dict() self.agent_shortfall_penalty: dict[str, list[float]] = dict() kwargs["log_to_file"] = not no_logs if compact: kwargs["event_file_name"] = None kwargs["event_types"] = [] kwargs["log_screen_level"] = logging.CRITICAL kwargs["log_file_level"] = logging.ERROR kwargs["log_negotiations"] = False kwargs["log_ufuns"] = False # kwargs["save_mechanism_state_in_contract"] = False kwargs["save_cancelled_contracts"] = False kwargs["save_resolved_breaches"] = False kwargs["save_negotiations"] = True else: kwargs["save_negotiations"] = True self.compact = compact # if negotiation_speed == 0: # negotiation_speed = neg_n_steps + 1 mechanisms = kwargs.pop("mechanisms", {}) mech_params = dict( end_on_no_response=not debug, dynamic_entry=False, max_wait=len(agent_types), check_offers=True, enforce_issue_types=True, cast_offers=True, hidden_time_limit=neg_hidden_time_limit, sync_calls=sync_calls, one_offer_per_step=one_offer_per_step, ignore_negotiator_exceptions=False, ) super().__init__( bulletin_board=None, breach_processing=BreachProcessing.NONE, awi_type="scml.oneshot.OneShotAWI", shuffle_negotiations=shuffle_negotiations, mechanisms={ "negmas.sao.SAOMechanism": mech_params | mechanisms.get("negmas.sao.SAOMechanism", dict()) }, default_signing_delay=signing_delay, n_steps=n_steps, time_limit=time_limit, negotiation_speed=negotiation_speed, neg_n_steps=neg_n_steps, neg_time_limit=neg_time_limit, neg_step_time_limit=neg_step_time_limit, force_signing=force_signing, batch_signing=batch_signing, negotiation_quota_per_step=float("inf"), # type: ignore negotiation_quota_per_simulation=float("inf"), # type: ignore no_logs=no_logs, operations=( Operations.StatsUpdate, Operations.SimulationStep, Operations.Negotiations, Operations.ContractSigning, Operations.ContractExecution, Operations.AgentSteps, Operations.SimulationStep, Operations.StatsUpdate, ), name=name, debug=debug, **kwargs, ) if not self.bulletin_board: raise ValueError("Cannot find the bulletin-board") self.bulletin_board.record("settings", self.horizon, "horizon") self.quantity_multiplier = quantity_multiplier self.one_time_per_negotiation = one_time_per_negotiation self.bulletin_board.record( "settings", self.one_time_per_negotiation, "one_time_per_negotiation" ) self.bulletin_board.record( "settings", self.quantity_multiplier, "quantity_multiplier" ) self.bulletin_board.record("settings", self.perishable, "perishable") self.bulletin_board.record("settings", self.publish_assets, "publish_assets") self.bulletin_board.record( "settings", self.publish_production_capacity, "publisher_production_capacity", ) self.bulletin_board.record( "settings", self.nullify_bankrupt_contracts, "nullify_bankrupt_contracts" ) self.bulletin_board.record( "settings", self.price_range_fraction, "price_range_fraction" ) self.bulletin_board.record( "settings", publish_trading_prices, "public_trading_prices" ) self.bulletin_board.record( "settings", publish_exogenous_summary, "public_exogenous_summary" ) self.bulletin_board.record( "settings", financial_report_period, "financial_report_period" ) self.bulletin_board.record( "settings", penalize_bankrupt_for_future_contracts, "penalize_bankrupt_for_future_contracts", ) self.bulletin_board.record( "settings", exogenous_force_max, "exogenous_force_max" ) self.bulletin_board.record( "settings", self.allow_zero_quantity, "allow_zero_quantity" ) self.bulletin_board.record( "settings", self.inventory_valuation_trading, "inventory_valuation_trading" ) self.bulletin_board.record( "settings", self.inventory_valuation_catalog, "inventory_valuation_catalog" ) # self.bulletin_board.record("settings", disposal_cost, "ufun_disposal_cost") # self.bulletin_board.record( # "settings", shortfall_penalty, "ufun_shortfall_penalty" # ) self.bulletin_board.record("settings", True, "has_exogenous_contracts") self.bulletin_board.record("settings", bankruptcy_limit, "bankruptcy_limit") if self.info is None: self.info = {} n_products = len(catalog_prices) n_processes = n_products - 1 process_inputs = [[i] for i in range(n_processes)] process_outputs = [[i + 1] for i in range(n_processes)] self.exogenous_dynamic = exogenous_dynamic agent_params = ( [copy.deepcopy(_) for _ in agent_params] if agent_params else agent_params ) self.penalties_scale = penalties_scale TimeInAgreementMixin.init(self, time_field="time") self.bulletin_board.add_section("reports_agent") self.bulletin_board.add_section("reports_time") if self.publish_exogenous_summary: self.bulletin_board.add_section("exogenous_contracts_summary") if self.publish_trading_prices: self.bulletin_board.add_section("trading_prices") initial_balance = make_array(initial_balance, len(profiles), dtype=int) self.bankruptcy_limit = ( -bankruptcy_limit if isinstance(bankruptcy_limit, int) else -int(0.5 + bankruptcy_limit * initial_balance.mean()) ) self.info.update( shuffle_negotiations=shuffle_negotiations, process_inputs=process_inputs, process_outputs=process_outputs, catalog_prices=catalog_prices, agent_types_final=[get_full_type_name(_) for _ in agent_types], agent_params_final=( [copy.deepcopy(_) for _ in agent_params] if agent_params is not None else agent_params ), initial_balance_final=initial_balance, penalties_scale_final=penalties_scale, penalize_bankrupt_for_future_contracts=penalize_bankrupt_for_future_contracts, perishable=perishable, publish_assets=publish_assets, publish_production_capacity=publish_production_capacity, bankruptcy_limit=bankruptcy_limit, price_range_fraction=self.price_range_fraction, financial_report_period=financial_report_period, exogenous_force_max=exogenous_force_max, compact=compact, no_logs=no_logs, n_steps=n_steps, time_limit=time_limit, neg_n_steps=neg_n_steps, neg_time_limit=neg_time_limit, neg_step_time_limit=neg_step_time_limit, negotiation_speed=negotiation_speed, allow_zero_quantity=allow_zero_quantity, signing_delay=signing_delay, agent_name_reveals_position=agent_name_reveals_position, agent_name_reveals_type=agent_name_reveals_type, # disposal_cost=disposal_cost, # shortfall_penalty=shortfall_penalty, exogenous_dynamic=exogenous_dynamic, publish_exogenous_summary=publish_exogenous_summary, publish_trading_prices=publish_trading_prices, selected_price_multiplier=price_multiplier, wide_price_range=wide_price_range, nullify_bankrupt_contracts=nullify_bankrupt_contracts, horizon=horizon, one_time_per_negotiation=one_time_per_negotiation, quantity_multiplier=quantity_multiplier, inventory_valuation_catalog=inventory_valuation_catalog, inventory_valuation_trading=inventory_valuation_trading, ) if not isinstance(agent_types, Iterable): agent_types = [agent_types] * len(profiles) profiles = [_ for _ in profiles if _.level not in (-1, n_processes)] if not len(profiles) == len(agent_types): raise AssertionError(f"{len(profiles)=} but {len(agent_types)=}") self.profiles = profiles self.catalog_prices = catalog_prices self.process_inputs = process_inputs self.process_outputs = process_outputs self.n_products = len(catalog_prices) self.n_processes = len(process_inputs) self.exogenous_force_max = exogenous_force_max self.financial_reports_period = ( financial_report_period if financial_report_period >= 1 else int(0.5 + financial_report_period * n_steps) ) agent_types = [get_class(_) for _ in agent_types] for p in agent_params: p["obj"] = get_class(p["controller_type"])( **p.get("controller_params", dict()) ) del p["controller_type"] if "controller_params" in p.keys(): del p["controller_params"] self.controller_types = [ get_class(_["obj"])._type_name() if _["obj"] else "system_agent" for _ in agent_params ] assert ( self.n_products == self.n_processes + 1 ), f"{self.n_products, self.n_processes}" n_agents = len(profiles) if agent_name_reveals_position or agent_name_reveals_type: default_names = [f"{_:02}" for _ in range(n_agents)] else: default_names = [unique_name("", add_time=False) for _ in range(n_agents)] if agent_name_reveals_type: for i, at in enumerate(agent_params): s2 = get_class(at["obj"]).__class__.__name__ s = s2.replace("Agent", "").replace("OneShot", "") s = "".join([c for c in s if c.isupper()])[:3] try: if len(s) < 3: if len(s2) > 3: s = s2[:2] elif len(s2) >= 2: s = s2[0] + s2[1 : 1 + (3 - len(s))] + s2[1:] elif len(s2) > 0: s = s2[0] * 3 else: s = "Agt" except Exception: pass default_names[i] += f"{s}" agent_levels = [p.level for p in profiles] if agent_name_reveals_position: for i, k in enumerate(agent_levels): default_names[i] += f"@{k:01}" if agent_params is None: agent_params = [dict(name=name) for name in default_names] elif isinstance(agent_params, dict): a = copy.copy(agent_params) agent_params = [] for i, name in enumerate(default_names): b = copy.deepcopy(a) b["name"] = name # type: ignore agent_params.append(b) elif len(agent_params) == 1: a = copy.copy(agent_params[0]) agent_params = [] for i, _ in enumerate(default_names): b = copy.deepcopy(a) b["name"] = name agent_params.append(b) else: if agent_name_reveals_type or agent_name_reveals_position: for i, (ns, ps) in enumerate(zip(default_names, agent_params)): agent_params[i] = dict(**ps) agent_params[i]["name"] = ns agent_types += [_StdSystemAgent, _StdSystemAgent] # type: ignore agent_params += [ {"role": SYSTEM_SELLER_ID, "obj": None}, {"role": SYSTEM_BUYER_ID, "obj": None}, ] profiles.append( OneShotProfile( cost=INFINITE_COST, input_product=-1, n_lines=0, disposal_cost_mean=0.0, storage_cost_mean=0.0, shortfall_penalty_mean=0.0, disposal_cost_dev=0.0, storage_cost_dev=0.0, shortfall_penalty_dev=0.0, ) ) profiles.append( OneShotProfile( cost=INFINITE_COST, input_product=n_processes, n_lines=0, disposal_cost_mean=0.0, storage_cost_mean=0.0, shortfall_penalty_mean=0.0, disposal_cost_dev=0.0, storage_cost_dev=0.0, shortfall_penalty_dev=0.0, ) ) initial_balance = initial_balance.tolist() + [ sys.maxsize // 4, sys.maxsize // 4, ] agents = [] for i, (atype, aparams) in enumerate(zip(agent_types, agent_params)): a = instantiate(atype, **aparams) a.id = a.name if a.adapted_object: a.adapted_object.id = a.id a.adapted_object.name = a.name if isinstance(a.adapted_object, OneShotAgent): a.adapted_object.connect_to_oneshot_adapter(a) else: a.adapted_object._owner = a self.join(a, i) agents.append(a) self.agent_types = [_.type_name for _ in agents] self.agent_params = [ {k: v for k, v in _.items() if k != "name" and k != "obj"} for _ in agent_params ] self.agent_unique_types = [ f"{t}{hash(str(p))}" if len(p) > 0 else t for t, p in zip(self.agent_types, self.agent_params) ] self.agent_n_contracts = dict(zip((_.id for _ in agents), itertools.repeat(0))) self.suppliers: list[list[str]] = [[] for _ in range(n_products)] self.consumers: list[list[str]] = [[] for _ in range(n_products)] self.production_capacity: list[int] = [ 0 if self.publish_production_capacity else -1 ] * n_processes self.agent_processes: dict[str, list[int]] = defaultdict(list) self.agent_inputs: dict[str, list[int]] = defaultdict(list) self.agent_outputs: dict[str, list[int]] = defaultdict(list) self.agent_consumers: dict[str, list[str]] = defaultdict(list) self.agent_suppliers: dict[str, list[str]] = defaultdict(list) self.agent_profiles: dict[str, OneShotProfile] = dict() self.consumers[n_products - 1].append(SYSTEM_BUYER_ID) self.agent_processes[SYSTEM_BUYER_ID] = [] self.agent_inputs[SYSTEM_BUYER_ID] = [n_products - 1] self.agent_outputs[SYSTEM_BUYER_ID] = [] self.suppliers[0].append(SYSTEM_SELLER_ID) self.agent_processes[SYSTEM_SELLER_ID] = [] self.agent_inputs[SYSTEM_SELLER_ID] = [] self.agent_outputs[SYSTEM_SELLER_ID] = [0] for agent_id, profile in zip(self.agents.keys(), profiles): # if is_system_agent(agent_id): # continue if profile.cost == INFINITE_COST: continue p = profile.level if self.publish_production_capacity: self.production_capacity[p] += profile.n_lines self.suppliers[p + 1].append(agent_id) self.consumers[p].append(agent_id) self.agent_processes[agent_id].append(p) self.agent_inputs[agent_id].append(p) self.agent_outputs[agent_id].append(p + 1) self.agent_profiles[agent_id] = profile self.agents[agent_id].profile = profile # type: ignore for aid, agent in self.agents.items(): if is_system_agent(aid): continue profile: OneShotProfile = agent.profile # type: ignore self.agent_disposal_cost[aid] = np.abs( # type: ignore np.random.randn(self.n_steps) * profile.disposal_cost_dev + profile.disposal_cost_mean ) self.agent_storage_cost[aid] = np.abs( # type: ignore np.random.randn(self.n_steps) * profile.storage_cost_dev + profile.storage_cost_mean ) self.agent_shortfall_penalty[aid] = np.abs( # type: ignore np.random.randn(self.n_steps) * profile.shortfall_penalty_dev + profile.shortfall_penalty_mean ) for p in range(n_products): for a in self.suppliers[p]: self.agent_consumers[a] = self.consumers[p] for a in self.consumers[p]: self.agent_suppliers[a] = self.suppliers[p] self.agent_processes = {k: np.array(v) for k, v in self.agent_processes.items()} # type: ignore self.agent_inputs = {k: np.array(v) for k, v in self.agent_inputs.items()} # type: ignore # type: ignore self.agent_outputs = {k: np.array(v) for k, v in self.agent_outputs.items()} # type: ignore assert all( len(v) == 1 or is_system_agent(self.agents[k].id) for k, v in self.agent_outputs.items() ), f"Not all agent outputs are singular:\n{self.agent_outputs}" assert all( len(v) == 1 or is_system_agent(self.agents[k].id) for k, v in self.agent_inputs.items() ), f"Not all agent inputs are singular:\n{self.agent_outputs}" assert all( is_system_agent(k) or self.agent_inputs[k][0] == self.agent_outputs[k] - 1 for k in self.agent_inputs.keys() ), f"Some agents have outputs != input+1\n{self.agent_outputs}\n{self.agent_inputs}" self.is_bankrupt: dict[str, bool] = dict( zip(self.agents.keys(), itertools.repeat(False)) ) self.exogenous_contracts: dict[int : list[Contract]] = defaultdict(list) # type: ignore for c in exogenous_contracts: seller_id = agents[c.seller].id if c.seller >= 0 else SYSTEM_SELLER_ID # type: ignore buyer_id = agents[c.buyer].id if c.buyer >= 0 else SYSTEM_BUYER_ID # type: ignore # type: ignore # type: ignore contract = Contract( agreement={ "time": c.time, "quantity": c.quantity, "unit_price": c.unit_price, }, partners=[buyer_id, seller_id], # type: ignore issues=[], # type: ignore # type: ignore signatures=dict(), signed_at=-1, to_be_signed_at=c.time, annotation={ "seller": seller_id, "buyer": buyer_id, "caller": ( SYSTEM_SELLER_ID if seller_id == SYSTEM_SELLER_ID else SYSTEM_BUYER_ID ), "is_buy": True, "product": c.product, "sim_step": self.current_step, }, ) self.exogenous_contracts[c.time].append(contract) self._traded_quantity = np.ones(n_products) * self.catalog_quantities self._real_price = np.nan * np.ones((n_products, n_steps + 1)) self._sold_quantity = np.zeros((n_products, n_steps + 1), dtype=int) self._real_price[:, 0] = self.catalog_prices self._trading_price = np.tile( self._real_price[:, 0].reshape((n_products, 1)), (1, n_steps + 1) ) self._betas = np.ones(n_steps + 1) self._betas[1] = self.trading_price_discount self._betas[1:] = np.cumprod(self._betas[1:]) self._betas_sum = self.catalog_quantities * np.ones((n_products, n_steps + 1)) if self.publish_trading_prices: self.bulletin_board.record("trading_prices", self._trading_price[:, 1]) # temporary variables for calculating scores self._input_quantity = defaultdict(int) self._input_price = defaultdict(int) self._output_quantity = defaultdict(int) self._output_price = defaultdict(int) self.exogenous_qout = defaultdict(int) self.exogenous_qin = defaultdict(int) self.exogenous_pout = defaultdict(int) self.exogenous_pin = defaultdict(int) self.exogenous_contracts_summary = [] self.initial_balances = dict(zip(self.agents.keys(), initial_balance)) # type: ignore self._max_n_lines = max(_.n_lines for _ in self.profiles) self.a2i = dict(zip((_.id for _ in agents), range(n_agents))) self._current_issues: list[list[ContiguousIssue]] = [] self.__contracts: dict[str, list[Contract]] = defaultdict(list) for product in range(self.n_products): unit_price, time, quantity = self._make_issues(product) _issues = [ make_issue(values(quantity), name="quantity"), make_issue(values(time), name="time"), make_issue(values(unit_price), name="unit_price"), ] if self._debug: assert all(isinstance(_, ContiguousIssue) for _ in _issues) self._current_issues.append(_issues) # type: ignore # type: ignore self.info.update( dict( agent_profiles={ k: dict( cost=v.cost, n_lines=v.n_lines, input_product=v.input_product, shortfall_penalty_mean=v.shortfall_penalty_mean, shortfall_penalty_dev=v.shortfall_penalty_dev, disposal_cost_mean=v.disposal_cost_mean, disposal_cost_dev=v.disposal_cost_dev, storage_cost_mean=v.storage_cost_mean, storage_cost_dev=v.storage_cost_dev, ) for k, v in self.agent_profiles.items() } ) ) self.info.update(dict(agent_inputs=to_lists(self.agent_inputs))) self.info.update(dict(agent_outputs=to_lists(self.agent_outputs))) self.info.update(dict(agent_processes=to_lists(self.agent_processes))) self.info.update(dict(agent_initial_balances=self.initial_balances)) self._update_exogenous(0) self._agent_negotiations: dict[ str, dict[str, dict[str, NegotiationDetails]] ] = dict()
[docs] def action_info_cols(self) -> list[tuple[str, type]]: return [ ("quantity", int), ("delivery_step", int), ("unit_price", int), ]
[docs] def extract_action_info(self, action: Any) -> list[int]: return [ action[QUANTITY] if action else 0, action[TIME] if action else 0, action[UNIT_PRICE] if action else 0, ]
[docs] def agreement_info_cols(self) -> list[tuple[str, type]]: return [ ("quantity", int), ("delivery_step", int), ("unit_price", int), ]
[docs] def extract_agreement_info(self, agreement: Outcome | None) -> list[int]: return [ agreement[QUANTITY] if agreement else 0, agreement[TIME] if agreement else 0, agreement[UNIT_PRICE] if agreement else 0, ]
[docs] def extra_neg_info(self, info: NegotiationInfo) -> dict[str, Any]: first = info.partners[0] last = info.partners[1] product = info.annotation["product"] results = dict() results["product"] = info.annotation["product"] results["exogenous_quantity0"] = ( 0 if not first else first.awi.current_exogenous_input_quantity if first.awi.is_first_level else first.awi.current_exogenous_output_quantity if first.awi.is_last_level else 0 ) results["exogenous_quantity1"] = ( 0 if not last else last.awi.current_exogenous_input_quantity if last.awi.is_first_level else last.awi.current_exogenous_output_quantity if last.awi.is_last_level else 0 ) results["exogenous_unit_price0"] = ( ( ( 0 if not first else first.awi.current_exogenous_input_price if first.awi.is_first_level else first.awi.current_exogenous_output_price if first.awi.is_last_level else 0 ) / results["exogenous_quantity0"] ) if results["exogenous_quantity0"] else 0 ) results["exogenous_unit_price1"] = ( ( ( 0 if not last else last.awi.current_exogenous_input_price if last.awi.is_first_level else last.awi.current_exogenous_output_price if last.awi.is_last_level else 0 ) / results["exogenous_quantity1"] ) if results["exogenous_quantity1"] else 0 ) results["needed_sales0"] = first.awi.needed_sales if first else 0 results["needed_sales1"] = last.awi.needed_sales if first else 0 results["needed_supplies0"] = first.awi.needed_supplies if first else 0 results["needed_supplies1"] = last.awi.needed_supplies if first else 0 results["trading_price"] = self.trading_prices[product] return results
@classmethod
[docs] def replace_agents( cls, config: dict, old_types: tuple[str | type[OneShotAgent], ...] | list[str | type[OneShotAgent]], types: tuple[str | type[OneShotAgent], ...] | list[str | type[OneShotAgent]], params: list[dict[str, Any]] | tuple[dict[str, Any], ...] | None = None, ): """ Replaces all agents of a given type by agents of a new type """ assert len(old_types) == len(types) config = copy.deepcopy(config) if not params: params = [dict() for _ in types] found_types = [_["controller_type"] for _ in config["agent_params"]] found_type_names = [ get_full_type_name(_) if not isinstance(_, str) else _ for _ in found_types ] type_names = [ get_full_type_name(_) if not isinstance(_, str) else _ for _ in types ] old_type_names = [ get_full_type_name(_) if not isinstance(_, str) else _ for _ in old_types ] mapping = dict(zip(old_type_names, type_names, strict=True)) params_map = dict(zip(type_names, params, strict=True)) for i, found in enumerate(found_type_names): if found not in mapping: continue t = mapping[found] config["agent_params"][i] = dict() config["agent_params"][i]["controller_type"] = t p = params_map[t] if p: config["agent_params"][i]["controller_params"] = p return config
@classmethod
[docs] def generate( cls, agent_types: ( tuple[str | type[OneShotAgent], ...] | list[str | type[OneShotAgent]] | type[OneShotAgent] | str ), agent_params: list[dict[str, Any]] | tuple[dict[str, Any], ...] | None = None, agent_processes: list[int] | None = None, n_steps: tuple[int, int] | int = (50, 200), n_processes: tuple[int, int] | int = 2, n_lines: np.ndarray | tuple[int, int] | int = 10, n_agents_per_process: np.ndarray | tuple[int, int] | int = (4, 8), process_inputs: np.ndarray | tuple[int, int] | int = 1, process_outputs: np.ndarray | tuple[int, int] | int = 1, production_costs: np.ndarray | tuple[int, int] | int = (1, 4), profit_means: np.ndarray | tuple[float, float] | float = (0.1, 0.2), profit_stddevs: np.ndarray | tuple[float, float] | float = 0.05, max_productivity: np.ndarray | tuple[float, float] | float = (0.8, 1.0), initial_balance: np.ndarray | tuple[int, int] | int | None = None, exogenous_supply_predictability: tuple[float, float] | float = (0.6, 0.9), exogenous_sales_predictability: tuple[float, float] | float = (0.6, 0.9), exogenous_control: tuple[float, float] | float = -1, cash_availability: tuple[float, float] | float = (1.5, 2.5), shortfall_penalty: tuple[float, float] | float = (0.2, 1.0), shortfall_penalty_dev: tuple[float, float] | float = (0.0, 0.1), disposal_cost: tuple[float, float] | float = (0.0, 0.2), disposal_cost_dev: tuple[float, float] | float = (0.0, 0.02), storage_cost: tuple[float, float] | float = (0.0, 0.02), storage_cost_dev: tuple[float, float] | float = 0, exogenous_price_dev: tuple[float, float] | float = (0.1, 0.2), price_multiplier: np.ndarray | tuple[float, float] | float = (1.5, 2.0), cost_increases_with_level=True, equal_exogenous_supply=False, equal_exogenous_sales=False, force_signing=True, profit_basis=np.max, random_agent_types: bool = False, penalties_scale: str | list[str] = "trading", cap_exogenous_quantities: bool = True, exogenous_generation_method="profitable", perishable: bool | None = True, max_supply: np.ndarray | tuple[float, float] | float = (0.8, 1.0), **kwargs, ) -> dict[str, Any]: """ Generates the configuration for a world Args: agent_types: All agent types agent_params: Agent parameters used to initialize them n_steps: Number of simulation steps n_processes: Number of processes in the production chain n_lines: Number of lines per factory process_inputs: Number of input units per process process_outputs: Number of output units per process production_costs: Production cost per factory profit_means: Mean profitability per production level (i.e. process). profit_stddevs: Std. Dev. of the profitability of every level (i.e. process). max_productivity: Maximum possible productivity per level (i.e. process). max_supply: Maximum possible supply level to the market, initial_balance: The initial balance of all agents n_agents_per_process: Number of agents per process agent_processes: The process for each agent. If not `None` , it will override `n_agents_per_process` and must be a list/tuple of the same length as `agent_types` . Morevoer, `random_agent_types` must be False in this case cost_increases_with_level: If true, production cost will be higher for processes nearer to the final product. profit_basis: The statistic used when controlling catalog prices by profit arguments. It can be np.mean, np.median, np.min, np.max or any Callable[[list[float]], float] and is used to summarize production costs at every level. equal_exogenous_supply: If true, external supply will be distributed equally among all agents in the first layer equal_exogenous_sales: If true, external sales will be distributed equally among all agents in the last layer exogenous_supply_predictability: How predictable are exogenous supplies of each agent over time. 1.0 means that every agent will have the same quantity for all of its contracts over time. 0.0 means quantities per agent are completely random exogenous_sales_predictability: How predictable are exogenous supplies of each agent over time. 1.0 means that every agent will have the same quantity for all of its contracts over time. 0.0 means quantities per agent are completely random force_signing: Whether to force contract signatures (exogenous contracts are treated in the same way). exogenous_control: How much control does the agent have over exogenous contract signing. Only effective if force_signing is False and use_exogenous_contracts is True cap_exogenous_quantities: If True, all exogenous quantities in all contracts are capped to be no more than the number of lines cash_availability: The fraction of the total money needs of the agent to work at maximum capacity that is available as `initial_balance` . This is only effective if `initial_balance` is set to `None` . exogenous_control: How much control does the agent have over exogenous contract signing. Only effective if force_signing is False and use_exogenous_contracts is True disposal_cost: A range to sample mean-disposal costs for all factories from (only used if perishable is True) shortfall_penalty: A range to sample mean-shortfall penalty for all factories from storage_cost: A range to sample mean-storage costs fro all factories from (only used if perishable is False) disposal_cost_dev: A range to sample std. dev of disposal costs for all factories from shortfall_penalty_dev: A range to sample std. dev of shortfall penalty for all factories from storage_cost_dev: The standard deviation of storage cost relative to the mean price exogenous_price_dev: The standard deviation of exogenous contract prices relative to the mean price price_multiplier: A value to multiply with trading/catalog price to get the upper limit on prices for all negotiations random_agent_types: If True, the final agent types used by the generator will always be sampled from the given types. If False, this random sampling will only happen if len(agent_types) != n_agents. penalties_scale: What are `disposal_cost` and `shortfall_penalty` relative to. There are four options: `trading`, `catalog` mean trading and catalog prices of the product. `unit` means the unit price in the contract and `none` means the `storage-cost` and `shortfall_penalty` are absolute values (in money unit). If not given will be read through the AWI exogenous_generation_method: the generation method. This is only for compatibility with SCML2020World and is not used. perishable: If True, storage_cost is set to zero as there is no storage and if False, disposal_cost is set to zero as there is no disposal. If None, neither is overridden. **kwargs: Returns: world configuration as a Dict[str, Any]. A world can be generated from this dict by calling OneShotWorld(**d) Remarks: - There are two general ways to use this generator: 1. Pass `random_agent_types = False`, and pass `agent_types`, `agent_processes` to control placement of each agent in each level of the production graph. 2. Pass `random_agent_types = True` and pass `agent_types`, `n_agents_per_process` to make the system randomly place the specified number of agents in each production level - Most parameters (i.e. `process_inputs` , `process_outputs` , `n_agents_per_process` , `costs` ) can take a single value, a tuple of two values, or a list of values. If it has a single value, it is repeated for all processes/factories as appropriate. If it is a tuple of two numbers $(i, j)$, each process will take a number sampled from a uniform distribution supported on $[i, j]$ inclusive. If it is a list of values, of the length `n_processes` , it is used as it is otherwise, it is used to sample values for each process. """ if perishable is not None: if perishable: storage_cost = storage_cost_dev = 0 else: disposal_cost = disposal_cost_dev = 0 if agent_processes is not None and random_agent_types: raise ValueError( "You cannot pass `agent_processes` and use `random_agent_types`. The first is only " "used when you want to fix the assignment of all agents to specific processes" ) if agent_processes is not None and len(agent_processes) != len(agent_types): # type: ignore raise ValueError( f"Length of `agent_processes` ({len(agent_processes)}) must equal the length of `agent_types` ({len(agent_types)})" # type: ignore ) info = dict( perishable=perishable, n_steps=n_steps, n_processes=n_processes, n_lines=n_lines, force_signing=force_signing, agent_processes=agent_processes, n_agents_per_process=n_agents_per_process, process_inputs=process_inputs, process_outputs=process_outputs, process_inputs_generator=process_inputs, process_outputs_generator=process_outputs, production_costs=production_costs, profit_means=profit_means, profit_stddevs=profit_stddevs, max_productivity=max_productivity, max_supply=max_supply, initial_balance=initial_balance, cost_increases_with_level=cost_increases_with_level, equal_exogenous_sales=equal_exogenous_sales, equal_exogenous_supply=equal_exogenous_supply, exogenous_supply_predictability=exogenous_supply_predictability, exogenous_sales_predictability=exogenous_sales_predictability, cash_availability=cash_availability, price_multiplier=price_multiplier, exogenous_price_dev=exogenous_price_dev, penalties_scale=penalties_scale, exogenous_control=exogenous_control, shortfall_penalty=shortfall_penalty, shortfall_penalty_dev=shortfall_penalty_dev, disposal_cost=disposal_cost, disposal_cost_dev=disposal_cost_dev, storage_cost=storage_cost, storage_cost_dev=storage_cost_dev, cap_exogenous_quantities=cap_exogenous_quantities, random_agent_types=random_agent_types, exogenous_generation_method=exogenous_generation_method, profit_basis=( "min" if profit_basis == np.min else ( "mean" if profit_basis == np.mean else ( "max" if profit_basis == np.max else "median" if profit_basis == np.median else "unknown" ) ) ), ) exogenous_price_dev = realin(exogenous_price_dev) penalties_scale = strin(penalties_scale) price_multiplier = realin(price_multiplier) # type: ignore n_processes = intin(n_processes) n_steps = intin(n_steps) exogenous_control = realin(exogenous_control) exogenous_sales_predictability = realin(exogenous_sales_predictability) exogenous_supply_predictability = realin(exogenous_supply_predictability) np.errstate(divide="ignore") # n_startup = n_processes # if n_steps <= n_startup: # raise ValueError( # f"Cannot generate a world with n_steps <= n_processes: {n_steps} <= {n_startup}" # ) process_inputs = make_array(process_inputs, n_processes, dtype=int) process_outputs = make_array(process_outputs, n_processes, dtype=int) fixed_assignment = agent_processes is not None and not random_agent_types if agent_processes is not None: pcount = defaultdict(int) for i in agent_processes: pcount[i] += 1 pnums = list(pcount.keys()) assert ( min(pnums) == 0 and max(pnums) == len(pnums) - 1 ), f"`agent_processes` is invalid: {agent_processes} as it leads to the following `n_agents_per_process`: {dict(pcount)}" n_agents_per_process = np.asarray([pcount[i] for i in range(len(pnums))]) assert not any( _ <= 0 for _ in n_agents_per_process ), "We have some levels with no processes" else: n_agents_per_process = make_array( n_agents_per_process, n_processes, dtype=int ) profit_means = make_array(profit_means, n_processes, dtype=float) profit_stddevs = make_array(profit_stddevs, n_processes, dtype=float) max_productivity = make_array( max_productivity, n_processes * n_steps, dtype=float ).reshape((n_processes, n_steps)) max_supply = make_array(max_supply, n_steps, dtype=float).reshape((1, n_steps)) n_agents = n_agents_per_process.sum() assert n_agents >= n_processes n_products = n_processes + 1 production_costs = make_array(production_costs, n_agents, dtype=int) if initial_balance is not None: initial_balance = make_array(initial_balance, n_agents, dtype=int) if not isinstance(agent_types, Iterable): agent_types = [agent_types] * n_agents if agent_params is None: agent_params = dict() # type: ignore if isinstance(agent_params, dict): agent_params = [copy.deepcopy(agent_params) for _ in range(n_agents)] else: assert len(agent_params) == 1 # type: ignore agent_params = [copy.deepcopy(agent_params[0]) for _ in range(n_agents)] # type: ignore elif not fixed_assignment: if agent_params is None: agent_params = [dict() for _ in range(len(agent_types))] if isinstance(agent_params, dict): agent_params = [ copy.deepcopy(agent_params) for _ in range(len(agent_types)) ] assert len(agent_types) == len(agent_params) tp = random.choices(list(range(len(agent_types))), k=n_agents) agent_types = [copy.copy(agent_types[_]) for _ in tp] agent_params = [copy.copy(agent_params[_]) for _ in tp] else: if agent_params is None: agent_params = [dict() for _ in range(len(agent_types))] if isinstance(agent_params, dict): agent_params = [ copy.deepcopy(agent_params) for _ in range(len(agent_types)) ] agent_types = list(agent_types) agent_params = list(agent_params) assert len(agent_types) == len(agent_params) agent_params = [_ if _ is not None else dict() for _ in agent_params] for t, p in zip(agent_types, agent_params): # type: ignore # type: ignore p["controller_type"] = t agent_types = [ # type: ignore ( DefaultOneShotAdapter if at and ( (isinstance(at, str) and at.startswith(PLACEHOLDER_AGENT_PREFIX)) or issubclass(get_class(at), OneShotAgent) ) else OneShotSCML2020Adapter if at else None ) for at in agent_types # type: ignore ] # generate production costs making sure that every agent can do exactly one process n_agents_cumsum = n_agents_per_process.cumsum().tolist() first_agent = [0] + n_agents_cumsum[:-1] last_agent = n_agents_cumsum[:-1] + [n_agents] process_of_agent = np.empty(n_agents, dtype=int) for i, (f, k) in enumerate(zip(first_agent, last_agent)): process_of_agent[f:k] = i if cost_increases_with_level: production_costs[f:k] = np.round( production_costs[f:k] * (i + 1) # math.sqrt(i + 1) ).astype(int) n_lines = intin(n_lines) costs = INFINITE_COST * np.ones((n_agents, n_lines, n_processes), dtype=int) # type: ignore for p, (f, k) in enumerate(zip(first_agent, last_agent)): costs[f:k, :, p] = production_costs[f:k].reshape((k - f), 1) # generate external contract amounts (controlled by productivity): # - generate total amount of input to the market # (it will end up being an n_products list of n_steps vectors) quantities = [ np.round(n_lines * n_agents_per_process[0] * max_supply).astype(int) ] # - for each level, find the amount of the output product that can be produced given the input amount and # productivity for p in range(n_processes): agents = n_agents_per_process[p] lines = n_lines * agents quantities.append( np.minimum( (quantities[-1] // process_outputs[p]) * process_inputs[p], ( np.round(lines * max_productivity[p, :]).astype(int) // process_inputs[p] ) * process_outputs[p], ) ) # - divide the quantity at every level between factories exogenous_supplies = distribute_quantities( equal_exogenous_supply, exogenous_supply_predictability, quantities[0], n_agents_per_process[0], n_steps, n_lines if cap_exogenous_quantities else None, # type: ignore ) quantities[0] = [sum(_) for _ in exogenous_supplies] exogenous_sales = distribute_quantities( equal_exogenous_sales, exogenous_sales_predictability, quantities[-1], n_agents_per_process[-1], n_steps, n_lines if cap_exogenous_quantities else None, # type: ignore # type: ignore ) quantities[-1] = [sum(_) for _ in exogenous_sales] # - now exogenous_supplies and exogenous_sales are both n_steps lists # of n_agents_per_process[p] vectors (jagged) # assign prices to the quantities given the profits catalog_prices = np.zeros(n_products, dtype=int) catalog_prices[0] = 10 supply_prices = catalog_prices[0] * np.ones( (n_agents_per_process[0], n_steps), dtype=int ) # We will calculate these later sale_prices = np.zeros((n_agents_per_process[-1], n_steps), dtype=int) # calculate manufacturing cost per process per step (this is per line) # we will multiply this by the number of active lines later manufacturing_costs = np.zeros((n_processes, n_steps), dtype=int) for p in range(n_processes): manufacturing_costs[p, :] = profit_basis( costs[first_agent[p] : last_agent[p], :, p] ) # calculate an "average" profit per process per step profits = np.zeros((n_processes, n_steps)) for p in range(n_processes): profits[p, :] = np.random.randn() * profit_stddevs[p] + profit_means[p] # total input costs come from buying exogenous supplies (quantity * unit price) input_costs = np.zeros((n_processes, n_steps), dtype=int) for step in range(n_steps): input_costs[0, step] = np.sum( exogenous_supplies[step] * supply_prices[:, step][:] ) # total input quantities per process are simply inputs of the corresponding # product type in quantities. input_quantity = np.zeros((n_processes, n_steps), dtype=int) input_quantity[0, :] = quantities[0] # the number of active lines come from dividing input by n. inputs consumed # by each line active_lines = np.hstack( [(n_lines * n_agents_per_process).reshape((n_processes, 1))] * n_steps # type: ignore ) assert active_lines.shape == (n_processes, n_steps) active_lines[0, :] = input_quantity[0, :] // process_inputs[0] # output_quantity = np.zeros((n_processes, n_steps), dtype=int) output_quantity = np.zeros((n_processes, n_steps), dtype=int) output_quantity[0, :] = active_lines[0, :] * process_outputs[0] # find the total manufacturing_costs per process manufacturing_costs[0, :] *= active_lines[0, :] # cost = cost of input + cost of manufacturing total_costs = input_costs + manufacturing_costs # should sell at the cost plus profit output_total_prices = np.ceil(total_costs * (1 + profits)).astype(int) for p in range(1, n_processes): input_costs[p, :] = output_total_prices[p - 1, :] input_quantity[p, :] = output_quantity[p - 1, :] active_lines[p, :] = input_quantity[p, :] // process_inputs[p] output_quantity[p, :] = active_lines[p, :] * process_outputs[p] manufacturing_costs[p, :] *= active_lines[p - 1, :] total_costs[p, :] = input_costs[p, :] + manufacturing_costs[p, :] output_total_prices[p, :] = np.ceil( total_costs[p, :] * (1 + profits[p, :]) ).astype(int) sale_prices[:, :] = np.ceil( output_total_prices[-1, :] / output_quantity[-1, :] ).astype(int) product_prices = np.zeros((n_products, n_steps)) product_prices[0, :] = catalog_prices[0] product_prices[1:, :] = np.ceil( np.divide( output_total_prices.astype(float), output_quantity.astype(float), out=np.zeros_like(output_total_prices, dtype=float), where=output_quantity != 0, ) ).astype(int) catalog_prices = np.ceil( [ profit_basis(product_prices[p, p : p + n_steps]) for p in range(n_products) ] ).astype(int) profile_info: list[ tuple[OneShotProfile, np.ndarray, np.ndarray, np.ndarray, np.ndarray] ] = [] nxt = 0 for k in range(n_processes): for a in range(n_agents_per_process[k]): esales = np.zeros((n_steps, n_products), dtype=int) esupplies = np.zeros((n_steps, n_products), dtype=int) esale_prices = np.zeros((n_steps, n_products), dtype=int) esupply_prices = np.zeros((n_steps, n_products), dtype=int) # TODO make sale_prices vary around a mean if k == 0: esupplies[:, 0] = [exogenous_supplies[s][a] for s in range(n_steps)] esupply_prices[:, 0] = supply_prices[a, :] if k == n_processes - 1: esales[:, -1] = [exogenous_sales[s][a] for s in range(n_steps)] esale_prices[:, -1] = sale_prices[a, :] dp = realin(shortfall_penalty) sc = realin(disposal_cost) storagec = realin(storage_cost) profile_info.append( ( OneShotProfile( cost=[_ for _ in costs[nxt][0] if _ != INFINITE_COST][0], input_product=k, n_lines=n_lines, # type: ignore disposal_cost_mean=sc, shortfall_penalty_mean=dp, storage_cost_mean=storagec, disposal_cost_dev=realin(disposal_cost_dev) * sc, shortfall_penalty_dev=realin(shortfall_penalty_dev) * dp, storage_cost_dev=realin(storage_cost_dev) * storagec, ), esales, esale_prices, esupplies, esupply_prices, ) ) nxt += 1 for p in profile_info: p = p[0] if perishable: assert ( p.storage_cost_dev == p.storage_cost_mean == 0 ), f"{storage_cost=}, {storage_cost_dev=}" else: assert ( p.disposal_cost_dev == p.disposal_cost_mean == 0 ), f"{disposal_cost=}, {disposal_cost_dev=}" max_income = ( output_quantity * catalog_prices[1:].reshape((n_processes, 1)) - total_costs ) assert nxt == n_agents if initial_balance is None: # every agent at every level will have just enough to do all the needed to do cash_availability fraction of # production (even though it may not have enough lines to do so) cash_availability = realin(cash_availability) balance = np.ceil( np.sum(total_costs, axis=1) / n_agents_per_process ).astype(int) initial_balance = [] # type: ignore for b, a in zip(balance, n_agents_per_process): initial_balance += [int(math.ceil(b * cash_availability))] * a b = np.sum(initial_balance) # type: ignore info.update( dict( product_prices=product_prices, active_lines=active_lines, input_quantities=input_quantity, output_quantities=output_quantity, exogenous_supplies=exogenous_supplies, exogenous_sales=exogenous_sales, expected_productivity=float(np.sum(active_lines)) / np.sum(n_lines * n_steps * n_agents_per_process), expected_n_products=np.sum(active_lines, axis=-1), expected_income=max_income, expected_welfare=float(np.sum(max_income)), expected_income_per_step=max_income.sum(axis=0), expected_income_per_process=max_income.sum(axis=-1), expected_mean_profit=( float(np.sum(max_income)) if b != 0 else np.sum(max_income) ), expected_profit_sum=( float(n_agents * np.sum(max_income) / b) if b != 0 else n_agents * np.sum(max_income) ), ) ) exogenous = [] for ( indx, (_, esales, esale_prices, esupplies, esupply_prices), # type: ignore Not using profile!! ) in enumerate(profile_info): input_product = process_of_agent[indx] for step, (sale, price) in enumerate( zip(esales[:, input_product + 1], esale_prices[:, input_product + 1]) ): if sale == 0: continue thisprice = int( 0.5 + price + np.random.randn() * exogenous_price_dev * price ) if force_signing or exogenous_control <= 0.0: exogenous.append( OneShotExogenousContract( product=input_product + 1, quantity=sale, unit_price=thisprice, time=step, revelation_time=step, seller=indx, # type: ignore buyer=-1, # type: ignore ) ) else: n_contracts = int(1 + exogenous_control * (sale - 1)) per_contract = integer_cut(sale, n_contracts, 0) for q in per_contract: if q == 0: continue thisprice = int( 0.5 + price + np.random.randn() * exogenous_price_dev * price ) exogenous.append( OneShotExogenousContract( product=input_product + 1, quantity=q, unit_price=thisprice, time=step, revelation_time=step, seller=indx, # type: ignore # type: ignore buyer=-1, # type: ignore ) ) for step, (supply, price) in enumerate( zip(esupplies[:, input_product], esupply_prices[:, input_product]) ): if supply == 0: continue thisprice = int( 0.5 + price + np.random.randn() * exogenous_price_dev * price ) if force_signing or exogenous_control <= 0.0: exogenous.append( OneShotExogenousContract( product=input_product, quantity=supply, unit_price=thisprice, time=step, revelation_time=step, seller=-1, # type: ignore buyer=indx, # type: ignore ) ) else: n_contracts = int(1 + exogenous_control * (supply - 1)) per_contract = integer_cut(supply, n_contracts, 0) for q in per_contract: if q == 0: continue thisprice = int( 0.5 + price + np.random.randn() * exogenous_price_dev * price ) exogenous.append( OneShotExogenousContract( product=input_product, quantity=q, unit_price=thisprice, time=step, revelation_time=step, seller=-1, # type: ignore buyer=indx, # type: ignore ) ) return dict( # process_inputs=process_inputs, # process_outputs=process_outputs, catalog_prices=catalog_prices, profiles=[_[0] for _ in profile_info], perishable=perishable, exogenous_contracts=exogenous, agent_types=agent_types, agent_params=agent_params, initial_balance=initial_balance, n_steps=n_steps, info=info, force_signing=force_signing, price_multiplier=price_multiplier, inventory_valuation_trading=0, inventory_valuation_catalog=0, penalties_scale=penalties_scale, **kwargs, )
[docs] def type_name_for_logs(self, agent: OneShotAgent | None) -> str | None: if not agent: return None x = agent.type_name.split(":")[-1] if x.startswith("scml"): x = x.split(".")[-1] return x
@property
[docs] def negotiated_contract_records(self) -> list[dict[str, Any]]: return [ _ for _ in self._saved_contracts.values() if all(not is_system_agent(a) for a in _["partners"]) ]
@property
[docs] def exogenous_contract_records(self) -> list[dict[str, Any]]: return [ _ for _ in self._saved_contracts.values() if any(is_system_agent(a) for a in _["partners"]) ]
[docs] def current_balance(self, agent_id: str): return sum(self._profits[agent_id]) + self.initial_balances[agent_id] # type: ignore
[docs] def add_financial_report( self, agent: DefaultOneShotAdapter, reports_agent, reports_time ) -> None: """ Records a financial report for the given agent in the agent indexed reports and time indexed reports Args: agent: The agent reports_agent: A dictionary of financial reports indexed by agent id reports_time: A dictionary of financial reports indexed by time Returns: """ current_balance = sum(self._profits[agent.id]) + self.initial_balances[agent.id] # type: ignore self.is_bankrupt[agent.id] = ( current_balance < self.bankruptcy_limit ) or self.is_bankrupt[agent.id] assets = ( ( self._inventory_input[agent.id] * self.trading_prices[self.agent_profiles[agent.id].input_product] + self._inventory_output[agent.id] * self.trading_prices[self.agent_profiles[agent.id].output_product] ) if self.publish_assets else 0 ) report = FinancialReport( agent_id=agent.id, step=self.current_step, cash=current_balance, assets=assets, breach_prob=len([_ for _ in self._breaches_of[agent.id] if _]) / len(self._breaches_of[agent.id]), breach_level=sum(self._breach_levels[agent.id]) / len(self._breach_levels[agent.id]), is_bankrupt=self.is_bankrupt[agent.id], agent_name=agent.name, ) repstr = str(report).replace("\n", " ") self.logdebug(f"{agent.name}: {repstr}") if reports_agent.get(agent.id, None) is None: reports_agent[agent.id] = {} reports_agent[agent.id][self.current_step] = report if reports_time.get(str(self.current_step), None) is None: reports_time[str(self.current_step)] = {} reports_time[str(self.current_step)][agent.id] = report
@property
[docs] def agent_contracts(self): return self.__contracts
[docs] def _update_exogenous(self, s): self.exogenous_qout = defaultdict(int) self.exogenous_qin = defaultdict(int) self.exogenous_pout = defaultdict(int) self.exogenous_pin = defaultdict(int) # self.__contracts = defaultdict(list) # Register exogenous contracts as concluded # ----------------------------------------- for contract in self.exogenous_contracts[s]: seller = contract.annotation["seller"] buyer = contract.annotation["buyer"] quantity = contract.agreement["quantity"] unit_price = contract.agreement["unit_price"] self.exogenous_qout[seller] += quantity self.exogenous_pout[seller] += quantity * unit_price self.exogenous_qin[buyer] += quantity self.exogenous_pin[buyer] += quantity * unit_price self.on_contract_concluded(contract, to_be_signed_at=self.current_step) if self.exogenous_force_max: contract.signatures = dict(zip(contract.partners, contract.partners)) else: if SYSTEM_SELLER_ID in contract.partners: contract.signatures[SYSTEM_SELLER_ID] = SYSTEM_SELLER_ID else: contract.signatures[SYSTEM_BUYER_ID] = SYSTEM_BUYER_ID if self.exogenous_dynamic: raise NotImplementedError("Exogenous-dynamic is not yet implemented")
[docs] def step_with(self, actions: dict[str, dict[str, SAOResponse]], init=False) -> bool: """ Runs a simulation step for the agents given in keys passing the corresponding values as counter offers. Returns: False if this is the last negotiation. Remarks: - You must call this with `init=True` once at the beginning of every simulation to make sure that `init()` and other initialization code is called correctly. - Every step advances all negotiations one step. - Negotiators belonging to the given agents are never called as long as a corresponding action (response) is given in the agents dict. - The world MUST be created with `one_offer_per_step` passed as `True` (default is `False`). """ neg_actions = dict() existing = set(self._negotiations.keys()) # existing = set(_.nmi.id for _ in self._current_negotiations) for agent, responses in actions.items(): awi: OneShotAWI = self.agents[agent].awi # type: ignore negotiations = awi.current_negotiation_details["buy"].copy() negotiations.update(awi.current_negotiation_details["sell"]) for partner, neg in negotiations.items(): neg: NegotiationDetails mynegs = [ _ for _ in neg.nmi._mechanism.negotiators if _.owner and _.owner.id == agent ] assert len(mynegs) == 1 assert neg.nmi._mechanism._one_offer_per_step # type: ignore response = responses.get(partner, None) mid = neg.nmi.mechanism_id # if mid not in existing: # continue if self._debug: assert ( mid in existing ), f"{mid} mechanism (with {partner}) does not exist for {agent}" if response is not None: neg_actions[mid] = {mynegs[0].id: response} else: warnings.warn(f"{agent=} has no response for partner {partner}") return self.step(n_neg_steps=int(not init), neg_actions=neg_actions)
[docs] def simulation_step(self, stage=0): s = self.current_step if self._verbose: print(f"{self.id}: Simulation step {s} stage: {stage}") if stage == 0: self._update_exogenous(s) # publish public information # -------------------------- if self.publish_trading_prices: self.bulletin_board.record( "trading_prices", value=self.trading_prices, key=str(self.current_step), ) q, p = np.zeros(self.n_products), np.zeros(self.n_products) for contract in self.exogenous_contracts[s]: product = contract.annotation["product"] quantity, unit_price = ( contract.agreement["quantity"], contract.agreement["unit_price"], ) q[product] += quantity p[product] += quantity * unit_price self.exogenous_contracts_summary = [(a, b) for a, b in zip(q, p)] if self.publish_exogenous_summary: self.bulletin_board.record( "exogenous_contracts_summary", value=self.exogenous_contracts_summary, key=str(self.current_step), ) # make agent ufuns # ================ for aid, a in self.agents.items(): if is_system_agent(aid): continue a.make_ufun(add_exogenous=True) # type: ignore # zero quantities and prices # ========================== self._input_quantity = defaultdict(int) self._input_price = defaultdict(int) self._output_quantity = defaultdict(int) self._output_price = defaultdict(int) self._n_nullified = 0 self._nullified_price = 0 self._nullified_quantity = 0 self._activity = 0 # Reset all agents # ================ for aid, a in self.agents.items(): a.reset() assert ( a.is_clean() ), f"Agent {aid} has unclean state: Negotiations: {a._negotiations}" # # Clean negotiation details # # ------------------------- # self._agent_negotiations = dict( # zip( # [_ for _ in self.agents.keys()], # [dict(buy=dict(), sell=dict()) for _ in self.agents.keys()], # ) # ) # request all negotiations # ======================== self._make_negotiations() # initialize all agents for this step # =================================== for aid, a in self.agents.items(): if hasattr(a, "before_step"): a.before_step() # type: ignore return # update trading price information # -------------------------------- has_trade = self._sold_quantity[:, s + 1] > 0 self._trading_price[~has_trade, s + 1] = self._trading_price[~has_trade, s] self._betas_sum[~has_trade, s + 1] = self._betas_sum[~has_trade, s] assert not np.any( np.isnan(self._real_price[has_trade, s + 1]) ), f"Nans in _real_price at step {self.current_step}\n{self._real_price}" self._trading_price[has_trade, s + 1] = ( self._trading_price[has_trade, s] * self._betas_sum[has_trade, s] * self.trading_price_discount + self._real_price[has_trade, s + 1] * self._sold_quantity[has_trade, s + 1] ) self._betas_sum[has_trade, s + 1] = ( self._betas_sum[has_trade, s] * self.trading_price_discount + self._sold_quantity[has_trade, s + 1] ) self._trading_price[has_trade, s + 1] /= self._betas_sum[has_trade, s + 1] self._trading_price[:, s + 1 :] = self._trading_price[:, s + 1].reshape( (self.n_products, 1) ) self._traded_quantity += self._sold_quantity[:, s + 1] # self._trading_price[has_trade, s] = ( # np.sum(self._betas[:s+1] * self._real_price[has_trade, s::-1]) # ) / self._betas_sum[s+1] # update agent profits # --------------------- for aid, agent in self.agents.items(): if is_system_agent(aid): continue if ( not self.penalize_bankrupt_for_future_contracts and self.is_bankrupt[aid] ): continue # agent.profile # todo: I am accessing the ufun of the agent directly to avoid running # unnecessary optimizations to find best and worst utility. This is # dangerous because the agent can change its own ufun. May be I should # directly create the ufun here using a global ufun method defined in # the ufun.py module that takes a world and an agent (or just an AWI) qin, qout = ( self._input_quantity[aid], self._output_quantity[aid], ) ufun = agent.ufun assert isinstance(ufun, OneShotUFun) info = ufun.from_contracts( [ _ for _ in self.__contracts[aid] if _.agreement["time"] == self.current_step ], return_info=True, ) ucon, producible = info.utility, info.producible # print(f"{aid}: {info}") if not self.perishable: remaining_in = max(0, self._input_quantity[aid] - producible) remaining_out = max(0, producible - self._output_quantity[aid]) self._inventory_input[aid] += remaining_in self._inventory_output[aid] += remaining_out else: assert ( self._inventory_input.get(aid, 0) == self._inventory_output.get(aid, 0) == 0 ), f"Perishable but with inventory remaining: {self._inventory_input[aid]=}, {self._inventory_output[aid]=}, {producible=}, {qin=}, {qout=}" self._productivity[aid] = producible / self.agent_profiles[aid].n_lines self._shortfall_penalty[aid] = info.shortfall_penalty self._shortfall_quantity[aid] = info.shortfall_quantity self._storage_cost[aid] = info.storage_cost self._disposal_cost[aid] = info.disposal_cost self._penalized_quantity[aid] = info.remaining_quantity self._profits[aid].append(ucon) self._breach_levels[aid].append(ufun.breach_level(qin, qout)) self._breaches_of[aid].append(ufun.is_breach(qin, qout)) current_balance = self.current_balance(aid) self.is_bankrupt[aid] = ( current_balance < self.bankruptcy_limit or self.is_bankrupt[aid] ) # TODO nullify all contracts of the bankrupt agent if self.is_bankrupt[aid]: for partner in self.agents.keys(): for contract in self.__contracts.get(partner, []): if not ( aid in contract.partners and contract.executed_at < 0 and contract.agreement["time"] >= self.current_step ): continue contract.nullified_at = self.current_step if self._breaches_of[aid][-1]: self.bulletin_board.record( section="breaches", key=unique_name("", add_time=False), value=self._breach_record( aid, self._breach_levels[aid][-1], "product" ), ) # publish financial reports # ------------------------- if self.current_step % self.financial_reports_period == 0: reports_agent = self.bulletin_board.data["reports_agent"] reports_time = self.bulletin_board.data["reports_time"] for aid, agent in self.agents.items(): if is_system_agent(agent.id): continue self.add_financial_report(agent, reports_agent, reports_time) # type: ignore
# # Clean negotiation details # # ------------------------- # self._agent_negotiations = dict( # zip( # [_ for _ in self.agents.keys()], # [dict(buy=dict(), sell=dict()) for _ in self.agents.keys()], # ) # )
[docs] def _breach_record( self, perpetrator, level, type_, ) -> dict[str, Any]: return { "perpetrator": perpetrator, "perpetrator_name": perpetrator, "level": level, "type": type_, "time": self.current_step, }
[docs] def _adjust_contract_types(self, contract): for k in ("unit_price", "quantity"): if not isinstance(contract.agreement[k], int): contract.agreement[k] = int(contract.agreement[k] + 0.5) return contract
[docs] def on_contract_signed(self, contract: Contract) -> bool: contract = self._adjust_contract_types(contract) self.__contracts[contract.annotation["buyer"]].append(contract) self.__contracts[contract.annotation["seller"]].append(contract) return super().on_contract_signed(contract)
[docs] def contract_record(self, contract: Contract) -> dict[str, Any]: contract = self._adjust_contract_types(contract) c = { "id": contract.id, "seller_name": self.agents[contract.annotation["seller"]].name, "buyer_name": self.agents[contract.annotation["buyer"]].name, "seller_type": self.agents[contract.annotation["seller"]].type_name, "buyer_type": self.agents[contract.annotation["buyer"]].type_name, "delivery_time": contract.agreement["time"], "quantity": contract.agreement["quantity"], "unit_price": contract.agreement["unit_price"], "signed_at": contract.signed_at, "nullified_at": contract.nullified_at, "concluded_at": contract.concluded_at, "signatures": "|".join(str(_) for _ in contract.signatures), "issues": contract.issues, "seller": contract.annotation["seller"], "buyer": contract.annotation["buyer"], "partners": contract.partners, "product_name": "p" + str(contract.annotation["product"]), } if not self.compact: c.update(contract.annotation) c["n_neg_steps"] = ( contract.mechanism_state.step if contract.mechanism_state else 0 ) return c
[docs] def breach_record(self, breach: Breach) -> dict[str, Any]: return { "perpetrator": breach.perpetrator, "perpetrator_name": breach.perpetrator, "level": breach.level, "type": breach.type, "time": breach.step, }
[docs] def execute_action(self, action, agent, callback: Callable | None = None) -> bool: _ = action, agent, callback return True
[docs] def contract_size(self, contract: Contract) -> float: if contract.nullified_at >= 0: return 0 return int(contract.agreement["quantity"] + 0.5) * int( contract.agreement["unit_price"] + 0.5 )
[docs] def post_step_stats(self): self._stats["n_contracts_nullified_now"].append(self._n_nullified) self._stats["n_contracts_nullified_quantity"].append(self._nullified_quantity) self._stats["n_contracts_nullified_price"].append(self._nullified_price) scores = self.scores() for p in range(self.n_products): self._stats[f"trading_price_{p}"].append( self._trading_price[p, self.current_step + 1] ) self._stats[f"sold_quantity_{p}"].append( self._sold_quantity[p, self.current_step + 1] ) self._stats[f"unit_price_{p}"].append( self._real_price[p, self.current_step + 1] ) if self.exogenous_contracts_summary: qq = self.exogenous_contracts_summary[p][0] self._stats[f"exogenous_quantity_{p}"].append(qq) self._stats[f"exogenous_unit_price_{p}"].append( (self.exogenous_contracts_summary[p][1] / qq) if qq else 0.0 ) for aid in self.agents.keys(): if is_system_agent(aid): continue self._stats[f"score_{aid}"].append(scores[aid]) self._stats[f"balance_{aid}"].append(self.current_balance(aid)) self._stats[f"bankrupt_{aid}"].append(self.is_bankrupt.get(aid, False)) self._stats[f"productivity_{aid}"].append(self._productivity[aid]) self._stats[f"shortfall_quantity_{aid}"].append( self._shortfall_quantity[aid] ) self._stats[f"shortfall_penalty_{aid}"].append(self._shortfall_penalty[aid]) self._stats[f"storage_cost_{aid}"].append(self._storage_cost[aid]) self._stats[f"disposal_cost_{aid}"].append(self._disposal_cost[aid]) self._stats[f"inventory_penalized_{aid}"].append( self._penalized_quantity[aid] ) self._stats[f"inventory_input_{aid}"].append(self._inventory_input[aid]) self._stats[f"inventory_output_{aid}"].append(self._inventory_output[aid])
[docs] def pre_step_stats(self): self._n_nullified = 0 self._nullified_price = 0 self._nullified_quantity = 0 self._activity = 0
[docs] def welfare(self, include_bankrupt: bool = False) -> float: """Total welfare of all agents""" scores = self.scores() return sum( scores[a.id] for a in self.agents.values() if not is_system_agent(a.id) and (include_bankrupt or not self.is_bankrupt[a.id]) )
[docs] def relative_welfare(self, include_bankrupt: bool = False) -> float | None: """Total welfare relative to expected value. Returns None if no expectation is found in self.info""" if "expected_income" not in self.info.keys(): return None return self.welfare(include_bankrupt) / np.sum(self.info["expected_income"])
[docs] def is_valid_contact(self, contract: Contract) -> bool: """Checks whether a signed contract is valid""" return ( contract.agreement["time"] >= self.current_step and contract.agreement["time"] < self.n_steps and contract.agreement["unit_price"] > 0 and contract.agreement["quantity"] > 0 )
[docs] def scores(self, assets_multiplier: float = 0.0) -> dict[str, float]: """ Scores of all agents given the asset multiplier. Args: assets_multiplier: A multiplier to multiply the assets with. """ scores = dict() for aid in self.agents.keys(): if is_system_agent(aid): continue if not self.initial_balances[aid]: # type: ignore scores[aid] = self.initial_balances[aid] + sum(self._profits[aid]) # type: ignore continue scores[aid] = self.initial_balances[aid] + sum( # type: ignore self._profits[aid] ) # type: ignore if abs(assets_multiplier) > 1e-6: scores[aid] += ( self._inventory_input[aid] * self.trading_prices[self.agent_profiles[aid].input_product] ) scores[aid] += ( self._inventory_output[aid] * self.trading_prices[self.agent_profiles[aid].output_product] ) scores[aid] /= self.initial_balances[aid] # type: ignore return scores
@property
[docs] def winners(self): """The winners of this world (factory managers with maximum wallet balance""" if len(self.agents) < 1: return [] scores = self.scores() sa = sorted(zip(scores.values(), scores.keys()), reverse=True) max_score = sa[0][0] winners = [] for s, a in sa: if s < max_score: break winners.append(self.agents[a]) return winners
[docs] def trading_prices_for( self, discount: float = 1.0, condition="executed" ) -> np.ndarray: """ Calculates the prices at which all products traded using an optional discount factor Args: discount: A discount factor to treat older prices less importantly (exponential discounting). condition: The condition for contracts to consider. Possible values are executed, signed, concluded, nullified Returns: an n_products vector of trading prices """ prices = np.nan * np.ones((self.n_products, self.n_steps), dtype=float) quantities = np.zeros((self.n_products, self.n_steps), dtype=int) for contract in self.saved_contracts: if contract[condition + "_at"] < 0: continue p, t, q, u = ( contract["product"], contract["delivery_time"], contract["quantity"], contract["unit_price"], ) prices[p, t] = (prices[p, t] * quantities[p, t] + u * q) / ( quantities[p, t] + q ) quantities[p, t] += q discount = np.cumprod(discount * np.ones(self.n_steps)) # type: ignore discount /= sum(discount) # type: ignore return np.nansum(np.nanprod(prices, discount), axis=-1) # type: ignore
@property
[docs] def trading_prices(self): if self.current_step == self.n_steps: return self._trading_price[:, -1] return self._trading_price[:, self.current_step + 1]
@property
[docs] def stats_df(self) -> pd.DataFrame: """Returns a pandas data frame with the stats""" return pd.DataFrame(self.stats)
@property
[docs] def contracts_df(self) -> pd.DataFrame: """Returns a pandas data frame with the contracts""" contracts = pd.DataFrame(self.saved_contracts) contracts["product_index"] = contracts.product_name.str.replace("p", "").astype( int ) contracts["breached"] = contracts.breaches.str.len() > 0 contracts["executed"] = contracts.executed_at >= 0 contracts["erred"] = contracts.erred_at >= 0 contracts["nullified"] = contracts.nullified_at >= 0 return contracts
@property
[docs] def system_agents(self) -> list[_StdSystemAgent]: """Returns the two system agents""" return [_ for _ in self.agents.values() if is_system_agent(_.id)] # type: ignore
@property
[docs] def system_agent_names(self) -> list[str]: """Returns the names two system agents""" return [_ for _ in self.agents.keys() if is_system_agent(_)]
@property
[docs] def non_system_agents(self) -> list[DefaultOneShotAdapter]: """Returns all agents except system agents""" return [_ for _ in self.agents.values() if not is_system_agent(_.id)] # type: ignore
@property
[docs] def non_system_agent_names(self) -> list[str]: """Returns names of all agents except system agents""" return [_ for _ in self.agents.keys() if not is_system_agent(_)]
@property
[docs] def agreement_fraction(self) -> float: """Fraction of negotiations ending in agreement and leading to signed contracts""" n_negs = sum(self.stats["n_negotiations"]) n_contracts = self.n_saved_contracts(True) return n_contracts / n_negs if n_negs != 0 else np.nan
[docs] system_agent_ids = system_agent_names
[docs] non_system_agent_ids = non_system_agent_names
[docs] def draw( # type: ignore self, steps: tuple[int, int] | int | None = None, what: Collection[str] = DEFAULT_EDGE_TYPES, who: Callable[[Agent], bool] | None = None, where: Callable[[Agent], int | tuple[float, float]] | None = None, together: bool = True, axs: Collection[Axis] | None = None, ncols: int = 4, figsize: tuple[int, int] = (15, 15), **kwargs, ) -> tuple[Axis, nx.Graph] | tuple[list[Axis], list[nx.Graph]]: if where is None: def mywhere(x): return ( self.n_processes + 1 if x == SYSTEM_BUYER_ID else 0 if x == SYSTEM_SELLER_ID else int(self.agents[x].awi.profile.level + 1) ) # type: ignore where = mywhere return super().draw( # type: ignore steps, what, who, where, together=together, axs=axs, # type: ignore ncols=ncols, figsize=figsize, **kwargs, )
[docs] def _request_negotiations( self, agent_id: str, # quantity: int | tuple[int, int], # unit_price: int | tuple[int, int], # time: int | tuple[int, int], controller: SAOController | None = None, negotiators: list[SAONegotiator] | None = None, extra: dict[str, Any] | None = None, # consumer_starts: bool = True, ) -> bool: """ Requests negotiations (used internally) Args: agent_id: the agent requesting product: The product to negotiate about quantity: The minimum and maximum quantities. Passing a single value q is equivalent to passing (q,q) unit_price: The minimum and maximum unit prices. Passing a single value u is equivalent to passing (u,u) time: The minimum and maximum delivery step. Passing a single value t is equivalent to passing (t,t) controller: The controller to manage the complete set of negotiations negotiators: An optional list of negotiators to use for negotiating with the given partners (in the same order). extra: Extra information accessible through the negotiation annotation to the caller # consumer_starts: Whether the consumer or supplier sends the first offer in the negotiation Returns: `True` if the partner accepted and the negotiation is ready to start """ if self.is_bankrupt[agent_id] or is_system_agent(agent_id): return True if controller is not None and negotiators is not None: raise ValueError( "You cannot pass both controller and negotiators to request_negotiations" ) if controller is None and negotiators is None: raise ValueError( "You MUST pass either controller or negotiators to request_negotiations" ) if extra is None: extra = dict() # responding_agents = ( # self.suppliers[product] if consumer_starts else self.consumers[product] # ) results = [] for responding_agents, is_buying, product in ( ( self.agent_suppliers[agent_id], True, self.agent_profiles[agent_id].input_product, ), ( self.agent_consumers[agent_id], False, self.agent_profiles[agent_id].output_product, ), ): responding_agents = [ _ for _ in responding_agents if not is_system_agent(_) and not self.is_bankrupt[_] ] if not responding_agents: continue # print( # f"{is_buying=}: {agent_id} requesting negotiations with {responding_agents} for product {product}" # ) partners = [ _ for _ in responding_agents if not self.is_bankrupt[_] and not is_system_agent(_) ] if not partners: return True if negotiators is None: assert controller is not None negotiators = [ controller.create_negotiator(ControlledSAONegotiator, name=_, id=_) # type: ignore for _ in partners ] assert negotiators is not None results += [ self._request_negotiation( agent_id=agent_id, product=product, # quantity=quantity, # unit_price=unit_price, # time=time, partner=partner, negotiator=negotiator, extra=extra, is_buy=is_buying, ) for partner, negotiator in zip(partners, negotiators) ] # for p, r in zip(partners, results): # if r: # self._world._registered_negs.add(tuple(sorted([P, self.agent.id]))) if self._debug and not all(results): failed = set() for r, p in zip(results, partners): if not r: failed.add(p) assert failed raise AssertionError( f"Partners {failed} failed to accept negotiation request from {agent_id}" ) return all(results)
[docs] def _request_negotiation( self, agent_id: str, product: int, # quantity: int | tuple[int, int], # unit_price: int | tuple[int, int], # time: int | tuple[int, int], partner: str, negotiator: SAONegotiator, extra: dict[str, Any] | None = None, is_buy: bool = True, ) -> NegotiationInfo | None: """ Requests a negotiation Args: product: The product to negotiate about quantity: The minimum and maximum quantities. Passing a single value q is equivalent to passing (q,q) unit_price: The minimum and maximum unit prices. Passing a single value u is equivalent to passing (u,u) time: The minimum and maximum delivery step. Passing a single value t is equivalent to passing (t,t) partner: ID of the partner to negotiate with. negotiator: The negotiator to use for this negotiation (if the partner accepted to negotiate) extra: Extra information accessible through the negotiation annotation to the caller is_buy: whether the consumer starts the negotiation Returns: `True` if the partner accepted and the negotiation is ready to start """ if is_buy: assert ( self.agent_profiles[agent_id].input_product == product ), f"{agent_id=}: Buying {product=} but my input product is {self.agent_profiles[agent_id].input_product}" else: assert ( self.agent_profiles[agent_id].output_product == product ), f"{agent_id=}: Selling {product=} but my output product is {self.agent_profiles[agent_id].output_product}" agent = self.agents[agent_id] if extra is None: extra = dict() if self._debug: quantity = self._current_issues[product][QUANTITY] time = self._current_issues[product][TIME] unit_price = self._current_issues[product][UNIT_PRICE] self.logdebug( f"{agent.name} requested to {'buy' if is_buy else 'sell'} {product} to {partner}" f" q: {quantity}, u: {unit_price}, t: {time}" ) annotation = { "product": product, "is_buy": is_buy, "buyer": agent_id if is_buy else partner, "seller": partner if is_buy else agent_id, "caller": agent_id, "sim_step": self.current_step, } issues = self._current_issues[product] partners = [agent_id, partner] extra["negotiator_id"] = negotiator.id req_id = agent.create_negotiation_request( issues=issues, # type: ignore partners=partners, negotiator=negotiator, annotation=annotation, extra=dict(**extra), ) result = self.request_negotiation_about( caller=agent, issues=issues, # type: ignore partners=[self.agents[_] for _ in partners], req_id=req_id, annotation=annotation, ) # signal failure if the request is rejected if not result: return result partner_ = [_ for _ in partners if _ != agent.id][0] if is_buy: buyer, seller = agent.id, partner_ else: seller, buyer = agent.id, partner_ assert result.mechanism is not None info = NegotiationDetails( seller=seller, buyer=buyer, nmi=result.mechanism.nmi, product=product, ) if self._debug: assert ( buyer in self.agents[seller].awi.my_consumers # type: ignore ), f"{seller=}, {buyer=}" assert ( seller in self.agents[buyer].awi.my_suppliers # type: ignore ), f"{seller=}, {buyer=}" # self._current_negotiations.append(info) self._agent_negotiations[seller]["sell"][buyer] = info self._agent_negotiations[buyer]["buy"][seller] = info return result
[docs] def _make_issues( self, product ) -> tuple[tuple[int, int], tuple[int, int], tuple[int, int]]: """ Creates the negotiation agendas Args: product (int): The product to be negotiated about Returns: A tuple of minimum and maximum values for unit-price, time, and quantity in that order """ price_of_product = ( self.trading_prices[product] if self.publish_trading_prices else self.catalog_prices[product] ) if self.wide_price_range: if product: p = ( self.trading_prices[product - 1] if self.publish_trading_prices else self.catalog_prices[product - 1] ) else: p = 0 else: p = price_of_product if self.price_range_fraction > 1e-6: unit_price = ( max(1, int(p * (1.0 - self.price_range_fraction))), int((1.0 + self.price_range_fraction) * price_of_product), ) if unit_price[1] < unit_price[0]: unit_price = (unit_price[1], unit_price[0]) elif self.price_multiplier > 1e-6: unit_price = ( max( 1, int(p // self.price_multiplier), ), int(self.price_multiplier * price_of_product), ) else: ceil = int(math.ceil(price_of_product)) unit_price = ( max(1, ceil - 1), max(1, ceil), ) assert unit_price[0] + 1 == unit_price[1] or unit_price[1] == 1 time = ( self.current_step, self.current_step + (self.horizon if not self.one_time_per_negotiation else 0), ) quantity = ( int(not self.allow_zero_quantity), max(1, int(self._max_n_lines * self.quantity_multiplier + 0.5)), ) return unit_price, time, quantity
[docs] def _make_negotiations(self): # consumer_starts = random.random() > 0.5 # initialize negotiation details self._agent_negotiations = dict( zip( [_ for _ in self.agents.keys()], [dict(buy=dict(), sell=dict()) for _ in self.agents.keys()], ) ) if self._verbose: print(f"{self.id} ({self.current_step}): Making Negotiations") def values(x: int | tuple[int, int]): if not isinstance(x, Iterable): return int(x), int(x) return int(min(x)), int(max(x)) controllers = dict() for aid, a in self.agents.items(): if is_system_agent(aid) or isinstance(a, OneShotSCML2020Adapter): continue controllers[aid] = a.adapted_object # type: ignore a.adapted_object.make_ufun(add_exogenous=True) # type: ignore assert ( a.is_clean() ), f"Agent {aid} has unclean state: Negotiations: {a._negotiations}" expected_negs = set() if self._debug: if len(self._negotiations) != 0: warnings.warn( f"Found unexpected negotiations at step {self.current_step}" f"\n{[(_.partners, _.mechanism.state if _.mechanism else None) for _ in self._negotiations.values() ]}" ) for product in range(1, self.n_products): unit_price, time, quantity = self._make_issues(product) self._current_issues[product] = [ # type: ignore make_issue(values(quantity), name="quantity"), make_issue(values(time), name="time"), make_issue(values(unit_price), name="unit_price"), ] assert ( not self.one_time_per_negotiation or time[0] == time[1] == self.current_step ), f"{time=}, {self.current_step=} but {self.one_time_per_negotiation=}" for level in range(self.n_products - 1, -1, -2): requesting_agents = self.suppliers[level] requesting_agents = [ _ for _ in requesting_agents if not is_system_agent(_) and not self.is_bankrupt[_] ] if not requesting_agents: continue for aid in requesting_agents: if is_system_agent(aid) or isinstance( self.agents[aid], OneShotSCML2020Adapter ): continue self._request_negotiations( agent_id=aid, controller=controllers[aid], negotiators=None, extra=None, # consumer_starts=consumer_starts, ) # request negotiations about the future if needed if self.one_time_per_negotiation and self.horizon: for _ in range(self.horizon): t = time[0] + 1 # type: ignore if t >= self.n_steps: continue time = (t, t) self._request_negotiations( agent_id=aid, controller=controllers[aid], negotiators=None, extra=None, # consumer_starts=consumer_starts, ) if self._debug: for level in range(self.n_products): for c in self.consumers[level]: if is_system_agent(c) or self.is_bankrupt[c]: continue for s in self.suppliers[level]: if is_system_agent(s) or self.is_bankrupt[s]: continue expected_negs.add(tuple(sorted((c, s)))) found_negs = set() for n in self._negotiations.values(): found_negs.add(tuple(sorted(_.id for _ in n.partners))) assert ( found_negs == expected_negs ), f"{expected_negs=}\n\n{found_negs=}\n\n{found_negs.difference(expected_negs)=}\n\n{expected_negs.difference(found_negs)=}"
# if not success: # raise ValueError( # f"Failed to start negotiations for product " f"{product}" # )
[docs] def order_contracts_for_execution( self, contracts: Collection[Contract] ) -> Collection[Contract]: # for contract in contracts: # contract.executed_at = self.current_step return contracts
[docs] def get_private_state(self, agent: Agent) -> dict: return agent.awi.state
[docs] def _contract_record(self, contract): record = super()._contract_record(contract) # record["executed_at"] = self.current_step return record
[docs] def start_contract_execution(self, contract: Contract) -> set[Breach] | None: # print(f"Started executing {contract.agreement} on {self.current_step} signed on {contract.signed_at}") breaches = set() # do not process if the partner is bankrupt if ( self.nullify_bankrupt_contracts and any(self.is_bankrupt.get(a, False) for a in contract.partners) and contract.nullified_at < 0 ): self._n_nullified += 1 q = contract.agreement["quantity"] self._activity += 0 self._nullified_quantity += q self._nullified_price += contract.agreement["price"] * q contract.nullified_at = self.current_step self.loginfo( f"Nullified contract because a partner was bankrupt: {contract}" ) if contract.nullified_at >= 0: return breaches assert ( contract.agreement["time"] == self.current_step ), f"{contract.agreement=} executed on {self.current_step} signed on {contract.signed_at}" contract.executed_at = self.current_step product = contract.annotation["product"] bought = contract.agreement["quantity"] if bought < 1: contract.nullified_at = self.current_step return breaches total_price = bought * contract.agreement["unit_price"] oldq = self._sold_quantity[product, self.current_step + 1] oldp = self._real_price[product, self.current_step + 1] totalp = 0.0 if oldq > 0: totalp = oldp * oldq self._sold_quantity[product, self.current_step + 1] += bought self._real_price[product, self.current_step + 1] = (totalp + total_price) / ( oldq + bought ) self._input_quantity[contract.annotation["buyer"]] += bought self._input_price[contract.annotation["buyer"]] += total_price self._output_quantity[contract.annotation["seller"]] += bought self._output_price[contract.annotation["seller"]] += total_price self._activity += total_price return breaches
[docs] def complete_contract_execution( self, contract: Contract, breaches: list[Breach], resolution: Contract ) -> None: super().complete_contract_execution(contract, breaches, resolution)
@classmethod
[docs] def plot_combined_stats( # type: ignore cls, worlds: tuple[SCMLBaseWorld, ...] | SCMLBaseWorld, stats: str | tuple[str, ...] | None = None, pertype=False, makefig=False, title=True, ylabel=False, xlabel=False, legend=True, figsize=None, perishable: bool = False, **kwargs, ): """Plots combined statistics of multiple worlds in a single plot Args: stats: The statistics to plot. If `None`, some selected stats will be displayed pertype: combine agent-statistics per type use_sum: plot sum for type statistics instead of mean title: If given a title will be added to each subplot ylabel: If given, the ylabel will be added to each subplot xlabel: If given The xlabel will be added (Simulation Step) legend: If given, a legend will be displayed makefig: If given a new figure will be started figsize: Size of the figure to host the plot ylegend: y-axis of legend for cases with large number of labels legend_n_cols: number of columns in the legend """ import matplotlib.pyplot as plt if not stats: if makefig: fig = plt.figure(figsize=figsize) axes = fig.subplots(4 - int(perishable), 2) else: _, axes = plt.subplots(4 - int(perishable), 2) plt.sca(axes[0, 0]) cls.plot_combined_stats( worlds, "shortfall_penalty", pertype=pertype, legend=legend, xlabel=False, title=title, ylabel=ylabel, **kwargs, ) plt.tick_params( axis="x", # changes apply to the x-axis which="both", # both major and minor ticks are affected bottom=False, # ticks along the bottom edge are off top=False, # ticks along the top edge are off labelbottom=False, ) # labels along the bottom edge are off plt.sca(axes[1, 0]) cls.plot_combined_stats( worlds, "disposal_cost" if perishable else "storage_cost", pertype=pertype, legend=False, xlabel=False, title=title, ylabel=ylabel, ) plt.tick_params( axis="x", # changes apply to the x-axis which="both", # both major and minor ticks are affected bottom=False, # ticks along the bottom edge are off top=False, # ticks along the top edge are off labelbottom=False, ) # labels along the bottom edge are off plt.sca(axes[0, 1]) cls.plot_combined_stats( worlds, "score", pertype=pertype, legend=False, xlabel=False, title=title, ylabel=ylabel, ) plt.tick_params( axis="x", # changes apply to the x-axis which="both", # both major and minor ticks are affected bottom=False, # ticks along the bottom edge are off top=False, # ticks along the top edge are off labelbottom=False, ) # labels along the bottom edge are off plt.sca(axes[1, 1]) cls.plot_combined_stats( worlds, "productivity", pertype=pertype, legend=False, xlabel=False, title=title, ylabel=ylabel, ) plt.tick_params( axis="x", # changes apply to the x-axis which="both", # both major and minor ticks are affected bottom=False, # ticks along the bottom edge are off top=False, # ticks along the top edge are off labelbottom=False, ) # labels along the bottom edge are off if not perishable: plt.sca(axes[2, 0]) cls.plot_combined_stats( worlds, "inventory_penalized", pertype=pertype, legend=False, xlabel=False, title=title, ylabel=ylabel, ) plt.tick_params( axis="x", # changes apply to the x-axis which="both", # both major and minor ticks are affected bottom=False, # ticks along the bottom edge are off top=False, # ticks along the top edge are off labelbottom=False, ) # labels along the bottom edge are off plt.sca(axes[2, 1]) cls.plot_combined_stats( worlds, "inventory_input", pertype=pertype, legend=False, xlabel=False, title=title, ylabel=ylabel, ) plt.tick_params( axis="x", # changes apply to the x-axis which="both", # both major and minor ticks are affected bottom=False, # ticks along the bottom edge are off top=False, # ticks along the top edge are off labelbottom=False, ) # labels along the bottom edge are off plt.sca(axes[3 - int(perishable), 0]) cls.plot_combined_stats( worlds, "sold_quantity", pertype=False, xlabel=True, title=title, ylabel=ylabel, ) plt.sca(axes[3 - int(perishable), 1]) cls.plot_combined_stats( worlds, "trading_price", pertype=False, xlabel=True, title=title, ylabel=ylabel, ) return return super().plot_combined_stats( worlds=worlds, stats=stats, pertype=pertype, makefig=makefig, title=title, ylabel=ylabel, xlabel=xlabel, legend=legend, figsize=figsize, **kwargs, )
[docs] def plot_stats( self, stats: str | tuple[str, ...] | None = None, pertype=False, use_sum=False, makefig=False, title=True, ylabel=False, xlabel=False, legend=True, figsize=None, ylegend=2.0, legend_ncols=8, ): """Plots statistics of the world in a single plot Args: stats: The statistics to plot. If `None`, some selected stats will be displayed pertype: combine agent-statistics per type use_sum: plot sum for type statistics instead of mean title: If given a title will be added to each subplot ylabel: If given, the ylabel will be added to each subplot xlabel: If given The xlabel will be added (Simulation Step) legend: If given, a legend will be displayed makefig: If given a new figure will be started figsize: Size of the figure to host the plot ylegend: y-axis of legend for cases with large number of labels """ import matplotlib.pyplot as plt if not stats: if makefig: fig = plt.figure(figsize=figsize) axes = fig.subplots(4 - int(self.perishable), 2) else: _, axes = plt.subplots(4 - int(self.perishable), 2) plt.sca(axes[0, 0]) self.plot_stats( "shortfall_penalty", pertype=pertype, legend=legend, xlabel=False, title=title, ylabel=ylabel, legend_ncols=legend_ncols, ) plt.tick_params( axis="x", # changes apply to the x-axis which="both", # both major and minor ticks are affected bottom=False, # ticks along the bottom edge are off top=False, # ticks along the top edge are off labelbottom=False, ) # labels along the bottom edge are off plt.sca(axes[1, 0]) self.plot_stats( "disposal_cost" if self.perishable else "storage_cost", pertype=pertype, legend=False, xlabel=False, title=title, ylabel=ylabel, ) plt.tick_params( axis="x", # changes apply to the x-axis which="both", # both major and minor ticks are affected bottom=False, # ticks along the bottom edge are off top=False, # ticks along the top edge are off labelbottom=False, ) # labels along the bottom edge are off plt.sca(axes[0, 1]) self.plot_stats( "score", pertype=pertype, legend=False, xlabel=False, title=title, ylabel=ylabel, ) plt.tick_params( axis="x", # changes apply to the x-axis which="both", # both major and minor ticks are affected bottom=False, # ticks along the bottom edge are off top=False, # ticks along the top edge are off labelbottom=False, ) # labels along the bottom edge are off plt.sca(axes[1, 1]) self.plot_stats( "productivity", pertype=pertype, legend=False, xlabel=False, title=title, ylabel=ylabel, ) plt.tick_params( axis="x", # changes apply to the x-axis which="both", # both major and minor ticks are affected bottom=False, # ticks along the bottom edge are off top=False, # ticks along the top edge are off labelbottom=False, ) # labels along the bottom edge are off if not self.perishable: plt.sca(axes[2, 0]) self.plot_stats( "inventory_penalized", pertype=pertype, legend=False, xlabel=False, title=title, ylabel=ylabel, ) plt.tick_params( axis="x", # changes apply to the x-axis which="both", # both major and minor ticks are affected bottom=False, # ticks along the bottom edge are off top=False, # ticks along the top edge are off labelbottom=False, ) # labels along the bottom edge are off plt.sca(axes[2, 1]) self.plot_stats( "inventory_input", pertype=pertype, legend=False, xlabel=False, title=title, ylabel=ylabel, ) plt.tick_params( axis="x", # changes apply to the x-axis which="both", # both major and minor ticks are affected bottom=False, # ticks along the bottom edge are off top=False, # ticks along the top edge are off labelbottom=False, ) # labels along the bottom edge are off plt.sca(axes[3 - int(self.perishable), 0]) self.plot_stats( "sold_quantity", pertype=False, xlabel=True, title=title, ylabel=ylabel, ) plt.sca(axes[3 - int(self.perishable), 1]) self.plot_stats( "trading_price", pertype=False, xlabel=True, title=title, ylabel=ylabel, ) return return super().plot_stats( stats=stats, pertype=pertype, use_sum=use_sum, makefig=makefig, title=title, ylabel=ylabel, xlabel=xlabel, legend=legend, figsize=figsize, ylegend=ylegend, legend_ncols=legend_ncols, )
[docs] class OneShotWorld(SCMLBaseWorld): """Basic oneshot simulation""" pass
[docs] class SCML2020OneShotWorld(OneShotWorld): """Oneshot simulation as used in SCML 2020 competition""" pass
[docs] class SCML2021OneShotWorld(SCML2020OneShotWorld): """Oneshot simulation as used in SCML 2021 competition""" def __init__(self, *args, **kwargs): kwargs["price_multiplier"] = 2.0 kwargs["wide_price_range"] = True kwargs["perishable"] = True super().__init__(*args, **kwargs)
[docs] class SCML2022OneShotWorld(SCML2021OneShotWorld): """Oneshot simulation as used in SCML 2022 competition""" pass
[docs] class SCML2023OneShotWorld(SCML2020OneShotWorld): """Oneshot simulation as used in SCML 2023 competition""" def __init__(self, *args, **kwargs): kwargs["price_multiplier"] = 0.0 kwargs["wide_price_range"] = False super().__init__(*args, **kwargs)
[docs] class SCML2024OneShotWorld(SCML2023OneShotWorld): """Oneshot simulation as used in SCML 2024 competition""" def __init__(self, *args, **kwargs): kwargs["perishable"] = True super().__init__(*args, **kwargs)