Source code for scml.oneshot.agents.greedy

import itertools
import random
import math
from collections import defaultdict

from negmas import Outcome, ResponseType, SAOResponse

from scml.oneshot.agent import (
    OneShotAgent,
    OneShotSingleAgreementAgent,
    OneShotSyncAgent,
)
from scml.oneshot.common import QUANTITY, TIME, UNIT_PRICE
from scml.oneshot.ufun import OneShotUFun

__all__ = ["GreedyOneShotAgent", "GreedySyncAgent", "GreedySingleAgreementAgent"]


[docs] class GreedyOneShotAgent(OneShotAgent): """ A greedy agent based on OneShotAgent Args: concession_exponent: A real number controlling how fast does the agent concede on price. acc_price_slack: The allowed slack in price limits compared with best prices I got so far step_price_slack: The allowed slack in price limits compared with best prices I got this step opp_price_slack: The allowed slack in price limits compared with best prices I got so far from a given opponent in this step opp_acc_price_slack: The allowed slack in price limits compared with best prices I got so far from a given opponent so far range_slack: Always consider prices above (1-`range_slack`) of the best possible prices *good enough*. Remarks: - A `concession_exponent` greater than one makes the agent concede super linearly and vice versa """ def __init__( self, *args, concession_exponent=None, acc_price_slack=float("inf"), step_price_slack=None, opp_price_slack=None, opp_acc_price_slack=None, range_slack=None, **kwargs, ): super().__init__(*args, **kwargs) if concession_exponent is None: concession_exponent = 0.2 + random.random() * 0.8 if step_price_slack is None: step_price_slack = random.random() * 0.1 + 0.05 if opp_price_slack is None: opp_price_slack = random.random() * 0.1 + 0.05 if opp_acc_price_slack is None: opp_acc_price_slack = random.random() * 0.1 + 0.05 if range_slack is None: range_slack = random.random() * 0.2 + 0.05 self._e = concession_exponent self._acc_price_slack = acc_price_slack self._step_price_slack = step_price_slack self._opp_price_slack = opp_price_slack self._opp_acc_price_slack = opp_acc_price_slack self._range_slack = range_slack self._sales = self._supplies = 0
[docs] def init(self): """Initialize the quantities and best prices received so far""" self._best_acc_selling, self._best_acc_buying = 0.0, float("inf") self._best_opp_acc_selling = defaultdict(float) self._best_opp_acc_buying = defaultdict(lambda: float("inf"))
[docs] def before_step(self): """Initialize the quantities and best prices received for next step""" self._best_selling, self._best_buying = 0.0, float("inf") self._best_opp_selling = defaultdict(float) self._best_opp_buying = defaultdict(lambda: float("inf")) self._sales = self._supplies = 0
[docs] def on_negotiation_success(self, contract, mechanism): """Record sales/supplies secured""" super().on_negotiation_success(contract, mechanism) # update my current best price to use for limiting concession in other # negotiations up = contract.agreement["unit_price"] if contract.annotation["product"] == self.awi.my_output_product: partner = contract.annotation["buyer"] self._best_acc_selling = max(up, self._best_acc_selling) self._best_opp_acc_selling[partner] = max( up, self._best_opp_acc_selling[partner] ) else: partner = contract.annotation["seller"] self._best_acc_buying = min(up, self._best_acc_buying) self._best_opp_acc_buying[partner] = min( up, self._best_opp_acc_buying[partner] )
[docs] def propose(self, negotiator_id: str, state, source=None) -> Outcome | None: # find the absolute best offer for me. This will most likely has an # unrealistic price offer = self.best_offer(negotiator_id) # if there are no best offers, just return None to end the negotiation if not offer: return None # over-write the unit price in the best offer with a good-enough price offer = list(offer) offer[UNIT_PRICE] = self._find_good_price(self.get_nmi(negotiator_id), state) return tuple(offer)
[docs] def respond(self, negotiator_id, state, source=None) -> ResponseType: offer = state.current_offer # type: ignore assert offer is not None # find the quantity I still need and end negotiation if I need nothing more my_needs = self._needed(negotiator_id) if my_needs <= 0: return ResponseType.END_NEGOTIATION # reject any offers with quantities above my needs response = ( ResponseType.ACCEPT_OFFER if offer[QUANTITY] <= my_needs else ResponseType.REJECT_OFFER ) if response != ResponseType.ACCEPT_OFFER: return response # reject offers with prices that are deemed NOT good-enough nmi = self.get_nmi(negotiator_id) response = ( response if self._is_good_price(nmi, state, offer[UNIT_PRICE]) else ResponseType.REJECT_OFFER ) # update my current best price to use for limiting concession in other # negotiations up = offer[UNIT_PRICE] if self._is_selling(nmi): self._best_selling = max(up, self._best_selling) partner = nmi.annotation["buyer"] self._best_opp_selling[partner] = self._best_selling else: self._best_buying = min(up, self._best_buying) partner = nmi.annotation["seller"] self._best_opp_buying[partner] = self._best_buying return response
[docs] def best_offer(self, negotiator_id): _need = self._needed(negotiator_id) my_needs = int(_need) if _need and not math.isinf(_need) else 0 if my_needs <= 0: return None nmi = self.get_nmi(negotiator_id) if not nmi: return None quantity_issue = nmi.issues[QUANTITY] unit_price_issue = nmi.issues[UNIT_PRICE] offer = [-1] * 3 mx = max(min(my_needs, quantity_issue.max_value), quantity_issue.min_value) offer[QUANTITY] = random.randint( max(1, int(0.5 + mx * self.awi.current_step / self.awi.n_steps)), mx ) offer[TIME] = self.awi.current_step if self._is_selling(nmi): offer[UNIT_PRICE] = unit_price_issue.max_value else: offer[UNIT_PRICE] = unit_price_issue.min_value return tuple(offer)
[docs] def _needed(self, negotiator_id): nmi = self.get_nmi(negotiator_id) if not nmi: return 0 summary = self.awi.exogenous_contract_summary secured = self._sales if self._is_selling(nmi) else self._supplies demand = min(summary[0][0], summary[-1][0]) / (self.awi.n_competitors + 1) return demand - secured
[docs] def _is_selling(self, nmi): if not nmi: return None return nmi.annotation["product"] == self.awi.my_output_product
[docs] def _is_good_price(self, nmi, state, price): """Checks if a given price is good enough at this stage""" mn, mx = self._price_range(nmi) th = self._th(state.step, nmi.n_steps) # a good price is one better than the threshold if self._is_selling(nmi): return (price - mn) >= th * (mx - mn) else: return (mx - price) >= th * (mx - mn)
[docs] def _find_good_price(self, nmi, state): """Finds a good-enough price conceding linearly over time""" mn, mx = self._price_range(nmi) th = self._th(state.step, nmi.n_steps) # offer a price that is around th of your best possible price if self._is_selling(nmi): return int(mn + th * (mx - mn)) else: return int(mx - th * (mx - mn))
[docs] def _price_range(self, nmi): """Limits the price by the best price received""" mn = nmi.issues[UNIT_PRICE].min_value mx = nmi.issues[UNIT_PRICE].max_value if self._is_selling(nmi): partner = nmi.annotation["buyer"] mn = min( mx * (1 - self._range_slack), max( [mn] + [ p * (1 - slack) for p, slack in ( (self._best_selling, self._step_price_slack), (self._best_acc_selling, self._acc_price_slack), (self._best_opp_selling[partner], self._opp_price_slack), ( self._best_opp_acc_selling[partner], self._opp_acc_price_slack, ), ) ] ), ) else: partner = nmi.annotation["seller"] mx = max( mn * (1 + self._range_slack), min( [mx] + [ p * (1 + slack) for p, slack in ( (self._best_buying, self._step_price_slack), (self._best_acc_buying, self._acc_price_slack), (self._best_opp_buying[partner], self._opp_price_slack), ( self._best_opp_acc_buying[partner], self._opp_acc_price_slack, ), ) ] ), ) return int(mn), int(mx)
[docs] def _th(self, step, n_steps): """calculates a descending threshold (0 <= th <= 1)""" return ((n_steps - step - 1) / (n_steps - 1)) ** self._e
[docs] class GreedySyncAgent(OneShotSyncAgent, GreedyOneShotAgent): # type: ignore """A greedy agent based on OneShotSyncAgent""" def __init__(self, *args, threshold=None, **kwargs): super().__init__(*args, **kwargs) if threshold is None: threshold = random.random() * 0.2 + 0.2 self._threshold = threshold self.ufun: OneShotUFun # type: ignore
[docs] def before_step(self): super().before_step() self.ufun.find_limit(True) self.ufun.find_limit(False)
[docs] def first_proposals(self): """Decide a first proposal on every negotiation. Returning None for a negotiation means ending it.""" return dict( zip( self.negotiators.keys(), (self.best_offer(_) for _ in self.negotiators.keys()), ) )
[docs] def counter_all(self, offers, states) -> dict: """Respond to a set of offers given the negotiation state of each.""" if self.ufun.max_utility < 0: return dict(zip(offers.keys(), itertools.repeat(None))) good_prices = { k: self._find_good_price(self.get_nmi(k), s) for k, s in states.items() } responses = { k: SAOResponse(ResponseType.REJECT_OFFER, None) for k in offers.keys() } my_input_needs, my_output_needs = self._needs() input_offers = { k: v for k, v in offers.items() if not self._is_selling(self.get_nmi(k)) } output_offers = { k: v for k, v in offers.items() if self._is_selling(self.get_nmi(k)) } def calc_responses(my_needs, offers, is_selling): nonlocal responses if len(offers) == 0: return 0 sorted_offers = sorted( offers.values(), key=lambda x: -x[UNIT_PRICE] if is_selling else x[UNIT_PRICE], ) secured, outputs, chosen = 0, [], dict() for i, k in enumerate(offers.keys()): offer = sorted_offers[i] secured += offer[QUANTITY] if secured >= my_needs: break chosen[k] = offer outputs.append(is_selling) if ( self.ufun.from_offers(tuple(chosen.values()), tuple(outputs)) >= self._th(self.awi.current_step, self.awi.n_steps) * self.ufun.max_utility ): for k in chosen.keys(): responses[k] = SAOResponse(ResponseType.ACCEPT_OFFER, None) return secured secured = calc_responses(my_input_needs, input_offers, False) secured += calc_responses(my_output_needs, output_offers, True) for k, v in responses.items(): if v.response != ResponseType.REJECT_OFFER: continue responses[k] = SAOResponse( ResponseType.REJECT_OFFER, ( max(1, my_input_needs + my_output_needs - secured), self.awi.current_step, good_prices[k], ), ) return responses
[docs] def _needs(self): """ Returns both input and output needs """ if self.awi.is_middle_level: summary = self.awi.exogenous_contract_summary n = min(summary[0][0], summary[-1][0]) return n - self._supplies, n - self._sales if self.awi.is_first_level: return 0, self.awi.current_exogenous_input_quantity - self._sales return self.awi.current_exogenous_output_quantity - self._supplies, 0
[docs] def propose(self, negotiator_id, state): # type: ignore return OneShotSyncAgent.propose(self, negotiator_id, state)
[docs] def respond(self, negotiator_id, state, source=""): # type: ignore return OneShotSyncAgent.respond(self, negotiator_id, state, source)
[docs] class GreedySingleAgreementAgent(OneShotSingleAgreementAgent): """A greedy agent based on `OneShotSingleAgreementAgent`""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.ufun: OneShotUFun # type: ignore
[docs] def before_step(self): self.ufun.find_limit(True) self.ufun.find_limit(False)
[docs] def is_acceptable(self, offer, source, state) -> bool: mx, mn = self.ufun.max_utility, self.ufun.min_utility u = (self.ufun(offer) - mn) / (mx - mn) return u >= (1 - state.relative_time)
[docs] def best_offer(self, offers): ufuns = [(self.ufun(_), i) for i, _ in enumerate(offers.values())] keys = list(offers.keys()) return keys[max(ufuns)[1]]
[docs] def is_better(self, a, b, negotiator, state): return self.ufun(a) > self.ufun(b)