Source code for scml.std.agents.greedy

import random
from collections import defaultdict

from negmas import Outcome, ResponseType

from scml.oneshot.agents.greedy import GreedyOneShotAgent, GreedySyncAgent
from scml.std.agent import StdAgent
from scml.std.common import QUANTITY, TIME, UNIT_PRICE

__all__ = [
    "GreedyStdAgent",
    "GreedySyncAgent",
    "GreedyOneShotAgent",
]


[docs] class GreedyStdAgent(StdAgent): """ A greedy agent based on StdAgent Args: concession_exponent: A real number controlling how fast does the agent concede on price. acc_price_slack: The allowed slack in price limits compared with best prices I got so far step_price_slack: The allowed slack in price limits compared with best prices I got this step opp_price_slack: The allowed slack in price limits compared with best prices I got so far from a given opponent in this step opp_acc_price_slack: The allowed slack in price limits compared with best prices I got so far from a given opponent so far range_slack: Always consider prices above (1-`range_slack`) of the best possible prices *good enough*. production_target: Fraction of production capacity to be secured in advance Remarks: - A `concession_exponent` greater than one makes the agent concede super linearly and vice versa """ def __init__( self, *args, concession_exponent=None, acc_price_slack=float("inf"), step_price_slack=None, opp_price_slack=None, opp_acc_price_slack=None, range_slack=None, future_threshold=0.9, production_target=0.75, **kwargs, ): super().__init__(*args, **kwargs) if concession_exponent is None: concession_exponent = 0.2 + random.random() * 0.8 if step_price_slack is None: step_price_slack = random.random() * 0.1 + 0.05 if opp_price_slack is None: opp_price_slack = random.random() * 0.1 + 0.05 if opp_acc_price_slack is None: opp_acc_price_slack = random.random() * 0.1 + 0.05 if range_slack is None: range_slack = random.random() * 0.2 + 0.05 self._e = concession_exponent self._acc_price_slack = acc_price_slack self._step_price_slack = step_price_slack self._opp_price_slack = opp_price_slack self._opp_acc_price_slack = opp_acc_price_slack self._range_slack = range_slack self._sales = self._supplies = 0 self._production_target = production_target self._future_threshold = future_threshold
[docs] def init(self): """Initialize the quantities and best prices received so far""" self._best_acc_selling, self._best_acc_buying = 0.0, float("inf") self._best_opp_acc_selling = defaultdict(float) self._best_opp_acc_buying = defaultdict(lambda: float("inf"))
[docs] def before_step(self): """Initialize the quantities and best prices received for next step""" self._best_selling, self._best_buying = 0.0, float("inf") self._best_opp_selling = defaultdict(float) self._best_opp_buying = defaultdict(lambda: float("inf")) self._sales = self._supplies = 0
[docs] def on_negotiation_success(self, contract, mechanism): """Record sales/supplies secured""" super().on_negotiation_success(contract, mechanism) # update my current best price to use for limiting concession in other # negotiations up = contract.agreement["unit_price"] if contract.annotation["product"] == self.awi.my_output_product: partner = contract.annotation["buyer"] self._best_acc_selling = max(up, self._best_acc_selling) self._best_opp_acc_selling[partner] = max( up, self._best_opp_acc_selling[partner] ) else: partner = contract.annotation["seller"] self._best_acc_buying = min(up, self._best_acc_buying) self._best_opp_acc_buying[partner] = min( up, self._best_opp_acc_buying[partner] )
[docs] def propose(self, negotiator_id: str, state, source=None) -> Outcome | None: # find the absolute best offer for me. This will most likely has an # unrealistic price offer = self.best_offer(negotiator_id) # if there are no best offers, just return None to end the negotiation if not offer: return None # over-write the unit price in the best offer with a good-enough price offer = list(offer) offer[UNIT_PRICE] = self._find_good_price( self.get_nmi(negotiator_id), state, offer ) return tuple(offer)
[docs] def respond(self, negotiator_id, state, source=None) -> ResponseType: offer = state.current_offer # type: ignore assert offer is not None # find the quantity I still need and end negotiation if I need nothing more my_needs = self._needed(negotiator_id) # reject any offers with quantities above my needs response = ( ResponseType.ACCEPT_OFFER if (offer[QUANTITY] <= my_needs and offer[TIME] == self.awi.current_step) or ( offer[QUANTITY] < self._future_needs(negotiator_id, offer[TIME]) and offer[TIME] > self.awi.current_step ) else ResponseType.REJECT_OFFER ) if response != ResponseType.ACCEPT_OFFER: return response # reject offers with prices that are deemed NOT good-enough nmi = self.get_nmi(negotiator_id) response = ( response if self._is_good_price(nmi, state, offer) else ResponseType.REJECT_OFFER ) # If this response is about today, do not update internal stats if offer[TIME] != self.awi.current_step: return response # update my current best price to use for limiting concession in other # negotiations up = offer[UNIT_PRICE] if self._is_selling(nmi): self._best_selling = max(up, self._best_selling) partner = nmi.annotation["buyer"] self._best_opp_selling[partner] = self._best_selling else: self._best_buying = min(up, self._best_buying) partner = nmi.annotation["seller"] self._best_opp_buying[partner] = self._best_buying return response
[docs] def best_offer(self, negotiator_id): nmi = self.get_nmi(negotiator_id) if not nmi: return None my_needs = int(self._needed(negotiator_id)) if my_needs <= 0: # see if I can get something in the future time_issue = nmi.issues[TIME] times = list(time_issue.all) random.shuffle(times) for t in times: my_needs = self._future_needs(negotiator_id, t) if my_needs <= 0: continue offer = [-1] * 3 quantity_issue = nmi.issues[QUANTITY] unit_price_issue = nmi.issues[UNIT_PRICE] mx = max( min(my_needs, quantity_issue.max_value), quantity_issue.min_value ) # never contract offer more than production capacity mx = max(0, min(mx, self.awi.n_lines * (t - self.awi.current_step))) if mx < 1: continue mn_ = max(1, int(0.5 + mx * self.awi.current_step / self.awi.n_steps)) mx_ = int(mx) offer[QUANTITY] = random.randint(mn_, mx_) if mn_ < mx_ else mn_ offer[TIME] = t if self._is_selling(nmi): offer[UNIT_PRICE] = unit_price_issue.max_value else: offer[UNIT_PRICE] = unit_price_issue.min_value return tuple(offer) quantity_issue = nmi.issues[QUANTITY] unit_price_issue = nmi.issues[UNIT_PRICE] offer = [-1] * 3 mx = max(min(my_needs, quantity_issue.max_value), quantity_issue.min_value) # never contract offer more than production capacity mx = min(mx, self.awi.n_lines) offer[QUANTITY] = random.randint( max(1, int(0.5 + mx * self.awi.current_step / self.awi.n_steps)), int(mx) ) offer[TIME] = self.awi.current_step if self._is_selling(nmi): offer[UNIT_PRICE] = unit_price_issue.max_value else: offer[UNIT_PRICE] = unit_price_issue.min_value return tuple(offer)
[docs] def _future_needs(self, negotiator_id, t): return self._production_target * ( self.awi.n_lines - sum( ( self.awi.future_sales if negotiator_id in self.awi.my_consumers else self.awi.future_supplies ) .get(t, dict()) .values() ) )
[docs] def _needed(self, negotiator_id): if self.awi.is_middle_level: return self._production_target * self.awi.n_lines return ( self.awi.needed_sales if negotiator_id in self.awi.my_consumers else self.awi.needed_supplies )
[docs] def _is_selling(self, nmi): if not nmi: return None return nmi.annotation["product"] == self.awi.my_output_product
[docs] def _is_good_price(self, nmi, state, offer): """Checks if a given price is good enough at this stage""" price = offer[UNIT_PRICE] mn, mx = self._price_range(nmi, offer) th = ( self._th(state.step, nmi.n_steps) if offer[TIME] == self.awi.current_step else self._future_threshold ) # a good price is one better than the threshold if self._is_selling(nmi): return (price - mn) >= th * (mx - mn) else: return (mx - price) >= th * (mx - mn)
[docs] def _find_good_price(self, nmi, state, offer): """Finds a good-enough price conceding linearly over time""" mn, mx = self._price_range(nmi, offer) th = self._th(state.step, nmi.n_steps) # offer a price that is around th of your best possible price if self._is_selling(nmi): return int(mn + th * (mx - mn)) else: return int(mx - th * (mx - mn))
[docs] def _price_range(self, nmi, offer): """Limits the price by the best price received""" mn = nmi.issues[UNIT_PRICE].min_value mx = nmi.issues[UNIT_PRICE].max_value if offer[TIME] != self.awi.current_step: mn, mx = int(mx * self._future_threshold), mx if self._is_selling(nmi): partner = nmi.annotation["buyer"] mn = min( mx * (1 - self._range_slack), max( [mn] + [ p * (1 - slack) for p, slack in ( (self._best_selling, self._step_price_slack), (self._best_acc_selling, self._acc_price_slack), (self._best_opp_selling[partner], self._opp_price_slack), ( self._best_opp_acc_selling[partner], self._opp_acc_price_slack, ), ) ] ), ) else: partner = nmi.annotation["seller"] mx = max( mn * (1 + self._range_slack), min( [mx] + [ p * (1 + slack) for p, slack in ( (self._best_buying, self._step_price_slack), (self._best_acc_buying, self._acc_price_slack), (self._best_opp_buying[partner], self._opp_price_slack), ( self._best_opp_acc_buying[partner], self._opp_acc_price_slack, ), ) ] ), ) return int(mn), int(mx)
[docs] def _th(self, step, n_steps): """calculates a descending threshold (0 <= th <= 1)""" return ((n_steps - step - 1) / (n_steps - 1)) ** self._e