import copy
import itertools
import math
import sys
from os import PathLike
from pathlib import Path
from random import choices, randint, random, shuffle
import numpy as np
from negmas import Agent
from negmas.helpers import get_class, get_full_type_name, instantiate, unique_name
from negmas.serialization import serialize
from negmas.situated import Entity
from negmas.tournaments import TournamentResults, WorldRunResults, tournament
from scml.scml2019.common import (
DEFAULT_NEGOTIATOR,
Factory,
InputOutput,
ManufacturingProfile,
Process,
Product,
)
from scml.scml2019.consumers import ConsumptionProfile, JustInTimeConsumer
from scml.scml2019.factory_managers.builtins import GreedyFactoryManager
from scml.scml2019.miners import MiningProfile, ReactiveMiner
from scml.scml2019.world import SCML2019World
if True:
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Optional,
Sequence,
Tuple,
Type,
Union,
)
from .factory_managers.builtins import FactoryManager
__all__ = [
"anac2019_world",
"anac2019_tournament",
"anac2019_collusion",
"anac2019_std",
"balance_calculator",
"anac2019_sabotage",
"DefaultGreedyManager",
]
[docs]
class DefaultGreedyManager(GreedyFactoryManager):
def __init__(
self,
*args,
reserved_value=0.0,
negotiator_params=None,
optimism=0.0,
negotiator_type=DEFAULT_NEGOTIATOR,
n_retrials=5,
use_consumer=True,
reactive=True,
sign_only_guaranteed_contracts=False,
riskiness=0.0,
max_insurance_premium: float = float("inf"),
**kwargs,
):
r = random()
if r < 0.25:
aspiration_type = "conceder"
elif r < 0.5:
aspiration_type = "linear"
else:
aspiration_type = "boulware"
if negotiator_params is None:
negotiator_params = {}
negotiator_params.update({"aspiration_type": aspiration_type})
super().__init__(
*args,
reserved_value=reserved_value,
negotiator_params=negotiator_params,
optimism=optimism,
n_retrials=n_retrials,
use_consumer=use_consumer,
reactive=reactive,
sign_only_guaranteed_contracts=sign_only_guaranteed_contracts,
riskiness=riskiness,
max_insurance_premium=max_insurance_premium,
negotiator_type=negotiator_type,
**kwargs,
)
def integer_cut(total: int, n: int, mn: Union[int, List[int]]) -> List[int]:
"""
Generates l random integers that sum to n where each of them is at least l_m
Args:
total: total
n: number of levels
mn: minimum per level
Returns:
"""
if not isinstance(mn, Iterable):
mn = [mn] * n
sizes = np.asarray(mn)
if total < sizes.sum():
raise ValueError(
f"Cannot generate {n} numbers summing to {total} with a minimum summing to {sizes.sum()}"
)
while sizes.sum() < total:
sizes[randint(0, n - 1)] += 1
return list(sizes.tolist())
def _realin(rng: Union[Tuple[float, float], float]) -> float:
"""
Selects a random number within a range if given or the input if it was a float
Args:
rng: Range or single value
Returns:
the real within the given range
"""
if isinstance(rng, float):
return rng
if abs(rng[1] - rng[0]) < 1e-8:
return rng[0]
return rng[0] + random() * (rng[1] - rng[0])
def _intin(rng: Union[Tuple[int, int], int]) -> int:
"""
Selects a random number within a range if given or the input if it was an int
Args:
rng: Range or single value
Returns:
the int within the given range
"""
if isinstance(rng, int):
return rng
if rng[0] == rng[1]:
return rng[0]
return randint(rng[0], rng[1])
def anac2019_sabotage_config_generator(
n_competitors: int,
n_agents_per_competitor: int,
agent_names_reveal_type: bool = False,
non_competitors: Optional[Tuple[Union[str, FactoryManager]]] = None,
non_competitor_params: Optional[Tuple[Dict[str, Any]]] = None,
compact: bool = True,
*,
consumption_schedule: Tuple[int, int] = (0, 5),
consumption_horizon: Tuple[int, int] = (10, 15),
n_retrials: Union[int, Tuple[int, int]] = 2,
negotiator_type: str = DEFAULT_NEGOTIATOR,
n_steps: Union[int, Tuple[int, int]] = (50, 100),
n_miners: Union[int, Tuple[int, int]] = 5,
n_consumers: Union[int, Tuple[int, int]] = 5,
profile_cost: Tuple[float, float] = (1, 4),
profile_time: Union[int, Tuple[int, int]] = 1,
n_intermediate: Tuple[int, int] = (1, 4),
min_factories_per_level: int = 5,
n_default_managers: Tuple[int, int] = (1, 4),
n_lines: int = 10,
**kwargs,
) -> List[Dict[str, Any]]:
return anac2019_config_generator(
1,
n_agents_per_competitor=n_agents_per_competitor,
agent_names_reveal_type=agent_names_reveal_type,
consumption_schedule=consumption_schedule,
consumption_horizon=consumption_horizon,
n_retrials=n_retrials,
negotiator_type=negotiator_type,
n_steps=n_steps,
n_miners=n_miners,
n_consumers=n_consumers,
profile_cost=profile_cost,
profile_time=profile_time,
n_intermediate=n_intermediate,
min_factories_per_level=min_factories_per_level,
n_default_managers=n_default_managers,
non_competitors=non_competitors,
non_competitor_params=non_competitor_params,
n_lines=n_lines,
compact=compact,
**kwargs,
)
def anac2019_config_generator(
n_competitors: int,
n_agents_per_competitor: int,
agent_names_reveal_type: bool = False,
non_competitors: Optional[Tuple[Union[str, FactoryManager]]] = None,
non_competitor_params: Optional[Tuple[Dict[str, Any]]] = None,
compact: bool = True,
*,
consumption_schedule: Tuple[int, int] = (0, 5),
consumption_horizon: Tuple[int, int] = (10, 15),
n_retrials: Union[int, Tuple[int, int]] = 2,
negotiator_type: str = DEFAULT_NEGOTIATOR,
n_steps: Union[int, Tuple[int, int]] = (50, 100),
n_miners: Union[int, Tuple[int, int]] = 5,
n_consumers: Union[int, Tuple[int, int]] = 5,
profile_cost: Tuple[float, float] = (1, 4),
profile_time: Union[int, Tuple[int, int]] = 1,
n_intermediate: Tuple[int, int] = (1, 4),
min_factories_per_level: int = 5, # strictly guaranteed
max_factories_per_level: int = 8, # not strictly guaranteed
n_default_managers: Tuple[int, int] = (1, 4),
n_lines: int = 10,
**kwargs,
) -> List[Dict[str, Any]]:
if non_competitors is None:
non_competitors = (DefaultGreedyManager,)
if isinstance(n_intermediate, Iterable):
n_intermediate = list(n_intermediate)
else:
n_intermediate = [n_intermediate, n_intermediate]
n_steps = _intin(n_steps)
miner_type = ReactiveMiner
consumer_type = JustInTimeConsumer
consumer_kwargs = {
"negotiator_type": negotiator_type,
"consumption_horizon": _intin(consumption_horizon),
}
miner_kwargs = {"negotiator_type": negotiator_type, "n_retrials": n_retrials}
if negotiator_type is not None:
for args in (consumer_kwargs, miner_kwargs):
if "negotiator_type" not in args.keys():
args["negotiator_type"] = negotiator_type
n_intermediate_levels = randint(*n_intermediate)
products = [
Product(id=0, name="p0", catalog_price=3.0, production_level=0, expires_in=0)
]
processes = []
miners = [
instantiate(
miner_type,
profiles={products[-1].id: MiningProfile()},
name=f"m_{i}",
**miner_kwargs,
)
for i in range(n_miners)
]
factories = []
def _s(x):
return x if x is not None else 0
if isinstance(profile_cost, tuple):
historical_cost = (profile_cost[0] + profile_cost[1]) / 2.0
else:
historical_cost = profile_cost
historical_cost = (historical_cost * 0.85, historical_cost * 1.15)
for level in range(n_intermediate_levels + 1):
p = Process(
name=f"p{level + 1}",
inputs=[InputOutput(product=level, quantity=1, step=0.0)],
production_level=level + 1,
outputs=[InputOutput(product=level + 1, quantity=1, step=1.0)],
historical_cost=_realin(historical_cost),
id=level,
)
new_product = Product(
name=f"p{level + 1}",
catalog_price=products[-1].catalog_price + p.historical_cost,
# keep this to the world to calculate _s(products[-1].catalog_price) + level + 1
production_level=level + 1,
id=level + 1,
expires_in=0,
)
processes.append(p)
products.append(new_product)
n_defaults = []
for level in range(n_intermediate_levels + 1):
n_defaults.append(_intin(n_default_managers))
n_agents = n_agents_per_competitor * n_competitors
n_a_list = integer_cut(n_agents, n_intermediate_levels + 1, 0)
for i, n_a in enumerate(n_a_list):
if n_a + n_defaults[i] < min_factories_per_level:
n_defaults[i] = min_factories_per_level - n_a
if n_a + n_defaults[i] > max_factories_per_level and n_defaults[i] > 1:
n_defaults[i] = max(1, min_factories_per_level - n_a)
n_f_list = [a + b for a, b in zip(n_defaults, n_a_list)]
n_factories = sum(n_f_list)
if non_competitor_params is None:
non_competitor_params = [{}] * len(non_competitors)
non_competitors = [get_full_type_name(_) for _ in non_competitors]
for c_, p_ in zip(non_competitors, non_competitor_params):
if (
c_.startswith("scml.scml2019.") and c_.endswith("GreedyFactoryManager")
) or "DefaultGreedyManager" in c_:
p_.update({"negotiator_type": negotiator_type, "n_retrials": n_retrials})
max_def_agents = len(non_competitors) - 1
manager_types = [None] * n_factories
manager_params = [None] * n_factories
first_in_level = 0
for level in range(n_intermediate_levels + 1):
n_d = n_defaults[level]
n_f = n_f_list[level]
assert (
n_d <= n_f
), f"Got {n_f} total factories at level {level} out of which {n_d} are default!!"
for j in range(n_f):
profiles = []
factory_time = _intin(profile_time)
factory_cost = _realin(profile_cost)
for k in range(n_lines):
profiles.append(
ManufacturingProfile(
n_steps=factory_time,
cost=factory_cost,
initial_pause_cost=0,
running_pause_cost=0,
resumption_cost=0,
cancellation_cost=0,
line=k,
process=processes[level],
)
)
factory = Factory(
id=f"f{level + 1}_{j}",
max_storage=sys.maxsize,
profiles=profiles,
initial_storage={},
initial_wallet=1000.0,
)
factories.append(factory)
if j >= n_f - n_d: # default managers are last managers in the list
def_indx = randint(0, max_def_agents)
manager_types[first_in_level + j] = non_competitors[def_indx]
params_ = copy.deepcopy(non_competitor_params[def_indx])
if agent_names_reveal_type:
params_["name"] = f"_df_{level + 1}_{j}"
else:
params_[
"name"
] = f"_df_{level + 1}_{j}" # because I use name to know that this is a default agent in evaluate.
# @todo do not use name to identify default agents in evaluation
manager_params[first_in_level + j] = params_
first_in_level += n_f
def create_schedule():
if isinstance(consumption_schedule, tuple) and len(consumption_schedule) == 2:
return list(
np.random.randint(
consumption_schedule[0], consumption_schedule[1], n_steps
).tolist()
)
return consumption_schedule
consumers = [
instantiate(
consumer_type,
profiles={products[-1].id: ConsumptionProfile(schedule=create_schedule())},
name=f"c_{i}",
**consumer_kwargs,
)
for i in range(n_consumers)
]
world_name = unique_name("", add_time=True, rand_digits=4)
world_params = dict(
name=world_name,
time_limit=7200 + 3600,
neg_time_limit=120,
neg_n_steps=20,
neg_step_time_limit=10,
negotiation_speed=21,
default_signing_delay=1,
transportation_delay=0,
no_bank=True,
breach_penalty_society=0.02,
no_insurance=False,
premium=0.03,
premium_time_increment=0.1,
premium_breach_increment=0.001,
max_allowed_breach_level=None,
breach_penalty_society_min=0.0,
breach_penalty_victim=0.0,
breach_move_max_product=True,
initial_wallet_balances=1000.0,
transfer_delay=0,
start_negotiations_immediately=False,
catalog_profit=0.15,
financial_reports_period=10,
default_price_for_products_without_one=1,
compensation_fraction=0.5,
n_steps=n_steps,
compact=compact,
)
world_params.update(kwargs)
config = {
"world_params": world_params,
"products": [serialize(p, add_type_field=False) for p in products],
"processes": [serialize(p, add_type_field=False) for p in processes],
"factories": [
{
"profile": {
"n_steps": f.profiles[0].n_steps,
"cost": f.profiles[0].cost,
"line": f.profiles[0].line,
"process.id": f.profiles[0].process.id,
},
"max_storage": sys.maxsize,
"initial_wallet": 1000.0,
"id": f.id,
"n_lines": n_lines,
}
for f in factories
],
"miners": [
dict(
id=m.id,
name=m.name,
type=get_full_type_name(miner_type),
args=miner_kwargs,
profiles={
k: serialize(v, add_type_field=False) for k, v in m.profiles.items()
},
)
for m in miners
],
"consumers": [
dict(
id=c.id,
name=c.name,
type=get_full_type_name(consumer_type),
args=consumer_kwargs,
profiles={
k: serialize(v, add_type_field=False) for k, v in c.profiles.items()
},
)
for c in consumers
],
"manager_types": [
get_full_type_name(_) if isinstance(_, FactoryManager) else _
for _ in manager_types
],
"manager_params": manager_params,
"n_factories_per_level": n_f_list,
"agent_names_reveal_type": agent_names_reveal_type,
"compact": compact,
"scoring_context": {},
"non_competitors": non_competitors,
"non_competitor_params": non_competitor_params,
}
config.update(kwargs)
return [config]
def anac2019_sabotage_assigner(
config: List[Dict[str, Any]],
max_n_worlds: int,
n_agents_per_competitor: int = 1,
fair: bool = True,
competitors: Sequence[Type[Agent]] = (),
params: Sequence[Dict[str, Any]] = (),
dynamic_non_competitors: Optional[List[Type[Agent]]] = None,
dynamic_non_competitor_params: Optional[List[Dict[str, Any]]] = None,
exclude_competitors_from_reassignment: bool = True,
) -> List[List[Dict[str, Any]]]:
config = config[0]
competitors = list(
get_full_type_name(_) if not isinstance(_, str) and _ is not None else _
for _ in competitors
)
n_competitors = len(competitors)
params = (
list(params) if params is not None else [dict() for _ in range(n_competitors)]
)
agent_names_reveal_type = config.pop("agent_names_reveal_type", False)
n_permutations = 1
manager_types = config["manager_types"]
assignable_factories = [i for i, mtype in enumerate(manager_types) if mtype is None]
shuffle(assignable_factories)
assignable_factories = (
np.asarray(assignable_factories).reshape((1, n_agents_per_competitor)).tolist()
)
configs = []
def shorten(long_name: str, d: Dict[str, Any]) -> str:
name = (
long_name.split(".")[-1]
.lower()
.replace("factory_manager", "")
.replace("manager", "")
)
name = (
name.replace("factory", "")
.replace("agent", "")
.replace("miner", "m")
.replace("consumer", "")
)
if long_name.startswith("jnegmas"):
name = f"j:{name}"
return name
non_competitors = config.get(
"non_competitors", ("scml.scml2019.utils.DefaultGreedyManager",)
)
max_def = len(non_competitors) - 1
non_competitor_params = config.get("non_competitor_params", None)
if non_competitor_params is None:
non_competitor_params = [{}] * (max_def + 1)
def _type_name(c_: str, p_) -> str:
return instantiate(c_, **p_).type_name
def _copy_config(perm_, conf, indx, comp, c_p):
perm_ = list(perm_)
perm1 = copy.deepcopy(perm_)
ctype = _type_name(comp, c_p)
for i, (c_, p_) in enumerate(perm_):
if c_ != "competitor":
perm_[i] = (c_, p_)
else:
perm_[i] = (comp, c_p)
new_config = copy.deepcopy(conf)
new_config["world_params"]["name"] += f".{indx:05d}_with_{shorten(comp, c_p)}"
new_config["scoring_context"].update(
{"competitor": ctype, "competitor_params": c_p}
)
for (a, p_), assignable in zip(perm_, assignable_factories):
for factory in assignable:
new_config["manager_types"][factory] = a
new_config["manager_params"][factory] = copy.deepcopy(p_)
for i, (c_, p_) in enumerate(perm1):
if c_ != "competitor":
perm1[i] = (c_, p_)
else:
def_indx = randint(0, max_def)
perm1[i] = (non_competitors[def_indx], non_competitor_params[def_indx])
no_sabotage_config = copy.deepcopy(conf)
no_sabotage_config["world_params"][
"name"
] += f".{indx:05d}_no_{shorten(comp, c_p)}"
no_sabotage_config["scoring_context"].update(
{"competitor": ctype, "competitor_params": c_p}
)
for (a, p_), assignable in zip(perm1, assignable_factories):
for factory in assignable:
no_sabotage_config["manager_types"][factory] = a
no_sabotage_config["manager_params"][factory] = copy.deepcopy(p_)
return [new_config, no_sabotage_config]
max_n_worlds = (
int(max(1, max_n_worlds // n_competitors)) if max_n_worlds is not None else None
)
if n_permutations is not None and (
max_n_worlds is None or n_permutations <= max_n_worlds
):
k = 0
others = list(choices(list(zip(non_competitors, non_competitor_params))))
agents = ["competitor"] + [_[0] for _ in others]
agent_params = ["competitor"] + [_[1] for _ in others]
permutation = list(zip(agents, agent_params))
for competitor, c_params in zip(competitors, params):
perm = copy.deepcopy(permutation)
configs.append(_copy_config(perm, config, k, competitor, c_params))
k += 1
elif max_n_worlds is None:
raise ValueError("Did not give max_n_worlds and cannot find n_permutations.")
else:
others = list(choices(list(zip(non_competitors, non_competitor_params))))
agents = ["competitor"] + [_[0] for _ in others]
agent_params = ["competitor"] + [_[1] for _ in others]
permutation = list(zip(agents, agent_params))
assert len(permutation) == len(
assignable_factories
), f"assignable {len(assignable_factories)}, permutation {len(permutation)}"
if fair:
k = 0
shuffle(permutation)
for competitor, c_params in zip(competitors, params):
perm = copy.deepcopy(permutation)
configs.append(_copy_config(perm, config, k, competitor, c_params))
else:
for k in range(max_n_worlds):
for competitor, c_params in zip(competitors, params):
perm = copy.deepcopy(permutation)
shuffle(perm)
configs.append(_copy_config(perm, config, k, competitor, c_params))
if agent_names_reveal_type:
for config_set in configs:
for config in config_set:
nxt = 0
for i, (t, p, f) in enumerate(
zip(
config["manager_types"],
config["manager_params"],
config["factories"],
)
):
if p.get("name", "").startswith("_df_"):
continue
p = p.copy()
name_ = (
t.short_type_name
if isinstance(t, Entity)
else get_full_type_name(t)
if not isinstance(t, str)
else shorten(t, config["manager_params"][i])
)
p["name"] = f'{name_}@{f["id"][1:]}'
config["manager_params"][i] = copy.deepcopy(p)
nxt = nxt + 1
return configs
def anac2019_assigner(
config: List[Dict[str, Any]],
max_n_worlds: int,
n_agents_per_competitor: int = 1,
fair: bool = True,
competitors: Sequence[Type[Agent]] = (),
params: Sequence[Dict[str, Any]] = (),
dynamic_non_competitors: Optional[List[Type[Agent]]] = None,
dynamic_non_competitor_params: Optional[List[Dict[str, Any]]] = None,
exclude_competitors_from_reassignment: bool = True,
) -> List[List[Dict[str, Any]]]:
config = config[0]
competitors = list(
get_full_type_name(_) if not isinstance(_, str) and _ is not None else _
for _ in competitors
)
n_competitors = len(competitors)
params = (
list(params) if params is not None else [dict() for _ in range(n_competitors)]
)
agent_names_reveal_type = config.pop("agent_names_reveal_type", False)
try:
n_permutations = n_competitors
except ArithmeticError:
n_permutations = None
manager_types = config["manager_types"]
assignable_factories = [i for i, mtype in enumerate(manager_types) if mtype is None]
shuffle(assignable_factories)
assignable_factories = (
np.asarray(assignable_factories)
.reshape((n_competitors, n_agents_per_competitor))
.tolist()
)
configs = []
def _copy_config(perm_, c, indx):
new_config = copy.deepcopy(c)
new_config["world_params"]["name"] += f".{indx:05d}"
for (a, p_), assignable in zip(perm_, assignable_factories):
for factory in assignable:
new_config["manager_types"][factory] = a
new_config["manager_params"][factory] = copy.deepcopy(p_)
return [new_config]
if n_permutations is not None and max_n_worlds is None:
k = 0
permutation = list(zip(competitors, params))
assert len(permutation) == len(assignable_factories)
shuffle(permutation)
perm = permutation
for __ in range(n_permutations):
k += 1
perm = copy.deepcopy(perm)
perm = perm[-1:] + perm[:-1]
configs.append(_copy_config(perm, config, k))
elif max_n_worlds is None:
raise ValueError("Did not give max_n_worlds and cannot find n_permutations.")
else:
permutation = list(zip(competitors, params))
assert len(permutation) == len(assignable_factories)
if fair:
n_min = len(assignable_factories)
n_rounds = int(max_n_worlds // n_min)
if n_rounds < 1:
raise ValueError(
f"Cannot guarantee fair assignment: n. competitors {len(assignable_factories)}, at least"
f" {n_min} runs are needed for fair assignment"
)
max_n_worlds = n_rounds * n_min
k = 0
for _ in range(n_rounds):
shuffle(permutation)
for __ in range(n_min):
k += 1
perm = copy.deepcopy(permutation)
perm = perm[-1:] + perm[:-1]
configs.append(_copy_config(perm, config, k))
else:
for k in range(max_n_worlds):
perm = copy.deepcopy(permutation)
shuffle(perm)
configs.append(_copy_config(perm, config, k))
def shorten(long_name: str, d: Dict[str, Any]) -> str:
name = (
long_name.split(".")[-1]
.lower()
.replace("factory_manager", "")
.replace("manager", "")
)
name = (
name.replace("factory", "")
.replace("agent", "")
.replace("miner", "m")
.replace("consumer", "")
)
if long_name.startswith("jnegmas"):
name = f"j:{name}"
return name
if agent_names_reveal_type:
for config_set in configs:
for config in config_set:
nxt = 0
for i, (t, p, f) in enumerate(
zip(
config["manager_types"],
config["manager_params"],
config["factories"],
)
):
if p.get("name", "").startswith("_df_"):
continue
p = p.copy()
name_ = (
t.short_type_name
if isinstance(t, Entity)
else get_full_type_name(t)
if not isinstance(t, str)
else shorten(t, config["manager_params"][i])
)
p["name"] = f'{name_}@{f["id"][1:]}'
config["manager_params"][i] = copy.deepcopy(p)
nxt = nxt + 1
return configs
def anac2019_world_generator(**kwargs):
products = [Product(**p) for p in kwargs["products"]]
processes = [Process(**p) for p in kwargs["processes"]]
for process in processes:
process.inputs = [InputOutput(**io) for io in process.inputs]
process.outputs = [InputOutput(**io) for io in process.outputs]
factories = []
for f in kwargs["factories"]:
p = f["profile"]
factories.append(
Factory(
initial_storage={},
initial_wallet=f["initial_wallet"],
max_storage=f["max_storage"],
id=f'{f["id"]}',
profiles=[
ManufacturingProfile(
n_steps=p["n_steps"],
cost=p["cost"],
line=_,
process=processes[p["process.id"]],
cancellation_cost=0.0,
initial_pause_cost=0.0,
resumption_cost=0,
running_pause_cost=0.0,
)
for _ in range(f["n_lines"])
],
)
)
miners = []
for m in kwargs["miners"]:
miner = instantiate(
m["type"],
**m["args"],
name=m["name"],
profiles={k: MiningProfile(**v) for k, v in m["profiles"].items()},
)
miner.id = m["id"]
miners.append(miner)
consumers = []
for c in kwargs["consumers"]:
consumer = instantiate(
c["type"],
**c["args"],
name=c["name"],
profiles={k: ConsumptionProfile(**v) for k, v in c["profiles"].items()},
)
consumer.id = c["id"]
consumers.append(consumer)
kwargs.pop("n_factories_per_level", None)
manager_types = kwargs.pop("manager_types", [])
manager_params = kwargs.pop("manager_params", [])
managers = [
instantiate(mt, **mp)
for mt, mp in zip(manager_types, itertools.cycle(manager_params))
]
world = SCML2019World(
products=products,
processes=processes,
factories=factories,
consumers=consumers,
miners=miners,
factory_managers=managers,
**kwargs["world_params"],
)
return world
[docs]
def anac2019_world(
competitors: Sequence[Union[str, Type[FactoryManager]]] = (),
params: Sequence[Dict[str, Any]] = (),
randomize: bool = True,
log_file_name: str = None,
name: str = None,
agent_names_reveal_type: bool = False,
n_intermediate: Tuple[int, int] = (1, 4),
n_miners=5,
n_factories_per_level=11,
n_agents_per_competitor=1,
n_consumers=5,
n_lines_per_factory=10,
guaranteed_contracts=False,
use_consumer=True,
max_insurance_premium=float("inf"),
n_retrials=5,
negotiator_type: str = DEFAULT_NEGOTIATOR,
transportation_delay=0,
default_signing_delay=0,
max_storage=sys.maxsize,
consumption_horizon=15,
consumption=(3, 5),
negotiation_speed=21,
neg_time_limit=60 * 4,
neg_n_steps=20,
n_steps=100,
time_limit=90 * 90,
n_default_per_level: int = 5,
compact: bool = False,
**kwargs,
) -> SCML2019World:
"""
Creates a world compatible with the ANAC 2019 competition. Note that
Args:
n_agents_per_competitor: Number of instantiations of each competing type.
name: SCML2020World name to use
agent_names_reveal_type: If true, a snake_case version of the agent_type will prefix agent names
randomize: If true, managers are assigned to factories randomly otherwise in the order
they are giving (cycling).
n_intermediate:
n_default_per_level:
competitors: A list of class names for the competitors
params: A list of dictionaries giving parameters to pass to the competitors
n_miners: number of miners of the single raw material
n_factories_per_level: number of factories at every production level
n_consumers: number of consumers of the final product
n_steps: number of simulation steps
n_lines_per_factory: number of lines in each factory
negotiation_speed: The number of negotiation steps per simulation step. None means infinite
default_signing_delay: The number of simulation between contract conclusion and signature
neg_n_steps: The maximum number of steps of a single negotiation (that is double the number of rounds)
neg_time_limit: The total time-limit of a single negotiation
time_limit: The total time-limit of the simulation
transportation_delay: The transportation delay
n_retrials: The number of retrials the `Miner` and `GreedyFactoryManager` will try if negotiations fail
max_insurance_premium: The maximum insurance premium accepted by `GreedyFactoryManager` (-1 to disable)
use_consumer: If true, the `GreedyFactoryManager` will use an internal consumer for buying its needs
guaranteed_contracts: If true, the `GreedyFactoryManager` will only sign contracts that it can guaratnee not to
break.
consumption_horizon: The number of steps for which `Consumer` publishes `CFP` s
consumption: The consumption schedule will be sampled from a uniform distribution with these limits inclusive
log_file_name: File name to store the logs
negotiator_type: The negotiation factory used to create all negotiators
max_storage: maximum storage capacity for all factory managers If None then it is unlimited
compact: If True, then compact logs will be created to reduce memory footprint
kwargs: key-value pairs to be passed as argument to chain_world() and then to SCML2019World()
Returns:
SCML2019World ready to run
Remarks:
- Every production level n has one process only that takes n steps to complete
"""
competitors = list(competitors)
params = (
list(params)
if params is not None
else [dict() for _ in range(len(competitors))]
)
if n_factories_per_level == n_default_per_level and len(competitors) > 0:
raise ValueError(
f"All factories in all levels are occupied by the default factory manager. Either decrease"
f" n_default_per_level ({n_default_per_level}) or increase n_factories_per_level "
f" ({n_factories_per_level})"
)
if isinstance(n_intermediate, Iterable):
n_intermediate = list(n_intermediate)
else:
n_intermediate = [n_intermediate, n_intermediate]
n_competitors = len(competitors)
n_intermediate_levels_min = (
int(math.ceil(n_competitors / (n_factories_per_level - n_default_per_level)))
- 1
)
if n_intermediate_levels_min > n_intermediate[1]:
raise ValueError(
f"Need {n_intermediate_levels_min} intermediate levels to run {n_competitors} competitors"
)
n_intermediate[0] = max(n_intermediate_levels_min, n_intermediate[0])
competitors = [get_class(c) if isinstance(c, str) else c for c in competitors]
if len(competitors) < 1:
competitors.append(DefaultGreedyManager)
params.append(dict())
world = SCML2019World.chain_world(
log_file_name=log_file_name,
n_steps=n_steps,
agent_names_reveal_type=agent_names_reveal_type,
negotiation_speed=negotiation_speed,
n_intermediate_levels=randint(*n_intermediate),
n_miners=n_miners,
n_consumers=n_consumers,
n_factories_per_level=n_factories_per_level,
consumption=consumption,
consumer_kwargs={
"negotiator_type": negotiator_type,
"consumption_horizon": consumption_horizon,
},
miner_kwargs={"negotiator_type": negotiator_type, "n_retrials": n_retrials},
default_manager_params={
"negotiator_type": negotiator_type,
"n_retrials": n_retrials,
"sign_only_guaranteed_contracts": guaranteed_contracts,
"use_consumer": use_consumer,
"max_insurance_premium": max_insurance_premium,
},
transportation_delay=transportation_delay,
time_limit=time_limit,
neg_time_limit=neg_time_limit,
neg_n_steps=neg_n_steps,
default_signing_delay=default_signing_delay,
n_lines_per_factory=n_lines_per_factory,
max_storage=max_storage,
manager_types=competitors,
manager_params=params,
n_default_per_level=n_default_per_level,
randomize=randomize,
name=name,
compact=compact,
**kwargs,
)
return world
[docs]
def balance_calculator(
worlds: List[SCML2019World],
scoring_context: Dict[str, Any],
dry_run: bool,
ignore_default=True,
) -> WorldRunResults:
"""A scoring function that scores factory managers' performance by the final balance only ignoring whatever still
in their inventory.
Args:
worlds: The world which is assumed to be run up to the point at which the scores are to be calculated.
scoring_context: A dict of context parameters passed by the world generator or assigner.
dry_run: A boolean specifying whether this is a dry_run. For dry runs, only names and types are expected in
the returned `WorldRunResults`
Returns:
WorldRunResults giving the names, scores, and types of factory managers.
"""
assert len(worlds) == 1
world = worlds[0]
result = WorldRunResults(
world_names=[world.name], log_file_names=[world.log_file_name]
)
initial_balances = []
for manager in world.factory_managers:
if "_df_" in manager.name and ignore_default:
continue
initial_balances.append(world.a2f[manager.id].initial_balance)
normalize = all(_ != 0 for _ in initial_balances)
for manager in world.factory_managers:
if "_df_" in manager.name and ignore_default:
continue
factory = world.a2f[manager.id]
result.names.append(manager.name)
result.ids.append(manager.id)
result.types.append(manager.type_name)
if dry_run:
result.scores.append(None)
continue
if normalize:
result.scores.append(
(factory.total_balance - factory.initial_balance)
/ factory.initial_balance
)
else:
result.scores.append(factory.total_balance - factory.initial_balance)
return result
def sabotage_effectiveness(
worlds: List[SCML2019World], scoring_context: Dict[str, Any], dry_run: bool
) -> WorldRunResults:
"""A scoring function that scores factory managers' performance by the final balance only ignoring whatever still
in their inventory.
Args:
worlds: The world which is assumed to be run up to the point at which the scores are to be calculated.
scoring_context: A dict of context parameters passed by the world generator or assigner.
dry_run: A boolean specifying whether this is a dry_run. For dry runs, only names and types are expected in
the returned `WorldRunResults`
Returns:
WorldRunResults giving the names, scores, and types of factory managers.
"""
assert len(worlds) == 2
type_scored = scoring_context.get("competitor", None)
if type_scored is None:
raise ValueError("Cannot determine which is the sabotaging agent")
if dry_run:
results = WorldRunResults(world_names=[""], log_file_names=[""])
results.names = [""]
results.ids = [""]
results.types = [type_scored]
results.scores = [None]
return results
results = [
balance_calculator([_], {}, dry_run=False, ignore_default=False) for _ in worlds
]
normal_scores, sabotaged_scores = [], []
sabotaged_indices, normal_indices = [], []
for i in range(len(worlds)):
if type_scored in results[i].types:
sabotaged_indices.append(int(i))
else:
normal_indices.append(int(i))
if len(sabotaged_indices) < 1:
raise ValueError(
f"The sabotaging agent type {type_scored} did not participate in any worlds"
)
if len(normal_indices) < 1:
raise ValueError(
f"The sabotaging agent type {type_scored} participated in ALL worlds"
)
for indx in sabotaged_indices:
sabotaged_scores += [
score
for score, type_ in zip(results[indx].scores, results[indx].types)
if type_ != type_scored
]
for indx in normal_indices:
normal_scores += [
score
for score, type_ in zip(results[indx].scores, results[indx].types)
if type_ != type_scored
]
normal_score = sum(normal_scores) / len(normal_scores)
sabotaged_score = sum(sabotaged_scores) / len(sabotaged_scores)
result = WorldRunResults(
world_names=[_.name for _ in worlds],
log_file_names=[_.log_file_name for _ in worlds],
)
result.names = [""]
result.ids = [""]
result.scores = [(normal_score - sabotaged_score) / (normal_score + 1.0)]
result.types = [type_scored]
return result
[docs]
def anac2019_tournament(
competitors: Sequence[Union[str, Type[FactoryManager]]],
agent_names_reveal_type=False,
n_configs: int = 5,
max_worlds_per_config: int = 1000,
n_runs_per_world: int = 5,
n_agents_per_competitor: int = 5,
tournament_path: str = None,
total_timeout: Optional[int] = None,
parallelism="parallel",
scheduler_ip: Optional[str] = None,
scheduler_port: Optional[str] = None,
tournament_progress_callback: Callable[[Optional[WorldRunResults]], None] = None,
world_progress_callback: Callable[[Optional[SCML2019World]], None] = None,
name: str = None,
verbose: bool = False,
configs_only=False,
compact=False,
**kwargs,
) -> Union[TournamentResults, PathLike]:
"""
The function used to run ANAC 2019 SCML tournament (collusion track).
Args:
name: Tournament name
competitors: A list of class names for the competitors
agent_names_reveal_type: If true then the type of an agent should be readable in its name (most likely at its
beginning).
n_configs: The number of different world configs (up to competitor assignment) to be generated.
max_worlds_per_config: The maximum number of worlds to run per config. If None, then all possible assignments
of competitors within each config will be tried (all permutations).
n_runs_per_world: Number of runs per world. All of these world runs will have identical competitor assignment
and identical world configuration.
n_agents_per_competitor: Number of agents per competitor
total_timeout: Total timeout for the complete process
tournament_path: Path at which to store all results. A scores.csv file will keep the scores and logs folder will
keep detailed logs
parallelism: Type of parallelism. Can be 'serial' for serial, 'parallel' for parallel and 'distributed' for distributed
scheduler_port: Port of the dask scheduler if parallelism is dask, dist, or distributed
scheduler_ip: IP Address of the dask scheduler if parallelism is dask, dist, or distributed
world_progress_callback: A function to be called after everystep of every world run (only allowed for serial
evaluation and should be used with cautious).
tournament_progress_callback: A function to be called with `WorldRunResults` after each world finished
processing
verbose: Verbosity
configs_only: If true, a config file for each
compact: If true, effort will be made to reduce memory footprint including disableing most logs
kwargs: Arguments to pass to the `world_generator` function
Returns:
`TournamentResults` The results of the tournament or a `PathLike` giving the location where configs were saved
Remarks:
Default parameters will be used in the league with the exception of `parallelism` which may use distributed
processing
"""
return anac2019_collusion(
competitors=competitors,
agent_names_reveal_type=agent_names_reveal_type,
n_configs=n_configs,
max_worlds_per_config=max_worlds_per_config,
n_runs_per_world=n_runs_per_world,
n_agents_per_competitor=n_agents_per_competitor,
tournament_path=tournament_path,
total_timeout=total_timeout,
parallelism=parallelism,
scheduler_ip=scheduler_ip,
scheduler_port=scheduler_port,
tournament_progress_callback=tournament_progress_callback,
world_progress_callback=world_progress_callback,
name=name,
verbose=verbose,
compact=compact,
configs_only=configs_only,
non_competitors=None,
non_competitor_params=None,
**kwargs,
)
[docs]
def anac2019_std(
competitors: Sequence[Union[str, Type[FactoryManager]]],
competitor_params: Optional[Sequence[Dict[str, Any]]] = None,
agent_names_reveal_type=False,
n_configs: int = 5,
max_worlds_per_config: Optional[int] = 1000,
n_runs_per_world: int = 5,
min_factories_per_level: int = 5,
tournament_path: str = None,
total_timeout: Optional[int] = None,
parallelism="parallel",
scheduler_ip: Optional[str] = None,
scheduler_port: Optional[str] = None,
tournament_progress_callback: Callable[[Optional[WorldRunResults]], None] = None,
world_progress_callback: Callable[[Optional[SCML2019World]], None] = None,
non_competitors: Optional[Sequence[Union[str, Type[FactoryManager]]]] = None,
non_competitor_params: Optional[Sequence[Union[str, Type[FactoryManager]]]] = None,
name: str = None,
verbose: bool = False,
configs_only=False,
compact=False,
**kwargs,
) -> Union[TournamentResults, PathLike]:
"""
The function used to run ANAC 2019 SCML tournament (standard track).
Args:
name: Tournament name
competitors: A list of class names for the competitors
competitor_params: A list of competitor parameters (used to initialize the competitors).
agent_names_reveal_type: If true then the type of an agent should be readable in its name (most likely at its
beginning).
n_configs: The number of different world configs (up to competitor assignment) to be generated.
max_worlds_per_config: The maximum number of worlds to run per config. If None, then all possible assignments
of competitors within each config will be tried (all permutations).
n_runs_per_world: Number of runs per world. All of these world runs will have identical competitor assignment
and identical world configuration.
min_factories_per_level: Minimum number of factories for each production level
total_timeout: Total timeout for the complete process
tournament_path: Path at which to store all results. A scores.csv file will keep the scores and logs folder will
keep detailed logs
parallelism: Type of parallelism. Can be 'serial' for serial, 'parallel' for parallel and 'distributed' for
distributed
scheduler_port: Port of the dask scheduler if parallelism is dask, dist, or distributed
scheduler_ip: IP Address of the dask scheduler if parallelism is dask, dist, or distributed
world_progress_callback: A function to be called after everystep of every world run (only allowed for serial
evaluation and should be used with cautious).
tournament_progress_callback: A function to be called with `WorldRunResults` after each world finished
processing
non_competitors: A list of agent types that will not be competing in the sabotage competition but will exist
in the world
non_competitor_params: parameters of non competitor agents
verbose: Verbosity
configs_only: If true, a config file for each
compact: If true, compact logs will be created and effort will be made to reduce the memory footprint
kwargs: Arguments to pass to the `world_generator` function
Returns:
`TournamentResults` The results of the tournament or a `PathLike` giving the location where configs were saved
Remarks:
Default parameters will be used in the league with the exception of `parallelism` which may use distributed
processing
"""
if non_competitors is None:
non_competitors = (DefaultGreedyManager,)
non_competitor_params = ({},)
return tournament(
competitors=competitors,
competitor_params=competitor_params,
non_competitors=non_competitors,
non_competitor_params=non_competitor_params,
agent_names_reveal_type=agent_names_reveal_type,
n_configs=n_configs,
n_runs_per_world=n_runs_per_world,
max_worlds_per_config=max_worlds_per_config,
tournament_path=tournament_path,
total_timeout=total_timeout,
parallelism=parallelism,
scheduler_ip=scheduler_ip,
scheduler_port=scheduler_port,
tournament_progress_callback=tournament_progress_callback,
world_progress_callback=world_progress_callback,
name=name,
verbose=verbose,
configs_only=configs_only,
n_agents_per_competitor=1,
world_generator=anac2019_world_generator,
config_generator=anac2019_config_generator,
config_assigner=anac2019_assigner,
score_calculator=balance_calculator,
min_factories_per_level=min_factories_per_level,
compact=compact,
**kwargs,
)
[docs]
def anac2019_collusion(
competitors: Sequence[Union[str, Type[FactoryManager]]],
competitor_params: Optional[Sequence[Dict[str, Any]]] = None,
agent_names_reveal_type=False,
n_configs: int = 5,
max_worlds_per_config: Optional[int] = 1000,
n_runs_per_world: int = 5,
n_agents_per_competitor: int = 5,
min_factories_per_level: int = 5,
tournament_path: str = None,
total_timeout: Optional[int] = None,
parallelism="parallel",
scheduler_ip: Optional[str] = None,
scheduler_port: Optional[str] = None,
tournament_progress_callback: Callable[[Optional[WorldRunResults]], None]
| None = None,
world_progress_callback: Callable[[Optional[SCML2019World]], None] | None = None,
non_competitors: Optional[Sequence[Union[str, Type[FactoryManager]]]] = None,
non_competitor_params: Optional[Sequence[Dict[str, Any]]] = None,
name: str | None = None,
verbose: bool = False,
configs_only=False,
compact=False,
**kwargs,
) -> Union[TournamentResults, PathLike]:
"""
The function used to run ANAC 2019 SCML tournament (collusion track).
Args:
name: Tournament name
competitors: A list of class names for the competitors
competitor_params: A list of competitor parameters (used to initialize the competitors).
agent_names_reveal_type: If true then the type of an agent should be readable in its name (most likely at its
beginning).
n_configs: The number of different world configs (up to competitor assignment) to be generated.
max_worlds_per_config: The maximum number of worlds to run per config. If None, then all possible assignments
of competitors within each config will be tried (all permutations).
n_runs_per_world: Number of runs per world. All of these world runs will have identical competitor assignment
and identical world configuration.
n_agents_per_competitor: Number of agents per competitor
min_factories_per_level: Minimum number of factories for each production level
total_timeout: Total timeout for the complete process
tournament_path: Path at which to store all results. A scores.csv file will keep the scores and logs folder will
keep detailed logs
parallelism: Type of parallelism. Can be 'serial' for serial, 'parallel' for parallel and 'distributed' for
distributed
scheduler_port: Port of the dask scheduler if parallelism is dask, dist, or distributed
scheduler_ip: IP Address of the dask scheduler if parallelism is dask, dist, or distributed
world_progress_callback: A function to be called after everystep of every world run (only allowed for serial
evaluation and should be used with cautious).
tournament_progress_callback: A function to be called with `WorldRunResults` after each world finished
processing
non_competitors: A list of agent types that will not be competing in the sabotage competition but will exist
in the world
non_competitor_params: parameters of non competitor agents
verbose: Verbosity
configs_only: If true, a config file for each
compact: If true, compact logs will be created and effort will be made to reduce the memory footprint
kwargs: Arguments to pass to the `world_generator` function
Returns:
`TournamentResults` The results of the tournament or a `PathLike` giving the location where configs were saved
Remarks:
Default parameters will be used in the league with the exception of `parallelism` which may use distributed
processing
"""
if non_competitors is None:
non_competitors = (DefaultGreedyManager,)
non_competitor_params = ({},)
return tournament(
competitors=tuple(competitors),
competitor_params=competitor_params,
non_competitors=non_competitors,
non_competitor_params=non_competitor_params,
agent_names_reveal_type=agent_names_reveal_type,
n_configs=n_configs,
n_runs_per_world=n_runs_per_world,
max_worlds_per_config=max_worlds_per_config,
tournament_path=tournament_path,
total_timeout=total_timeout,
n_agents_per_competitor=n_agents_per_competitor,
parallelism=parallelism,
scheduler_ip=scheduler_ip,
scheduler_port=scheduler_port,
tournament_progress_callback=tournament_progress_callback,
world_progress_callback=world_progress_callback,
name=name,
verbose=verbose,
configs_only=configs_only,
world_generator=anac2019_world_generator,
config_generator=anac2019_config_generator,
config_assigner=anac2019_assigner,
score_calculator=balance_calculator,
min_factories_per_level=min_factories_per_level,
compact=compact,
**kwargs,
)
[docs]
def anac2019_sabotage(
competitors: Sequence[Union[str, Type[FactoryManager]]],
competitor_params: Optional[Sequence[Dict[str, Any]]] = None,
agent_names_reveal_type=False,
n_configs: int = 5,
max_worlds_per_config: Optional[int] = 1000,
n_runs_per_world: int = 5,
n_agents_per_competitor: int = 5,
min_factories_per_level: int = 5,
tournament_path: str | Path | None = None,
total_timeout: Optional[int] = None,
parallelism="parallel",
scheduler_ip: Optional[str] = None,
scheduler_port: Optional[str] = None,
tournament_progress_callback: Callable[[Optional[WorldRunResults]], None] = None,
world_progress_callback: Callable[[Optional[SCML2019World]], None] = None,
non_competitors: Optional[Sequence[Union[str, Type[FactoryManager]]]] = None,
non_competitor_params: Optional[Sequence[Union[str, Type[FactoryManager]]]] = None,
name: str = None,
verbose: bool = False,
configs_only=False,
compact=False,
**kwargs,
) -> Union[TournamentResults, PathLike]:
"""
The function used to run ANAC 2019 SCML tournament (collusion track).
Args:
name: Tournament name
competitors: A list of class names for the competitors
competitor_params: A list of competitor parameters (used to initialize the competitors).
agent_names_reveal_type: If true then the type of an agent should be readable in its name (most likely at its
beginning).
n_configs: The number of different world configs (up to competitor assignment) to be generated.
max_worlds_per_config: The maximum number of worlds to run per config. If None, then all possible assignments
of competitors within each config will be tried (all permutations).
n_runs_per_world: Number of runs per world. All of these world runs will have identical competitor assignment
and identical world configuration.
n_agents_per_competitor: Number of agents per competitor
min_factories_per_level: Minimum number of factories for each production level
total_timeout: Total timeout for the complete process
tournament_path: Path at which to store all results. A scores.csv file will keep the scores and logs folder will
keep detailed logs
parallelism: Type of parallelism. Can be 'serial' for serial, 'parallel' for parallel and 'distributed' for
distributed
scheduler_port: Port of the dask scheduler if parallelism is dask, dist, or distributed
scheduler_ip: IP Address of the dask scheduler if parallelism is dask, dist, or distributed
world_progress_callback: A function to be called after every step of every world run (only allowed for serial
evaluation and should be used with cautious).
tournament_progress_callback: A function to be called with `WorldRunResults` after each world finished
processing
non_competitors: A list of agent types that will not be competing in the sabotage competition but will exist
in the world
non_competitor_params: parameters of non competitor agents
verbose: Verbosity
configs_only: If true, a config file for each
compact: If true, compact logs will be created and effort will be made to reduce the memory footprint
kwargs: Arguments to pass to the `world_generator` function
Returns:
`TournamentResults` The results of the tournament or a `PathLike` giving the location where configs were saved
Remarks:
Default parameters will be used in the league with the exception of `parallelism` which may use distributed
processing
"""
if non_competitors is None:
non_competitors = (DefaultGreedyManager,)
non_competitor_params = ({},)
return tournament(
competitors=competitors,
competitor_params=competitor_params,
agent_names_reveal_type=agent_names_reveal_type,
non_competitors=non_competitors,
non_competitor_params=non_competitor_params,
n_configs=n_configs,
n_runs_per_world=n_runs_per_world,
max_worlds_per_config=max_worlds_per_config,
tournament_path=tournament_path,
total_timeout=total_timeout,
n_agents_per_competitor=n_agents_per_competitor,
parallelism=parallelism,
scheduler_ip=scheduler_ip,
scheduler_port=scheduler_port,
tournament_progress_callback=tournament_progress_callback,
world_progress_callback=world_progress_callback,
name=name,
verbose=verbose,
configs_only=configs_only,
world_generator=anac2019_world_generator,
config_generator=anac2019_sabotage_config_generator,
config_assigner=anac2019_sabotage_assigner,
score_calculator=sabotage_effectiveness,
compact=compact,
min_factories_per_level=min_factories_per_level,
**kwargs,
)