Source code for scml.scml2019.world

import copy
import itertools
import logging
import math
import sys
import uuid
from collections import defaultdict
from random import choices, randint, random, sample, shuffle
from typing import (
    Any,
    Callable,
    Collection,
    Dict,
    Iterable,
    List,
    Optional,
    Sequence,
    Set,
    Tuple,
    Type,
    Union,
)

import numpy as np
import yaml
from negmas import Negotiator, NegotiatorMechanismInterface, UtilityFunction
from negmas.events import Event, EventSource
from negmas.helpers import instantiate, unique_name
from negmas.outcomes import Issue
from negmas.situated import (
    Action,
    Agent,
    Breach,
    BreachProcessing,
    Contract,
    Operations,
    TimeInAgreementMixin,
    World,
)

from .agent import SCML2019Agent
from .bank import DefaultBank
from .common import (
    DEFAULT_NEGOTIATOR,
    Factory,
    FactoryState,
    FinancialReport,
    InputOutput,
    Job,
    ManufacturingProfile,
    Process,
    Product,
    SCMLAgreement,
)
from .consumers import Consumer, ConsumptionProfile, JustInTimeConsumer
from .factory_managers.builtins import FactoryManager, GreedyFactoryManager
from .insurance import DefaultInsuranceCompany
from .miners import Miner, MiningProfile, ReactiveMiner

CONSUMER_SIMULATION_PRIORITY = 1
MANAGER_SIMULATION_PRIORITY = 2
MINER_SIMULATION_PRIORITY = 3

__all__ = ["SCML2019World", "Factory"]


def _realin(rng: Union[Tuple[float, float], float]) -> float:
    """
    Selects a random number within a range if given or the input if it was a float

    Args:
        rng: Range or single value

    Returns:

        the real within the given range
    """
    if not isinstance(rng, Iterable):
        return rng
    if abs(rng[1] - rng[0]) < 1e-8:
        return rng[0]
    return rng[0] + random() * (rng[1] - rng[0])


def _intin(rng: Union[Tuple[int, int], int]) -> int:
    """
    Selects a random number within a range if given or the input if it was an int

    Args:
        rng: Range or single value

    Returns:

        the int within the given range
    """
    if isinstance(rng, int):
        return rng
    if rng[0] == rng[1]:
        return rng[0]
    return randint(rng[0], rng[1])


