import os
import pandas as pd
from ddql_optimal_execution import State, Preprocessor
from ._exceptions import InvalidActionError, InvalidSwapError, EpisodeIndexError
[docs]class MarketEnvironnement:
"""
This class represents the environment in which the agent is operating. It contains information such
as the current time step, the agent's current position, and any other relevant information about
the environment.
Parameters
----------
initial_inventory : int, optional
The `initial_inventory` parameter is a float that represents the initial inventory of the agent.
The default value is 100.0.
data_path : str, optional
The `data_path` parameter is a string that represents the path to the directory containing the
data files. The default value is "../data".
n_periods : int, optional
The `n_periods` parameter is an integer that represents the number of periods in the data files.
The default value is 5.
quadratic_penalty_coefficient : float, optional
The `quadratic_penalty_coefficient` parameter is a float that represents the coefficient of the
quadratic penalty term in the reward function. The default value is 0.01.
multi_episodes : bool, optional
The `multi_episodes` parameter is a boolean that indicates whether the agent is operating in
multi-episode mode. The default value is False.
Attributes
----------
initial_inventory : int
The `initial_inventory` attribute is a float that represents the initial inventory of the agent.
n_periods : int
The `n_periods` attribute is an integer that represents the number of periods in the data files.
current_episode : int
The `current_episode` attribute is an integer that represents the current episode number.
multi_episodes : bool
The `multi_episodes` attribute is a boolean that indicates whether the agent is operating in
multi-episode mode.
quadratic_penalty_coefficient : float
The `quadratic_penalty_coefficient` attribute is a float that represents the coefficient of the
quadratic penalty term in the reward function.
horizon : int
The `horizon` attribute is an integer that represents the number of periods in the data files.
preprocessor : Preprocessor
The `preprocessor` attribute is an instance of the `Preprocessor` class, which is used to
preprocess the data files.
historical_data_series : List[str]
The `historical_data_series` attribute is a list of strings that represents the paths to the
data files.
historical_data : pd.DataFrame
The `historical_data` attribute is a pandas DataFrame that contains the historical data.
state : State
The `state` attribute is an instance of the `State` class, which represents the current state of
the environment in which the agent is operating. It contains information such as the current
time step, the agent's current position, and any other relevant information about the
environment.
done : bool
The `done` attribute is a boolean that indicates whether the episode is over.
"""
[docs] def __init__(
self,
initial_inventory: int = 100,
data_path: str = "../data",
n_periods: int = 5,
quadratic_penalty_coefficient: float = 0.01,
multi_episodes: bool = False,
**kwargs,
) -> None:
self.initial_inventory = initial_inventory
self.n_periods = n_periods
self.current_episode = 0
self.multi_episodes = multi_episodes
self.quadratic_penalty_coefficient = quadratic_penalty_coefficient
self.horizon = n_periods
self.preprocessor = Preprocessor(
n_periods=n_periods,
QV=kwargs.get("QV", True),
normalize_price=kwargs.get("normalize_price", True),
volume=kwargs.get("Volume", True),
)
if multi_episodes:
self.historical_data_series = []
for file in os.listdir(data_path):
if file.endswith(".csv"):
self.historical_data_series.append(f"{data_path}/{file}")
self.historical_data = pd.read_csv(
self.historical_data_series[self.current_episode]
)
else:
self.historical_data = pd.read_csv(f"{data_path}/historical_data.csv")
self.__load_episode()
self.state_size = len(self.state)
def __initialize_state(self) -> None:
"""This function initializes the state of an object with historical data and an initial inventory."""
self.state = State(
dict(
zip(
[*self.historical_data.columns, "inventory"],
[*self.historical_data.iloc[0].values, self.initial_inventory],
)
)
)
def __load_episode(self) -> None:
"""This function loads an episode by preprocessing historical data, initializing the state, and setting
the "done" flag to False.
Parameters
----------
df : pd.DataFrame
The parameter `df` is a pandas DataFrame that is being passed as an argument to the
`__load_episode` method. However, it is not being used in the method and seems to be unnecessary.
"""
self.historical_data, self.raw_prices = self.preprocessor(self.historical_data)
self.__initialize_state()
self.done = False
self.pnl_for_episode = []
[docs] def get_current_raw_price(self) -> pd.Series:
"""This function returns the current raw price."""
return self.raw_prices[self.raw_prices["period"] == int(self.state["period"])][
"Price"
]
[docs] def swap_episode(self, episode: int) -> None:
"""
This function swaps the current episode in a time series environment and updates the historical
data, periods, and state accordingly.
Args:
episode (int): The episode parameter is an integer that represents the episode number to be
swapped to.
"""
if self.state["period"] >= 1 and not self.done:
raise InvalidSwapError
if episode >= len(self.historical_data_series):
raise EpisodeIndexError
self.current_episode = episode
self.historical_data = pd.read_csv(self.historical_data_series[episode])
self.__load_episode()
[docs] def step(self, action: int) -> float:
"""
This function executes one time step within the environment, raises an error if the action is
invalid, executes the action, updates the state, and returns the reward.
Args:
action (int): an integer representing the action taken by the agent in the environment. In this
case, it is assumed that the agent is making a decision about how much of a certain item to
sell, and the action parameter represents the quantity of that item to sell.
Returns:
a float value, which is the reward obtained after executing the action in the environment.
"""
# Execute one time step within the environment
if action > self.state["inventory"]:
raise InvalidActionError
reward = self.__compute_reward(action)
self.__update_state(action)
return reward
def __update_state(self, action: int) -> None:
"""This function updates the state of an inventory management environment based on a given action and
historical data.
Parameters
----------
action : int
The parameter `action` is an integer representing the amount of inventory to be subtracted from the
current inventory level in the `self.state` dictionary.
"""
self.state["inventory"] -= action
self.state["period"] = self.state["period"] + 1
self.done = (self.state["period"] == self.horizon - 1) or (
self.state["inventory"] == 0
)
if not self.done:
self.state.update_state(
**self.historical_data[
self.historical_data.period == self.state["period"]
]
.iloc[0]
.to_dict()
)
[docs] def get_state(self, copy=True) -> State:
"""This function returns a copy of the current state of an object if the copy parameter is True,
otherwise it returns the current state itself.
Parameters
----------
copy, optional
A boolean parameter that determines whether a copy of the state should be returned or the original
state object. If copy is True, a copy of the state object is returned, otherwise the original state
object is returned.
Returns
-------
The method `get_state` is returning a copy of the current state of the object if `copy` is set to
`True`, otherwise it returns the current state object itself. The return type is `State`.
"""
return self.state.copy() if copy else self.state
def __compute_reward(self, action: int) -> float:
"""This function computes the reward for a given action based on the current inventory and historical
price data.
Parameters
----------
action : int
The input parameter `action` is an integer representing the number of shares to sell at each
time step.
Returns
-------
The function `__compute_reward` returns a float value which represents the reward calculated based
on the given action and the current state of the environment.
"""
inventory = self.state["inventory"]
intra_time_steps_prices = self.historical_data[
self.historical_data.period == self.state["period"]
].Price.values
len_ts = len(intra_time_steps_prices)
reward = 0
pnl_comp = 0
for p1, p2, true_p in zip(
intra_time_steps_prices[:-1],
intra_time_steps_prices[1:],
self.get_current_raw_price(),
):
inventory -= action / len_ts
reward += (
inventory * (p2 - p1)
- self.quadratic_penalty_coefficient
* (action / ((self.initial_inventory + 1) * len_ts)) ** 2
)
pnl_comp += (
inventory * p2
- self.quadratic_penalty_coefficient
* (action / ((self.initial_inventory + 1) * len_ts)) ** 2
)
self.pnl_for_episode.append(pnl_comp)
return reward
[docs] def reset(self) -> None:
"""The "reset" function initializes the state and sets the "done" flag to False."""
self.state = State(
dict(
zip(
[*self.historical_data.columns, "inventory"],
[*self.historical_data.iloc[0].values, self.initial_inventory],
)
)
)
self.done = False
self.pnl_for_episode = []
def __repr__(self) -> str:
return f"MarketEnvironnement(initial_inventory={self.initial_inventory}, quadratic_penalty_coefficient={self.quadratic_penalty_coefficient}, current_episode={self.current_episode}, multi_episodes={self.multi_episodes})"