Source code for scml.runner

from __future__ import annotations
from attr import define
import random
import numbers
from collections import defaultdict
from scml.oneshot.agent import OneShotAgent
import pandas as pd
import numpy as np
from typing import Any, Callable, Iterable

from scml.oneshot.context import BaseContext, RepeatingContext
from scml.oneshot.sysagents import DefaultOneShotAdapter
from scml.oneshot.world import SCMLBaseWorld
from scml.std.agent import StdAgent
from scml.std.world import StdWorld

from negmas.helpers import get_full_type_name
from negmas.negotiators.modular import itertools
from negmas.helpers.numeric import truncated_mean
from copy import deepcopy

__all__ = ["WorldRunner", "mean", "median", "truncated_mean"]

AgentType = type[OneShotAgent] | type[StdAgent]


@define
class WorldInfo:
    """Information about the worlds generated by the runner"""

    world: SCMLBaseWorld
    indices: list[int]
    type_name: str
    trial: int
    config_name: str
    agents: list[DefaultOneShotAdapter]

    def done(self):
        return self.world is not None and self.world.current_step >= self.world.n_steps


[docs] def mean(lst: list[float]) -> float: """Calculates the mean""" if not lst: return float("nan") return sum(lst) / len(lst)
[docs] def median(lst: list[float]) -> float: """Calculates the median""" if not lst: return float("nan") i1 = len(lst) // 2 i2 = (i1 + 1) if len(lst) % 2 == 1 else i1 lst = sorted(lst) return (lst[i1] + lst[i2]) / 2.0
def is_agent_stat(s: str, ids: list[str]): return any(s.endswith(f"_{_}") for _ in ids) def is_common_stat(s: str, ids: list[str]): return not any(s.endswith(f"_{_}") for _ in ids) def remove_agent_id(s: str, ids: list[str]) -> str: for id in ids: s = s.replace(f"_{id}", "") return s
[docs] class WorldRunner: """A utility class to run controlled simulations for different agent types. Args: generator: The `Context` used to generate world configurations n_configs: The number of configurations n_repetitions: Number of repetitions of each configuration one_offer_per_step: Whether one negotiation step is one offer. Pass as `True` when using RL save_worlds: If given worlds will be saved internally. You need this to use worlds_of and the like save_common_stats: Save general agent-independent statistics over time save_agent_stats: Save agent statistics over time combiner: How to combine scores and statistics if the number of agents of the tested type in a world is more than one control_all_agents: If given, the agent type(s) passed to __call__ will control the whole market. shorten_names: If given, shorter versions of type names will be used when compiling type-names progress: If given, a progress bar will be displayed for each world being run """ def __init__( self, generator: BaseContext, n_configs: int = 10, n_repetitions: int = 1, save_worlds: bool = True, save_common_stats: bool = True, save_agent_stats: bool = True, combiner: Callable[[list[float]], float] = np.mean, control_all_agents: bool = False, shorten_names: bool = True, progress: bool = False, ) -> None: self.n_configs = n_configs self.n_repetitions = n_repetitions self.generator = generator self.configs = [generator.make_config() for _ in range(n_configs)] self.worlds: dict[tuple[str, str], list[WorldInfo]] = defaultdict(list) self.save_common_stats = save_common_stats self.save_agent_stats = save_agent_stats self.save_worlds = save_worlds self.control_all_agents = control_all_agents self._stats = pd.DataFrame() self._scores = pd.DataFrame(columns=["world", "type", "trial", "score"]) self._combiner = combiner def _name(): return generator.name if generator.name else "c" self.config_names = [f"{_name()}{i}" for i in range(n_configs)] self.shorten_names = shorten_names self.progress = progress self.type_names: set[str] = set() @classmethod
[docs] def from_runner(cls, src: WorldRunner, **kwargs) -> WorldRunner: """ Creates a `WorldRunner` from another one. Remarks: - It simply copies the configurations created by the source runner. - You can override any parameters by passing them as keyword arguments """ kwargs = ( dict( n_repetitions=src.n_repetitions, save_common_stats=src.save_common_stats, save_agent_stats=src.save_agent_stats, combiner=src._combiner, control_all_agents=src.control_all_agents, shorten_names=src.shorten_names, progress=src.progress, ) | kwargs ) runner = WorldRunner(src.generator, len(src.configs), **kwargs) # type: ignore runner.configs = src.configs return runner
@classmethod
[docs] def from_configs( cls, world_type: type[SCMLBaseWorld], configs: tuple[dict[str, Any], ...], name: str | None = None, **kwargs, ) -> WorldRunner: """ Creates a `WorldRunner` from a collection of configs. Args: world_type: The world type to use. configs: The collection of configs to use. Remarks: - Uses a `RepeatingContext` internally. - You can override any parameters of the new runner by passing them as keyword arguments """ context = RepeatingContext(world_type=world_type, configs=configs, name=name) runner = WorldRunner(context, len(configs), **kwargs) runner.configs = configs context.configs = configs return runner
@property
[docs] def existing_config_names(self) -> list[str]: """Existing configuration names.""" if len(self._scores) < 1: return [] return self._scores["config"].unique().tolist()
@property
[docs] def existing_type_names(self) -> list[str]: """Existing type names tested so far.""" if len(self._scores) < 1: return [] return self._scores["type"].unique().tolist()
[docs] def get_type_name(self, type_: type[AgentType] | str) -> str: """Used to get a name for the given type.""" n = get_full_type_name(type_) if not self.shorten_names: return n existing = self.scores["type"].unique() for i in range(1, 100): n = ".".join(n.split(".")[-i:]) if n not in existing: break return n
[docs] def add( self, types: tuple[AgentType, ...] | list[AgentType] | AgentType, params: list[dict[str, Any]] | dict[str, Any] | None = None, name: str | None = None, ) -> list[WorldInfo]: """Used to add an agent type collection to the pool of tested agent types. Does not run the worlds Args: types: In general, you can pass a collection of types here. Note that this collection will be used in every simulation. This means that the placeholder_types in the context used to create the runner must accept enough such agents. The usual use-case is that the placeholder_types in the context contain a single type and in this case, you can either pass a singelton collection of a single type or just pass the type directly. params: Parameters accepted by the constructor of the types passed. name: The name to call this type(s). Returns: A list of pairs each containing a world and a list of agents representing the worlds that were run and the agents corresponding to the passed type(s). """ worlds: list[WorldInfo] = [] if not isinstance(types, Iterable): types = tuple([types] * len(self.generator.placeholder_types)) assert len(types) == len(self.generator.placeholder_types), ( f"Cannot pass {len(types)} types to a generator with " f"{len(self.generator.placeholder_types)} placeholders" ) if params is None: params = dict() if isinstance(params, dict): params = [deepcopy(params) for _ in range(len(types))] if not name: name = ";".join(tuple(self.get_type_name(_) for _ in types)) self.type_names.add(name) types = tuple(types) control_all_agents = self.control_all_agents for base_config, cid in zip(self.configs, self.config_names): config_worlds: list[WorldInfo] = [] indices = ( [ i for i, p in enumerate(base_config["agent_params"]) if p["controller_type"] in (self.generator.placeholder_types) ] if not control_all_agents else list(range(len(base_config["agent_params"]))) ) if control_all_agents: existing_types = [ p["controller_type"] for p in base_config["agent_params"] ] else: existing_types = self.generator.placeholder_types new_types = tuple( [types[i % len(types)] for i in range(len(existing_types))] ) new_params = tuple( [params[i % len(types)] for i in range(len(existing_types))] ) config = self.generator.world_type.replace_agents( base_config, existing_types, new_types, new_params, ) for trial in range(self.n_repetitions): config["name"] = f"{cid}_{name}_{trial}" world = self.generator.world_type( **(self.generator.world_params | config), ) ids = [world.non_system_agent_ids[i] for i in indices] agents = [world.agents[i] for i in ids] assert all( isinstance(a._obj, t) for a, t in zip(agents, types) ), f"Found {[type(_._ob) for _ in agents]} but expected {types} at {indices=} with {ids=}" config_worlds.append( WorldInfo(world, indices, name, trial, cid, agents) ) worlds += config_worlds if self.save_worlds: self.worlds[(name, cid)] = config_worlds return worlds
[docs] def runall(self, progress: bool = False) -> None: """Runs all worlds""" return self._runall(self.all_world_infos, progress)
[docs] def _runall(self, world_infos: list[WorldInfo], progress: bool = False) -> None: """Runs all worlds""" if not progress: def track(x, **kwargs): _ = kwargs return x else: from rich.progress import track # type: ignore for w in track(world_infos, total=len(world_infos), description="Running ... "): self.process(w)
[docs] def __call__( self, types: tuple[AgentType, ...] | list[AgentType] | AgentType, params: list[dict[str, Any]] | dict[str, Any] | None = None, name: str | None = None, progress: bool = False, ) -> list[WorldInfo]: """Used to add an agent type collection to the pool of tested agent types. Args: types: In general, you can pass a collection of types here. Note that this collection will be used in every simulation. This means that the placeholder_types in the context used to create the runner must accept enough such agents. The usual use-case is that the placeholder_types in the context contain a single type and in this case, you can either pass a singelton collection of a single type or just pass the type directly. params: Parameters accepted by the constructor of the types passed. name: The name to call this type(s). Returns: A list of pairs each containing a world and a list of agents representing the worlds that were run and the agents corresponding to the passed type(s). """ world_infos = self.add(types, params, name=name) self._runall(world_infos, progress) return world_infos
[docs] def _types_and_configs( self, type: AgentType | str | None = None, config: str | None = None ) -> tuple[list[str], list[str]]: if config is None: configs = self.config_names else: configs = [config] types = ( [self.get_type_name(type)] if type is not None else self.existing_type_names ) return types, configs
[docs] def world_infos_of( self, type: AgentType | str | None = None, config: str | None = None ) -> list[WorldInfo]: """ Returns the information of all worlds of the given type and config. Remarks: Note that you can also pass the name used when running the WorldRunner with the given type. """ types, configs = self._types_and_configs(type, config) return list( itertools.chain( *( [_ for _ in self.worlds.get((t, c), [])] for t, c in itertools.product(types, configs) ) ) )
[docs] def worlds_of( self, type: AgentType | str | None = None, config: str | None = None ) -> list[SCMLBaseWorld]: """ Returns the worlds of the given type and config. Remarks: Note that you can also pass the name used when running the WorldRunner with the given type. """ types, configs = self._types_and_configs(type, config) return list( itertools.chain( *( [ _.world for _ in self.worlds.get((t, c), []) if _.world is not None ] for t, c in itertools.product(types, configs) ) ) )
[docs] def agents_per_world_of( self, type: AgentType | str | None = None, config: str | None = None ) -> dict[str, list[AgentType]]: """ Returns the agents representing the type for each world of the given type and config. Remarks: Note that you can also pass the name used when running the WorldRunner with the given type. """ types, configs = self._types_and_configs(type, config) return dict( # type: ignore map( dict.popitem, [ { info.world.id: info.agents for info in self.worlds.get((t, c), []) if info and info.world is not None } for t, c in itertools.product(types, configs) ], ) )
[docs] def agents_of( self, type: AgentType | str | None = None, config: str | None = None ) -> list[AgentType]: """ Returns the agents representing the type and config. Remarks: Note that you can also pass the name used when running the WorldRunner with the given type. """ results = [] types, configs = self._types_and_configs(type, config) for x in itertools.product(types, configs): for info in self.worlds[x]: results += info.agents return results
@property
[docs] def stats(self) -> pd.DataFrame: """The statistics saved (must be constructed with `save_stats`)""" return self._stats
@property
[docs] def all_world_infos(self) -> list[WorldInfo]: """Information of all worlds simulated (including the agents evaluated for each world)""" return list(itertools.chain(*(self.worlds.values())))
@property
[docs] def all_worlds(self) -> list[SCMLBaseWorld]: """Information of all worlds simulated (including the agents evaluated for each world)""" return [_.world for _ in (itertools.chain(*(self.worlds.values())))]
@property
[docs] def scores(self) -> pd.DataFrame: """ The scores of all evaluated agents in all evaluated worlds Remarks: Other than the score, the returned data-frame will contain information about the production level, number of suppliers, number of consumers, number of competitors, etc about each agent. """ return self._scores
[docs] def score_summary( self, percentiles=None, include=None, exclude=None, by: str | tuple[str, ...] = "type", order_by: str | None = "score", ascending: bool = False, ) -> pd.DataFrame: """A summary of comparative scores of all agent types tested so far. Args: percentiles: passed to `groupby` include: passed to `groupby` exclud: passed to `groupby` by: passed to `groupby` order_by: The method for sorting resulting scores. Possibilities: score, mean, min, max, 20%, 50%, 75%, median ascending: Ascending or descending scores. Returns: A dataframe that describes the scores of all evaluated types. """ if order_by and order_by == "median": order_by = "50%" df1 = self._scores.groupby(by)["score"].apply(truncated_mean) df2 = self._scores.groupby(by)["score"].describe( percentiles=percentiles, include=include, exclude=exclude ) df = pd.concat((df1, df2), axis=1, ignore_index=False) df = df.reset_index() if order_by: df = df.sort_values(order_by, ascending=ascending) return df
[docs] def plot_stats( self, stats: tuple[str, ...] | str | None = None, by: tuple[str, ...] | str = "type", agg: bool = True, legend: bool = True, ylegend: float = 1.8, legend_ncols=3, title: bool = True, order_by: str | None = "score", ascending: bool = False, **kwargs, ): """Plots saves statistics (`save_stats` must be given) Args: stats: The stats to be displayed. If not given, a default set will be chosen. Any statistic saved by the `World` or `SCMLBaseWorld` can be used. There are over 42 such statistics. by: Group the data by the given attribute (used as hue if agg is False and as x if agg is True). agg: Whether to aggregate over simulation steps or not. legend:whether or not to show a legend (only used if agg is False) legend_ncols: How many columns to use in the legend. Pass zero to disable the legend ylegend: The y-coordinate of the legend to control where it appears. title: Show stat names as title (instead of ylabel) order_by: The statistic to order with. Possibilities are score, mean, 50%, max, min, 20%, 75%, median ascending: If true, order ascendingly **kwargs: Any extra paramters to pass to the underlying seaborn method (lineplot in case agg=False and boxplot in case agg=True) Returns: The figure and axes used. """ if order_by and order_by == "median": order_by = "50%" import matplotlib.pyplot as plt import seaborn as sns if legend_ncols < 1: legend = False if stats is None: s_ = [] if self.save_agent_stats: s_ += [ "shortfall_penalty", "score", "storage_cost" if issubclass(self.generator.world_type, StdWorld) else "disposal_cost", "productivity", ] if issubclass(self.generator.world_type, StdWorld): s_ += ["inventory_penalized", "inventory_input"] stats = tuple(s_) elif isinstance(stats, str): stats = (stats,) # if self.save_common_stats: # stats += ["sold_quantity", "trading_price"] if not stats: return # print(self.score_summary(by=by)) # print(self.score_summary(by=by).sort_values(order_by)[by]) order = ( self.score_summary(by=by) .sort_values(order_by, ascending=ascending)[by] .tolist() if order_by else None ) if len(stats) == 1: ncols = nrows = 1 else: ncols = 2 nrows = len(stats) // ncols if nrows * ncols < len(stats): nrows += 1 fig, axs = plt.subplots(nrows, ncols, sharex=True, squeeze=True) if nrows == ncols == 1: axs = (axs,) else: axs = axs.flatten() for i, (ax, stat) in enumerate(zip(axs, stats)): show_legend = not agg and legend and (i == 0) if agg: df = self.stats if stat == "score": df = self.scores g = sns.boxplot( data=df, x=by, y=stat, ax=ax, order=order, **kwargs, # type: ignore ) g.tick_params(axis="x", rotation=90) g.set(xlabel=None) else: kwargs = ( dict( err_style="bars", errorbar=("se", 1), ) | kwargs ) g = sns.lineplot( data=self.stats, hue=by, x="step", y=stat, ax=ax, legend=show_legend, hue_order=order, **kwargs, # type: ignore ) if title: g.set(title=stat, ylabel=None) if not show_legend: continue sns.move_legend( g, loc="upper left", bbox_to_anchor=(-0.02, ylegend), ncol=legend_ncols, fancybox=True, shadow=True, ) # handles, labels = g.get_legend_handles_labels() # g.legend(handles=handles[1:], labels=labels[1:]) return fig, axs
[docs] def draw_worlds_of( self, type: AgentType | str | None, config: str | None = None, what=("contracts-signed",), n: int | None = 4, randomize: bool = False, ): """Draws the given set of worlds Args: type: The type to filter by config: The config to filter by what: what stat to draw. See `negmas.situated.World.draw_world` for all options available n: Number of worlds to draw. If None, all of them will be drawn randomize: If given the worlds will be shuffled before display but only if not all worlds will be displayed Returns: figure and axes used. """ import matplotlib.pyplot as plt import math worlds = self.worlds_of(type, config) if self.n_repetitions > 1 and n is not None and not randomize: worlds = worlds[:: self.n_repetitions] if randomize and n is not None and len(worlds) > n: random.shuffle(worlds) if n is not None and len(worlds) > n: worlds = worlds[:n] n_trials = len(worlds) mx = min(n_trials, 2) fig = plt.figure(figsize=(11 * mx, 8)) axs = fig.subplots(int(math.ceil(n_trials / mx)), mx) if isinstance(axs, Iterable): axs = axs.flatten() for ax, world in zip(axs if n_trials > 1 else [axs], worlds): world.draw( what=what, steps=(0, world.n_steps - 1), together=True, ncols=1, axs=ax, ) return fig, axs
[docs] def process(self: WorldRunner, info: WorldInfo): world = info.world name = info.type_name trial = info.trial cid = info.config_name agents = info.agents ids = [a.id for a in agents] all_ids = list(world.agents.keys()) world.run() wscores = world.scores() scores = [wscores[id] for id in ids] df = pd.DataFrame.from_records( [ dict( config=cid, world=world.id, type=name, trial=trial, score=score, agent=aid, level=agent.awi.profile.level, n_suppliers=len(agent.awi.my_suppliers), n_consumers=len(agent.awi.my_consumers), n_competitors=len( agent.awi.all_consumers[agent.awi.my_input_product] ), ) for aid, score, agent in zip(ids, scores, agents) ] ) self._scores = pd.concat((self._scores, df)) stat_names = [ k for k in world._stats.keys() if (is_agent_stat(k, ids) and self.save_agent_stats) or (self.save_common_stats and is_common_stat(k, all_ids)) ] if not stat_names: return df = world.stats_df[stat_names] df.columns = [remove_agent_id(_, ids) for _ in df.columns] df = df.groupby(by=df.columns, axis=1).apply( lambda g: g.mean(axis=1) if isinstance(g.iloc[0, 0], numbers.Number) else g.iloc[:, 0] ) df["config"] = cid df["world"] = world.id df["type"] = name df["trial"] = trial df["step"] = list(range(len(df))) df = df.reset_index(drop=True) self._stats = self._stats.reset_index(drop=True) self._stats = pd.concat((self._stats, df), axis="index", ignore_index=True)