1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-05 03:50:57 +08:00

Merge branch 'nested_decision_exe' of https://github.com/microsoft/qlib into rl-dummy

This commit is contained in:
v-mingzhehan
2021-07-15 08:31:39 +00:00
6 changed files with 169 additions and 206 deletions

View File

@@ -247,7 +247,7 @@ def collect_data(
"""initialize the strategy and executor, then collect the trade decision data for rl training
please refer to the docs of the backtest for the explanation of the parameters
Yields
-------
object

View File

@@ -26,7 +26,7 @@ class Exchange:
codes="all",
deal_price: Union[str, Tuple[str], List[str]] = None,
subscribe_fields=[],
limit_threshold=None,
limit_threshold: Union[Tuple[str, str], float, None] = None,
volume_threshold=None,
open_cost=0.0015,
close_cost=0.0025,
@@ -41,7 +41,7 @@ class Exchange:
:param end_time: closed end time for backtest
:param codes: list stock_id list or a string of instruments(i.e. all, csi500, sse50)
:param deal_price: Union[str, Tuple[str], List[str]]
:param deal_price: Union[str, Tuple[str, str], List[str]]
The `deal_price` supports following two types of input
- <deal_price> : str
- (<buy_price>, <sell_price>): Tuple[str] or List[str]
@@ -51,8 +51,16 @@ class Exchange:
- for example '$close', '$open', '$vwap' ("close" is OK. `Exchange` will help to prepend
"$" to the expression)
:param subscribe_fields: list, subscribe fields
:param limit_threshold: float, 0.1 for example, default None
:param subscribe_fields: list, subscribe fields. This expressions will be added to the query and `self.quote`.
It is useful when users want more fields to be queried
:param limit_threshold: Union[Tuple[str, str], float, None]
1) `None`: no limitation
2) float, 0.1 for example, default None
3) Tuple[str, str]: (<the expression for buying stock limitation>,
<the expression for sell stock limitation>)
`False` value indicates the stock is tradable
`True` value indicates the stock is limited and not tradable
:param volume_threshold: float, 0.1 for example, default None
:param open_cost: cost rate for open, default 0.0015
:param close_cost: cost rate for close, default 0.0025
@@ -97,7 +105,7 @@ class Exchange:
if limit_threshold is None:
if C.region == REG_CN:
self.logger.warning(f"limit_threshold not set. The stocks hit the limit may be bought/sold")
elif abs(limit_threshold) > 0.1:
elif self._get_limit_type(limit_threshold) == self.LT_FLT and abs(limit_threshold) > 0.1:
if C.region == REG_CN:
self.logger.warning(f"limit_threshold may not be set to a reasonable value")
@@ -119,13 +127,17 @@ class Exchange:
# $change is for calculating the limit of the stock
necessary_fields = {self.buy_price, self.sell_price, "$close", "$change", "$factor", "$volume"}
if self._get_limit_type(limit_threshold) == self.LT_TP_EXP:
for exp in limit_threshold:
necessary_fields.add(exp)
subscribe_fields = list(necessary_fields | set(subscribe_fields))
all_fields = list(necessary_fields | set(subscribe_fields))
self.all_fields = all_fields
self.open_cost = open_cost
self.close_cost = close_cost
self.min_cost = min_cost
self.limit_threshold = limit_threshold
self.limit_threshold: Union[Tuple[str, str], float, None] = limit_threshold
self.volume_threshold = volume_threshold
self.extra_quote = extra_quote
self.set_quote(codes, start_time, end_time)
@@ -133,6 +145,7 @@ class Exchange:
def set_quote(self, codes, start_time, end_time):
if len(codes) == 0:
codes = D.instruments()
self.quote = D.features(codes, self.all_fields, start_time, end_time, freq=self.freq, disk_cache=True).dropna(
subset=["$close"]
)
@@ -157,13 +170,7 @@ class Exchange:
self.trade_w_adj_price = False
# update limit
# check limit_threshold
if self.limit_threshold is None:
self.quote["limit_buy"] = False
self.quote["limit_sell"] = False
else:
# set limit
self._update_limit(buy_limit=self.limit_threshold, sell_limit=self.limit_threshold)
self._update_limit()
quote_df = self.quote
if self.extra_quote is not None:
@@ -194,9 +201,33 @@ class Exchange:
self.quote = quote_dict
def _update_limit(self, buy_limit, sell_limit):
self.quote["limit_buy"] = self.quote["$change"].ge(buy_limit)
self.quote["limit_sell"] = self.quote["$change"].le(-sell_limit)
LT_TP_EXP = "(exp)" # Tuple[str, str]
LT_FLT = "float" # float
LT_NONE = "none" # none
def _get_limit_type(self, limit_threshold):
if isinstance(limit_threshold, Tuple):
return self.LT_TP_EXP
elif isinstance(limit_threshold, float):
return self.LT_FLT
elif limit_threshold is None:
return self.LT_NONE
else:
raise NotImplementedError(f"This type of `limit_threshold` is not supported")
def _update_limit(self):
# check limit_threshold
lt_type = self._get_limit_type(self.limit_threshold)
if lt_type == self.LT_NONE:
self.quote["limit_buy"] = False
self.quote["limit_sell"] = False
elif lt_type == self.LT_TP_EXP:
# set limit
self.quote["limit_buy"] = self.quote[self.limit_threshold[0]]
self.quote["limit_sell"] = self.quote[self.limit_threshold[1]]
elif lt_type == self.LT_FLT:
self.quote["limit_buy"] = self.quote["$change"].ge(self.limit_threshold)
self.quote["limit_sell"] = self.quote["$change"].le(-self.limit_threshold) # pylint: disable=E1130
def check_stock_limit(self, stock_id, start_time, end_time, direction=None):
"""

View File

@@ -296,7 +296,7 @@ class NestedExecutor(BaseExecutor):
- The decisions may be updated by steps
- The inner executor may not follow the decisions from the outer strategy
align_range_limit: bool
force to align the index_range decision
force to align the trade_range decision
It is only for nested executor, because range_limit is given by outer strategy
"""
self.inner_executor: BaseExecutor = init_instance_by_config(

View File

@@ -166,7 +166,60 @@ class OrderHelper:
)
class IndexRangeByTime:
class TradeRange:
def __call__(self, trade_calendar: TradeCalendarManager) -> Tuple[int, int]:
"""
This method will be call with following way
The outer strategy give a decision with with `TradeRange`
The decision will be checked by the inner decision.
inner decision will pass its trade_calendar as parameter when getting the trading range
- The framework's step is integer-index based.
Parameters
----------
trade_calendar : TradeCalendarManager
the trade_calendar is from inner strategy
Returns
-------
Tuple[int, int]:
the start index and end index which are tradable
Raises
------
NotImplementedError:
Exceptions are raised when no range limitation
"""
raise NotImplementedError(f"Please implement the `__call__` method")
def clip_time_range(self, start_time: pd.Timestamp, end_time: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]:
"""
Parameters
----------
start_time : pd.Timestamp
end_time : pd.Timestamp
Both sides (start_time, end_time) are closed
Returns
-------
Tuple[pd.Timestamp, pd.Timestamp]:
The tradable time range.
- It is intersection of [start_time, end_time] and the rule of TradeRange itself
"""
raise NotImplementedError(f"Please implement the `clip_time_range` method")
class IdxTradeRange(TradeRange):
def __init__(self, start_idx: int, end_idx: int):
self._start_idx = start_idx
self._end_idx = end_idx
def __call__(self, trade_calendar: TradeCalendarManager = None) -> Tuple[int, int]:
return self._start_idx, self._end_idx
class TradeRangeByTime(TradeRange):
"""This is a helper function for make decisions"""
def __init__(self, start_time: str, end_time: str):
@@ -186,14 +239,24 @@ class IndexRangeByTime:
"""
self.start_time = pd.Timestamp(start_time).time()
self.end_time = pd.Timestamp(end_time).time()
assert self.start_time < self.end_time
def __call__(self, trade_calendar: TradeCalendarManager) -> Tuple[int, int]:
def __call__(self, trade_calendar: TradeCalendarManager = None) -> Tuple[int, int]:
if trade_calendar is None:
raise NotImplementedError("trade_calendar is necessary for getting TradeRangeByTime.")
start = trade_calendar.start_time
val_start, val_end = concat_date_time(start.date(), self.start_time), concat_date_time(
start.date(), self.end_time
)
return trade_calendar.get_range_idx(val_start, val_end)
def clip_time_range(self, start_time: pd.Timestamp, end_time: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]:
start_date = start_time.date()
val_start, val_end = concat_date_time(start_date, self.start_time), concat_date_time(start_date, self.end_time)
# NOTE: `end_date` should not be used. Because the `end_date` is for slicing. It may be in the next day
# Assumption: start_time and end_time is for intraday trading. So it is OK for only using start_date
return max(val_start, start_time), min(val_end, end_time)
class BaseTradeDecision:
"""
@@ -211,54 +274,29 @@ class BaseTradeDecision:
2. Same as `case 1.3`
"""
def __init__(self, strategy: BaseStrategy, idx_range: Union[Tuple[int, int], Callable] = None):
def __init__(self, strategy: BaseStrategy, trade_range: Union[Tuple[int, int], TradeRange] = None):
"""
Parameters
----------
strategy : BaseStrategy
The strategy who make the decision
idx_range: Union[Tuple[int, int], Callable] (optional)
trade_range: Union[Tuple[int, int], Callable] (optional)
The index range for underlying strategy.
Here are two examples of idx_range for each type
Here are two examples of trade_range for each type
1) Tuple[int, int]
start_index and end_index of the underlying factor(both sides are closed)
start_index and end_index of the underlying strategy(both sides are closed)
2) Callable
.. code-block:: python
def idx_range(time_per_step: str) -> Tuple[int, int]:
# time_per_step is the strategy's time_per_step (not inner strategy. It's the `self` strategy in
# `self._idx_range` )
# e.g.
# For example, strategy A with 30min each step and strategy B with 1min each step
# strategy A's will use "30min" when calling `idx_range`.
2) TradeRange
"""
self.strategy = strategy
self.total_step = None # upper strategy has no knowledge about the sub executor before `_init_sub_trading`
self._idx_range = idx_range
@staticmethod
def _calc_idx_range(
idx_range: Union[Tuple[int, int], Callable], inner_calendar: TradeCalendarManager = None
) -> Tuple[int, int]:
"""calculate index range for `idx_range` in different cases"""
if idx_range is None:
# not set, return nothing
return None, None
elif isinstance(idx_range, tuple):
return idx_range
elif isinstance(idx_range, Callable):
if inner_calendar is None:
# time_per_step is a required parameter for `def idx_range`
return None, None
else:
return idx_range(inner_calendar)
else:
raise NotImplementedError(f"This type of input is not supported")
if isinstance(trade_range, Tuple):
# for Tuple[int, int]
trade_range = IdxTradeRange(**trade_range)
self.trade_range: TradeRange = trade_range
def get_decision(self) -> List[object]:
"""
@@ -303,12 +341,18 @@ class BaseTradeDecision:
# purpose 2)
return self.strategy.update_trade_decision(self, trade_calendar)
def _get_range_limit(self, **kwargs) -> Tuple[int, int]:
if self.trade_range is not None:
return self.trade_range(trade_calendar=kwargs.get("inner_calendar"))
else:
raise NotImplementedError("The decision didn't provide an index range")
def get_range_limit(self, **kwargs) -> Tuple[int, int]:
"""
return the expected step range for limiting the decision execution time
Both left and right are **closed**
if no available _idx_range, `default_value` will be returned
if no available trade_range, `default_value` will be returned
It is only used in `NestedExecutor`
- The outmost strategy will not follow any range limit (but it may give range_limit)
@@ -328,7 +372,7 @@ class BaseTradeDecision:
"default_value": <default_value>, # using dict is for distinguish no value provided or None provided
"inner_calendar": <trade calendar of inner strategy>
# because the range limit will control the step range of inner strategy, inner calendar will be a
# important parameter when _idx_range is callable
# important parameter when trade_range is callable
}
Returns
@@ -342,29 +386,25 @@ class BaseTradeDecision:
1) the decision can't provide a unified start and end
2) default_value is not provided
"""
# get index
_start_idx, _end_idx = self._calc_idx_range(self._idx_range, inner_calendar=kwargs.get("inner_calendar"))
if _start_idx is None or _end_idx is None:
# handle case without decision
# TODO: time range in the order should be checked.
# _start_idx and _end_idx should be used instead of _idx_range
# because it is possible that no limitation when _idx_range is callable and return None
try:
_start_idx, _end_idx = self._get_range_limit(**kwargs)
except NotImplementedError:
if "default_value" in kwargs:
return kwargs["default_value"]
else:
# Default to get full index
raise NotImplementedError(f"The decision didn't provide an index range")
else:
# clip index
if getattr(self, "total_step", None) is not None:
# if `self.update` is called.
# Then the _start_idx, _end_idx should be clipped
if _start_idx < 0 or _end_idx >= self.total_step:
logger = get_module_logger("decision")
logger.warning(f"{self._idx_range} go beyoud the total_step({self.total_step}), it will be clipped")
_start_idx, _end_idx = max(0, _start_idx), min(self.total_step - 1, _end_idx)
# clip index
if getattr(self, "total_step", None) is not None:
# if `self.update` is called.
# Then the _start_idx, _end_idx should be clipped
if _start_idx < 0 or _end_idx >= self.total_step:
logger = get_module_logger("decision")
logger.warning(
f"[{_start_idx},{_end_idx}] go beyoud the total_step({self.total_step}), it will be clipped"
)
_start_idx, _end_idx = max(0, _start_idx), min(self.total_step - 1, _end_idx)
return _start_idx, _end_idx
def empty(self) -> bool:
@@ -394,9 +434,9 @@ class BaseTradeDecision:
inner_trade_decision : BaseTradeDecision
"""
# base class provide a default behaviour to modify inner_trade_decision
# callable _idx_range should be propagated when inner _idx_range is not set
if isinstance(self._idx_range, Callable) and inner_trade_decision._idx_range is None:
inner_trade_decision._idx_range = self._idx_range
# trade_range should be propagated when inner trade_range is not set
if inner_trade_decision.trade_range is None:
inner_trade_decision.trade_range = self.trade_range
class EmptyTradeDecision(BaseTradeDecision):
@@ -410,106 +450,12 @@ class TradeDecisionWO(BaseTradeDecision):
Besides, the time_range is also included.
"""
def __init__(self, order_list: List[Order], strategy: BaseStrategy, idx_range: Tuple[int, int] = None):
super().__init__(strategy, idx_range=idx_range)
def __init__(self, order_list: List[Order], strategy: BaseStrategy, trade_range: Tuple[int, int] = None):
super().__init__(strategy, trade_range=trade_range)
self.order_list = order_list
def get_decision(self) -> List[object]:
return self.order_list
def __repr__(self) -> str:
return f"strategy: {self.strategy}; idx_range: {self._idx_range}; order_list[{len(self.order_list)}]"
# TODO: the orders below need to be discussed ------------------------------------
# - The classes below are designed for Case 1
# - However, Case 1 can't take `order_pool` as the an argument as the constructor function
class TradeDecisionWithOrderPool:
"""trade decision that made by strategy"""
def __init__(self, strategy, order_pool):
"""
Parameters
----------
strategy : BaseStrategy
the original strategy that make the decision
order_pool : list, optional
the candinate order pool for generate trade decision
"""
super(TradeDecisionWithOrderPool, self).__init__(strategy)
self.order_pool = order_pool
self.order_list = []
def pop_order_pool(self, pop_len):
if pop_len > len(self.order_pool):
warnings.warn(
f"pop len {pop_len} is too much length than order pool, cut it as pool length {len(self.order_pool)}"
)
pop_len = len(self.order_pool)
res = self.order_pool[:pop_len]
del self.order_pool[:pop_len]
return res
def push_order_list(self, order_list):
self.order_list.extend(order_list)
def get_decision(self):
"""get the order list
Parameters
----------
only_enable : bool, optional
wether to ignore disabled order, by default False
only_disable : bool, optional
wether to ignore enabled order, by default False
Returns
-------
List[Order]
the order list
"""
return self.order_list
def update(self, trade_calendar):
"""make the original strategy update the enabled status of orders."""
self.ori_strategy.update_trade_decision(self, trade_calendar)
class BaseDecisionUpdater:
def update_decision(self, decision, trade_calendar) -> BaseTradeDecision:
"""
Parameters
----------
decision : BaseTradeDecision
the trade decision to be updated
trade_calendar : BaseTradeCalendar
the trade calendar of inner execution
Returns
-------
BaseTradeDecision
the updated decision
"""
raise NotImplementedError(f"This method is not implemented")
class DecisionUpdaterWithOrderPool:
def __init__(self, plan_config=None):
"""
Parameters
----------
plan_config : Dict[Tuple(int, float)], optional
the plan config, by default None
"""
if plan_config is None:
self.plan_config = [(0, 1)]
else:
self.plan_config = plan_config
def update_decision(self, decision, trade_calendar) -> BaseTradeDecision:
# get the number of trading step finished, trade_step can be [0, 1, 2, ..., trade_len - 1]
trade_step = self.trade_calendar.get_trade_step()
for _index, _ratio in self.plan_config:
if trade_step == _index:
pop_len = len(decision.order_pool) * _ratio
pop_order_list = decision.pop_order_pool(pop_len)
decision.push_order_list(pop_order_list)
return f"strategy: {self.strategy}; trade_range: {self.trade_range}; order_list[{len(self.order_list)}]"

View File

@@ -364,6 +364,11 @@ class Indicator:
agg = pa_config.get("agg", "twap").lower()
price = pa_config.get("price", "deal_price").lower()
# NOTE: IndexTradeRange is not supported!!!!! Because inner index is not available
trade_start_time, trade_end_time = decision.trade_range.clip_time_range(
start_time=trade_start_time, end_time=trade_end_time
)
if price == "deal_price":
price_s = trade_exchange.get_deal_price(
inst, trade_start_time, trade_end_time, direction=direction, method=None
@@ -386,21 +391,6 @@ class Indicator:
else:
raise NotImplementedError(f"This type of input is not supported")
# no sub executor on the lowest level
# So range_limit an total step will all be None
total_step = decision.total_step
if total_step is None:
total_step = 1
range_limit = decision.get_range_limit(default_value=(0, total_step - 1))
assert volume_s.shape[0] % total_step == 0, "The price series can't be divided by step length"
factor = volume_s.shape[0] // total_step
slc = slice(range_limit[0] * factor, (range_limit[1] + 1) * factor)
volume_s = volume_s.iloc[slc]
price_s = price_s.iloc[slc]
base_volume = volume_s.sum().item()
base_price = ((price_s * volume_s).sum() / base_volume).item()

View File

@@ -10,7 +10,7 @@ from qlib.utils import lazy_sort_index
from ...utils.resam import resam_ts_data, ts_data_last
from ...data.data import D
from ...strategy.base import BaseStrategy
from ...backtest.order import BaseTradeDecision, Order, TradeDecisionWO
from ...backtest.order import BaseTradeDecision, Order, TradeDecisionWO, TradeRange
from ...backtest.exchange import Exchange, OrderHelper
from ...backtest.utils import CommonInfrastructure, LevelInfrastructure
from qlib.utils.file import get_io_object
@@ -625,7 +625,7 @@ class ACStrategy(BaseStrategy):
class RandomOrderStrategy(BaseStrategy):
def __init__(
self,
index_range: Tuple[int, int], # The range is closed on both left and right.
trade_range: Union[Tuple[int, int], TradeRange], # The range is closed on both left and right.
sample_ratio: float = 1.0,
volume_ratio: float = 0.01,
market: str = "all",
@@ -636,13 +636,8 @@ class RandomOrderStrategy(BaseStrategy):
"""
Parameters
----------
index_range : Tuple
the intra day time index range of the orders
the left and right is closed.
If you want to get the index_range in intra-day
- `qlib/utils/time.py:def get_day_min_idx_range` can help you create the index range easier
# TODO: this is a index_range level limitation. We'll implement a more detailed limitation later.
trade_range : Tuple
please refer to the `trade_range` parameter of BaseStrategy
sample_ratio : float
the ratio of all orders are sampled
volume_ratio : float
@@ -653,7 +648,6 @@ class RandomOrderStrategy(BaseStrategy):
"""
super().__init__(*args, **kwargs)
self.index_range = index_range
self.sample_ratio = sample_ratio
self.volume_ratio = volume_ratio
self.market = market
@@ -664,6 +658,7 @@ class RandomOrderStrategy(BaseStrategy):
D.instruments(market), ["Mean(Ref($volume, 1), 10)"], start_time=exch.start_time, end_time=exch.end_time
)
self.volume_df = self.volume.iloc[:, 0].unstack()
self.trade_range = trade_range
def generate_trade_decision(self, execute_result=None):
trade_step = self.trade_calendar.get_trade_step()
@@ -683,7 +678,7 @@ class RandomOrderStrategy(BaseStrategy):
direction=self.direction,
)
)
return TradeDecisionWO(order_list, self, self.index_range)
return TradeDecisionWO(order_list, self, self.trade_range)
class FileOrderStrategy(BaseStrategy):
@@ -692,7 +687,8 @@ class FileOrderStrategy(BaseStrategy):
- This class provides an interface for user to read orders from csv files.
"""
def __init__(self, file: Union[IO, str, Path, pd.DataFrame], index_range: Tuple[int, int] = None, *args, **kwargs):
def __init__(self, file: Union[IO, str, Path, pd.DataFrame],
trade_range: Union[Tuple[int, int], TradeRange]= None, *args, **kwargs):
"""
Parameters
@@ -709,13 +705,13 @@ class FileOrderStrategy(BaseStrategy):
20200103, SH600519, 1000, buy
20200106, SH600519, 1000, sell
index_range : Tuple[int, int]
trade_range : Tuple[int, int]
the intra day time index range of the orders
the left and right is closed.
If you want to get the index_range in intra-day
If you want to get the trade_range in intra-day
- `qlib/utils/time.py:def get_day_min_idx_range` can help you create the index range easier
# TODO: this is a index_range level limitation. We'll implement a more detailed limitation later.
# TODO: this is a trade_range level limitation. We'll implement a more detailed limitation later.
"""
super().__init__(*args, **kwargs)
@@ -730,7 +726,7 @@ class FileOrderStrategy(BaseStrategy):
# make sure the datetime is the first level for fast indexing
self.order_df = lazy_sort_index(convert_index_format(self.order_df, level="datetime"))
self.index_range = index_range
self.trade_range = trade_range
def generate_trade_decision(self, execute_result=None) -> TradeDecisionWO:
"""
@@ -760,4 +756,4 @@ class FileOrderStrategy(BaseStrategy):
end_time=end,
)
)
return TradeDecisionWO(order_list, self, self.index_range)
return TradeDecisionWO(order_list, self, self.trade_range)