from __future__ import annotations
from attr import define
import random
import numbers
from collections import defaultdict
from scml.oneshot.agent import OneShotAgent
import pandas as pd
import numpy as np
from typing import Any, Callable, Iterable
from scml.oneshot.context import BaseContext, RepeatingContext
from scml.oneshot.sysagents import DefaultOneShotAdapter
from scml.oneshot.world import SCMLBaseWorld
from scml.std.agent import StdAgent
from scml.std.world import StdWorld
from negmas.helpers import get_full_type_name
from negmas.negotiators.modular import itertools
from negmas.helpers.numeric import truncated_mean
from copy import deepcopy
__all__ = ["WorldRunner", "mean", "median", "truncated_mean"]
AgentType = type[OneShotAgent] | type[StdAgent]
@define
class WorldInfo:
"""Information about the worlds generated by the runner"""
world: SCMLBaseWorld
indices: list[int]
type_name: str
trial: int
config_name: str
agents: list[DefaultOneShotAdapter]
def done(self):
return self.world is not None and self.world.current_step >= self.world.n_steps
[docs]
def mean(lst: list[float]) -> float:
"""Calculates the mean"""
if not lst:
return float("nan")
return sum(lst) / len(lst)
def is_agent_stat(s: str, ids: list[str]):
return any(s.endswith(f"_{_}") for _ in ids)
def is_common_stat(s: str, ids: list[str]):
return not any(s.endswith(f"_{_}") for _ in ids)
def remove_agent_id(s: str, ids: list[str]) -> str:
for id in ids:
s = s.replace(f"_{id}", "")
return s
[docs]
class WorldRunner:
"""A utility class to run controlled simulations for different agent types.
Args:
generator: The `Context` used to generate world configurations
n_configs: The number of configurations
n_repetitions: Number of repetitions of each configuration
one_offer_per_step: Whether one negotiation step is one offer. Pass as `True` when using RL
save_worlds: If given worlds will be saved internally. You need this to use worlds_of and the like
save_common_stats: Save general agent-independent statistics over time
save_agent_stats: Save agent statistics over time
combiner: How to combine scores and statistics if the number of agents of the tested type in a world is more than one
control_all_agents: If given, the agent type(s) passed to __call__ will control the whole market.
shorten_names: If given, shorter versions of type names will be used when compiling type-names
progress: If given, a progress bar will be displayed for each world being run
"""
def __init__(
self,
generator: BaseContext,
n_configs: int = 10,
n_repetitions: int = 1,
save_worlds: bool = True,
save_common_stats: bool = True,
save_agent_stats: bool = True,
combiner: Callable[[list[float]], float] = np.mean,
control_all_agents: bool = False,
shorten_names: bool = True,
progress: bool = False,
) -> None:
self.n_configs = n_configs
self.n_repetitions = n_repetitions
self.generator = generator
self.configs = [generator.make_config() for _ in range(n_configs)]
self.worlds: dict[tuple[str, str], list[WorldInfo]] = defaultdict(list)
self.save_common_stats = save_common_stats
self.save_agent_stats = save_agent_stats
self.save_worlds = save_worlds
self.control_all_agents = control_all_agents
self._stats = pd.DataFrame()
self._scores = pd.DataFrame(columns=["world", "type", "trial", "score"])
self._combiner = combiner
def _name():
return generator.name if generator.name else "c"
self.config_names = [f"{_name()}{i}" for i in range(n_configs)]
self.shorten_names = shorten_names
self.progress = progress
self.type_names: set[str] = set()
@classmethod
[docs]
def from_runner(cls, src: WorldRunner, **kwargs) -> WorldRunner:
"""
Creates a `WorldRunner` from another one.
Remarks:
- It simply copies the configurations created by the source runner.
- You can override any parameters by passing them as keyword arguments
"""
kwargs = (
dict(
n_repetitions=src.n_repetitions,
save_common_stats=src.save_common_stats,
save_agent_stats=src.save_agent_stats,
combiner=src._combiner,
control_all_agents=src.control_all_agents,
shorten_names=src.shorten_names,
progress=src.progress,
)
| kwargs
)
runner = WorldRunner(src.generator, len(src.configs), **kwargs) # type: ignore
runner.configs = src.configs
return runner
@classmethod
[docs]
def from_configs(
cls,
world_type: type[SCMLBaseWorld],
configs: tuple[dict[str, Any], ...],
name: str | None = None,
**kwargs,
) -> WorldRunner:
"""
Creates a `WorldRunner` from a collection of configs.
Args:
world_type: The world type to use.
configs: The collection of configs to use.
Remarks:
- Uses a `RepeatingContext` internally.
- You can override any parameters of the new runner by passing them as keyword arguments
"""
context = RepeatingContext(world_type=world_type, configs=configs, name=name)
runner = WorldRunner(context, len(configs), **kwargs)
runner.configs = configs
context.configs = configs
return runner
@property
[docs]
def existing_config_names(self) -> list[str]:
"""Existing configuration names."""
if len(self._scores) < 1:
return []
return self._scores["config"].unique().tolist()
@property
[docs]
def existing_type_names(self) -> list[str]:
"""Existing type names tested so far."""
if len(self._scores) < 1:
return []
return self._scores["type"].unique().tolist()
[docs]
def get_type_name(self, type_: type[AgentType] | str) -> str:
"""Used to get a name for the given type."""
n = get_full_type_name(type_)
if not self.shorten_names:
return n
existing = self.scores["type"].unique()
for i in range(1, 100):
n = ".".join(n.split(".")[-i:])
if n not in existing:
break
return n
[docs]
def add(
self,
types: tuple[AgentType, ...] | list[AgentType] | AgentType,
params: list[dict[str, Any]] | dict[str, Any] | None = None,
name: str | None = None,
) -> list[WorldInfo]:
"""Used to add an agent type collection to the pool of tested agent types. Does not run the worlds
Args:
types: In general, you can pass a collection of types here. Note that this collection
will be used in every simulation. This means that the placeholder_types in the
context used to create the runner must accept enough such agents. The usual use-case
is that the placeholder_types in the context contain a single type and in this case,
you can either pass a singelton collection of a single type or just pass the type directly.
params: Parameters accepted by the constructor of the types passed.
name: The name to call this type(s).
Returns:
A list of pairs each containing a world and a list of agents representing the worlds
that were run and the agents corresponding to the passed type(s).
"""
worlds: list[WorldInfo] = []
if not isinstance(types, Iterable):
types = tuple([types] * len(self.generator.placeholder_types))
assert len(types) == len(self.generator.placeholder_types), (
f"Cannot pass {len(types)} types to a generator with "
f"{len(self.generator.placeholder_types)} placeholders"
)
if params is None:
params = dict()
if isinstance(params, dict):
params = [deepcopy(params) for _ in range(len(types))]
if not name:
name = ";".join(tuple(self.get_type_name(_) for _ in types))
self.type_names.add(name)
types = tuple(types)
control_all_agents = self.control_all_agents
for base_config, cid in zip(self.configs, self.config_names):
config_worlds: list[WorldInfo] = []
indices = (
[
i
for i, p in enumerate(base_config["agent_params"])
if p["controller_type"] in (self.generator.placeholder_types)
]
if not control_all_agents
else list(range(len(base_config["agent_params"])))
)
if control_all_agents:
existing_types = [
p["controller_type"] for p in base_config["agent_params"]
]
else:
existing_types = self.generator.placeholder_types
new_types = tuple(
[types[i % len(types)] for i in range(len(existing_types))]
)
new_params = tuple(
[params[i % len(types)] for i in range(len(existing_types))]
)
config = self.generator.world_type.replace_agents(
base_config,
existing_types,
new_types,
new_params,
)
for trial in range(self.n_repetitions):
config["name"] = f"{cid}_{name}_{trial}"
world = self.generator.world_type(
**(self.generator.world_params | config),
)
ids = [world.non_system_agent_ids[i] for i in indices]
agents = [world.agents[i] for i in ids]
assert all(
isinstance(a._obj, t) for a, t in zip(agents, types)
), f"Found {[type(_._ob) for _ in agents]} but expected {types} at {indices=} with {ids=}"
config_worlds.append(
WorldInfo(world, indices, name, trial, cid, agents)
)
worlds += config_worlds
if self.save_worlds:
self.worlds[(name, cid)] = config_worlds
return worlds
[docs]
def runall(self, progress: bool = False) -> None:
"""Runs all worlds"""
return self._runall(self.all_world_infos, progress)
[docs]
def _runall(self, world_infos: list[WorldInfo], progress: bool = False) -> None:
"""Runs all worlds"""
if not progress:
def track(x, **kwargs):
_ = kwargs
return x
else:
from rich.progress import track # type: ignore
for w in track(world_infos, total=len(world_infos), description="Running ... "):
self.process(w)
[docs]
def __call__(
self,
types: tuple[AgentType, ...] | list[AgentType] | AgentType,
params: list[dict[str, Any]] | dict[str, Any] | None = None,
name: str | None = None,
progress: bool = False,
) -> list[WorldInfo]:
"""Used to add an agent type collection to the pool of tested agent types.
Args:
types: In general, you can pass a collection of types here. Note that this collection
will be used in every simulation. This means that the placeholder_types in the
context used to create the runner must accept enough such agents. The usual use-case
is that the placeholder_types in the context contain a single type and in this case,
you can either pass a singelton collection of a single type or just pass the type directly.
params: Parameters accepted by the constructor of the types passed.
name: The name to call this type(s).
Returns:
A list of pairs each containing a world and a list of agents representing the worlds
that were run and the agents corresponding to the passed type(s).
"""
world_infos = self.add(types, params, name=name)
self._runall(world_infos, progress)
return world_infos
[docs]
def _types_and_configs(
self, type: AgentType | str | None = None, config: str | None = None
) -> tuple[list[str], list[str]]:
if config is None:
configs = self.config_names
else:
configs = [config]
types = (
[self.get_type_name(type)] if type is not None else self.existing_type_names
)
return types, configs
[docs]
def world_infos_of(
self, type: AgentType | str | None = None, config: str | None = None
) -> list[WorldInfo]:
"""
Returns the information of all worlds of the given type and config.
Remarks:
Note that you can also pass the name used when running the WorldRunner with the
given type.
"""
types, configs = self._types_and_configs(type, config)
return list(
itertools.chain(
*(
[_ for _ in self.worlds.get((t, c), [])]
for t, c in itertools.product(types, configs)
)
)
)
[docs]
def worlds_of(
self, type: AgentType | str | None = None, config: str | None = None
) -> list[SCMLBaseWorld]:
"""
Returns the worlds of the given type and config.
Remarks:
Note that you can also pass the name used when running the WorldRunner with the
given type.
"""
types, configs = self._types_and_configs(type, config)
return list(
itertools.chain(
*(
[
_.world
for _ in self.worlds.get((t, c), [])
if _.world is not None
]
for t, c in itertools.product(types, configs)
)
)
)
[docs]
def agents_per_world_of(
self, type: AgentType | str | None = None, config: str | None = None
) -> dict[str, list[AgentType]]:
"""
Returns the agents representing the type for each world of the given type and config.
Remarks:
Note that you can also pass the name used when running the WorldRunner with the
given type.
"""
types, configs = self._types_and_configs(type, config)
return dict( # type: ignore
map(
dict.popitem,
[
{
info.world.id: info.agents
for info in self.worlds.get((t, c), [])
if info and info.world is not None
}
for t, c in itertools.product(types, configs)
],
)
)
[docs]
def agents_of(
self, type: AgentType | str | None = None, config: str | None = None
) -> list[AgentType]:
"""
Returns the agents representing the type and config.
Remarks:
Note that you can also pass the name used when running the WorldRunner with the
given type.
"""
results = []
types, configs = self._types_and_configs(type, config)
for x in itertools.product(types, configs):
for info in self.worlds[x]:
results += info.agents
return results
@property
[docs]
def stats(self) -> pd.DataFrame:
"""The statistics saved (must be constructed with `save_stats`)"""
return self._stats
@property
[docs]
def all_world_infos(self) -> list[WorldInfo]:
"""Information of all worlds simulated (including the agents evaluated for each world)"""
return list(itertools.chain(*(self.worlds.values())))
@property
[docs]
def all_worlds(self) -> list[SCMLBaseWorld]:
"""Information of all worlds simulated (including the agents evaluated for each world)"""
return [_.world for _ in (itertools.chain(*(self.worlds.values())))]
@property
[docs]
def scores(self) -> pd.DataFrame:
"""
The scores of all evaluated agents in all evaluated worlds
Remarks:
Other than the score, the returned data-frame will contain information
about the production level, number of suppliers, number of consumers,
number of competitors, etc about each agent.
"""
return self._scores
[docs]
def score_summary(
self,
percentiles=None,
include=None,
exclude=None,
by: str | tuple[str, ...] = "type",
order_by: str | None = "score",
ascending: bool = False,
) -> pd.DataFrame:
"""A summary of comparative scores of all agent types tested so far.
Args:
percentiles: passed to `groupby`
include: passed to `groupby`
exclud: passed to `groupby`
by: passed to `groupby`
order_by: The method for sorting resulting scores.
Possibilities: score, mean, min, max, 20%, 50%, 75%, median
ascending: Ascending or descending scores.
Returns:
A dataframe that describes the scores of all evaluated types.
"""
if order_by and order_by == "median":
order_by = "50%"
df1 = self._scores.groupby(by)["score"].apply(truncated_mean)
df2 = self._scores.groupby(by)["score"].describe(
percentiles=percentiles, include=include, exclude=exclude
)
df = pd.concat((df1, df2), axis=1, ignore_index=False)
df = df.reset_index()
if order_by:
df = df.sort_values(order_by, ascending=ascending)
return df
[docs]
def plot_stats(
self,
stats: tuple[str, ...] | str | None = None,
by: tuple[str, ...] | str = "type",
agg: bool = True,
legend: bool = True,
ylegend: float = 1.8,
legend_ncols=3,
title: bool = True,
order_by: str | None = "score",
ascending: bool = False,
**kwargs,
):
"""Plots saves statistics (`save_stats` must be given)
Args:
stats: The stats to be displayed. If not given, a default set will be chosen. Any statistic
saved by the `World` or `SCMLBaseWorld` can be used. There are over 42 such statistics.
by: Group the data by the given attribute (used as hue if agg is False and as x if agg is True).
agg: Whether to aggregate over simulation steps or not.
legend:whether or not to show a legend (only used if agg is False)
legend_ncols: How many columns to use in the legend. Pass zero to disable the legend
ylegend: The y-coordinate of the legend to control where it appears.
title: Show stat names as title (instead of ylabel)
order_by: The statistic to order with.
Possibilities are score, mean, 50%, max, min, 20%, 75%, median
ascending: If true, order ascendingly
**kwargs: Any extra paramters to pass to the underlying seaborn method
(lineplot in case agg=False and boxplot in case agg=True)
Returns:
The figure and axes used.
"""
if order_by and order_by == "median":
order_by = "50%"
import matplotlib.pyplot as plt
import seaborn as sns
if legend_ncols < 1:
legend = False
if stats is None:
s_ = []
if self.save_agent_stats:
s_ += [
"shortfall_penalty",
"score",
"storage_cost"
if issubclass(self.generator.world_type, StdWorld)
else "disposal_cost",
"productivity",
]
if issubclass(self.generator.world_type, StdWorld):
s_ += ["inventory_penalized", "inventory_input"]
stats = tuple(s_)
elif isinstance(stats, str):
stats = (stats,)
# if self.save_common_stats:
# stats += ["sold_quantity", "trading_price"]
if not stats:
return
# print(self.score_summary(by=by))
# print(self.score_summary(by=by).sort_values(order_by)[by])
order = (
self.score_summary(by=by)
.sort_values(order_by, ascending=ascending)[by]
.tolist()
if order_by
else None
)
if len(stats) == 1:
ncols = nrows = 1
else:
ncols = 2
nrows = len(stats) // ncols
if nrows * ncols < len(stats):
nrows += 1
fig, axs = plt.subplots(nrows, ncols, sharex=True, squeeze=True)
if nrows == ncols == 1:
axs = (axs,)
else:
axs = axs.flatten()
for i, (ax, stat) in enumerate(zip(axs, stats)):
show_legend = not agg and legend and (i == 0)
if agg:
df = self.stats
if stat == "score":
df = self.scores
g = sns.boxplot(
data=df,
x=by,
y=stat,
ax=ax,
order=order,
**kwargs, # type: ignore
)
g.tick_params(axis="x", rotation=90)
g.set(xlabel=None)
else:
kwargs = (
dict(
err_style="bars",
errorbar=("se", 1),
)
| kwargs
)
g = sns.lineplot(
data=self.stats,
hue=by,
x="step",
y=stat,
ax=ax,
legend=show_legend,
hue_order=order,
**kwargs, # type: ignore
)
if title:
g.set(title=stat, ylabel=None)
if not show_legend:
continue
sns.move_legend(
g,
loc="upper left",
bbox_to_anchor=(-0.02, ylegend),
ncol=legend_ncols,
fancybox=True,
shadow=True,
)
# handles, labels = g.get_legend_handles_labels()
# g.legend(handles=handles[1:], labels=labels[1:])
return fig, axs
[docs]
def draw_worlds_of(
self,
type: AgentType | str | None,
config: str | None = None,
what=("contracts-signed",),
n: int | None = 4,
randomize: bool = False,
):
"""Draws the given set of worlds
Args:
type: The type to filter by
config: The config to filter by
what: what stat to draw. See `negmas.situated.World.draw_world` for all options available
n: Number of worlds to draw. If None, all of them will be drawn
randomize: If given the worlds will be shuffled before display but only if not all worlds will be displayed
Returns:
figure and axes used.
"""
import matplotlib.pyplot as plt
import math
worlds = self.worlds_of(type, config)
if self.n_repetitions > 1 and n is not None and not randomize:
worlds = worlds[:: self.n_repetitions]
if randomize and n is not None and len(worlds) > n:
random.shuffle(worlds)
if n is not None and len(worlds) > n:
worlds = worlds[:n]
n_trials = len(worlds)
mx = min(n_trials, 2)
fig = plt.figure(figsize=(11 * mx, 8))
axs = fig.subplots(int(math.ceil(n_trials / mx)), mx)
if isinstance(axs, Iterable):
axs = axs.flatten()
for ax, world in zip(axs if n_trials > 1 else [axs], worlds):
world.draw(
what=what,
steps=(0, world.n_steps - 1),
together=True,
ncols=1,
axs=ax,
)
return fig, axs
[docs]
def process(self: WorldRunner, info: WorldInfo):
world = info.world
name = info.type_name
trial = info.trial
cid = info.config_name
agents = info.agents
ids = [a.id for a in agents]
all_ids = list(world.agents.keys())
world.run()
wscores = world.scores()
scores = [wscores[id] for id in ids]
df = pd.DataFrame.from_records(
[
dict(
config=cid,
world=world.id,
type=name,
trial=trial,
score=score,
agent=aid,
level=agent.awi.profile.level,
n_suppliers=len(agent.awi.my_suppliers),
n_consumers=len(agent.awi.my_consumers),
n_competitors=len(
agent.awi.all_consumers[agent.awi.my_input_product]
),
)
for aid, score, agent in zip(ids, scores, agents)
]
)
self._scores = pd.concat((self._scores, df))
stat_names = [
k
for k in world._stats.keys()
if (is_agent_stat(k, ids) and self.save_agent_stats)
or (self.save_common_stats and is_common_stat(k, all_ids))
]
if not stat_names:
return
df = world.stats_df[stat_names]
df.columns = [remove_agent_id(_, ids) for _ in df.columns]
df = df.groupby(by=df.columns, axis=1).apply(
lambda g: g.mean(axis=1)
if isinstance(g.iloc[0, 0], numbers.Number)
else g.iloc[:, 0]
)
df["config"] = cid
df["world"] = world.id
df["type"] = name
df["trial"] = trial
df["step"] = list(range(len(df)))
df = df.reset_index(drop=True)
self._stats = self._stats.reset_index(drop=True)
self._stats = pd.concat((self._stats, df), axis="index", ignore_index=True)