Source code for scml.oneshot.rl.helpers

"""Helpers for the observation and action managers."""

import numpy as np
from collections import defaultdict
from typing import Mapping, TypeVar
from scml.oneshot.awi import OneShotAWI
from negmas.outcomes import Outcome
from scml.oneshot.common import QUANTITY, TIME, UNIT_PRICE, OneShotState
from scml.oneshot.rl.common import group_partners


__all__ = [
    "recover_offers",
    "read_offers",
    "encode_offers_with_time",
    "encode_offers_no_time",
    "decode_offers_no_time",
    "unnormalize_offers",
    "normalize_offers_no_time",
    "clip_normal",
    "clip",
    "discretize_and_clip",
    "normalize_and_clip",
]


[docs] def recover_offers( encoded: np.ndarray, awi: OneShotState | OneShotAWI, n_suppliers: int, n_consumers: int, max_group_size: int, continuous: bool, n_prices: int, ) -> dict[str, Outcome | None]: suppliers = group_partners(awi.my_suppliers, n_suppliers, max_group_size) consumers = group_partners(awi.my_consumers, n_consumers, max_group_size) buyos = awi.current_input_outcome_space sellos = awi.current_output_outcome_space return decode_offers_no_time( encoded, n_suppliers, n_consumers, suppliers, consumers, awi.current_step, continuous, buyos.issues[UNIT_PRICE].min_value, sellos.issues[UNIT_PRICE].min_value, buyos.issues[UNIT_PRICE].max_value, sellos.issues[UNIT_PRICE].max_value, buyos.issues[QUANTITY].max_value, sellos.issues[QUANTITY].max_value, n_prices=n_prices, )
def encode_given_offers( offers: dict[str, Outcome | None], state: OneShotAWI | OneShotState, n_suppliers: int, n_consumers: int, max_group_size: int, continuous: bool, ) -> list[tuple[int, int]] | list[tuple[float, float]]: encoder = encode_offers_no_time normalizer = normalize_offers_no_time suppliers = group_partners(state.my_suppliers, n_suppliers, max_group_size) consumers = group_partners(state.my_consumers, n_consumers, max_group_size) min_iprice = state.current_input_outcome_space.issues[UNIT_PRICE].min_value max_iprice = state.current_input_outcome_space.issues[UNIT_PRICE].max_value max_iquantity = state.current_input_outcome_space.issues[QUANTITY].max_value ioffers = encoder(offers, suppliers, min_iprice, max_iprice) if continuous: ioffers = normalizer( ioffers, min_iprice, max_iprice, 0, max_iquantity, subtract_min_price=False ) min_oprice = state.current_output_outcome_space.issues[UNIT_PRICE].min_value max_oprice = state.current_output_outcome_space.issues[UNIT_PRICE].max_value max_oquantity = state.current_output_outcome_space.issues[QUANTITY].max_value ooffers = encoder(offers, consumers, min_oprice, max_iprice) if continuous: ooffers = normalizer( ooffers, min_oprice, max_oprice, 0, max_oquantity, subtract_min_price=False ) return ioffers + ooffers
[docs] def read_offers( state: OneShotAWI | OneShotState, n_suppliers: int, n_consumers: int, max_group_size: int, continuous: bool, ) -> list[tuple[int, int]] | list[tuple[float, float]]: return encode_given_offers( offers=state.current_offers, # type: ignore state=state, n_suppliers=n_suppliers, n_consumers=n_consumers, max_group_size=max_group_size, continuous=continuous, )
[docs] def encode_offers_with_time( offers: Mapping[str, Outcome | None], partner_groups: list[list[str]], min_price: int, max_price: int, ) -> list[tuple[int, int, int]]: """ Encodes offers from the given partner groups into `n_partners` tuples of quantity, unit-price values. Args: offers: All received offers. Keys are sources. Sources not in the `partner_groups` will be ignored partner_groups: A list of lists of partner IDs each defining a group to be considered together min_price: Minimum allowed price max_price: Maximum allowed price Return: A list of quantity, unit-price tuples of length `len(partner_groups)`. """ n_partners = len(partner_groups) offer_list: list[tuple[int, int, int]] = [(0, 0, 0) for _ in range(n_partners)] for i, partners in enumerate(partner_groups): n_read = 0 curr_offer = dict() for partner in partners: outcome = offers.get(partner, None) if outcome is None: continue c = curr_offer.get(outcome[TIME], (0, 0)) curr_offer[outcome[TIME]] = ( c[0] + outcome[QUANTITY], c[1] + outcome[UNIT_PRICE] * outcome[QUANTITY], ) n_read += 1 if n_read: for t, c in curr_offer.items(): if c[0]: c = ( c[0], c[1] / c[0] - min_price, ) else: c = (0, max_price - min_price) curr_offer[t] = c offer_list[i] else: offer_list[i] = (0, 0, 0) return offer_list
[docs] def encode_offers_no_time( offers: Mapping[str, Outcome | None], partner_groups: list[list[str]], min_price: int, max_price: int, ) -> list[tuple[int, int]]: """ Encodes offers from the given partner groups into `n_partners` tuples of quantity, unit-price values. Args: offers: All received offers. Keys are sources. Sources not in the `partner_groups` will be ignored partner_groups: A list of lists of partner IDs each defining a group to be considered together min_price: Minimum allowed price max_price: Maximum allowed price Return: A list of quantity, unit-price tuples of length `len(partner_groups)`. """ n_partners = len(partner_groups) offer_list: list[tuple[int, int]] = [(0, 0) for _ in range(n_partners)] for i, partners in enumerate(partner_groups): n_read = 0 curr_offer = (0, 0) for partner in partners: outcome = offers.get(partner, None) if outcome is None: continue curr_offer = ( curr_offer[0] + outcome[QUANTITY], curr_offer[1] + outcome[UNIT_PRICE] * outcome[QUANTITY], ) n_read += 1 if n_read: if curr_offer[0]: curr_offer = ( curr_offer[0], curr_offer[1] / curr_offer[0] - min_price, ) else: curr_offer = (0, max_price - min_price) offer_list[i] = curr_offer return offer_list
[docs] def decode_offers_no_time( encoded: np.ndarray | list[tuple[int, int]] | list[tuple[float, float]], n_suppliers: int, n_consumers: int, suppliers: list[list[str]], consumers: list[list[str]], step: int, continuous: bool, min_buy_price: int, min_sell_price: int, max_buy_price: int = -1, max_sell_price: int = -1, max_buy_quantity: int = -1, max_sell_quantity: int = -1, n_prices: int | None = None, ) -> dict[str, Outcome | None]: """ Inverts `encode_offers_no_time` Remarks: - max_* are only needed if continuous is True """ n_partners = n_suppliers + n_consumers encoded = np.asarray(encoded).flatten()[: n_partners * 2] e = np.asarray(encoded).reshape((n_partners, 2)) encodedl = e.tolist() supplier_offers = encodedl[:n_suppliers] consumer_offers = encodedl[n_suppliers:] if continuous: supplier_offers = unnormalize_offers( supplier_offers, min_buy_price, max_buy_price, 0, max_buy_quantity, add_min_price=False, ) consumer_offers = unnormalize_offers( consumer_offers, min_sell_price, max_sell_price, 0, max_sell_quantity, add_min_price=False, ) responses: dict[str, Outcome | None] = defaultdict(lambda: (0, 0, 0)) def update_respones(plst, w, is_supplier): p = "+".join(plst) minprice = min_buy_price if is_supplier else min_sell_price maxprice = max_buy_price if is_supplier else max_sell_price if w[0] == w[1] == 0: responses[p] = None return price = w[1] + minprice if n_prices: price *= n_prices / (maxprice - minprice + 1) outcome = (int(w[0] + 0.5), step, price) r = responses[p] if r is None: responses[p] = outcome else: responses[p] = ( r[0] + outcome[0], max( outcome[1], r[1] ), # we use the largest step here as all steps should be equal anyway r[-1] + outcome[-1], ) if len(suppliers) != len(supplier_offers) or len(consumers) != len(consumer_offers): raise AssertionError("fdsdf") for plst, w in zip(suppliers, supplier_offers, strict=True): update_respones(plst, w, True) for plst, w in zip(consumers, consumer_offers, strict=True): update_respones(plst, w, False) result = { k: None if v is not None and v[0] == 0 and v[1] == 0 else v for k, v in responses.items() } return result
def normalize_offers_with_time( offers: list[tuple[int, int, int]], min_price: int, max_price: int, min_quantity: int, max_quantity: int, ) -> list[tuple[float, float, float]]: """ Normalize the offers to values between 0 and 1 for both quantity and unit price """ d = max_price - min_price if not d: d = 1 dq = max_quantity - min_quantity if not dq: dq = 1 return [ (float(offer[0] - min_quantity) / dq, offer[1], float(offer[-1]) / d) for offer in offers ]
[docs] def normalize_offers_no_time( offers: list[tuple[int, int]], min_price: int, max_price: int, min_quantity: int, max_quantity: int, subtract_min_price: int = False, ) -> list[tuple[float, float]]: """ Normalize the offers to values between 0 and 1 for both quantity and unit price """ d = max_price - min_price if not d: d = 1 dq = max_quantity - min_quantity if not dq: dq = 1 if not subtract_min_price: min_price = 0 return [ (float(offer[0] - min_quantity) / dq, float(offer[1] - min_price) / d) for offer in offers ]
[docs] def unnormalize_offers( offers: list[tuple[float, float]], min_price: int, max_price: int, min_quantity: int, max_quantity: int, add_min_price: bool = False, ) -> list[tuple[int, int]]: """ Reverses `normalize_offers` converting quantities and prices in the range 0,1 to integers """ d = max_price - min_price if not d: d = 1 dq = max_quantity - min_quantity if not dq: dq = 1 if not add_min_price: min_price = 0 return [ (int(offer[0] * dq + min_quantity + 0.5), int(offer[1] * d + min_price + 0.5)) for offer in offers ]
[docs] def clip_normal( x: float, mu: float, sigma: float, n_sigmas: float | int = 3, eps: float = 1e-6, ) -> float: """ Normalizes x between 0 and 1 given that it is sampled from a normal (mu, sigma). This is actually a very stupid way to do it. """ mn = mu - n_sigmas * sigma mx = mu + n_sigmas * sigma if abs(mn - mx) < eps: return 1.0 return max(0.0, min(1.0, (x - mn) / (mx - mn)))
T = TypeVar("T", bound=int | float)
[docs] def clip(x: T, mn: T = 0, mx: T = 1) -> T: return max(mn, min(mx, x))
[docs] def discretize_and_clip(x: float, n_bins: int) -> int: return min(n_bins - 1, max(0, int(0.5 + (n_bins - 1) * x)))
[docs] def normalize_and_clip(x: int, mn: T, mx: T, eps=1e-6) -> float: d = mx - mn if d < eps: return float(mx) return clip((x - mn) / d, 0.0, 1.0)