Source code for scml.std.agents.rand

import random

from negmas import Outcome, ResponseType
from negmas.negotiators.modular import itertools
from negmas.sao import SAOResponse, SAOState

from scml.common import distribute
from scml.oneshot.agents import SyncRandomOneShotAgent
from scml.oneshot.common import QUANTITY, TIME, UNIT_PRICE
from scml.std.agent import StdAgent, StdSyncAgent

__all__ = ["SyncRandomStdAgent", "SyncRandomOneShotAgent", "RandomStdAgent"]

PROB_END = 0.005

[docs] class RandomStdAgent(StdAgent): """A naive random agent""" def __init__( self, owner=None, ufun=None, name=None, p_accept=PROB_ACCEPTANCE, p_end=PROB_END ): self.p_accept, self.p_end = p_accept, p_end super().__init__(owner, ufun, name)
[docs] def propose(self, negotiator_id: str, state: SAOState) -> Outcome | None: # type: ignore nmi = self.get_nmi(negotiator_id) if not nmi: return None return nmi.random_outcome()
[docs] def respond(self, negotiator_id, state, source=None): if random.random() < self.p_end: return ResponseType.END_NEGOTIATION if random.random() < self.p_accept: return ResponseType.ACCEPT_OFFER return ResponseType.REJECT_OFFER
[docs] class SyncRandomStdAgent(StdSyncAgent): """An agent that distributes its needs over its partners randomly.""" def __init__( self, *args, today_target_productivity=0.3, future_target_productivity=0.3, today_concentration=0.25, future_concentration=0.75, today_concession_exp=2.0, future_concession_exp=4.0, future_min_price=0.25, prioritize_near_future: bool = False, prioritize_far_future: bool = False, pfuture=0.15, **kwargs, ): """A simply agent that distributes its offers between today's needs and some future needs Args: today_target_productivity: Estimated productivity today used to set needed supply and demand for agents in the middle of the production graph future_target_productivity: Estimated productivity in the future used to limit estimates of needed future supplies and sales. future_concentration: How concentrated should our offers for future supplies/sales. This is the fraction of future supply/sale distributions that will use the minimum possible number of partners. today_concentration: How concentrated should our offers for today's supplies/sales. This is the fraction of today's supply/sale distributions that will use the minimum possible number of partners. today_concession_exp: The concession exponent to use for prices today future_concession_exp:The concession exponent to use for prices in offers regarding the future pfuture: Fraction of available offers to always use for future supplies/sales. future_min_price: Fraction of the price range not to go under/over for future sales/supplies prioritize_near_future: Prioritize near-future when distributing future needs prioritize_far_future: Prioritize far-future when distributing future needs """ super().__init__(*args, **kwargs) self.ptoday = 1.0 - pfuture self.today_exp = today_concession_exp self.future_exp = future_concession_exp self.fmin = future_min_price self.today_productivity = today_target_productivity self.future_productivity = future_target_productivity self.near = prioritize_near_future self.far = prioritize_far_future self.future_concentration = future_concentration self.today_concentration = today_concentration
[docs] def first_proposals(self): # type: ignore # just randomly distribute my needs over my partners (with best price for me). # remaining partners get random future offers distribution = self.distribute_todays_needs() future_suppliers = {k for k, v in distribution.items() if v <= 0} unneeded = ( None if not self.awi.allow_zero_quantity else (0, self.awi.current_step, 0) ) offers = { k: ((q, self.awi.current_step, self.best_price(k)) if q > 0 else unneeded) for k, q in distribution.items() } | self.distribute_future_offers(list(future_suppliers)) return offers
[docs] def counter_all(self, offers, states): max_sell = self.awi.current_output_issues[UNIT_PRICE].max_value min_sell = max( self.awi.current_output_issues[UNIT_PRICE].min_value, self.awi.current_input_issues[UNIT_PRICE].max_value, ) min_buy = self.awi.current_input_issues[UNIT_PRICE].min_value max_buy = min( self.awi.current_input_issues[UNIT_PRICE].max_value, self.awi.current_output_issues[UNIT_PRICE].min_value, ) # find everything I need from now to the end of time needed_supplies, needed_sales = self.estimate_future_needs() needed_sales[self.awi.current_step] = self.awi.needed_sales needed_supplies[self.awi.current_step] = self.awi.needed_supplies if self.awi.is_middle_level: needed_sales[self.awi.current_step] = max( needed_sales[self.awi.current_step], int(self.awi.n_lines * self.today_productivity), ) needed_supplies[self.awi.current_step] = max( needed_supplies[self.awi.current_step], int(self.awi.n_lines * self.today_productivity), ) # accept all offers I seem to need if they have good price responses = dict() c = self.awi.current_step n = max(self.awi.n_steps - c, 1) for is_partner, needs, is_good_price, mn, mx in ( (self.is_supplier, needed_supplies, self.good2buy, min_buy, max_buy), (self.is_consumer, needed_sales, self.good2sell, min_sell, max_sell), ): if mn > mx: continue for partner, offer in offers.items(): if not is_partner(partner): continue if offer is None: continue q, t = offer[QUANTITY], offer[TIME] today = t == c r = states[partner].relative_time if today else (t - c) / n if not is_good_price(offer[UNIT_PRICE], r, mn, mx, today): continue if 0 < q < needs.get(t, 0): responses[partner] = SAOResponse(ResponseType.ACCEPT_OFFER, offer) needs[t] -= q remaining = {k for k in offers.keys() if k not in responses.keys()} # distribute today's needs over the partners with rejected offers distribution = self.distribute_todays_needs(partners=remaining) future_partners = {k for k, v in distribution.items() if v <= 0} unneeded = ( None if not self.awi.allow_zero_quantity else (0, self.awi.current_step, 0) ) # distribute my future needs over people I did not use today myoffers = { k: ( (q, self.awi.current_step, self.good_price(k, today=False)) if q > 0 else unneeded ) for k, q in distribution.items() } | self.distribute_future_offers(list(future_partners)) responses |= { k: SAOResponse(ResponseType.REJECT_OFFER, offer) for k, offer in myoffers.items() } return responses
[docs] def distribute_todays_needs(self, partners=None) -> dict[str, int]: """Distributes my needs randomly over all my partners""" ignored = [] if partners is None: partners = self.negotiators.keys() partners = list(partners) random.shuffle(partners) n = min(len(partners), max(1, int(self.ptoday * len(partners)))) ignored = partners[n:] partners = partners[:n] response = dict(zip(partners, itertools.repeat(0))) | dict( zip(ignored, itertools.repeat(0)) ) mxin = self.awi.current_input_issues[QUANTITY].max_value mxout = self.awi.current_output_issues[QUANTITY].max_value for is_partner, edge_needs, mxq in ( (self.is_supplier, self.awi.needed_supplies, mxin), (self.is_consumer, self.awi.needed_sales, mxout), ): needs = self.awi.n_lines if self.awi.is_middle_level else edge_needs # find my partners and the quantity I need active_partners = [_ for _ in partners if is_partner(_)] if not active_partners or needs < 1: continue random.shuffle(active_partners) n_partners = len(active_partners) # distribute my needs over my (remaining) partners. # we always allow zero quantity because these will be overriden # by future offers later response |= dict( zip( active_partners, distribute( needs, n_partners, allow_zero=True, concentrated=random.random() < self.today_concentration, equal=random.random() > 0.5, mx=mxq, ), ) ) return response
[docs] def estimate_future_needs(self): """Estimates how much I need to buy and sell for each future step""" current_step, n_steps = self.awi.current_step, self.awi.n_steps trange = ( max( self.awi.current_input_issues[TIME].min_value, self.awi.current_step + 1 ), min(self.awi.current_input_issues[TIME].max_value, self.awi.n_steps - 1), ) trange = ( min( trange[0], max( self.awi.current_input_issues[TIME].min_value, self.awi.current_step + 1, ), ), max( trange[1], min( self.awi.current_input_issues[TIME].max_value, self.awi.n_steps - 1 ), ), ) target_supplies, target_sales = dict(), dict() for t in range(trange[0], trange[1] + 1): secured_supplies = ( self.awi.total_supplies_until(t) + self.awi.current_inventory_input + self.awi.current_inventory_output ) secured_sales = self.awi.total_sales_from(t) secured_supplies += ( self.awi.current_exogenous_input_quantity * (t - current_step) if self.awi.is_first_level else 0 ) secured_sales += ( self.awi.current_exogenous_output_quantity * (n_steps - t) if self.awi.is_last_level else 0 ) secured_supplies = max( self.future_productivity * (t - current_step), secured_supplies ) secured_sales = max(self.future_productivity * (n_steps - t), secured_sales) target_supplies[t] = secured_sales - secured_supplies target_sales[t] = secured_supplies - secured_sales if self.awi.is_first_level: target_supplies[t] = 0 elif self.awi.is_last_level: target_sales[t] = 0 target_supplies = {k: int(v) for k, v in target_supplies.items() if v > 0} target_sales = {k: int(v) for k, v in target_sales.items() if v > 0} return target_supplies, target_sales
[docs] def distribute_future_offers( self, partners: list[str] ) -> dict[str, Outcome | None]: """Distribute future offers over the given partners""" if not partners: return dict() c = self.awi.current_step n = max((self.awi.n_steps - c), 1) # get minimum and maximum price and quantity according to current # negotiations. # - For prices make sure that the limits do not lead to loss # - We assume here that trading prices are not going to change much # - We know according to the rules that the range of quantities is the # same every day mxoutp = self.awi.current_output_issues[UNIT_PRICE].max_value mnoutp = max( self.awi.current_output_issues[UNIT_PRICE].min_value, self.awi.current_input_issues[UNIT_PRICE].max_value, ) mninp = self.awi.current_input_issues[UNIT_PRICE].min_value mxinp = min( self.awi.current_input_issues[UNIT_PRICE].max_value, self.awi.current_output_issues[UNIT_PRICE].min_value, ) mxinq = self.awi.current_input_issues[QUANTITY].max_value mxoutq = self.awi.current_output_issues[QUANTITY].max_value # estimate needed supplies up to and sales starting from each # time-step in the future needed_supplies, needed_sales = self.estimate_future_needs() # Separate suppliers and consumers suppliers = [_ for _ in partners if self.is_supplier(_)] consumers = [_ for _ in partners if self.is_consumer(_)] # prioritize which time to try to satisfy first if self.near or self.far: if needed_supplies: shffl = sorted(needed_supplies.keys(), reverse=self.far) needed_supplies = {k: needed_supplies[k] for k in shffl} if needed_sales: shffl = sorted(needed_sales.keys(), reverse=self.far) needed_sales = {k: needed_sales[k] for k in shffl} else: if needed_supplies: shffl = list(needed_supplies.keys()) random.shuffle(shffl) needed_supplies = {k: needed_supplies[k] for k in shffl} if needed_sales: shffl = list(needed_sales.keys()) random.shuffle(shffl) needed_sales = {k: needed_sales[k] for k in shffl} # initialize indicating that I do not need anything unneeded = ( None if not self.awi.allow_zero_quantity else (0, self.awi.current_step, 0) ) offers = dict(zip(partners, itertools.repeat(unneeded))) # loop over suppliers and consumers for plist, needs, mnp, mxp, mxq, price in ( ( suppliers, needed_supplies, mninp, mxinp, mxinq, self.buy_price, ), ( consumers, needed_sales, mnoutp, mxoutp, mxoutq, self.sell_price, ), ): # if there are no good prices, just do nothing if mnp > mxp: continue # try to satisfy my future needs in order for t, q in needs.items(): # if I have no partners, do nothing if not plist: continue # distribute the needs over the partners d = distribute( int(q), len(plist), mx=mxq, concentrated=random.random() < self.future_concentration, equal=random.random() > 0.5, allow_zero=self.awi.allow_zero_quantity, ) # find relative time to the end of simulation to estimate good prices # Notice that nearer times will entail higher concessions r = 1 - max(0, min(1, (t - c) / n)) offers |= { plist[i]: (q, t, price(r, mnp, mxp, today=t == c)) for i, q in enumerate(d) if q > 0 } plist = list(set(plist).difference(offers.keys())) return offers
[docs] def is_supplier(self, negotiator_id): return negotiator_id in self.awi.my_suppliers
[docs] def is_consumer(self, negotiator_id): return negotiator_id in self.awi.my_consumers
[docs] def best_price(self, partner_id): """Best price for a negotiation today""" issue = self.get_nmi(partner_id).issues[UNIT_PRICE] return issue.max_value if self.is_consumer(partner_id) else issue.min_value
[docs] def good_price(self, partner_id, today: bool): """A good price to use""" nmi = self.get_nmi(partner_id) mn = nmi.issues[UNIT_PRICE].min_value mx = nmi.issues[UNIT_PRICE].max_value if self.is_supplier(partner_id): return self.buy_price(nmi.state.relative_time, mn, mx, today=today) return self.sell_price( self.get_nmi(partner_id).state.relative_time, mn, mx, today=today )
[docs] def buy_price(self, t: float, mn: float, mx: float, today: bool) -> float: """Return a good price to buy at""" e = self.today_exp if today else self.future_exp return max(mn, min(mx, int(mn + (mx - mn) * (t**e) + 0.5)))
[docs] def sell_price(self, t: float, mn: float, mx: float, today: bool) -> float: """Return a good price to sell at""" e = self.today_exp if today else self.future_exp if not today: mn = mn + self.fmin * (mx - mn) return max(mn, min(mx, int(0.5 + mx - (mx - mn) * (t**e))))
[docs] def good2buy(self, p: float, t: float, mn, mx, today: bool): """Is p a good price to buy at?""" if not today: mx = mx - self.fmin * (mx - mn) return p - 0.0001 <= self.buy_price(t, mn, mx, today)
[docs] def good2sell(self, p: float, t: float, mn, mx, today: bool): """Is p a good price to sell at?""" return p + 0.0001 >= self.sell_price(t, mn, mx, today)