1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-04 03:21:00 +08:00

Add more friendly index range by timing

This commit is contained in:
Young
2021-07-13 14:46:53 +00:00
parent 4c4b30ebec
commit 9b38e62f21
5 changed files with 189 additions and 32 deletions

View File

@@ -368,11 +368,17 @@ class NestedExecutor(BaseExecutor):
break
sub_cal: TradeCalendarManager = self.inner_executor.trade_calendar
# NOTE: make sure get_start_end_idx is after `self._update_trade_decision`
start_idx, end_idx = get_start_end_idx(sub_cal, trade_decision)
if not self._align_range_limit or start_idx <= sub_cal.get_trade_step() <= end_idx:
# if force align the range limit, skip the steps outside the decision range limit
_inner_trade_decision = self.inner_strategy.generate_trade_decision(_inner_execute_result)
_inner_trade_decision: BaseTradeDecision = self.inner_strategy.generate_trade_decision(
_inner_execute_result
)
trade_decision.mod_inner_decision(_inner_trade_decision) # propagate part of decision information
# NOTE sub_cal.get_cur_step_time() must be called before collect_data in case of step shifting
decision_list.append((_inner_trade_decision, *sub_cal.get_cur_step_time()))

View File

@@ -3,10 +3,11 @@
# TODO: rename it with decision.py
from __future__ import annotations
from enum import IntEnum
from qlib.utils.time import concat_date_time
from qlib.log import get_module_logger
# try to fix circular imports when enabling type hints
from typing import TYPE_CHECKING
from typing import Callable, TYPE_CHECKING
if TYPE_CHECKING:
from qlib.strategy.base import BaseStrategy
@@ -164,6 +165,35 @@ class OrderHelper:
)
class IndexRangeByTime:
"""This is a helper function for make decisions"""
def __init__(self, start_time: str, end_time: str):
"""
This is a callable class.
**NOTE**:
- It is designed for minute-bar for intraday trading!!!!!
- Both start_time and end_time are **closed** in the range
Parameters
----------
start_time : str
e.g. "9:30"
end_time : str
e.g. "14:30"
"""
self.start_time = pd.Timestamp(start_time).time()
self.end_time = pd.Timestamp(end_time).time()
def __call__(self, trade_calendar: TradeCalendarManager) -> Tuple[int, int]:
start = trade_calendar.start_time
val_start, val_end = concat_date_time(start.date(), self.start_time), concat_date_time(
start.date(), self.end_time
)
return trade_calendar.get_range_idx(val_start, val_end)
class BaseTradeDecision:
"""
Trade decisions ara made by strategy and executed by exeuter
@@ -180,16 +210,54 @@ class BaseTradeDecision:
2. Same as `case 1.3`
"""
def __init__(self, strategy: BaseStrategy, idx_range: Tuple[int, int] = None):
def __init__(self, strategy: BaseStrategy, idx_range: Union[Tuple[int, int], Callable] = None):
"""
Parameters
----------
strategy : BaseStrategy
The strategy who make the decision
idx_range: Union[Tuple[int, int], Callable] (optional)
The index range for underlying strategy.
Here are two examples of idx_range for each type
1) Tuple[int, int]
start_index and end_index of the underlying factor(both sides are closed)
2) Callable
.. code-block:: python
def idx_range(time_per_step: str) -> Tuple[int, int]:
# time_per_step is the strategy's time_per_step (not inner strategy. It's the `self` strategy in
# `self._idx_range` )
# e.g.
# For example, strategy A with 30min each step and strategy B with 1min each step
# strategy A's will use "30min" when calling `idx_range`.
"""
self.strategy = strategy
self.total_step = None # upper strategy has no knowledge about the sub executor before `_init_sub_trading`
self.idx_range = idx_range
self._idx_range = idx_range
@staticmethod
def _calc_idx_range(
idx_range: Union[Tuple[int, int], Callable], inner_calendar: TradeCalendarManager = None
) -> Tuple[int, int]:
"""calculate index range for `idx_range` in different cases"""
if idx_range is None:
# not set, return nothing
return None, None
elif isinstance(idx_range, tuple):
return idx_range
elif isinstance(idx_range, Callable):
if inner_calendar is None:
# time_per_step is a required parameter for `def idx_range`
return None, None
else:
return idx_range(inner_calendar)
else:
raise NotImplementedError(f"This type of input is not supported")
def get_decision(self) -> List[object]:
"""
@@ -212,7 +280,7 @@ class BaseTradeDecision:
"""
Be called at the **start** of each step.
This function is designn for following purpose
This function is design for following purpose
1) Leave a hook for the strategy who make `self` decision to update the decision itself
2) Update some information from the inner executor calendar
@@ -230,12 +298,6 @@ class BaseTradeDecision:
"""
# purpose 1)
self.total_step = trade_calendar.get_trade_len()
if self.idx_range is not None:
logger = get_module_logger("decision")
start_idx, end_idx = self.idx_range
if start_idx < 0 or end_idx >= self.total_step:
logger.warning(f"{self.idx_range} go beyound the total_step({self.total_step}), it will be clipped")
self.idx_range = max(0, start_idx), min(self.total_step - 1, end_idx)
# purpose 2)
return self.strategy.update_trade_decision(self, trade_calendar)
@@ -245,9 +307,28 @@ class BaseTradeDecision:
return the expected step range for limiting the decision execution time
Both left and right are **closed**
if no available _idx_range, `default_value` will be returned
It is only used in `NestedExecutor`
- The outmost strategy will not follow any range limit (but it may give range_limit)
- The inner most strategy's range_limit will be useless due to atomic executors don't have such
features.
**NOTE**:
1) This function must be called after `self.update` in following cases(ensured by NestedExecutor):
- user relies on the auto-clip feature of `self.update`
2) This function will be called after _init_sub_trading in NestedExecutor.
Parameters
----------
**kwargs:
{"default_value": <default_value>}
# using dict is for distinguish no value provided or None provided
{
"default_value": <default_value>, # using dict is for distinguish no value provided or None provided
"inner_calendar": <trade calendar of inner strategy>
# because the range limit will control the step range of inner strategy, inner calendar will be a
# important parameter when _idx_range is callable
}
Returns
-------
@@ -258,15 +339,32 @@ class BaseTradeDecision:
NotImplementedError:
If the following criteria meet
1) the decision can't provide a unified start and end
2) default_value is None
2) default_value is not provided
"""
if self.idx_range is None:
# get index
_start_idx, _end_idx = self._calc_idx_range(self._idx_range, inner_calendar=kwargs.get("inner_calendar"))
if _start_idx is None or _end_idx is None:
# handle case without decision
# TODO: time range in the order should be checked.
# _start_idx and _end_idx should be used instead of _idx_range
# because it is possible that no limitation when _idx_range is callable and return None
if "default_value" in kwargs:
return kwargs["default_value"]
else:
# Default to get full index
raise NotImplementedError(f"The decision didn't provide an index range")
return self.idx_range
else:
# clip index
if getattr(self, "total_step", None) is not None:
# if `self.update` is called.
# Then the _start_idx, _end_idx should be clipped
if _start_idx < 0 or _end_idx >= self.total_step:
logger = get_module_logger("decision")
logger.warning(f"{self._idx_range} go beyoud the total_step({self.total_step}), it will be clipped")
_start_idx, _end_idx = max(0, _start_idx), min(self.total_step - 1, _end_idx)
return _start_idx, _end_idx
def empty(self) -> bool:
for obj in self.get_decision():
@@ -278,6 +376,27 @@ class BaseTradeDecision:
return True
return True
def mod_inner_decision(self, inner_trade_decision: BaseTradeDecision):
"""
This method will be called on the inner_trade_decision after it is generated.
`inner_trade_decision` will be changed **inplaced**.
Motivation of the `mod_inner_decision`
- Leave a hook for outer decision to affact the decision generated by the inner strategy
- e.g. the outmost strategy generate a time range for trading. But the upper layer can only affact the
nearest layer in the original design. With `mod_inner_decision`, the decision can passed through multiple
layers
Parameters
----------
inner_trade_decision : BaseTradeDecision
"""
# base class provide a default behaviour to modify inner_trade_decision
# callable _idx_range should be propagated when inner _idx_range is not set
if isinstance(self._idx_range, Callable) and inner_trade_decision._idx_range is None:
inner_trade_decision._idx_range = self._idx_range
class EmptyTradeDecision(BaseTradeDecision):
def empty(self) -> bool:
@@ -298,7 +417,7 @@ class TradeDecisionWO(BaseTradeDecision):
return self.order_list
def __repr__(self) -> str:
return f"strategy: {self.strategy}; idx_range: {self.idx_range}; order_list[{len(self.order_list)}]"
return f"strategy: {self.strategy}; idx_range: {self._idx_range}; order_list[{len(self.order_list)}]"
# TODO: the orders below need to be discussed ------------------------------------

