import random
from collections import defaultdict
from typing import (
Any,
Callable,
Dict,
List,
Optional,
Tuple,
Type,
Union,
TYPE_CHECKING,
)
import numpy as np
from negmas import (
AgentWorldInterface,
ControlledNegotiator,
LinearUtilityFunction,
MechanismState,
Outcome,
ResponseType,
make_issue,
outcome_is_valid,
Preferences,
UtilityFunction,
)
from negmas.common import NegotiatorMechanismInterface
from negmas.events import Notification, Notifier
from negmas.helpers import instantiate
from negmas.sao import (
SAOController,
SAONegotiator,
SAOResponse,
SAOState,
SAOSyncController,
)
if TYPE_CHECKING:
from scml.scml2020.components.trading import PredictionBasedTradingStrategy
from scml.scml2020.common import QUANTITY, TIME, UNIT_PRICE
__all__ = ["StepController", "SyncController"]
[docs]
class StepController(SAOController, Notifier):
"""A controller for managing a set of negotiations about selling or buying (but not both) starting/ending at some
specific time-step.
Args:
target_quantity: The quantity to be secured
is_seller: Is this a seller or a buyer
parent_name: Name of the parent
horizon: How many steps in the future to allow negotiations for selling to go for.
step: The simulation step that this controller is responsible about
urange: The range of unit prices used for negotiation
product: The product that this controller negotiates about
partners: A list of partners to negotiate with
negotiator_type: The type of the single negotiator used for all negotiations.
negotiator_params: The parameters of the negotiator used for all negotiations
max_retries: How many times can the controller try negotiating with each partner.
negotiations_concluded_callback: A method to be called with the step of this controller and whether is it a
seller when all negotiations are concluded
*args: Position arguments passed to the base Controller constructor
**kwargs: Keyword arguments passed to the base Controller constructor
Remarks:
- It uses whatever negotiator type on all of its negotiations and it assumes that the ufun will never change
- Once it accumulates the required quantity, it ends all remaining negotiations
- It assumes that all ufuns are identical so there is no need to keep a separate negotiator for each one and it
instantiates a single negotiator that dynamically changes the AMI but always uses the same ufun.
"""
def __init__(
self,
*args,
target_quantity: int,
is_seller: bool,
step: int,
urange: Tuple[int, int],
product: int,
partners: List[str],
negotiator_type: SAONegotiator,
horizon: int,
awi: AgentWorldInterface,
parent_name: str,
negotiations_concluded_callback: Callable[[int, bool], None],
negotiator_params: Dict[str, Any] = None,
max_retries: int = 2,
**kwargs,
):
super().__init__(*args, **kwargs)
[docs]
self.parent_name = parent_name
[docs]
self.negotiations_concluded_callback = negotiations_concluded_callback
[docs]
self.is_seller = is_seller
[docs]
self.target = target_quantity
[docs]
self.partners = partners
negotiator_params = (
negotiator_params if negotiator_params is not None else dict()
)
issues = [
make_issue((1, max(int(target_quantity), 1)), "quantity"),
make_issue((step, step), "time"),
make_issue(
(int(urange[0]), max(int(urange[0]), int(urange[1]))), "unit_price"
),
]
if is_seller:
self.ufun = LinearUtilityFunction(weights=(1.0, 1.0, 10.0), issues=issues)
else:
self.ufun = LinearUtilityFunction(weights=(1.0, -1.0, -10.0), issues=issues)
negotiator_params["ufun"] = self.ufun
[docs]
self.__negotiator = instantiate(negotiator_type, **negotiator_params)
[docs]
self.completed = defaultdict(bool)
[docs]
self.retries: Dict[str, int] = defaultdict(int)
[docs]
self.max_retries = max_retries
[docs]
def join(
self,
negotiator_id: str,
nmi: NegotiatorMechanismInterface,
state: MechanismState,
*,
preferences: Optional["Preferences"] = None,
ufun: Optional["UtilityFunction"] = None,
role: str = "agent",
) -> bool:
joined = super().join(
negotiator_id, nmi, state, preferences=preferences, ufun=ufun, role=role
)
if joined:
self.completed[negotiator_id] = False
return joined
[docs]
def propose(self, negotiator_id: str, state: MechanismState) -> Optional["Outcome"]:
if negotiator_id not in self.negotiators.keys():
return None
try:
self.__negotiator._nmi = self.negotiators[negotiator_id][0].nmi
except Exception:
self.__negotiator._nmi = None
return self.__negotiator.propose(state)
[docs]
def respond(
self,
negotiator_id: str,
state: MechanismState,
source: str = "",
) -> ResponseType:
if negotiator_id not in self.negotiators.keys():
return ResponseType.END_NEGOTIATION
if self.secured >= self.target:
return ResponseType.END_NEGOTIATION
# if negotiator_id not in self.negotiators:
# breakpoint()
self.__negotiator._nmi = self.negotiators[negotiator_id][0].nmi
try:
return self.__negotiator.respond(state=state, source=source)
except TypeError:
return self.__negotiator.respond(state=state)
[docs]
def __str__(self):
return (
f"{'selling' if self.is_seller else 'buying'} p{self.product} [{self.step}] "
f"secured {self.secured} of {self.target} for {self.parent_name} "
f"({len([_ for _ in self.completed.values() if _])} completed of {len(self.completed)} negotiators)"
)
[docs]
def create_negotiator(
self,
negotiator_type: Union[str, Type[ControlledNegotiator]] = None,
name: str = None,
cntxt: Any = None,
**kwargs,
) -> ControlledNegotiator:
neg = super().create_negotiator(negotiator_type, name, cntxt, **kwargs)
self.completed[neg.id] = False
return neg
[docs]
def time_range(self, step, is_seller):
if is_seller:
return (
max(step, self.awi.current_step + 1),
min(step + self.horizon, self.awi.n_steps - 1),
)
return self.awi.current_step + 1, step - 1
[docs]
def on_negotiation_end(self, negotiator_id: str, state: MechanismState) -> None:
super().on_negotiation_end(negotiator_id, state)
agreement = state.agreement
# mark this negotiation as completed
self.completed[negotiator_id] = True
# if there is an agreement increase the secured amount and check if we are done.
if agreement is not None:
self.secured += agreement[QUANTITY]
if self.secured >= self.target:
self.awi.loginfo(f"Ending all negotiations on controller {str(self)}")
# If we are done, end all other negotiations
for k in self.negotiators.keys():
if self.completed[k]:
continue
self.notify(
self.negotiators[k][0], Notification("end_negotiation", None)
)
self.kill_negotiator(negotiator_id, force=True)
if all(self.completed.values()):
# If we secured everything, just return control to the agent
if self.secured >= self.target:
self.awi.loginfo(f"Secured Everything: {str(self)}")
self.negotiations_concluded_callback(self.step, self.is_seller)
return
# If we did not secure everything we need yet and time allows it, create new negotiations
tmin, tmax = self.time_range(self.step, self.is_seller)
if self.awi.current_step < tmax + 1 and tmin <= tmax:
# get a good partner: one that was not retired too much
random.shuffle(self.partners)
for other in self.partners:
if self.retries[other] <= self.max_retries:
partner = other
break
else:
return
self.retries[partner] += 1
neg = self.create_negotiator()
self.completed[neg.id] = False
self.awi.loginfo(
f"{str(self)} negotiating with {partner} on u={self.urange}"
f", q=(1,{self.target-self.secured}), u=({tmin}, {tmax})"
)
if (
self.target > self.secured
and self.urange[0] <= self.urange[1]
and tmin <= tmax
):
self.awi.request_negotiation(
not self.is_seller,
product=self.product,
quantity=(1, self.target - self.secured),
unit_price=self.urange,
time=(tmin, tmax),
partner=partner,
negotiator=neg,
extra=dict(
controller_index=self.step, is_seller=self.is_seller
),
)
# our controller
[docs]
class SyncController(SAOSyncController):
"""
Will try to get the best deal which is defined as being nearest to the agent needs and with lowest price
"""
def __init__(
self,
*args,
is_seller: bool,
parent: "PredictionBasedTradingStrategy",
price_weight=0.7,
utility_threshold=0.9,
time_threshold=0.9,
**kwargs,
):
super().__init__(*args, **kwargs)
[docs]
self._is_seller = is_seller
[docs]
self._time_threshold = time_threshold
[docs]
self._price_weight = price_weight
[docs]
self._utility_threshold = utility_threshold
[docs]
self._best_utils: Dict[str, float] = {}
# find out my needs and the amount secured lists
[docs]
def utility(self, offer: Tuple[int, int, int], max_price: int) -> float:
"""A simple utility function
Remarks:
- If the time is invalid or there is no need to get any more agreements
at the given time, return -1000
- Otherwise use the price-weight to calculate a linear combination of
the price and the how much of the needs is satisfied by this contract
"""
if self._is_seller:
_needed, _secured = (
self.__parent.outputs_needed,
self.__parent.outputs_secured,
)
else:
_needed, _secured = (
self.__parent.inputs_needed,
self.__parent.inputs_secured,
)
if offer is None:
return -1000.0
t = offer[TIME]
if t < self.__parent.awi.current_step or t > self.__parent.awi.n_steps - 1:
return -1000.0
q = _needed[t] - (offer[QUANTITY] + _secured[t])
if q < 0:
return -1000.0
if self._is_seller:
price = offer[UNIT_PRICE]
else:
price = max_price - offer[UNIT_PRICE]
return self._price_weight * price + (1 - self._price_weight) * q
[docs]
def is_valid(self, negotiator_id: str, offer: "Outcome") -> bool:
issues = self.negotiators[negotiator_id][0].nmi.issues
return outcome_is_valid(offer, issues)
[docs]
def counter_all(
self, offers: Dict[str, "Outcome"], states: Dict[str, SAOState]
) -> Dict[str, SAOResponse]:
"""Calculate a response to all offers from all negotiators (negotiator ID is the key).
Args:
offers: Maps negotiator IDs to offers
states: Maps negotiator IDs to offers AT the time the offers were made.
Remarks:
- The response type CANNOT be WAIT.
"""
# find the best offer
negotiator_ids = list(offers.keys())
utils = np.array(
[
self.utility(
o, self.negotiators[nid][0].nmi.issues[UNIT_PRICE].max_value
)
for nid, o in offers.items()
]
)
best_index = int(np.argmax(utils))
best_utility = utils[best_index]
best_partner = negotiator_ids[best_index]
best_offer = offers[best_partner]
# find my best proposal for each negotiation
best_proposals = self.first_proposals()
# if the best offer is still so bad just reject everything
if best_utility < 0:
return {
k: SAOResponse(ResponseType.REJECT_OFFER, best_proposals[k])
for k in offers.keys()
if k in self.negotiators.keys()
}
relative_time = min(_.relative_time for _ in states.values())
# if this is good enough or the negotiation is about to end accept the best offer
if (
best_utility >= self._utility_threshold * self._best_utils[best_partner]
or relative_time > self._time_threshold
):
responses = {
k: SAOResponse(
ResponseType.REJECT_OFFER,
best_offer if self.is_valid(k, best_offer) else best_proposals[k],
)
for k in offers.keys()
}
responses[best_partner] = SAOResponse(ResponseType.ACCEPT_OFFER, None)
return responses
# send the best offer to everyone else and try to improve it
responses = {
k: SAOResponse(
ResponseType.REJECT_OFFER,
best_offer if self.is_valid(k, best_offer) else best_proposals[k],
)
for k in offers.keys()
}
responses[best_partner] = SAOResponse(
ResponseType.REJECT_OFFER, best_proposals[best_partner]
)
return responses
# def on_negotiation_end(self, negotiator_id: str, state: MechanismState) -> None:
# """Update the secured quantities whenever a negotiation ends"""
# if state.agreement is None:
# return
#
# q, t = state.agreement[QUANTITY], state.agreement[TIME]
# if self._is_seller:
# self.__parent.outputs_secured[t] += q
# else:
# self.__parent.inputs_secured[t] += q
#
[docs]
def best_proposal(self, nid: str) -> Tuple[Optional[Outcome], float]:
"""
Finds the best proposal for the given negotiation
Args:
nid: Negotiator ID
Returns:
The outcome with highest utility and the corresponding utility
"""
negotiator = self.negotiators[nid][0]
if negotiator.nmi is None:
return None, -1000
outcomes = negotiator.nmi.discrete_outcomes()
utils = np.array(
[
self.utility(_, negotiator.nmi.issues[UNIT_PRICE].max_value)
for _ in outcomes
]
)
best_indx = np.argmax(utils)
self._best_utils[nid] = utils[best_indx]
if utils[best_indx] < 0:
return None, utils[best_indx]
return outcomes[best_indx], utils[best_indx]
[docs]
def first_proposals(self) -> Dict[str, "Outcome"]:
"""Gets a set of proposals to use for initializing the negotiation."""
return {nid: self.best_proposal(nid)[0] for nid in self.negotiators.keys()}