[docs] class SCML2019World(TimeInAgreementMixin, World): """The `SCML2020World` class running a simulation of supply chain management.""" def __init__( self, products: Collection[Product], processes: Collection[Process], factories: List[Factory], consumers: List[Consumer], miners: List[Miner], factory_managers: Optional[List[FactoryManager]] = None, # timing parameters n_steps=100, time_limit=60 * 90, # mechanism params mechanisms: Optional[Dict[str, Dict[str, Any]]] = None, neg_n_steps=20, neg_time_limit=2 * 60, neg_step_time_limit=60, negotiation_speed=21, # bank parameters no_bank=False, minimum_balance=0, interest_rate=0.1, interest_max=0.3, installment_interest=0.2, interest_time_increment=0.02, balance_at_max_interest=None, # loan parameters loan_installments=1, # insurance company parameters no_insurance=False, premium=0.03, premium_time_increment=0.03, premium_breach_increment=0.001, # breach processing max_allowed_breach_level=None, breach_processing=BreachProcessing.VICTIM_THEN_PERPETRATOR, breach_penalty_society=0.1, breach_penalty_society_min=0.0, breach_penalty_victim=0.0, breach_move_max_product=True, # simulation parameters initial_wallet_balances: Optional[int] = None, money_resolution=0.5, default_signing_delay=0, transportation_delay: int = 0, transfer_delay: int = 0, start_negotiations_immediately=False, catalog_profit=0.15, avg_process_cost_is_public=True, catalog_prices_are_public=True, strip_annotations=True, financial_reports_period=10, ignore_negotiated_penalties=False, prevent_cfp_tampering=False, # bankruptcy parameters default_price_for_products_without_one=1, compensation_fraction=0.5, # general parameters compact=False, log_folder=None, log_to_file: bool = False, log_to_screen: bool = False, log_file_level=logging.DEBUG, log_screen_level=logging.ERROR, log_file_name: str = "log.txt", log_ufuns: bool = False, log_negotiations: bool = False, save_mechanism_state_in_contract=False, save_signed_contracts: bool = True, save_cancelled_contracts: bool = True, save_negotiations: bool = True, save_resolved_breaches: bool = True, save_unresolved_breaches: bool = True, ignore_agent_exceptions: bool = False, ignore_contract_execution_exceptions: bool = False, name: str | None = None, **kwargs, ): """ Args: products: processes: factories: consumers: miners: factory_managers: initial_wallet_balances: If not none, all factories will be forced to have this initial wallet balance n_steps: time_limit: negotiation_speed: neg_n_steps: neg_time_limit: minimum_balance: interest_rate: interest_max: max_allowed_breach_level: catalog_profit: avg_process_cost_is_public: catalog_prices_are_public: breach_processing: breach_penalty_society: breach_penalty_society_min: breach_penalty_victim: breach_move_max_product: premium: money_resolution: premium_time_increment: premium_breach_increment: default_signing_delay: transportation_delay: loan_installments: strip_annotations: If true, annotations for all negotiations will be stripped from any information other than the following: partners, seller, buyer, cfp log_file_name: name: """ if compact: kwargs["event_file_name"] = None kwargs["event_types"] = [] save_mechanism_state_in_contract = False log_file_level = logging.ERROR log_negotiations = False log_ufuns = False log_screen_level = logging.CRITICAL save_cancelled_contracts = ( save_resolved_breaches ) = save_negotiations = False self.compact = compact super().__init__( bulletin_board=None, n_steps=n_steps, time_limit=time_limit, negotiation_speed=negotiation_speed, neg_n_steps=neg_n_steps, neg_time_limit=neg_time_limit, breach_processing=breach_processing, log_to_file=log_to_file, awi_type="scml.scml2019.SCMLAWI", default_signing_delay=default_signing_delay, start_negotiations_immediately=start_negotiations_immediately, neg_step_time_limit=neg_step_time_limit, mechanisms={"negmas.sao.SAOMechanism": {}} if mechanisms is None else mechanisms, log_to_screen=log_to_screen, log_file_level=log_file_level, log_screen_level=log_screen_level, log_negotiations=log_negotiations, name=name, save_signed_contracts=save_signed_contracts, save_negotiations=save_negotiations, save_cancelled_contracts=save_cancelled_contracts, save_resolved_breaches=save_resolved_breaches, save_unresolved_breaches=save_unresolved_breaches, log_ufuns=log_ufuns, log_folder=log_folder, log_file_name=log_file_name, ignore_agent_exceptions=ignore_agent_exceptions, ignore_contract_execution_exceptions=ignore_contract_execution_exceptions, batch_signing=False, no_logs=compact, force_signing=False, operations=( Operations.StatsUpdate, Operations.Negotiations, Operations.ContractSigning, Operations.AgentSteps, Operations.ContractExecution, Operations.ContractSigning, Operations.StatsUpdate, ), **kwargs, ) TimeInAgreementMixin.init(self, time_field="time") self.prevent_cfp_tampering = prevent_cfp_tampering self.ignore_negotiated_penalties = ignore_negotiated_penalties self.compensation_fraction = compensation_fraction self.save_mechanism_state_in_contract = save_mechanism_state_in_contract self.default_price_for_products_without_one = ( default_price_for_products_without_one ) self.agents: Dict[str, SCML2019Agent] = {} # type: ignore just to help static type checkers if balance_at_max_interest is None: balance_at_max_interest = initial_wallet_balances self.strip_annotations = strip_annotations self.bulletin_board.register_listener(event_type="new_record", listener=self) self.bulletin_board.register_listener( event_type="will_remove_record", listener=self ) self.bulletin_board.add_section("cfps") self.bulletin_board.add_section("products") self.bulletin_board.add_section("processes") self.bulletin_board.add_section("bankruptcy") self.bulletin_board.add_section("reports_time") self.bulletin_board.add_section("reports_agent") self.minimum_balance = minimum_balance self.money_resolution = money_resolution self.transportation_delay = transportation_delay self.breach_penalty_society = breach_penalty_society self.breach_move_max_product = breach_move_max_product self.breach_penalty_society_min = breach_penalty_society_min self.penalties = 0.0 self.financial_reports_period = financial_reports_period self.max_allowed_breach_level = max_allowed_breach_level self.catalog_profit = catalog_profit self.loan_installments = loan_installments self.breach_penalty_victim = breach_penalty_victim self.bulletin_board.record( section="settings", key="breach_penalty_society", value=breach_penalty_society, ) self.bulletin_board.record( section="settings", key="breach_penalty_victim", value=breach_penalty_victim ) self.bulletin_board.record( section="settings", key="immediate_negotiations", value=start_negotiations_immediately, ) self.bulletin_board.record( section="settings", key="negotiation_speed_multiple", value=negotiation_speed, ) self.bulletin_board.record( section="settings", key="negotiation_n_steps", value=neg_n_steps ) self.bulletin_board.record( section="settings", key="negotiation_step_time_limit", value=neg_step_time_limit, ) self.bulletin_board.record( section="settings", key="negotiation_time_limit", value=neg_time_limit ) self.bulletin_board.record( section="settings", key="transportation_delay", value=transportation_delay ) self.bulletin_board.record( section="settings", key="default_signing_delay", value=default_signing_delay ) self.bulletin_board.record( section="settings", key="breach_penalty_society_min", value=breach_penalty_society_min, ) self.bulletin_board.record( section="settings", key="financial_reports_period", value=financial_reports_period, ) self.bulletin_board.record( section="settings", key="transfer_delay", value=transfer_delay ) self.bulletin_board.record(section="settings", key="n_steps", value=n_steps) self.bulletin_board.record( section="settings", key="time_limit", value=time_limit ) self.avg_process_cost_is_public = avg_process_cost_is_public self.catalog_prices_are_public = catalog_prices_are_public self.initial_wallet_balances = initial_wallet_balances self.products: List[Product] = [] self.processes: List[Process] = [] self.factory_managers: List[FactoryManager] = [] self.miners: List[Miner] = [] self.consumers: List[Consumer] = [] self.set_products(products) self.set_processes(processes) self.factories = factories for factory in self.factories: factory.attach_to_world(self) self.set_miners(miners) self.set_consumers(consumers) self.set_factory_managers(factory_managers) self._report_receivers: Dict[str, Set[SCML2019Agent]] = defaultdict(set) # self._remove_processes_not_used_by_factories() # self._remove_products_not_used_by_processes() if catalog_prices_are_public or avg_process_cost_is_public: self._update_dynamic_product_process_info() if initial_wallet_balances is not None: for factory in self.factories: factory._wallet = initial_wallet_balances self.f2a: Dict[str, SCML2019Agent] = {} self.a2f: Dict[str, Factory] = {} for factory, agent in zip(self.factories, self.factory_managers): self.f2a[factory.id] = agent self.a2f[agent.id] = factory for agent in self.consumers: factory = Factory( initial_storage={}, initial_wallet=0, profiles=[], min_balance=-np.inf ) factories.append(factory) self.f2a[factory.id] = agent self.a2f[agent.id] = factory for agent in self.miners: factory = Factory( initial_storage={}, initial_wallet=0.0, profiles=[], min_storage=-np.inf ) factories.append(factory) self.f2a[factory.id] = agent self.a2f[agent.id] = factory self.__interested_agents: List[List[SCML2019Agent]] = [[]] * len(self.products) self.n_new_cfps = 0 self.__n_nullified = 0 self.__n_bankrupt = 0 self._transport: Dict[int, List[Tuple[SCML2019Agent, int, int]]] = defaultdict( list ) self._transfer: Dict[int, List[Tuple[SCML2019Agent, float]]] = defaultdict(list) self.transfer_delay = transfer_delay self._n_production_failures = 0 self.bank = DefaultBank( minimum_balance=minimum_balance, interest_rate=interest_rate, interest_max=interest_max, balance_at_max_interest=balance_at_max_interest, installment_interest=installment_interest, time_increment=interest_time_increment, a2f=self.a2f, disabled=no_bank, name="bank", ) self.join(self.bank, simulation_priority=-1) self.insurance_company = DefaultInsuranceCompany( premium=premium, disabled=no_insurance, premium_breach_increment=premium_breach_increment, premium_time_increment=premium_time_increment, a2f=self.a2f, name="insurance_company", ) self.join(self.insurance_company) self.traders = {} for agent in itertools.chain( self.miners, self.consumers, self.factory_managers ): # type: ignore agent.init_() self.traders[agent.id] = agent # self.standing_jobs: Dict[int, List[Tuple[Factory, Job]]] = defaultdict(list)
[docs] def join(self, x: "Agent", simulation_priority: int = 0): """Add an agent to the world. Args: x: The agent to be registered simulation_priority: The simulation priority. Entities with lower priorities will be stepped first during Returns: """ super().join(x=x, simulation_priority=simulation_priority)
[docs] def save_config(self, file_name: str) -> None: d = {k: v for k, v in self.__dict__.items()} d["factory_manager_types"] = { manager.id: manager.short_type_name for manager in self.factory_managers } with open(file_name, "w") as file: yaml.safe_dump(d, file)
[docs] def assign_managers( self, factory_managers=Iterable[Union[str, Type[FactoryManager], FactoryManager]], params: Optional[Iterable[Dict[str, Any]]] = None, ) -> None: """ Assigns existing factories to new factory managers created from the given types and parameters or manager objects. Args: factory_managers: An iterable of `FactoryManager` objects type names or `FactoryManager` types to assign to params: parameters of the newly created managers Remarks: - factories are assigned in the same order they exist in the local `factories` attribute cycling through the input managers or types/params - If a `FactoryManager` object is given instead of a type or a string in the `factory_managers` collection, and the number of `factory_managers` is less than the number of factories in the world causing this object to cycle for more than one factory, it is assigned to the first such factory but then deep copies of it with new ids and names are assigned to the rest of the factories. That ensures that each manager has exactly one factory and that all factories are assigned exactly one unique manager. """ if params is None: params = [dict()] # todo add an exit function to agents and call it as they are leaving the world self.factory_managers, self.a2f, self.f2a = [], {}, {} for factory, (manager_type, manager_params) in zip( self.factories, itertools.cycle(zip(factory_managers, itertools.cycle(params))), ): if isinstance(manager_type, FactoryManager): manager = manager_type if manager.id in self.a2f.keys(): manager = copy.deepcopy(manager) manager.id = uuid.uuid4() manager.name = unique_name( manager.name, add_time=False, rand_digits=4 ) else: manager = instantiate(manager_type, **manager_params) self.join(manager, simulation_priority=MANAGER_SIMULATION_PRIORITY) self.factory_managers.append(manager) self.a2f[manager.id] = factory self.f2a[factory.id] = manager manager.init_()
@classmethod
[docs] def random_small( cls, n_production_levels: int = 1, n_factories: int = 10, factory_kwargs: Dict[str, Any] = None, miner_kwargs: Dict[str, Any] = None, consumer_kwargs: Dict[str, Any] = None, **kwargs, ): return cls.random( n_raw_materials=3, raw_material_price=(1, 3), n_final_products=3, n_production_levels=n_production_levels, n_products_per_level=4, n_processes_per_level=2, n_inputs_per_process=(1, 2), bias_toward_last_level_products=1.0, quantity_per_input=1, quantity_per_output=1, process_relative_cost=0.15, n_outputs_per_process=1, n_lines=2, lines_are_similar=True, n_processes_per_line=None, cost_for_line=(1.0, 5.0), n_production_steps=(1, 5), n_factories=n_factories, n_consumers=1, n_products_per_consumer=None, n_miners=1, n_products_per_miner=None, factory_kwargs=factory_kwargs, miner_kwargs=miner_kwargs, consumer_kwargs=consumer_kwargs, **kwargs, )
@classmethod
[docs] def chain_world( cls, n_intermediate_levels=0, n_miners=5, n_factories_per_level=5, n_consumers: Union[int, Tuple[int, int], List[int]] = 5, n_steps=100, n_lines_per_factory=10, n_max_assignable_factories=None, log_file_name: str = None, agent_names_reveal_type: bool = False, negotiator_type: str = DEFAULT_NEGOTIATOR, miner_type: Union[str, Type[Miner]] = ReactiveMiner, consumer_type: Union[str, Type[Consumer]] = JustInTimeConsumer, max_storage: int = sys.maxsize, default_manager_params: Dict[str, Any] = None, miner_kwargs: Dict[str, Any] = None, consumption: Union[int, Tuple[int, int]] = (0, 5), consumer_kwargs: Dict[str, Any] = None, negotiation_speed: Optional[int] = 21, manager_types: Sequence[Type[FactoryManager]] = (GreedyFactoryManager,), manager_params: Optional[Sequence[Dict[str, Any]]] = None, n_default_per_level: int = 0, default_factory_manager_type: Type[FactoryManager] = GreedyFactoryManager, randomize: bool = True, initial_wallet_balances=1000, process_cost: Union[float, Tuple[float, float]] = (1.0, 5.0), process_time: Union[int, Tuple[int, int]] = 1, interest_rate=float("inf"), interest_max=float("inf"), shared_profile_per_factory=False, **kwargs, ): """ Creates a very small world in which only one raw material and one final product. The production graph is a series with `n_intermediate_levels` intermediate levels between the single raw material and single final product Args: n_max_assignable_factories: The maximum number of factories assigned to managers other than the default randomize: If true, the factory assignment is randomized n_default_per_level: The number of `GreedyFactoryManager` objects guaranteed at every level default_factory_manager_type: The `FactoryManager` type to use as the base for default_factory_managers. You can specify how many of this type exist at every level by specifying `n_default_per_level`. If `n_default_per_level` is zero, this parameter has no effect. manager_types: A sequence of factory manager types to control the factories. manager_params: An optional sequence of dictionaries giving the parameters to pass to `manager_types`. consumer_type: Consumer type to use for all consumers miner_type: Miner type to use for all miners consumption: Consumption schedule n_intermediate_levels: The number of intermediate products n_miners: number of miners of the single raw material n_factories_per_level: number of factories at every production level n_consumers: number of consumers of the final product n_steps: number of simulation steps n_lines_per_factory: number of lines in each factory process_cost: The range of process costs. A uniform distribution will be used process_time: The range of process times. A uniform distribution will be used log_file_name: File name to store the logs agent_names_reveal_type: If true, agent names will start with a snake_case version of their type name negotiator_type: The negotiation factory used to create all negotiators max_storage: maximum storage capacity for all factory managers If None then it is unlimited default_manager_params: keyword arguments to be used for constructing factory managers consumer_kwargs: keyword arguments to be used for constructing consumers miner_kwargs: keyword arguments to be used for constructing miners negotiation_speed: The number of negotiation steps per simulation step. None means infinite interest_max: Maximum interest rate interest_rate: Minimum interest rate initial_wallet_balances: initial wallet balances for all factories shared_profile_per_factory: If true, all lines in the same factory will have the same profile costs kwargs: Any other parameters are just passed to the world constructor Returns: SCML2019World ready to run Remarks: - Every production level n has one process only that takes n steps to complete """ if default_manager_params is None: default_manager_params = {} if consumer_kwargs is None: consumer_kwargs = {} if miner_kwargs is None: miner_kwargs = {} if manager_params is None and len(manager_types) > 0: manager_params = [dict() for _ in range(len(manager_types))] if negotiator_type is not None: for args in (default_manager_params, consumer_kwargs, miner_kwargs): if "negotiator_type" not in args.keys(): args["negotiator_type"] = negotiator_type products = [ Product( id=0, name="p0", catalog_price=1.0, production_level=0, expires_in=0 ) ] processes = [] miners = [ instantiate( miner_type, profiles={products[-1].id: MiningProfile()}, name=f"m_{i}", **miner_kwargs, ) for i in range(n_miners) ] factories, managers = ( [], [None] * ((n_intermediate_levels + 1) * n_factories_per_level), ) def _s(x): return x if x is not None else 0 for level in range(n_intermediate_levels + 1): new_product = Product( name=f"p{level + 1}", catalog_price=None, # keep this to the world to calculate _s(products[-1].catalog_price) + level + 1 production_level=level + 1, id=level + 1, expires_in=0, ) p = Process( name=f"p{level + 1}", inputs=[InputOutput(product=level, quantity=1, step=0.0)], production_level=level + 1, outputs=[InputOutput(product=level + 1, quantity=1, step=1.0)], historical_cost=None, # keep this to the world to calculate level + 1 id=level, ) processes.append(p) products.append(new_product) assignable_factories = [] _DefaultFactoryManager = default_factory_manager_type for level in range(n_intermediate_levels + 1): default_factories = [] for j in range(n_factories_per_level): profiles = [] shared_cost = _realin(process_cost) for k in range(n_lines_per_factory): profiles.append( ManufacturingProfile( n_steps=_intin(process_time), cost=_realin(process_cost) if not shared_profile_per_factory else shared_cost, initial_pause_cost=0, running_pause_cost=0, resumption_cost=0, cancellation_cost=0, line=k, process=processes[level], ) ) factory = Factory( id=f"f{level + 1}_{j}", max_storage=max_storage, profiles=profiles, initial_storage={}, initial_wallet=initial_wallet_balances, ) factories.append(factory) if j >= n_default_per_level and ( n_max_assignable_factories is None or len(assignable_factories) < n_max_assignable_factories ): assignable_factories.append(j + level * n_factories_per_level) else: default_factories.append(j + level * n_factories_per_level) for j, indx in enumerate(default_factories): manager_name = unique_name(base="_df_", add_time=False, rand_digits=12) manager = _DefaultFactoryManager( name=manager_name, **default_manager_params ) if agent_names_reveal_type: manager.name = f"_df_{manager.short_type_name}@{level + 1}_{j}" managers[indx] = manager if randomize: shuffle(assignable_factories) for j, (index, (params, manager_type)) in enumerate( zip( assignable_factories, itertools.cycle(zip(manager_params, manager_types)), ) ): factory = factories[index] manager_name = ( f"{unique_name(base='', add_time=False, rand_digits=12)}@{factory.id}" ) manager = instantiate(manager_type, name=manager_name, **params) if agent_names_reveal_type: manager.name = f"{manager.short_type_name}@{factory.id[1:]}" managers[index] = manager def create_schedule(): if isinstance(consumption, tuple) and len(consumption) == 2: return np.random.randint( consumption[0], consumption[1], n_steps ).tolist() return consumption consumers = [ instantiate( consumer_type, profiles={ products[-1].id: ConsumptionProfile(schedule=create_schedule()) }, name=f"c_{i}", **consumer_kwargs, ) for i in range(n_consumers) ] return SCML2019World( products=products, processes=processes, factories=factories, consumers=consumers, miners=miners, n_steps=_intin(n_steps), factory_managers=managers, initial_wallet_balances=initial_wallet_balances, interest_rate=interest_rate, interest_max=interest_max, log_file_name=log_file_name, negotiation_speed=negotiation_speed, **kwargs, )
@classmethod
[docs] def random( cls, n_raw_materials: Union[int, Tuple[int, int]] = (5, 10), raw_material_price: Union[float, Tuple[float, float]] = (1.0, 30.0), n_final_products: Union[int, Tuple[int, int]] = (3, 5), n_production_levels: Union[int, Tuple[int, int]] = (3, 5), n_products_per_level: Union[int, Tuple[int, int]] = (3, 5), n_processes_per_level: Union[int, Tuple[int, int]] = (6, 10), n_inputs_per_process: Union[int, Tuple[int, int]] = (2, 5), bias_toward_last_level_products: float = 0.0, quantity_per_input: Union[int, Tuple[int, int]] = (1, 10), input_step: Union[float, Tuple[float, float]] = 0.0, quantity_per_output: Union[int, Tuple[int, int]] = (1, 1), output_step: Union[float, Tuple[float, float]] = 1.0, process_relative_cost: Union[float, Tuple[float, float]] = (0.05, 0.4), n_outputs_per_process: Union[int, Tuple[int, int]] = (1, 1), n_lines: Union[int, Tuple[int, int]] = (3, 5), lines_are_similar: bool = False, n_processes_per_line: Union[int, Tuple[int, int]] = None, cost_for_line: Union[float, Tuple[float, float]] = (5.0, 50.0), n_production_steps: Union[int, Tuple[int, int]] = (2, 10), max_storage: Union[int, Tuple[int, int]] = 2000, n_factories: Union[int, Tuple[int, int]] = 20, n_consumers: Union[int, Tuple[int, int]] = 5, n_products_per_consumer: Union[int, Tuple[int, int]] = None, n_miners: Union[int, Tuple[int, int]] = 5, n_products_per_miner: Optional[Union[int, Tuple[int, int]]] = None, factory_manager_types: Union[ Type[FactoryManager], List[Type[FactoryManager]] ] = GreedyFactoryManager, consumer_types: Union[ Type[Consumer], List[Type[Consumer]] ] = JustInTimeConsumer, miner_types: Union[Type[Miner], List[Type[Miner]]] = ReactiveMiner, negotiator_type=DEFAULT_NEGOTIATOR, initial_wallet_balance: Union[float, Tuple[float, float]] = 1000, factory_kwargs: Dict[str, Any] = None, miner_kwargs: Dict[str, Any] = None, consumer_kwargs: Dict[str, Any] = None, **kwargs, ): """ Creates a random SCML scenario with adjustable parameters. Args: n_raw_materials: Number of raw materials. Can be a value or a range. raw_material_price: Catalog prices for raw materials. Can be a value or a range. n_final_products: Number of final products. Can be a value or a range. n_production_levels: How deep is the production graph (number of intermediate products). Can be a value or a range. n_products_per_level: How many intermediate products per intermediate level. Can be a value or a range. n_processes_per_level: Number of processes in intermediate levels. Can be a value or a range. n_inputs_per_process: Number of inputs per process. Can be a value or a range. bias_toward_last_level_products: How biased are production processes toward using products from the last level below them (0 means not bias, 1 means only sample from this last level). A value between 0 and 1. quantity_per_input: How many items are needed for each input to a process. Can be a value or a range. input_step: When are inputs consumed during the production process. Can be a value or a range. Default 0 quantity_per_output: How many items are produced per output. Can be a value or a range. output_step: When are outputs created during the production process. Can be a value or a range. Default 1 process_relative_cost: Intrinsic relative cost of processes [Outputs will be produced at a cost of sum(input costs) * (1 + process_relative_cost)]. Can be a value or a range. n_outputs_per_process: Number of outputs per process. Can be a value or a range. n_lines: Number of lines per factory. Can be a value or a range. lines_are_similar: If true then all lins of the same factory will have the same production processes. n_processes_per_line: Number of processes that can be run on each line per factory. Can be a value or a range. cost_for_line: Cost for running a process on a line. Can be a value or a range. n_production_steps: Number of production steps per line. Can be a value or a range. max_storage: Maximum storage per factory. Can be a value or a range. n_factories: Number of factories. Can be a value or a range. n_consumers: Number of consumers. Can be a value or a range. n_products_per_consumer: Number of products per miner. If None then all final products will be assigned to every customer. Can be a value or a range. n_miners: Number of miners. Can be a value or a range. n_products_per_miner: Number of products per miner. If None then all raw materials will be assigned to every miner. Can be a value or a range. factory_manager_types: A callable for creating factory managers for the factories consumer_types: A callable for creating `Consumer` objects miner_types: A callable for creating `Miner` objects negotiator_type: A string that can be `eval`uated to a negotiator. initial_wallet_balance: The initial balance of all wallets factory_kwargs: keyword arguments to be used for constructing factory managers consumer_kwargs: keyword arguments to be used for constructing consumers miner_kwargs: keyword arguments to be used for constructing miners **kwargs: Returns: `SCML2019World` The random world generated Remarks: - Most parameters accept either a single value or a 2-valued tuple. In the later case, it will sample a value within the range specified by the tuple (low, high) inclusive. For example the number of lines (n_lines) follows this pattern """ if factory_kwargs is None: factory_kwargs = {} if consumer_kwargs is None: consumer_kwargs = {} if miner_kwargs is None: miner_kwargs = {} if negotiator_type is not None: for args in (factory_kwargs, consumer_kwargs, miner_kwargs): if "negotiator_type" not in args.keys(): args["negotiator_type"] = negotiator_type def _sample_product( products: list, old_products: list, last_level_products: list, k: int ): if bias_toward_last_level_products < 1e-7: return sample(products, k=min(k, len(products))) elif bias_toward_last_level_products > 1 - 1e-7: return sample(last_level_products, min(k, len(last_level_products))) n_old, n_last = len(old_products), len(last_level_products) p_old = n_old / (n_old + n_last / bias_toward_last_level_products) if random() < p_old: return sample(old_products, min(k, len(old_products))) else: return sample(last_level_products, min(k, len(last_level_products))) products = [ Product( name=f"r_{ind}", catalog_price=_realin(raw_material_price), production_level=0, expires_in=0, id=ind, ) for ind in range(_intin(n_raw_materials)) ] raw_materials = products.copy() last_level_products = products # last level of products old_products: List[Product] = [] # products not including last level processes: List[Process] = [] def _adjust_level_of_production(new_products, new_processes): product_prices: Dict[Product, List[float]] = defaultdict( list ) # will keep the costs for generating products for process in new_processes: process.inputs = { InputOutput( product=_.index, quantity=_intin(quantity_per_input), step=_realin(input_step), ) for _ in _sample_product( products=products, old_products=old_products, last_level_products=last_level_products, k=_intin(n_inputs_per_process), ) } process.outputs = { InputOutput( product=_.index, quantity=_intin(quantity_per_output), step=_realin(output_step), ) for _ in sample(new_products, _intin(n_outputs_per_process)) } process.historical_cost = sum( products[_.product].catalog_price * _.quantity for _ in process.inputs ) process.historical_cost *= 1 + _realin(process_relative_cost) for output in process.outputs: product_prices[products[output.product]].append( process.historical_cost ) new_products = [_ for _ in product_prices.keys()] for product in new_products: product.catalog_price = sum(product_prices[product]) / len( product_prices[product] ) return new_products, new_processes n_levels = _intin(n_production_levels) if n_levels > 0: for level in range(n_levels): new_products = [ Product( name=f"intermediate_{level}_{ind}", production_level=level + 1, id=len(products) + ind, catalog_price=0, expires_in=0, ) for ind in range(_intin(n_products_per_level)) ] new_processes = [ Process( name=f"process_{level}_{ind}", production_level=level + 1, id=len(processes) + ind, historical_cost=0, inputs=set(), outputs=set(), ) for ind in range(_intin(n_processes_per_level)) ] new_products, new_processes = _adjust_level_of_production( new_products, new_processes ) products += new_products old_products += last_level_products last_level_products = new_products processes += new_processes final_products = [ Product( name=f"f_{ind}", production_level=n_levels + 1, id=len(products) + ind, catalog_price=0, expires_in=0, ) for ind in range(_intin(n_final_products)) ] new_processes = [ Process( name=f"process_final_{ind}", production_level=n_levels + 1, id=len(processes) + ind, historical_cost=0, inputs=set(), outputs=set(), ) for ind in range(_intin(n_processes_per_level)) ] final_products, new_processes = _adjust_level_of_production( final_products, new_processes ) products += final_products processes += new_processes else: final_products = raw_materials if n_processes_per_line is None: n_processes_per_line = len(processes) n_factories = _intin(n_factories) factories = [] for i in range(n_factories): profiles = [] if lines_are_similar: line_processes = sample(processes, _intin(n_processes_per_line)) profiles.extend( [ ManufacturingProfile( n_steps=_intin(n_production_steps), cost=_realin(cost_for_line), initial_pause_cost=0, running_pause_cost=0, resumption_cost=0, cancellation_cost=0, line=i, process=_, ) for _ in line_processes ] ) else: for _ in range(_intin(n_lines)): line_processes = sample(processes, _intin(n_processes_per_line)) profiles.extend( [ ManufacturingProfile( n_steps=_intin(n_production_steps), cost=_realin(cost_for_line), initial_pause_cost=0, running_pause_cost=0, resumption_cost=0, cancellation_cost=0, line=i, process=_, ) for _ in line_processes ] ) factories.append( Factory( profiles=profiles, max_storage=_intin(max_storage), initial_storage={}, initial_wallet=_realin(initial_wallet_balance), ) ) def _ensure_list(x): if isinstance(x, Iterable): return list(x) else: return [x] miner_types_list, consumer_types_list, factory_manager_types_list = ( _ensure_list(_) for _ in (miner_types, consumer_types, factory_manager_types) ) factory_managers = [ current(**factory_kwargs) for current in choices(factory_manager_types_list, k=n_factories) ] miners = [ current(**miner_kwargs) for current in choices(miner_types_list, k=_intin(n_miners)) ] if n_products_per_miner is None: n_products_per_miner = len(raw_materials) if n_products_per_consumer is None: n_products_per_consumer = len(final_products) n_products_per_miner = min(n_products_per_miner, len(raw_materials)) n_products_per_consumer = min(n_products_per_consumer, len(final_products)) for miner in miners: _n = _intin(n_products_per_miner) mining_profiles = dict( zip( (_.index for _ in sample(raw_materials, _n)), [MiningProfile.random() for _ in range(_n)], ) ) miner.set_profiles(mining_profiles) consumers = [ current(**consumer_kwargs) for current in choices(consumer_types_list, k=_intin(n_consumers)) ] for consumer in consumers: _n = _intin(n_products_per_consumer) consumer_profiles = dict( zip( (_.index for _ in sample(final_products, _n)), [ConsumptionProfile.random() for _ in range(_n)], ) ) consumer.set_profiles(consumer_profiles) return SCML2019World( products=products, processes=processes, factories=factories, consumers=consumers, miners=miners, factory_managers=factory_managers, initial_wallet_balances=None, **kwargs, )
[docs] def _update_dynamic_product_process_info(self): """Updates the catalog prices of all products based on the prices of their inputs""" # for process in self.processes: # for current in process.inputs: # self.products[current.product].consuming_processes.add((process, current.quantity)) # for current in process.outputs: # self.products[current.product].generating_processes.add((process, current.quantity)) # noinspection PyUnusedLocal def _or_for_none(a, b): return (a is None and b) or a or b product_costs: Dict[Product, List[float]] = defaultdict(list) process_costs: Dict[Process, List[float]] = defaultdict(list) producers: Dict[Product, List[Tuple[Process, int]]] = defaultdict(list) consumers: Dict[Product, List[Tuple[Process, int]]] = defaultdict(list) def production_record(process_: Process, product_: Product): for output_ in process_.outputs: if output_.product == product_.id: return process_, output_.quantity return None def consumption_record(process_: Process, product_: Product): for input_ in process_.inputs: if input_.product == product_.id: return process_, input_.quantity # sort products and processes by the production level. We assume here that production levels are well behaved products = sorted(self.products, key=lambda x: x.production_level) processes = sorted(self.processes, key=lambda x: x.production_level) # find the producers and consumers for every product. This builds the production graph and its inverse for product in products: for process in processes: precord = production_record(process, product) if precord is not None: producers[product].append(precord) crecord = consumption_record(process, product) if crecord is not None: consumers[product].append(crecord) # find the average manufacturing profile cost for every process for factory in self.factories: for profile_index, profile in enumerate(factory.profiles): process = profile.process if self.avg_process_cost_is_public: process_costs[process].append(profile.cost) process_costs_avg = { k: sum(v) / len(v) if len(v) > 0 else math.inf for k, v in process_costs.items() } # loop over all products finding the processes that can produce it and add a new cost example for this product for product in products: for process, quantity in producers[product]: if quantity == 0: continue # find the total input cost for a process that can produce the current product input_cost = 0.0 for input_ in process.inputs: iproduct = self.products[input_.product] if iproduct.catalog_price is None: if ( self.catalog_prices_are_public or self.avg_process_cost_is_public ): raise ValueError( f"Catalog prices should be public but product {product} is needed for process {process}" f" without a catalog price" ) return input_cost += iproduct.catalog_price * input_.quantity # append a new unit price for this product product_costs[product].append( (input_cost + process_costs_avg[process]) / quantity ) # now we have the product cost for all processes producing this product. Average them if ( product_costs.get(product, None) is None or len(product_costs[product]) < 1 ): continue avg_cost = sum(product_costs[product]) / len(product_costs[product]) if product.catalog_price is None: product.catalog_price = avg_cost * (1 + self.catalog_profit) # update the historical cost for processes if needed if self.avg_process_cost_is_public: for process in self.processes: if process.historical_cost is None: process.historical_cost = process_costs_avg[process] else: for process in self.processes: process.historical_cost = None # update the catalog prices by averaging all possible ways to create any product. if not self.catalog_prices_are_public: for product in self.products: product.catalog_price = None if self.money_resolution is not None: for product in self.products: if product.catalog_price is not None: product.catalog_price = ( math.floor(product.catalog_price / self.money_resolution) * self.money_resolution ) for process in self.processes: if process.historical_cost is not None: process.historical_cost = ( math.floor(process.historical_cost / self.money_resolution) * self.money_resolution )
[docs] def set_consumers(self, consumers: List[Consumer]): self.consumers = consumers for f in consumers: self.join(f, simulation_priority=CONSUMER_SIMULATION_PRIORITY)
[docs] def set_miners(self, miners: List[Miner]): self.miners = miners for f in miners: self.join(f, simulation_priority=MINER_SIMULATION_PRIORITY)
[docs] def set_factory_managers(self, factory_managers: Optional[List[FactoryManager]]): if factory_managers is None: factory_managers = [] self.factory_managers = factory_managers for f in factory_managers: self.join(f, simulation_priority=MANAGER_SIMULATION_PRIORITY)
[docs] def set_processes(self, processes: Collection[Process]): if processes is None: self.processes = [] else: self.processes = list(processes) for v in self.processes: self.bulletin_board.record("processes", key=str(v), value=v)
[docs] def set_products(self, products: Collection[Product]): if products is None: self.products = [] else: self.products = list(products) for v in self.products: self.bulletin_board.record("products", key=str(v), value=v)
[docs] def order_contracts_for_execution(self, contracts: Collection[Contract]): def order(x: Contract): o = self.products[x.annotation["cfp"].product].production_level if o: return o return 0 return sorted(contracts, key=order)
[docs] def execute_action( self, action: Action, agent: "Agent", callback: Callable[[Action, bool], Any] = None, ) -> bool: if not isinstance(agent, FactoryManager): if callback is not None: callback(action, False) self.logerror( f"{str(action)} received from {agent.id} which is {agent.__class__.__name__} not a FactoryManager" ) return False factory = self.a2f[agent.id] if factory is None: if callback is not None: callback(action, False) self.logerror(f"{str(action)} from {agent.id}: Unknown factory") return False if action.type == "hide_funds": factory.hide_funds(amount=action.params.get("amount", 0.0)) return True elif action.type == "unhide_funds": factory.unhide_funds(amount=action.params.get("amount", 0.0)) return True elif action.type in ("hide_product", "hide_products", "hide_inventory"): factory.hide_product( product=action.params.get("product", -1), quantity=action.params.get("quantity", 0), ) elif action.type in ("unhide_product", "unhide_products", "unhide_inventory"): factory.unhide_product( product=action.params.get("product", -1), quantity=action.params.get("quantity", 0), ) elif action.type not in ("run", "start", "stop", "pause", "resume"): raise ValueError(f"Unknown action {action.type} received {str(action)}") line, profile_index = ( action.params.get("line", None), action.params.get("profile", None), ) t = action.params.get("time", None) contract = action.params.get("contract", None) time = action.params.get("time", None) override = action.params.get("override", True) if ( (profile_index is None and line is None) or time is None or time < 0 or time > self.n_steps - 1 ): if callback is not None: callback(action, False) self.logerror( f"{str(action)} from {agent.id}: Neither profile index nor line is given or invalid time" ) return False if profile_index is not None: profile = factory.profiles[profile_index] if line is not None and profile.line != line: if callback is not None: callback(action, False) self.logerror( f"{str(action)} from {agent.id}: profile's line {profile.line} != given line {line}" ) return False line = profile.line job = Job( action=action.type, profile=profile_index, line=line, time=t, contract=contract, override=override, ) factory.schedule(job=job, override=override) if callback is not None: self.logdebug(f"{str(action)} from {agent.id}: Executed successfully") callback(action, True) return True
[docs] def get_private_state(self, agent: "Agent") -> FactoryState: f = self.a2f[agent.id] return FactoryState( storage=f.storage, wallet=f.wallet, loans=f.loans, max_storage=f.max_storage, line_schedules=f.line_schedules, hidden_money=f.hidden_money, hidden_storage=f.hidden_storage, n_lines=f.n_lines, profiles=f.profiles, next_step=f.next_step, commands=f.commands, jobs=f.jobs, )
[docs] def receive_financial_reports( self, agent: SCML2019Agent, receive: bool, agents: Optional[List[str]] ): """Registers interest/disinterest in receiving financial reports""" if agents is None: agents = self.agents.keys() if receive: for aid in agents: self._report_receivers[aid].add(agent) else: for aid in agents: self._report_receivers[aid].discard(agent)
[docs] def simulation_step(self, stage): """A step of SCML simulation""" # publish financial reports # ------------------------- if self.current_step % self.financial_reports_period == 0: reports_agent = self.bulletin_board.data["reports_agent"] reports_time = self.bulletin_board.data["reports_time"] for agent in self.agents.values(): factory = self.a2f.get(agent.id, None) if factory is None: continue try: inventory = sum( self.products[product].catalog_price * quantity for product, quantity in factory.storage.items() ) except ArithmeticError: inventory = None report = FinancialReport( agent=agent.id, step=self.current_step, cash=factory.wallet, liabilities=factory.loans, inventory=inventory, credit_rating=self.bank.credit_rating(agent.id), ) repstr = str(report).replace("\n", " ") self.logdebug(f"{agent.name}: {repstr}") if reports_agent.get(agent.id, None) is None: reports_agent[agent.id] = [] reports_agent[agent.id].append(report) if reports_time.get(self.current_step, None) is None: reports_time[self.current_step] = {} reports_time[self.current_step][agent.id] = report for receiver in self._report_receivers[agent.id]: receiver.on_new_report(report) # run standing jobs # ----------------- # for factory, job in self.standing_jobs.get(self.current_step, []): # factory.schedule_job(job=job, end=self.n_steps) # run factories # ------------- for factory in self.factories: manager = self.f2a[factory.id] reports = factory.step() if isinstance(manager, FactoryManager): self.logdebug( f"{manager.name} (Factory {factory.id}): money={factory.wallet}" f", storage={str(dict(factory.storage))}" f", loans={factory.loans}" ) nonempty = [] for report in reports: if not report.is_empty: self.logdebug(f"PRODUCTION>> {manager.name}: {str(report)}") if report.finished: nonempty.append(report) failures = [] for report in reports: if report.failed: failures.append(report.failure) if len(failures) > 0: manager.on_production_failure(failures=failures) else: manager.on_production_success(nonempty) else: self.logdebug( f"{manager.name}: money={factory.wallet}" f", storage={str(dict(factory.storage))}" ) # finish transportation and money transfer # ----------------------------------------- transports = self._transport.get(self.current_step, []) for transport in transports: manager, product_id, q = transport self.a2f[manager.id].transport_to(product_id, q) manager.on_inventory_change(product_id, q, "transport") transfers = self._transfer.get(self.current_step, []) for transfer in transfers: manager, money = transfer self.a2f[manager.id].receive(money) manager.on_cash_transfer(money, "transfer") # remove expired CFPs # ------------------- cfps = self.bulletin_board.query(section="cfps", query=None) if cfps is not None: # new_cfps = dict() toremove = [] for key, cfp in cfps.items(): # we remove CFP with a max_time less than *or equal* to current step as all processing for current step # should already be complete by now if cfp.max_time <= self.current_step: toremove.append(key) for key in toremove: self.bulletin_board.remove(section="cfps", query=key, query_keys=True)
[docs] def pre_step_stats(self): # noinspection PyProtectedMember cfps = self.bulletin_board._data["cfps"] self._stats["n_cfps_on_board_before"].append(len(cfps) if cfps else 0) self._n_production_failures = 0 self.__n_nullified = 0 self.__n_bankrupt = 0
[docs] def post_step_stats(self): """Saves relevant stats""" self._stats["n_cfps"].append(self.n_new_cfps) self.n_new_cfps = 0 # noinspection PyProtectedMember cfps = self.bulletin_board._data["cfps"] self._stats["n_cfps_on_board_after"].append(len(cfps) if cfps else 0) self._stats["n_contracts_nullified_now"].append(self.__n_nullified) self._stats["n_bankrupt"].append(self.__n_bankrupt) market_size = 0 self._stats["_balance_bank"].append(self.bank.wallet) self._stats["_balance_society"].append(self.penalties) self._stats["_balance_insurance"].append(self.insurance_company.wallet) self._stats["_storage_insurance"].append( sum(self.insurance_company.storage.values()) ) internal_market_size = ( self.bank.wallet + self.penalties + self.insurance_company.wallet ) for a in itertools.chain(self.miners, self.consumers, self.factory_managers): self._stats[f"balance_{a.name}"].append(self.a2f[a.id].balance) self._stats[f"storage_{a.name}"].append(self.a2f[a.id].total_storage) market_size += self.a2f[a.id].balance self._stats["market_size"].append(market_size) self._stats["production_failures"].append( self._n_production_failures / len(self.factories) if len(self.factories) > 0 else np.nan ) self._stats["_market_size_total"].append(market_size + internal_market_size)
[docs] def start_contract_execution(self, contract: Contract) -> Set[Breach]: partners, agreement = ( {self.agents[_] for _ in contract.partners}, contract.agreement, ) cfp = contract.annotation["cfp"] # type: ignore breaches = set() quantity, unit_price = agreement["quantity"], agreement["unit_price"] if quantity < 1 or unit_price < 0.0: self.loginfo( f"Contract with quantity {quantity} and unit price {unit_price} will be ignored: " f"{str(contract)}" ) return breaches if unit_price < 1e-7: self.logdebug(f"Contract with {unit_price} unit_price: {str(contract)}") # find out the values for vicitm and social penalties penalty_victim = ( agreement.get("penalty", None) if not self.ignore_negotiated_penalties else None ) if penalty_victim is not None and self.breach_penalty_victim is not None: # there is a defined penalty, find its value penalty_victim = penalty_victim if penalty_victim is not None else 0.0 penalty_victim += ( self.breach_penalty_victim if self.breach_penalty_victim is not None else 0.0 ) penalty_society = self.breach_penalty_society penalty_value = 0.0 # ask each partner to confirm the execution for partner in partners: if not partner.confirm_contract_execution(contract=contract): self.logdebug( f"{partner.name} refused execution og Contract {contract.id}" ) breaches.add( Breach( contract=contract, perpetrator=partner.id, # type: ignore victims=[_.id for _ in list(partners - {partner})], level=1.0, type="refusal", ) ) if len(breaches) > 0: return breaches # all partners agreed to execute the agreement -> execute it. pind = cfp.product buyer_id, seller_id = ( contract.annotation["buyer"], contract.annotation["seller"], ) buyer, seller = self.agents[buyer_id], self.agents[seller_id] seller_factory, buyer_factory = self.a2f[seller.id], self.a2f[buyer.id] product_breach, money_breach, penalty_breach_victim, penalty_breach_society = ( None, None, None, None, ) money = unit_price * quantity # first we will try to blindly execute the contract and will fall back to the more complex algorithm checking # all possibilities only if that failed try: if money > 0 or quantity > 0: self._move_product( buyer=buyer, seller=seller, quantity=quantity, money=money, product_id=pind, ) else: self.logdebug(f"Contract {str(contract)} has no transfers") return breaches except ValueError: pass # check the seller available_quantity = ( seller_factory.storage.get(pind, 0) if not isinstance(seller, Miner) else quantity ) missing_quantity = max(0, quantity - available_quantity) original_quantity = quantity if missing_quantity > 0: product_breach = missing_quantity / quantity penalty_values = [] for penalty, is_victim in ( (penalty_victim, True), (penalty_society, False), ): if penalty is None: penalty_values.append(0.0) continue # find out how much need to be paid for this penalty penalty_value = penalty * product_breach * quantity * unit_price # society penalty may have a minimum. If so, make sure the penalty value is at least as large as that if not is_victim and self.breach_penalty_society_min is not None: penalty_value = max(penalty_value, self.breach_penalty_society_min) penalty_values.append(penalty_value) # that is how much money is available in the wallet right now. Notice that loans do not count seller_balance = seller_factory.wallet # if the seller needs more than what she has in her wallet to pay the penalty, try a loan # Loans are mandatory for society penalty but the agent can refuse to pay a victim penalty if seller_balance < penalty_value: self.bank.buy_loan( agent=seller, amount=penalty_value - seller_balance, n_installments=self.loan_installments, force=True, beneficiary=buyer, contract=contract, ) for penalty, is_victim, penalty_value in ( (penalty_victim, True, penalty_values[0]), (penalty_society, False, penalty_values[1]), ): if penalty is None: continue # if the seller can pay the penalty then pay it and now there is no breach, otherwise pay as much as # possible. Notice that for society penalties, it should always be the case that the whole penalty # value is available in the wallet by now as loans are forced. paid_for_quantity, paid_penalties = 0, 0.0 if seller_factory.wallet >= penalty_value: # if there is enough money then pay the whole penalty paid_penalties = penalty_value else: # if there is not enough money then pay enough for the maximum number of items that can be paid if unit_price == 0: paid_for_quantity = missing_quantity else: paid_for_quantity = int( math.floor(seller_factory.wallet / unit_price) ) paid_penalties = unit_price * paid_for_quantity missing_quantity_unpaid_for = missing_quantity - paid_for_quantity if is_victim: penalty_breach_victim = missing_quantity_unpaid_for / quantity else: penalty_breach_society = missing_quantity_unpaid_for / quantity # actually pay the payable amount self.logdebug( f'Penalty: {seller.name} paid {paid_penalties} to {buyer.name if is_victim else "society"}' ) seller_factory.pay(paid_penalties) if is_victim: buyer_factory.receive(paid_penalties) # if we agreed on a penalty and the buyer paid it, then clear the product breach quantity -= paid_for_quantity money -= paid_for_quantity * unit_price missing_quantity -= paid_for_quantity if missing_quantity <= 0: product_breach = None else: product_breach = missing_quantity / original_quantity else: # if this is the society penalty, it does not affect the product_breach self.penalties += paid_penalties # pay penalties if there are any. Notice that penalties apply only to to seller. It makes no sense to have a # penalty on the buyer who already have no money to pay the contract value anyway. if penalty_breach_society is not None: breaches.add( Breach( contract=contract, perpetrator=seller.id, victims=[], level=penalty_breach_society, type="penalty_society", step=self.current_step, ) ) if penalty_breach_victim is not None: breaches.add( Breach( contract=contract, perpetrator=seller.id, victims=[buyer.id], level=penalty_breach_victim, type="penalty", step=self.current_step, ) ) # check the buyer available_money = ( buyer_factory.wallet if not isinstance(buyer, Consumer) else money ) missing_money = max(0.0, money - available_money) if missing_money > 0.0: # if the buyer cannot pay, then offer him a loan. The loan is always optional self.bank.buy_loan( agent=buyer, amount=missing_money, n_installments=self.loan_installments, force=False, beneficiary=seller, contract=contract, ) available_money = buyer_factory.wallet missing_money = max(0.0, money - available_money) # if there is still missing money after the loan is offered, then create a breach if missing_money > 0.0: money_breach = missing_money / money if product_breach is not None: # register the breach independent of insurance breaches.add( Breach( contract=contract, perpetrator=seller.id, victims=[buyer.id], level=product_breach, type="product", step=self.current_step, ) ) if money_breach is not None: # register the breach independent of insurance breaches.add( Breach( contract=contract, perpetrator=buyer.id, victims=[seller.id], level=money_breach, type="money", step=self.current_step, ) ) if len(breaches) > 0: return breaches # should never arrive here assert missing_quantity == 0 assert missing_money == 0 if money > 0 or quantity > 0: self._move_product( buyer=buyer, seller=seller, quantity=quantity, money=money, product_id=pind, ) else: self.logdebug(f"Contract {contract.id} has no transfers") return breaches
[docs] def _move_product( self, buyer: SCML2019Agent, seller: SCML2019Agent, product_id: int, quantity: int, money: float, ): """Moves as much product and money between the buyer and seller""" seller_factory, buyer_factory = self.a2f[seller.id], self.a2f[buyer.id] if quantity > 0: seller_factory.transport_from(product_id, quantity) seller.on_inventory_change(product_id, -quantity, "contract") if self.transportation_delay < 1: buyer_factory.transport_to(product_id, quantity) buyer.on_inventory_change(product_id, quantity, "contract") else: self._transport[self.current_step + self.transportation_delay].append( (buyer, product_id, quantity) ) if money > 0: buyer_factory.pay(money) if self.transfer_delay < 1: seller_factory.receive(money) seller.on_cash_transfer(money, "contract") else: self._transfer[self.current_step + self.transfer_delay].append( (seller, money) ) self.logdebug( f"Moved {quantity} units of {self.products[product_id].name} from {seller.name} to {buyer.name} " f"for {money} dollars" )
[docs] def complete_contract_execution( self, contract: Contract, breaches: List[Breach], resolution: Optional[Contract] ): """The resolution can either be None or a contract with the following items: The issues can be any or all of the following: immediate_quantity: int immediate_unit_price: float later_quantity: int later_unit_price: int later_penalty: float later_time: int """ partners, agreement = ( {self.agents[_] for _ in contract.partners}, contract.agreement, ) cfp = contract.annotation["cfp"] # type: ignore quantity, unit_price = agreement["quantity"], agreement["unit_price"] if quantity < 1 and unit_price <= 0.0: return penalty_victim = ( agreement.get("penalty", None) if not self.ignore_negotiated_penalties else None ) if penalty_victim is not None and self.breach_penalty_victim is not None: # there is a defined penalty, find its value penalty_victim = penalty_victim if penalty_victim is not None else 0.0 penalty_victim += ( self.breach_penalty_victim if self.breach_penalty_victim is not None else 0.0 ) pind = cfp.product buyer_id, seller_id = ( contract.annotation["buyer"], contract.annotation["seller"], ) buyer, seller = self.agents[buyer_id], self.agents[seller_id] seller_factory, buyer_factory = self.a2f[seller.id], self.a2f[buyer.id] ( product_breach, money_breach, _penalty_breach_victim, _penalty_breach_society, ) = ( None, None, None, None, ) if resolution is None: for breach in breaches: if breach.type == "product": product_breach = breach.level elif breach.type == "money": money_breach = breach.level elif breach.type == "penalty": pass elif breach.type == "penalty_society": pass else: quantity = resolution.agreement.get("immediate_quantity", quantity) unit_price = resolution.agreement.get("immediate_unit_price", unit_price) # make and register new contract contract.annotation["renegotiation"] = True future_agreement = { "time": agreement.get("later_time", -1), "penalty": agreement.get("later_penalty", -1), "quantity": agreement.get("later_quantity", None), "unit_price": agreement.get("later_unit_price", 0.0), } new_contract = Contract( partners=contract.partners, annotation=contract.annotation, issues=contract.issues, agreement=SCMLAgreement(**future_agreement), concluded_at=self.current_step, to_be_signed_at=self.current_step, signed_at=self.current_step, mechanism_state=None, signatures=dict(zip(contract.partners, contract.partners)), ) self.on_contract_concluded(new_contract, to_be_signed_at=self.current_step) self.on_contract_signed(contract=new_contract) for partner in partners: partner.on_contract_signed_(contract=new_contract) money = unit_price * quantity # check the seller available_quantity = ( max(0, seller_factory.storage.get(pind, 0)) if not isinstance(seller, Miner) else quantity ) missing_quantity = max(0, quantity - available_quantity) if missing_quantity > 0 and quantity > 0: product_breach = missing_quantity / quantity # check the buyer available_money = ( max(0.0, buyer_factory.wallet) if not isinstance(buyer, Consumer) else money ) if unit_price > 0.0: available_money = (available_money // unit_price) * unit_price missing_money = max(0.0, money - available_money) if missing_money > 0.0: money_breach = missing_money / money insured_quantity, insured_quantity_cost = 0, 0.0 insured_money, insured_money_quantity = 0.0, 0 # confirm partial execution perpetrators = {b.perpetrator for b in breaches} victims = set() if 0 < len(perpetrators) < len(partners): victims = {_.id for _ in partners} - set(perpetrators) execute = ( all( self.agents[victim].confirm_partial_execution( contract=contract, breaches=list(breaches) ) for victim in victims ) if len(victims) > 0 else True ) if product_breach is not None and resolution is None: # apply insurances if they exist # register the breach independent of insurance if self.insurance_company.is_insured(contract=contract, perpetrator=seller): # if the buyer has an insurance against the seller for this contract, then just give him the missing # quantity and proceed as if the contract was for the remaining quantity. Notice that the breach on the # seller is already registered by this time. # the insurance company can give as much as the buyer can buy. No loan is allowed here as the buyer was # already offered a loan earlier because surely if they have a deficit they committed a funds breach # buyer_deficit = missing_quantity * unit_price - buyer_factory.wallet # if buyer_deficit > 0: # self.bank.buy_loan(agent=buyer, amount=buyer_deficit, n_installments=self.loan_installments) if unit_price > 0: insured_quantity = min( missing_quantity, int(available_money // unit_price) ) else: insured_quantity = missing_quantity self.logdebug( f"Insurance: {buyer.name} got {insured_quantity} of {self.products[pind].name} " f"from insurance ({missing_quantity}) was missing" ) insured_quantity_cost = insured_quantity * unit_price buyer_factory.transport_to(product=pind, quantity=insured_quantity) buyer_factory.pay(insured_quantity_cost) buyer.on_inventory_change(pind, insured_quantity, "insurance") buyer.on_cash_transfer(-insured_quantity_cost, "insurance") self.insurance_company.storage[pind] -= insured_quantity self.insurance_company.wallet += insured_quantity_cost # we will only transfer the remaining quantity. missing_quantity -= insured_quantity quantity -= insured_quantity money -= insured_quantity_cost if money_breach is not None and resolution is None: # apply insurances if they exist. if self.insurance_company.is_insured(contract=contract, perpetrator=buyer): # if the seller has an insurance against the buyer for this contract, then just give him the missing # money and proceed as if the contract was for the remaining amount. Notice that the breach on the # seller is already registered by this time. # the insurance company will provide enough money to buy whatever actually exist of the contract in the # seller's storage insured_money = min(missing_money, available_quantity * unit_price) insured_money_quantity = int( insured_money // unit_price ) # I never come here if unit_price is zero. insured_money = insured_money_quantity * unit_price self.logdebug( f"Insurance: {seller.name} got {insured_money} dollars from insurance" ) seller_factory.receive(insured_money) seller_factory.transport_from( product=pind, quantity=insured_money_quantity ) seller.on_inventory_change(pind, -insured_money_quantity, "insurance") seller.on_cash_transfer(insured_money, "insurance") self.insurance_company.wallet -= insured_money self.insurance_company.storage[pind] += insured_money_quantity # we will only transfer the remaining money. missing_money -= insured_money quantity -= insured_money_quantity money -= insured_money # recalculate total and quantity to avoid rounding errors money = unit_price * quantity # check the seller available_quantity = ( max(0, seller_factory.storage.get(pind, 0)) if not isinstance(seller, Miner) else quantity ) missing_quantity = max(0, quantity - available_quantity) # check the buyer available_money = ( max(0.0, buyer_factory.wallet) if not isinstance(buyer, Consumer) else money ) if unit_price > 0.0: available_money = (available_money // unit_price) * unit_price missing_money = max(0.0, money - available_money) # missing quantity/money is now fully handled. Just remove them from the contract and execute the rest if missing_money > 0.0 and missing_quantity == 0: quantity -= ( int(math.ceil(missing_money / unit_price)) if unit_price != 0.0 else 0 ) elif missing_money <= 0.0 and missing_quantity > 0: quantity -= missing_quantity # int(math.floor(money / unit_price)) if unit_price != 0.0 else 0 elif missing_money > 0.0 and missing_quantity > 0: money_for_available_quantity = (quantity - missing_quantity) * unit_price quantity_for_available_money = int( math.floor((money - missing_money) / unit_price) ) money_for_available_money = quantity_for_available_money * unit_price available_money = money - missing_money money = max( 0.0, min( ( available_money, money_for_available_quantity, money_for_available_money, ) ), ) quantity = max( 0, int(math.floor(money / unit_price)) if unit_price != 0.0 else 0 ) money = quantity * unit_price # confirm that the money and quantity match given the unit price. if not (money >= 0.0 and quantity >= 0): self.logerror( f"invalid contract ({str(contract)})!! negative money ({money}) or quantity ({quantity})" f", unit price {unit_price}, missing quantity {missing_quantity}, missing money {missing_money}" f", breaches: {[str(_) for _ in breaches]}, insured_quantity {insured_quantity}" f", insured_quantity_cost {insured_quantity_cost}, insured_money {insured_money}" f', insured_money_quantity {insured_money_quantity}, original quantity {agreement["quantity"]}' f', original money {agreement["unit_price"] * agreement["quantity"]}' ) if not abs(money - unit_price * quantity) < 1e-5: self.logerror( f"invalid contract ({str(contract)})!! money {money}, quantity {quantity}" f", unit price {unit_price}, missing quantity {missing_quantity}, missing money {missing_money}" f", breaches: {[str(_) for _ in breaches]}, insured_quantity {insured_quantity}" f", insured_quantity_cost {insured_quantity_cost}, insured_money {insured_money}" f', insured_money_quantity {insured_money_quantity}, original quantity {agreement["quantity"]}' f', original money {agreement["unit_price"] * agreement["quantity"]}' ) # if money > unit_price * quantity: # money = unit_price * quantity # if unit_price != 0.0 and quantity > math.floor(money / unit_price): # quantity = int(math.floor(money / unit_price)) if money > 0 or quantity > 0: if execute: self._move_product( buyer=buyer, seller=seller, quantity=quantity, money=money, product_id=pind, ) else: self.logdebug( f"Contract {contract.id}: one of {[_.id for _ in victims]} refused partial execution." ) else: self.logdebug(f"Contract {contract.id} has no transfers")
[docs] def _move_product_force( self, buyer: SCML2019Agent, seller: SCML2019Agent, product_id: int, quantity: int, money: float, ): """Moves as much product and money between the buyer and seller""" seller_factory, buyer_factory = self.a2f[seller.id], self.a2f[buyer.id] if isinstance(seller, Miner): available_quantity = quantity else: available_quantity = min( quantity, seller_factory.storage.get(product_id, 0) ) if isinstance(buyer, Consumer): available_money = money else: available_money = min(money, buyer_factory.wallet) self.logdebug( f"Moving {quantity} (available {available_quantity}) units of " f"{self.products[product_id].name} from {seller.name} " f"to {buyer.name} for {money} (available {available_money}) dollars" ) if available_quantity > 0: seller_factory.transport_from(product_id, available_quantity) seller.on_inventory_change(product_id, -available_quantity, "contract") if self.transportation_delay < 1: buyer_factory.transport_to(product_id, available_quantity) buyer.on_inventory_change(product_id, -available_quantity, "contract") else: self._transport[self.current_step + self.transportation_delay].append( (buyer, product_id, available_quantity) ) if available_money > 0: buyer_factory.pay(available_money) if self.transfer_delay < 1: seller_factory.receive(available_money) seller.on_cash_transfer(available_money, "contract") else: self._transfer[self.current_step + self.transfer_delay].append( (seller, available_money) )
[docs] def register_interest(self, agent: SCML2019Agent, products: List[int]) -> None: for product in products: self.__interested_agents[product] = list( set(self.__interested_agents[product] + [agent]) )
[docs] def unregister_interest(self, agent: SCML2019Agent, products: List[int]) -> None: for product in products: try: self.__interested_agents[product].remove(agent) except ValueError: pass
[docs] def make_bankrupt( self, agent: SCML2019Agent, amount: float, beneficiary: Agent, contract: Optional[Contract], ) -> None: """Marks the agent as bankrupt""" self.bulletin_board.record( "bankruptcy", {"time": self.current_step}, key=agent.id ) for receiver in itertools.chain( self.miners, self.consumers, self.factory_managers ): receiver.on_agent_bankrupt(agent.id) # liquidate the bankrupt agent factory = self.a2f.get(agent.id, None) if factory is None: return # first sell everything the agent has in its factory at catalog prices for product_index, quantity in factory.storage.items(): price = self.products[product_index].catalog_price if price is None: price = self.default_price_for_products_without_one _saved_min_storage, factory.min_storage = factory.min_storage, 0 factory.sell(product=product_index, quantity=quantity, price=price) agent.on_inventory_change(product_index, -quantity, "bankruptcy") agent.on_cash_transfer(price, "bankruptcy") payable = min(factory.wallet, amount) # second pay the beneficiary if contract is None: # beneficiary is the bank beneficiary.wallet += payable factory.pay(payable) keep_for_beneficiary = 0 else: # beneficiary is another agent keep_for_beneficiary = payable bfactory = self.a2f.get(beneficiary.id, None) if bfactory is not None: bfactory.receive(payable) factory.pay(payable) # nullify all future contracts available = factory.balance - keep_for_beneficiary owed = 0.0 nulled_contracts = [] for time, contracts in self.contracts_per_step.items(): if time < self.current_step: continue for contract in contracts: if agent.id in contract.partners: victim = [_ for _ in contract.partners if _ != agent.id][0] nulled_contracts.append((victim, contract)) owed += ( contract.agreement["quantity"] * contract.agreement["unit_price"] ) self.__n_bankrupt += 1 if owed <= 0.0: return # calculate compensation fraction if available >= owed: fraction = self.compensation_fraction else: fraction = self.compensation_fraction * available / owed for victim, contract in nulled_contracts: victim = self.traders.get(victim, None) if victim is None: continue bfactory = self.a2f.get(victim.id, None) if bfactory is None: continue compensation = min( factory.wallet, fraction * contract.agreement["quantity"] * contract.agreement["unit_price"], ) if compensation > 0.0: bfactory.receive(compensation) factory.pay(compensation) victim.on_contract_nullified( contract=contract, bankrupt_partner=agent.id, compensation=compensation ) self.nullify_contract(contract)
[docs] def nullify_contract(self, contract: Contract): self.__n_nullified += 1 contract.nullified_at = self.current_step
[docs] def evaluate_insurance( self, contract: Contract, agent: SCML2019Agent, t: int = None ) -> Optional[float]: """Can be called to evaluate the premium for insuring the given contract against breachs committed by others Args: agent: The agent buying the contract contract: hypothetical contract t: time at which the policy is to be bought. If None, it means current step """ against = [self.agents[_] for _ in contract.partners if _ != agent.id] if len(against) == 0: against = [agent.id] return self.insurance_company.evaluate_insurance( contract=contract, insured=agent, against=against[0], t=t )
[docs] def buy_insurance(self, contract: Contract, agent: SCML2019Agent) -> bool: """Buys insurance for the contract by the premium calculated by the insurance company. Remarks: The agent can call `evaluate_insurance` to find the premium that will be used. """ against = [self.agents[_] for _ in contract.partners if _ != agent.id] if len(against) != 1: raise ValueError("Cannot find partner while evaluating insurance") return ( self.insurance_company.buy_insurance( contract=contract, insured=agent, against=against[0] ) is not None )
[docs] def _process_annotation( self, annotation: Optional[Dict[str, Any]] ) -> Optional[Dict[str, Any]]: """Processes an annotation stripping any extra information not allowed if necessary. Will return None if the annotation is suspecious""" if annotation is None: return {} if not self.strip_annotations: return annotation annotation = { k: v for k, v in annotation.items() if k in ("partners", "cfp", "buyer", "seller") } if self.prevent_cfp_tampering: cfp = annotation.get("cfp", None) if cfp is not None: cfp_ = self.bulletin_board.read("cfps", cfp.id) if cfp_ is None: self.logerror( f"CFP {str(cfp)} with id {cfp.id} was tampered with by {self.agent.name} and will be ignored" ) return None else: annotation["cfp"] = cfp_ return annotation
# def run_negotiation( # self, # caller: "Agent", # issues: Collection[Issue], # partners: Collection["Agent"], # roles: Collection[str] = None, # annotation: Optional[Dict[str, Any]] = None, # mechanism_name: str = None, # mechanism_params: Dict[str, Any] = None, # ) -> Optional[Tuple[Contract, NegotiatorMechanismInterface]]:
[docs] def run_negotiation( self, caller: "Agent", issues: Collection[Issue], partners: Collection["Agent"], negotiator: Negotiator, ufun: UtilityFunction = None, caller_role: str = None, roles: Collection[str] = None, annotation: Optional[Dict[str, Any]] = None, mechanism_name: str = None, mechanism_params: Dict[str, Any] = None, ) -> Optional[Tuple[Contract, NegotiatorMechanismInterface]]: annotation = self._process_annotation(annotation) if annotation is None: return None return super().run_negotiation( caller=caller, issues=issues, annotation=annotation, partners=partners, roles=roles, mechanism_name=mechanism_name, mechanism_params=mechanism_params, negotiator=negotiator, ufun=ufun, )
[docs] def run_negotiations( self, caller: "Agent", issues: Union[List[Issue], List[List[Issue]]], partners: List[List["Agent"]], negotiators: List[Negotiator], ufuns: List[UtilityFunction] = None, caller_roles: List[str] = None, roles: Optional[List[Optional[List[str]]]] = None, annotations: Optional[List[Optional[Dict[str, Any]]]] = None, mechanism_names: Optional[Union[str, List[str]]] = None, mechanism_params: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None, all_or_none: bool = False, ) -> List[Tuple[Contract, NegotiatorMechanismInterface]]: if annotations is None: return None for i, annotation in enumerate(annotations): annotations[i] = self._process_annotation(annotation) return super().run_negotiations( caller=caller, issues=issues, roles=roles, annotations=annotations, mechanism_names=mechanism_names, mechanism_params=mechanism_params, partners=partners, negotiators=negotiators, ufuns=ufuns, )
[docs] def request_negotiation_about( self, req_id: str, caller: "Agent", issues: List[Issue], partners: List["Agent"], roles: List[str] = None, annotation: Optional[Dict[str, Any]] = None, mechanism_name: str = None, mechanism_params: Dict[str, Any] = None, group=None, ): annotation = self._process_annotation(annotation) if annotation is None: return False return super().request_negotiation_about( req_id=req_id, caller=caller, issues=issues, annotation=annotation, partners=partners, roles=roles, mechanism_name=mechanism_name, mechanism_params=mechanism_params, group=group, )
@property
[docs] def winners(self): """The winners of this world (factory managers with maximum wallet balance""" if len(self.factory_managers) < 1: return [] if 0.0 in [self.a2f[_.id].initial_balance for _ in self.factory_managers]: balances = sorted( ((self.a2f[_.id].balance, _) for _ in self.factory_managers), key=lambda x: x[0], reverse=True, ) else: balances = sorted( ( (self.a2f[_.id].balance / self.a2f[_.id].initial_balance, _) for _ in self.factory_managers ), key=lambda x: x[0], reverse=True, ) max_balance = balances[0][0] return [_[1] for _ in balances if _[0] >= max_balance]
[docs] def on_event(self, event: Event, sender: "EventSource") -> None: """ Called whenever an event is raised for which the `SCML2020World` is registered asa listener Args: event: The event sender: The sender Returns: None """ if event.type == "new_record" and event.data["section"] == "cfps": cfp = event.data["value"] product = cfp.product for m in self.__interested_agents[product]: if m.id != cfp.publisher: m.on_new_cfp(copy.deepcopy(cfp)) elif event.type == "will_remove_record" and event.data["section"] == "cfps": cfp = event.data["value"] product = cfp.product for m in self.__interested_agents[product]: if m.id != cfp.publisher: m.on_remove_cfp(copy.deepcopy(cfp))
[docs] def contract_record(self, contract: Contract) -> Dict[str, Any]: c = { "id": contract.id, "seller_name": self.agents[contract.annotation["seller"]].name, "buyer_name": self.agents[contract.annotation["buyer"]].name, "seller_type": self.agents[ contract.annotation["seller"] ].__class__.__name__, "buyer_type": self.agents[contract.annotation["buyer"]].__class__.__name__, "product_name": self.products[contract.annotation["cfp"].product], "delivery_time": contract.agreement["time"], "quantity": contract.agreement["quantity"], "unit_price": contract.agreement["unit_price"], "signed_at": contract.signed_at if contract.signed_at is not None else -1, "nullified_at": contract.nullified_at, "concluded_at": contract.concluded_at, "penalty": contract.agreement.get("penalty", np.nan), "signing_delay": contract.agreement.get("signing_delay", 0), "signatures": "|".join(str(_) for _ in contract.signatures.values()), "issues": contract.issues if not self.compact else None, "seller": contract.annotation["seller"], "buyer": contract.annotation["buyer"], } if not self.compact: c.update(contract.annotation) c["n_neg_steps"] = contract.mechanism_state.step return c
[docs] def breach_record(self, breach: Breach) -> Dict[str, Any]: return { "perpetrator": breach.perpetrator, "perpetrator_name": breach.perpetrator, "level": breach.level, "type": breach.type, "time": breach.step, }
[docs] def contract_size(self, contract: Contract) -> float: return contract.agreement["unit_price"] * contract.agreement["quantity"]