"""Simulators module implementing factory simulation"""
import math
import sys
from abc import ABC, abstractmethod
from collections import defaultdict
from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import numpy as np
from .common import NO_PRODUCTION, Factory, Job, ManufacturingProfile
__all__ = [
"FactorySimulator",
"SlowFactorySimulator",
"FastFactorySimulator",
"transaction",
"temporary_transaction",
]
def storage_as_array(storage: Dict[int, int], n_products: int) -> np.array:
"""
Converts storage to an array
Args:
storage: A dictionary giving quantity for each product index
n_products: number of products (size of the resulting array)
Returns:
"""
a = np.zeros(n_products)
for k, v in storage.items():
a[k] = v
return a
[docs]
class FactorySimulator(ABC):
"""Simulates a factory allowing for prediction of storage/balance in the future.
Args:
initial_wallet: The initial amount of cash in the wallet
initial_storage: initial inventory
n_steps: number of simulation steps
n_products: number of products in the world
profiles: all profiles that the factory being simulated can run
max_storage: maximum available storage space.
"""
def __init__(
self,
initial_wallet: float,
initial_storage: Dict[int, int],
n_steps: int,
n_products: int,
profiles: List[ManufacturingProfile],
max_storage: Optional[int] = None,
):
self._n_steps = n_steps
self._max_storage = max_storage if max_storage is not None else sys.maxsize
self._initial_wallet = initial_wallet
self._initial_storage = np.zeros(n_products)
for k, v in initial_storage.items():
self._initial_storage[k] = v
self._profiles = profiles
self._n_products = n_products
self._reserved_storage = np.zeros(shape=(n_products, n_steps))
[docs]
def _as_array(self, storage: Dict[int, int]):
return storage_as_array(storage=storage, n_products=self._n_products)
# -----------------
# FIXED PROPERTIES
# -----------------
@property
[docs]
def max_storage(self) -> Optional[int]:
"""Maximum storage available"""
return self._max_storage
@property
[docs]
def n_steps(self) -> int:
"""Number of steps to predict ahead."""
return self._n_steps
@property
[docs]
def initial_wallet(self) -> float:
"""Initial cash in wallet"""
return self._initial_wallet
@property
[docs]
def initial_storage(self) -> np.array:
"""Initial inventory"""
return self._initial_storage
@property
@abstractmethod
[docs]
def n_lines(self):
"""Number of lines"""
@property
@abstractmethod
[docs]
def final_balance(self) -> float:
"""Final balance given everything scheduled so-far"""
# -------------------------------
# DYNAMIC PROPERTIES (READ STATE)
# -------------------------------
@abstractmethod
[docs]
def wallet_to(self, t: int) -> np.array:
"""
Returns the cash in wallet up to and including time t.
Args:
t: Time
Returns:
"""
[docs]
def wallet_at(self, t: int) -> float:
"""
Returns the cash in wallet *at* a given timestep (given all simulated actions)
Args:
t:
Returns:
"""
return self.wallet_to(t)[-1]
@abstractmethod
[docs]
def storage_to(self, t: int) -> np.array:
"""
Returns the storage of all products *up to* time t
Args:
t: Time
Returns:
An array of size `n_products` * `t` giving the quantity of each product in storage at every step up to `t`.
"""
[docs]
def storage_at(self, t: int) -> np.array:
"""
Returns the storage of all products *at* time t
Args:
t: Time
Returns:
An array of size `n_products` giving the quantity of each product in storage at time-step `t`.
See Also:
`storage_to` `wallet_at`
"""
return self.storage_to(t)[:, -1]
@abstractmethod
[docs]
def line_schedules_to(self, t: int) -> np.array:
"""
Returns the schedule of each line up to a given timestep
Args:
t: time
Returns:
An array of `n_lines` * `t` values giving the schedule up to `t`.
Remarks:
- A `NO_PRODUCTION` value means no production, otherwise the index of the process being run
"""
[docs]
def line_schedules_at(self, t: int) -> np.array:
"""
Returns the schedule of each line at a given timestep
Args:
t: time
Returns:
An array of `n_lines` values giving the schedule up at `t`.
Remarks:
- A `NO_PRODUCTION` value means no production, otherwise the index of the process being run
"""
return self.line_schedules_to(t)[:, -1]
[docs]
def total_storage_to(self, t: int) -> np.array:
"""
The total storage *up to* a given time
Args:
t: time
Returns:
an array of size `t` giving the total quantity of stored products in the inventory up to timestep `t`
See Also:
`total_storage_at` `storage_to`
"""
return self.storage_to(t).sum(axis=0)
[docs]
def total_storage_at(self, t: int) -> int:
"""
The total storage *at* a given time
Args:
t: time
Returns:
an integer giving the total quantity of stored products in the inventory at timestep `t`
See Also:
`total_storage_to` `storage_at`
"""
return self.total_storage_to(t)[-1]
[docs]
def reserved_storage_to(self, t: int) -> np.array:
"""
Returns the *reserved* storage of all products *up to* time t
Args:
t: Time
Returns:
An array of size `n_products` * `t` giving the quantity of each product reserved at every step up to `t`.
Remarks:
- Reserved storage *is counted* in calls to `storage_at` , `total_storage_at` , `storage_to`
, `total_storage_to`
- Reserving quantities of products is a tool that can be used to avoid double counting availability of given
products in the inventory for multiple contracts.
See Also:
`total_storage_at` `storage_at` `reserved_storage_at`
"""
return self._reserved_storage[:, : t + 1]
[docs]
def reserved_storage_at(self, t: int) -> np.array:
"""
Returns the *reserved* storage of all products *at* time t
Args:
t: Time
Returns:
An array of size `n_products` giving the quantity of each product reserved at time-step `t`.
Remarks:
- Reserved storage *is counted* in calls to `storage_at` , `total_storage_at` , `storage_to`
, `total_storage_to`
- Reserving quantities of products is a tool that can be used to avoid double counting availability of given
products in the inventory for multiple contracts.
See Also:
`total_storage_to` `storage_to` `reserved_storage_at`
"""
return self._reserved_storage[:, t]
[docs]
def available_storage_to(self, t: int) -> np.array:
"""
Returns the *available* storage of all products *up to* time t.
Args:
t: Time
Returns:
An array of size `n_products` * `t` giving the quantity of each product available at every step up to `t`.
Remarks:
- Available storage is defined as the difference between storage and reserved storage.
- Reserved storage *is counted* in calls to `storage_at` , `total_storage_at` , `storage_to`
, `total_storage_to`
- Reserving quantities of products is a tool that can be used to avoid double counting availability of given
products in the inventory for multiple contracts.
See Also:
`total_storage_to` `storage_to` `reserved_storage_to`
"""
return self.storage_to(t) - self.reserved_storage_to(t)
[docs]
def available_storage_at(self, t: int) -> np.array:
"""
Returns the *available* storage of all products *at* time t
Args:
t: Time
Returns:
An array of size `n_products` giving the quantity of each product available at time-step `t`.
Remarks:
- Available storage is defined as the difference between storage and reserved storage.
- Reserved storage *is counted* in calls to `storage_at` , `total_storage_at` , `storage_to`
, `total_storage_to`
- Reserving quantities of products is a tool that can be used to avoid double counting availability of given
products in the inventory for multiple contracts.
See Also:
`total_storage_to` `storage_to` `reserved_storage_at`
"""
return self.storage_at(t) - self.reserved_storage_at(t)
@abstractmethod
[docs]
def loans_to(self, t: int) -> np.array:
"""
Returns loans up to time t
Args:
t: time
Returns:
An array of `t` real numbers giving the loans registered at time-steps up to `t`
"""
[docs]
def loans_at(self, t: int) -> float:
"""
Returns loans at time t
Args:
t: time
"""
return self.loans_to(t)[-1]
[docs]
def balance_at(self, t: int) -> float:
"""
Returns the balance fo the factory at time t.
Args:
t: time
Remarks:
- The balance is defined as the cash in wallet minus loans
See Also:
`loans_at` `wallet_at`
"""
return self.wallet_at(t) - self.loans_at(t)
[docs]
def balance_to(self, t: int) -> np.array:
"""
Returns the balance fo the factory *up to* time t.
Args:
t: time
Remarks:
- The balance is defined as the cash in wallet minus loans
See Also:
`loans_to` `wallet_to`
"""
return self.wallet_to(t) - self.loans_to(t)
@property
@abstractmethod
[docs]
def fixed_before(self):
"""Gives the time before which the schedule is fixed.
See Also:
`fix_before`
"""
# -------------------------
# OPERATIONS (UPDATE STATE)
# -------------------------
@abstractmethod
[docs]
def set_state(
self,
t: int,
storage: np.array,
wallet: float,
loans: float,
line_schedules: np.array,
) -> None:
"""
Sets the current state at the given time-step. It implicitly causes a fix_before(t + 1)
Args:
t: Time step to set the state at
storage: quantity of every product (array of integers of size `n_products`)
wallet: Cash in wallet
loans: Loans
line_schedules: Line schedules (array of process numbers/NO_PRODUCTION of size `n_lines`)
"""
@abstractmethod
[docs]
def add_loan(self, total: float, t: int) -> bool:
"""
Adds a loan at the given time
Args:
total: Total amount of the loan
t: time step to take the loan
Returns:
Success or failure
Remarks:
- Taking a loan is simulated as reception of money. Payment back of the loan is not simulated in this call.
To simulate paying back the loan, use `pay` at the times of installment payments.
"""
[docs]
def receive(self, payment: float, t: int) -> bool:
"""
Simulates receiving payment at time t
Args:
payment: Amount received
t: time
Returns:
Success or failure
"""
return self.pay(-payment, t)
@abstractmethod
[docs]
def pay(self, payment: float, t: int, ignore_money_shortage: bool = True) -> bool:
"""
Simulate payment at time t
Args:
payment: Amount payed
t: time
ignore_money_shortage: If True, shortage in money will be ignored and the wallet can go negative
Returns:
Success or failure
"""
@abstractmethod
[docs]
def transport_to(
self,
product: int,
quantity: int,
t: int,
ignore_inventory_shortage: bool = True,
ignore_space_shortage: bool = True,
) -> bool:
"""
Simulates transporting products to/from storage at time t
Args:
product: product ID (index)
quantity: quantity to transport
t: time
ignore_inventory_shortage: Ignore shortage in the `product` which may lead to negative storage[product]
ignore_space_shortage: Ignore the limit on total storage which may lead to total_storage > max_storage
Returns:
Success or failure
"""
@abstractmethod
[docs]
def buy(
self,
product: int,
quantity: int,
price: int,
t: int,
ignore_money_shortage: bool = True,
ignore_space_shortage: bool = True,
) -> bool:
"""
Buy a given quantity of a product for a given price at some time t
Args:
product: Product to buy (ID/index)
quantity: quantity to buy
price: unit price
t: time
ignore_money_shortage: If True, shortage in money will be ignored and the wallet can go negative
ignore_space_shortage: Ignore the limit on total storage which may lead to total_storage > max_storage
Returns:
Success or failure
Remarks:
- buy cannot ever have inventory shortage
See Also:
`sell`
"""
@abstractmethod
[docs]
def sell(
self,
product: int,
quantity: int,
price: int,
t: int,
ignore_money_shortage: bool = True,
ignore_inventory_shortage: bool = True,
) -> bool:
"""
sell a given quantity of a product for a given price at some time t
Args:
product: Index/ID of the product to be sold
quantity: quantity to be sold
price: unit price
t: time
ignore_money_shortage: If True, shortage in money will be ignored and the wallet can go negative
ignore_inventory_shortage: Ignore shortage in the `product` which may lead to negative storage[product]
Returns:
Success or failure
Remarks:
- sell cannot ever have space shortage
See Also:
`buy`
"""
@abstractmethod
[docs]
def schedule(
self,
job: Job,
ignore_inventory_shortage=True,
ignore_money_shortage=True,
ignore_space_shortage=True,
override=True,
) -> bool:
"""
Simulates scheduling the given job at its `time` and `line` optionally overriding whatever was already scheduled
Args:
job: Production job
ignore_inventory_shortage: If true shortages in inputs will be ignored
ignore_money_shortage: If true, shortage in money will be ignored
ignore_space_shortage: If true, shortage in space will be ignored
override: Whether the job should override any already registered job at its time-step
Returns:
Success/failure
"""
[docs]
def reserve(self, product: int, quantity: int, t: int) -> bool:
"""
Simulates reserving the given quantity of the given product at times >= t.
Args:
product: Index/ID of the product being reserved
quantity: quantity being reserved
t: time
Returns:
Success/failure
Remarks:
- Reserved products show in calls to `storage_at` , `total_storage_at` etc.
- Reserving a product does nothing more than mark some quantity as reserved for calls to
`reserved_storage_at` and `available_storage_at`.
- This feature can be used to simulate inventory hiding commands in the real factory and to avoid
double counting of inventory when calculating needs for future contracts.
"""
self._reserved_storage[product, t] += quantity
return True
# ------------------
# HISTORY MANAGEMENT
# ------------------
@abstractmethod
[docs]
def fix_before(self, t: int) -> bool:
"""
Fix the history before this point
Args:
t: time
Returns:
Success/failure
Remarks:
- After this function is called at any time-step `t`, there is no way to change any component of the factory
state at any timestep before `t`.
- This function is useful for *fixing* any difference between the simulator and the real state (in
conjunction with `set_state`).
See Also:
`set_state` `fixed_before`
"""
@abstractmethod
[docs]
def bookmark(self) -> int:
"""Sets a bookmark to the current location
Returns:
bookmark ID
Remarks:
- Bookmarks can be used to implement transactions.
See Also:
`delete_bookmark` `rollback` `transaction` `temporary_transaction`
"""
@abstractmethod
[docs]
def rollback(self, bookmark_id: int) -> bool:
"""Rolls back to the given bookmark ID
Args:
bookmark_id The bookmark ID returned from bookmark
Remarks:
- You can only rollback in the reverse order of bookmarks. If the bookmark ID given here is not the one
at the top of the bookmarks stack, the rollback will fail (return False)
See Also:
`delete_bookmark` `rollback` `transaction` `temporary_transaction`
"""
@abstractmethod
[docs]
def delete_bookmark(self, bookmark_id: int) -> bool:
"""Commits everything since the bookmark so it cannot be rolled back
Args:
bookmark_id The bookmark ID returned from bookmark
Returns:
Success/failure
Remarks:
- You can delete bookmarks in the reverse order of their creation only. If the bookmark ID given here is
not the one at the top of the bookmarks stack, the deletion will fail (return False).
See Also:
`delete_bookmark` `rollback` `transaction` `temporary_transaction`
"""
@dataclass
class _Bookmark:
id: int
jobs: Dict[int, List[int]] = field(
default_factory=lambda: defaultdict(list), init=False
)
buy_contracts: Dict[int, List[int]] = field(
default_factory=lambda: defaultdict(list), init=False
)
sell_contracts: Dict[int, List[int]] = field(
default_factory=lambda: defaultdict(list), init=False
)
payment_updates: Dict[int, float] = field(
default_factory=lambda: defaultdict(float), init=False
)
loans_updates: Dict[int, float] = field(
default_factory=lambda: defaultdict(float), init=False
)
storage_updates: Dict[int, Dict[int, int]] = field(
default_factory=lambda: defaultdict(lambda: defaultdict(int)), init=False
)
@dataclass
class _State:
t: int
storage: np.array
wallet: float
loans: float
line_schedules: np.array
[docs]
class SlowFactorySimulator(FactorySimulator):
"""A slow factory simulator that runs an internal factory to find-out what will happen in the future
Remarks:
- It is *much* faster to always access the properties/methods of this class in ascending time. If that is not
the case, each time reversal will cause a complete reset.
- It is recommended to call `fix_before` () to fix the past once a production step is completed. That will speed
up operations
"""
[docs]
def set_state(
self,
t: int,
storage: np.array,
wallet: float,
loans: float,
line_schedules: np.array,
) -> None:
for i, s in enumerate(storage):
d = s - self.storage_at(t)[i]
if d == 0.0:
continue
self._storage_updates[t][i] += d
d = self.wallet_at(t) - wallet
if d != 0.0:
self._payment_updates[t] -= d
d = self.loans_at(t) - loans
if d != 0.0:
self._loans_updates[t] -= d
expected_schedules = self.line_schedules_at(t)
for i in range(self.n_lines):
expected, actual = expected_schedules[i], line_schedules[i]
if expected == actual:
continue
if expected == NO_PRODUCTION:
raise ValueError(
f"Expected no production at time {t} on line {i} but actually process "
f"{actual} is running"
)
if expected != actual and actual != NO_PRODUCTION:
raise ValueError(
f"Expected process {expected} at time {t} on line {i} but actually process "
f"{actual} is running"
)
self._line_schedules[i, t] = actual
self.fix_before(t + 1)
self._saved_states[t].append(
_State(
t=t,
storage=storage.copy(),
wallet=wallet,
loans=loans,
line_schedules=line_schedules.copy(),
)
)
[docs]
def delete_bookmark(self, bookmark_id: int) -> bool:
if self._active_bookmark is None or self._active_bookmark.id != bookmark_id:
raise ValueError("there is no active bookmark to delete")
self._bookmarks, self._bookmarked_at = (
self._bookmarks[:-1],
self._bookmarked_at[:-1],
)
self._active_bookmark = (
self._bookmarks[-1] if len(self._bookmarks) > 0 else None
)
self._active_bookmarked_at = (
self._bookmarked_at[-1] if len(self._bookmarked_at) > 0 else -1
)
return True
[docs]
def bookmark(self) -> int:
bookmark = _Bookmark(id=len(self._bookmarks))
self._bookmarks.append(bookmark)
self._bookmarked_at.append(self._factory.next_step)
self._active_bookmark = bookmark
self._active_bookmarked_at = self._bookmarked_at[-1]
return bookmark.id
[docs]
def rollback(self, bookmark_id: int) -> bool:
if self._active_bookmark is None or self._active_bookmark.id != bookmark_id:
raise ValueError("there is no active bookmark to rollback")
for t, payment in self._active_bookmark.payment_updates.items():
self._payment_updates[t] += payment
for t, payment in self._active_bookmark.loans_updates.items():
self._loans_updates[t] += payment
for t, storage in self._active_bookmark.storage_updates.items():
s = self._storage_updates[t]
for k, v in storage:
s[k] -= v
for t, rolled_indices in self._active_bookmark.jobs.items():
self._jobs[t] = [
_ for i, _ in enumerate(self._jobs[t]) if i not in rolled_indices
]
for t, rolled_indices in self._active_bookmark.buy_contracts.items():
self._buy_contracts[t] = [
_
for i, _ in enumerate(self._buy_contracts[t])
if i not in rolled_indices
]
for t, rolled_indices in self._active_bookmark.sell_contracts.items():
self._sell_contracts[t] = [
_
for i, _ in enumerate(self._sell_contracts[t])
if i not in rolled_indices
]
if self._factory.next_step != self._bookmarked_at:
self.goto(self._active_bookmarked_at)
return True
@property
[docs]
def final_balance(self) -> float:
self.goto(self.n_steps - 1)
return self.balance_at(self.n_steps - 1)
@property
[docs]
def n_lines(self):
return self._factory.n_lines
[docs]
def fix_before(self, t: int) -> bool:
self.goto(t)
self._fixed_before = t
invalid = [i for i, bt in enumerate(self._bookmarked_at) if bt < t]
self._bookmarks = [_ for i, _ in enumerate(self._bookmarks) if i not in invalid]
self._bookmarked_at = [
_ for i, _ in enumerate(self._bookmarked_at) if i not in invalid
]
return True
def __init__(
self,
initial_wallet: float,
initial_storage: Dict[int, int],
n_steps: int,
n_products: int,
profiles: List[ManufacturingProfile],
max_storage: Optional[int],
):
super().__init__(
initial_wallet=initial_wallet,
initial_storage=initial_storage,
n_steps=n_steps,
n_products=n_products,
profiles=profiles,
max_storage=max_storage,
)
self._factory = Factory(
initial_storage=initial_storage,
initial_wallet=initial_wallet,
profiles=profiles,
max_storage=max_storage,
)
self._jobs: Dict[int, List[(Job, bool, bool, bool, bool)]] = defaultdict(list)
self._buy_contracts: Dict[int, List[(int, int, float)]] = defaultdict(list)
self._sell_contracts: Dict[int, List[(int, int, float)]] = defaultdict(list)
self._payment_updates: Dict[int, float] = defaultdict(float)
self._loans_updates: Dict[int, float] = defaultdict(float)
self._storage_updates: Dict[int, Dict[int, int]] = defaultdict(
lambda: defaultdict(int)
)
self._wallet = np.zeros(n_steps)
self._loans = np.zeros(n_steps)
self._storage = np.zeros(shape=(n_products, n_steps))
self._line_schedules = np.zeros(shape=(self._factory.n_lines, self._n_steps))
self._fixed_before = 0
self._bookmarks: List[_Bookmark] = []
self._active_bookmark: Optional[_Bookmark] = None
self._active_bookmarked_at: int = -1
self._bookmarked_at: List[int] = []
self._saved_states: Dict[int, List[_State]] = defaultdict(list)
[docs]
def _update_state(self) -> None:
t = self._factory.next_step - 1
if t < 0:
return
self._wallet[t] = self._factory.wallet
self._loans[t] = self._factory.loans
self._storage[:, t] = self._as_array(self._factory.storage)
self._line_schedules[:, t] = np.array(
list(
NO_PRODUCTION if command.is_none else command.profile.process.id
for command in self._factory.commands
)
)
[docs]
def reset_to(self, t: int) -> None:
self._factory = Factory(
initial_storage={
i: v for i, v in enumerate(self._initial_storage) if v != 0
},
initial_wallet=self._initial_wallet,
profiles=self._profiles,
)
for step in range(t + 1):
self._factory.receive(payment=self._payment_updates.get(step, 0.0))
self._factory.add_loan(total=self._loans_updates.get(step, 0.0))
jobs = self._jobs.get(t, [])
for job, override, ignore_storage, ignore_money, ignore_space in jobs:
# @todo use ignore* here
try:
self._factory.schedule(job=job, override=override)
except ValueError as err:
print(err)
contracts = self._buy_contracts.get(t, [])
for product, quantity, price in contracts:
try:
self._factory.buy(product=product, quantity=quantity, price=price)
except ValueError as err:
print(err)
contracts = self._sell_contracts.get(t, [])
for product, quantity, price in contracts:
try:
self._factory.sell(product=product, quantity=quantity, price=price)
except ValueError as err:
print(err)
inventory = self._storage_updates.get(step, {})
for product, quantity in inventory.items():
try:
self._factory.transport_to(product, quantity)
except ValueError as err:
print(err)
self._update_state()
[docs]
def goto(self, t: int) -> None:
"""
Steps the factory to the end of step t
Args:
t: time
Returns:
"""
if t > self.n_steps - 1:
t = self.n_steps - 1
if self._factory.next_step > t + 1:
if t < self._fixed_before:
return
self.reset_to(t)
while self._factory.next_step <= t:
step = self._factory.next_step
loan = self._loans_updates.get(step, None)
if loan is not None:
self._factory.add_loan(loan)
payment = self._payment_updates.get(step, None)
if payment is not None:
self._factory.pay(payment)
jobs = self._jobs.get(step, [])
for job, override, ignore_storage, ignore_money, ignore_space in jobs:
# @todo implement ignore_input_shortage inside the factory to use ignore_input_shortage here
self._factory.schedule(job=job, override=override)
self._factory.step()
inventory = self._storage_updates.get(step, None)
if inventory is not None:
for product, quantity in inventory.items():
self._factory.transport_to(product, quantity)
self._update_state()
[docs]
def wallet_to(self, t: int) -> np.array:
if t < self._fixed_before:
return self._wallet[: t + 1]
self.goto(t)
return self._wallet[: t + 1]
[docs]
def line_schedules_to(self, t: int) -> np.array:
if t < self._fixed_before:
return self._storage[:, : t + 1]
self.goto(t)
return self._line_schedules[:, : t + 1]
[docs]
def storage_to(self, t: int) -> np.array:
if t < self._fixed_before:
return self._storage[:, : t + 1]
self.goto(t)
return self._storage[:, : t + 1]
[docs]
def loans_to(self, t: int) -> float:
if t < self._fixed_before:
return self._loans[: t + 1]
self.goto(t)
return self._loans[: t + 1]
[docs]
def add_loan(self, total: float, t: int) -> bool:
if t < self._fixed_before:
raise ValueError(
f"Cannot run operations in the past (t={t}, fixed before {self._fixed_before})"
)
self._loans_updates[t] += total
if self._active_bookmark:
self._active_bookmark.loans_updates[t] += total
return True
[docs]
def pay(self, payment: float, t: int, ignore_money_shortage: bool = True) -> bool:
if t < self._fixed_before:
raise ValueError(
f"Cannot run operations in the past (t={t}, fixed before {self._fixed_before})"
)
self._payment_updates[t] += payment
if self._active_bookmark:
self._active_bookmark.payment_updates[t] += payment
return True
[docs]
def transport_to(
self,
product: int,
quantity: int,
t: int,
ignore_inventory_shortage: bool = True,
ignore_space_shortage: bool = True,
) -> bool:
if t < self._fixed_before:
raise ValueError(
f"Cannot run operations in the past (t={t}, fixed before {self._fixed_before})"
)
s = self._storage_updates[t]
s[product] += quantity
if self._active_bookmark:
s = self._active_bookmark.storage_updates[t]
s[product] += quantity
return True
[docs]
def schedule(
self,
job: Job,
ignore_inventory_shortage=True,
ignore_money_shortage=True,
ignore_space_shortage=True,
override=True,
) -> bool:
t = job.time
if t < self._fixed_before:
raise ValueError(
f"Cannot run operations in the past (t={t}, fixed before {self._fixed_before})"
)
self._jobs[t].append(
(
job,
override,
ignore_inventory_shortage,
ignore_money_shortage,
ignore_space_shortage,
)
)
if self._active_bookmark:
self._active_bookmark.jobs[t].append(len(self._jobs[t]))
return True
[docs]
def buy(
self,
product: int,
quantity: int,
price: int,
t: int,
ignore_money_shortage: bool = True,
ignore_space_shortage: bool = True,
) -> bool:
if t < self._fixed_before:
raise ValueError(
f"Cannot run operations in the past (t={t}, fixed before {self._fixed_before})"
)
self._buy_contracts[t].append((product, quantity, price))
if self._active_bookmark:
self._active_bookmark.buy_contracts[t].append(len(self._buy_contracts[t]))
return True
[docs]
def sell(
self,
product: int,
quantity: int,
price: int,
t: int,
ignore_money_shortage: bool = True,
ignore_inventory_shortage: bool = True,
) -> bool:
if t < self._fixed_before:
raise ValueError(
f"Cannot run operations in the past (t={t}, fixed before {self._fixed_before})"
)
self._sell_contracts[t].append((product, quantity, price))
if self._active_bookmark:
self._active_bookmark.sell_contracts[t].append(len(self._sell_contracts[t]))
return True
@property
[docs]
def fixed_before(self):
return self._fixed_before
@dataclass
class _FullBookmark:
id: int
wallet: np.array
loans: np.array
storage: np.array
line_schedules: np.array
has_jobs: np.array
[docs]
class FastFactorySimulator(FactorySimulator):
"""
A faster implementation of the `FactorySimulator` interface (compared with `SlowFactorySimulator`.
"""
[docs]
def _as_array(self, storage: Dict[int, int]) -> np.array:
a = np.zeros(self._n_products)
for k, v in storage.items():
a[k] = v
return a
def __init__(
self,
initial_wallet: float,
initial_storage: Dict[int, int],
n_steps: int,
n_products: int,
profiles: List[ManufacturingProfile],
max_storage: Optional[int],
):
super().__init__(
initial_wallet=initial_wallet,
initial_storage=initial_storage,
n_steps=n_steps,
n_products=n_products,
profiles=profiles,
max_storage=max_storage,
)
self._wallet = np.ones(n_steps) * initial_wallet
self._loans = np.zeros(n_steps)
self._storage = np.repeat(
self._as_array(initial_storage).reshape((n_products, 1)), n_steps, axis=1
)
self._total_storage = self._storage.sum(axis=0)
factory = Factory(
initial_storage=initial_storage,
initial_wallet=initial_wallet,
profiles=profiles,
max_storage=max_storage,
)
self._profiles = factory.profiles
self._n_lines = factory.n_lines
self._line_schedules = (
np.ones(shape=(self._n_lines, self._n_steps)) * NO_PRODUCTION
)
self._has_jobs = np.zeros(shape=(self._n_lines, self._n_steps), dtype=bool)
self._fixed_before = 0
self._bookmarks: List[_FullBookmark] = []
self._active_bookmark: Optional[_FullBookmark] = None
[docs]
def init(self, *args, **kwargs):
self.__init__(*args, **kwargs)
@property
[docs]
def fixed_before(self):
return self._fixed_before
@property
[docs]
def n_lines(self):
return self._n_lines
@property
[docs]
def final_balance(self) -> float:
return self._wallet[-1] - self._loans[-1]
[docs]
def wallet_to(self, t: int) -> np.array:
return self._wallet[: t + 1]
[docs]
def storage_to(self, t: int) -> np.array:
return self._storage[:, : t + 1]
[docs]
def line_schedules_to(self, t: int) -> np.array:
return self._line_schedules[:, : t + 1]
[docs]
def loans_to(self, t: int) -> np.array:
return self._loans[: t + 1]
[docs]
def add_loan(self, total: float, t: int) -> bool:
if t < self._fixed_before:
raise ValueError(
f"Cannot run operations in the past (t={t}, fixed before {self._fixed_before})"
)
self._loans[t:] += total
return True
[docs]
def pay(self, payment: float, t: int, ignore_money_shortage: bool = True) -> bool:
# @todo add minimum balance
if t < self._fixed_before:
raise ValueError(
f"Cannot run operations in the past (t={t}, fixed before {self._fixed_before})"
)
b = self._wallet[t:]
if len(b) < 1:
return False
b -= payment
if b.min() < 0:
b += payment
return False
return True
[docs]
def transport_to(
self,
product: int,
quantity: int,
t: int,
ignore_inventory_shortage: bool = True,
ignore_space_shortage: bool = True,
) -> bool:
# @todo add minimum storage
if t < self._fixed_before:
raise ValueError(
f"Cannot run operations in the past (t={t}, fixed before {self._fixed_before})"
)
s, total = self._storage[product, t:].view(), self._total_storage[t:]
if len(total) < 1:
return False
s += quantity
total += quantity
if s.min() < 0 or total.max() > self.max_storage:
s -= quantity
total -= quantity
return False
return True
[docs]
def buy(
self,
product: int,
quantity: int,
price: int,
t: int,
ignore_money_shortage: bool = True,
ignore_space_shortage: bool = True,
) -> bool:
if t < self._fixed_before:
raise ValueError(
f"Cannot run operations in the past (t={t}, fixed before {self._fixed_before})"
)
s, total = self._storage[product, t:].view(), self._total_storage[t:]
if len(total) < 1:
return False
s += quantity
total += quantity
b = self._wallet[t:]
b -= price
if total.max() > self.max_storage or b.min() < 0:
s -= quantity
total -= quantity
b += price
return False
return True
[docs]
def sell(
self,
product: int,
quantity: int,
price: int,
t: int,
ignore_money_shortage: bool = True,
ignore_inventory_shortage: bool = True,
) -> bool:
if t < self._fixed_before:
raise ValueError(
f"Cannot run operations in the past (t={t}, fixed before {self._fixed_before})"
)
s, total = self._storage[product, t:].view(), self._total_storage[t:]
if len(total) < 1:
return False
s -= quantity
total -= quantity
b = self._wallet[t:]
b += price
if s.min() < 0:
s += quantity
total += quantity
b -= price
return False
return True
[docs]
def schedule(
self,
job: Job,
ignore_inventory_shortage=True,
ignore_money_shortage=True,
ignore_space_shortage=True,
override=True,
) -> bool:
t, job_override = job.time, job.override
if t < self._fixed_before:
raise ValueError(
f"Cannot run operations in the past (t={t}, fixed before {self._fixed_before})"
)
if job_override:
raise NotImplementedError(
f"{self.__class__.__name__} does not support scheduling jobs with overriding"
)
# job_line = job.line # only useful for stop/pause/resume that are not supported
profile = self._profiles[job.profile]
inputs, outputs, length, cost = (
profile.process.inputs,
profile.process.outputs,
profile.n_steps,
profile.cost,
)
line = profile.line
# confirm that there is no other jobs already scheduled at this exact time:
if self._has_jobs[line, t]:
if override:
raise NotImplementedError(
f"{self.__class__.__name__} does not support scheduling more than a single "
f"job at any time-step/line"
)
return False
# confirm that the line is not busy. If it was busy, and we are not overriding, fail.
if not job_override and np.any(
self._line_schedules[line, t : t + length] != NO_PRODUCTION
):
return False
# confirm that there is enough money to start production
if (not ignore_money_shortage) and np.any(self._wallet[t:] < cost):
return False
# bookmark to be able to rollback at any error
if job.action == "run":
with transaction(self) as bookmark:
if not self.pay(cost, t):
self.rollback(bookmark)
return False
self._line_schedules[line, t : t + length] = profile.process.id
for i in inputs:
it = int(math.floor(i.step * length) + t)
p, q = i.product, i.quantity
if (not ignore_inventory_shortage) and np.any(
self._storage[p, it:] < q
):
self.rollback(bookmark)
return False
s, total = self._storage[p, it:].view(), self._total_storage[it:]
s -= q
total -= q
for o in outputs:
ot = int(math.ceil(o.step * length) + t)
p, q = o.product, o.quantity
if (not ignore_space_shortage) and np.any(
self._total_storage[ot:] + q > self.max_storage
):
self.rollback(bookmark)
return False
s, total = self._storage[p, ot:].view(), self._total_storage[ot:]
s += q
total += q
return True
raise NotImplementedError(
f"{self.__class__.__name__} does not support scheduling {job.action} jobs"
)
[docs]
def fix_before(self, t: int) -> bool:
self._fixed_before = t
return True
[docs]
def delete_bookmark(self, bookmark_id: int) -> bool:
if self._active_bookmark is None or self._active_bookmark.id != bookmark_id:
raise ValueError("there is no active bookmark to delete")
self._bookmarks = self._bookmarks[:-1]
self._active_bookmark = (
self._bookmarks[-1] if len(self._bookmarks) > 0 else None
)
return True
[docs]
def bookmark(self) -> int:
bookmark = _FullBookmark(
id=len(self._bookmarks),
wallet=self._wallet.copy(),
loans=self._loans.copy(),
storage=self._storage.copy(),
line_schedules=self._line_schedules.copy(),
has_jobs=self._has_jobs.copy(),
)
self._bookmarks.append(bookmark)
self._active_bookmark = bookmark
return bookmark.id
[docs]
def rollback(self, bookmark_id: int) -> bool:
if self._active_bookmark is None or self._active_bookmark.id != bookmark_id:
raise ValueError("there is no active bookmark to rollback")
b = self._active_bookmark
self._wallet, self._loans, self._storage = b.wallet, b.loans, b.storage
self._line_schedules, self._has_jobs = b.line_schedules, b.has_jobs
self._total_storage = self._storage.sum(axis=0)
return True
[docs]
def set_state(
self,
t: int,
storage: np.array,
wallet: float,
loans: float,
line_schedules: np.array,
) -> None:
self._storage[:, t:] += storage.reshape(self._n_products, 1) - self._storage[
:, t
].reshape(self._n_products, 1)
self._wallet[t:] += wallet - self._wallet[t]
self._loans[t:] += loans - self._loans[t]
self._line_schedules[:, t] = line_schedules
# @todo enable this again to confirm that simulation is correct. may be I set_state before the job is run on the simulator
# expected_schedules = self._line_schedules[:, t]
# for i in range(self.n_lines):
# expected, actual = expected_schedules[i], line_schedules[i]
# if expected == actual:
# continue
# if expected == NO_PRODUCTION:
# raise ValueError(f'Expected no production at time {t} on line {i} but actually process '
# f'{actual} is running')
# if expected != actual and actual != NO_PRODUCTION:
# raise ValueError(f'Expected process {expected} at time {t} on line {i} but actually process '
# f'{actual} is running')
# self._line_schedules[i, t] = actual
self.fix_before(t)
@contextmanager
[docs]
def transaction(simulator):
"""Runs the simulated actions then confirms them if they are not rolled back"""
_bookmark = simulator.bookmark()
yield _bookmark
simulator.delete_bookmark(_bookmark)
@contextmanager
[docs]
def temporary_transaction(simulator):
"""Runs the simulated actions then rolls them back"""
_bookmark = simulator.bookmark()
yield _bookmark
simulator.rollback(_bookmark)
simulator.delete_bookmark(_bookmark)