View File

@@ -372,7 +372,7 @@ class Indicator:
raise NotImplementedError(f"This type of input is not supported")
# if there is no stock data during the time period
if(price_s is None):
if price_s is None:
return None, None
# NOTE: there are some zeros in the trading price. These cases are known meaningless
@@ -456,7 +456,7 @@ class Indicator:
trade_exchange=trade_exchange,
pa_config=pa_config,
)
if((bp_tmp is not None) and (bv_tmp is not None)):
if (bp_tmp is not None) and (bv_tmp is not None):
bp_new[inst], bv_new[inst] = bp_tmp, bv_tmp
else:
bp_new[inst], bv_new[inst] = pr, v

View File

@@ -1,6 +1,7 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from __future__ import annotations
import bisect
from typing import Union, TYPE_CHECKING, Tuple, Union, List, Set
if TYPE_CHECKING:
@@ -118,6 +119,33 @@ class TradeCalendarManager:
"""Get the start_time and end_time for trading"""
return self.start_time, self.end_time
# helper functions
def get_range_idx(self, start_time: pd.Timestamp, end_time: pd.Timestamp) -> Tuple[int, int]:
"""
get the range index which involve start_time~end_time (both sides are closed)
Parameters
----------
start_time : pd.Timestamp
end_time : pd.Timestamp
Returns
-------
Tuple[int, int]:
the index of the range. **the left and right are closed**
"""
left, right = (
bisect.bisect_right(self._calendar, start_time) - 1,
bisect.bisect_right(self._calendar, end_time) - 1,
)
left -= self.start_index
right -= self.start_index
def clip(idx):
return min(max(0, idx), self.trade_len - 1)
return clip(left), clip(right)
def __repr__(self) -> str:
return f"{self.start_time}[{self.start_index}]~{self.end_time}[{self.end_index}]: [{self.trade_step}/{self.trade_len}]"
@@ -201,6 +229,6 @@ def get_start_end_idx(trade_calendar: TradeCalendarManager, outer_trade_decision
start index and end index
"""
try:
return outer_trade_decision.get_range_limit()
return outer_trade_decision.get_range_limit(inner_calendar=trade_calendar)
except NotImplementedError:
return 0, trade_calendar.get_trade_len() - 1

View File

@@ -4,7 +4,7 @@
Time related utils are compiled in this script
"""
import bisect
from datetime import datetime, time
from datetime import datetime, time, date
from typing import List, Tuple
import re
from numpy import append
@@ -122,6 +122,20 @@ def get_day_min_idx_range(start: str, end: str, freq: str) -> Tuple[int, int]:
return left_idx, right_idx
def concat_date_time(date_obj: date, time_obj: time) -> pd.Timestamp:
return pd.Timestamp(
datetime(
date_obj.year,
month=date_obj.month,
day=date_obj.day,
hour=time_obj.hour,
minute=time_obj.minute,
second=time_obj.second,
microsecond=time_obj.microsecond,
)
)
def cal_sam_minute(x: pd.Timestamp, sam_minutes: int) -> pd.Timestamp:
"""
align the minute-level data to a down sampled calendar
@@ -143,17 +157,7 @@ def cal_sam_minute(x: pd.Timestamp, sam_minutes: int) -> pd.Timestamp:
cal = get_min_cal(C.min_data_shift)[::sam_minutes]
idx = bisect.bisect_right(cal, x.time()) - 1
date, new_time = x.date(), cal[idx]
return pd.Timestamp(
datetime(
date.year,
month=date.month,
day=date.day,
hour=new_time.hour,
minute=new_time.minute,
second=new_time.second,
microsecond=new_time.microsecond,
)
)
return concat_date_time(date, new_time)
if __name__ == "__main__":