1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-04 03:21:00 +08:00

Merge branch 'nested_decision_exe' of https://github.com/microsoft/qlib into rl-dummy

This commit is contained in:
v-mingzhehan
2021-07-14 09:12:54 +00:00
9 changed files with 267 additions and 44 deletions

View File

@@ -8,6 +8,7 @@ from .account import Account
if TYPE_CHECKING:
from ..strategy.base import BaseStrategy
from .position import Position
from .exchange import Exchange
from .executor import BaseExecutor
from .backtest import backtest_loop
@@ -95,7 +96,7 @@ def get_exchange(
def create_account_instance(
start_time, end_time, benchmark: str, account: float, pos_type: str = "Position"
start_time, end_time, benchmark: str, account: Union[float, int, Position], pos_type: str = "Position"
) -> Account:
"""
# TODO: is very strange pass benchmark_config in the account(maybe for report)
@@ -109,13 +110,23 @@ def create_account_instance(
end time of the benchmark
benchmark : str
the benchmark for reporting
account : Union[float, str]
account : Union[float, int, Position]
information for describing how to creating the account
For `float`
Using Account with a normal position
For `str`:
Using account with a specific Position
For `float` or `int`:
Using Account with only initial cash
For `Position`:
Using Account with a Position
"""
if isinstance(account, (int, float)):
pos_kwargs = {"init_cash": account}
elif isinstance(account, Position):
pos_kwargs = {
"init_cash": account.position["cash"],
"position_dict": account.position,
}
else:
raise ValueError("account must be in (int, float, Position)")
kwargs = {
"init_cash": account,
"benchmark_config": {
@@ -125,6 +136,7 @@ def create_account_instance(
},
"pos_type": pos_type,
}
kwargs.update(pos_kwargs)
return Account(**kwargs)
@@ -134,7 +146,7 @@ def get_strategy_executor(
strategy: BaseStrategy,
executor: BaseExecutor,
benchmark: str = "SH000300",
account: Union[float, str] = 1e9,
account: Union[float, int, Position] = 1e9,
exchange_kwargs: dict = {},
pos_type: str = "Position",
):
@@ -172,7 +184,41 @@ def backtest(
exchange_kwargs={},
pos_type: str = "Position",
):
"""initialize the strategy and executor, then backtest funciton for the interaction of the outermost strategy and executor in the nested decision execution
Parameters
----------
start_time : pd.Timestamp|str
closed start time for backtest
**NOTE**: This will be applied to the outmost executor's calendar.
end_time : pd.Timestamp|str
closed end time for backtest
**NOTE**: This will be applied to the outmost executor's calendar.
E.g. Executor[day](Executor[1min]), setting `end_time == 20XX0301` will include all the minutes on 20XX0301
strategy : Union[str, dict, BaseStrategy]
for initializing outermost portfolio strategy. Please refer to the docs of init_instance_by_config for more information.
executor : Union[str, dict, BaseExecutor]
for initializing the outermost executor.
benchmark: str
the benchmark for reporting.
account : Union[float, int, Position]
information for describing how to creating the account
For `float` or `int`:
Using Account with only initial cash
For `Position`:
Using Account with a Position
exchange_kwargs : dict
the kwargs for initializing Exchange
pos_type : str
the type of Position.
Returns
-------
report_dict: Report
it records the trading report information
indicator_dict: Indicator
it computes the trading indicator
"""
trade_strategy, trade_executor = get_strategy_executor(
start_time,
end_time,
@@ -198,7 +244,15 @@ def collect_data(
exchange_kwargs={},
pos_type: str = "Position",
):
"""initialize the strategy and executor, then collect the trade decision data for rl training
please refer to the docs of the backtest for the explanation of the parameters
Yields
-------
object
trade decision
"""
trade_strategy, trade_executor = get_strategy_executor(
start_time,
end_time,

View File

@@ -67,6 +67,7 @@ class Account:
def __init__(
self,
init_cash: float = 1e9,
position_dict: dict = {},
freq: str = "day",
benchmark_config: dict = {},
pos_type: str = "Position",
@@ -74,7 +75,7 @@ class Account:
):
self._pos_type = pos_type
self._port_metr_enabled = port_metr_enabled
self.init_vars(init_cash, freq, benchmark_config)
self.init_vars(init_cash, position_dict, freq, benchmark_config)
def is_port_metr_enabled(self):
"""
@@ -82,14 +83,17 @@ class Account:
"""
return self._port_metr_enabled and not self.current.skip_update()
def init_vars(self, init_cash, freq: str, benchmark_config: dict):
def init_vars(self, init_cash, position_dict, freq: str, benchmark_config: dict):
# init cash
self.init_cash = init_cash
self.current: BasePosition = init_instance_by_config(
{
"class": self._pos_type,
"kwargs": {"cash": init_cash},
"kwargs": {
"cash": init_cash,
"position_dict": position_dict,
},
"module_path": "qlib.backtest.position",
}
)

View File

@@ -21,6 +21,8 @@ def backtest_loop(start_time, end_time, trade_strategy: BaseStrategy, trade_exec
-------
report: Report
it records the trading report information
indicator: Indicator
it computes the trading indicator
"""
return_value = {}
for _decision in collect_data_loop(start_time, end_time, trade_strategy, trade_executor, return_value):

View File

@@ -215,7 +215,7 @@ class BaseExecutor:
execute_result : List[object]
the executed result for trade decision.
** NOTE!!!! **:
1) This is necessary, The return value of geenrator will be used in NestedExecutor
1) This is necessary, The return value of generator will be used in NestedExecutor
2) Please note the executed results are not merged.
Yields
@@ -368,11 +368,17 @@ class NestedExecutor(BaseExecutor):
break
sub_cal: TradeCalendarManager = self.inner_executor.trade_calendar
# NOTE: make sure get_start_end_idx is after `self._update_trade_decision`
start_idx, end_idx = get_start_end_idx(sub_cal, trade_decision)
if not self._align_range_limit or start_idx <= sub_cal.get_trade_step() <= end_idx:
# if force align the range limit, skip the steps outside the decision range limit
_inner_trade_decision = self.inner_strategy.generate_trade_decision(_inner_execute_result)
_inner_trade_decision: BaseTradeDecision = self.inner_strategy.generate_trade_decision(
_inner_execute_result
)
trade_decision.mod_inner_decision(_inner_trade_decision) # propagate part of decision information
# NOTE sub_cal.get_cur_step_time() must be called before collect_data in case of step shifting
decision_list.append((_inner_trade_decision, *sub_cal.get_cur_step_time()))

View File

@@ -3,10 +3,11 @@
# TODO: rename it with decision.py
from __future__ import annotations
from enum import IntEnum
from qlib.utils.time import concat_date_time
from qlib.log import get_module_logger
# try to fix circular imports when enabling type hints
from typing import TYPE_CHECKING
from typing import Callable, TYPE_CHECKING
if TYPE_CHECKING:
from qlib.strategy.base import BaseStrategy
@@ -165,6 +166,35 @@ class OrderHelper:
)
class IndexRangeByTime:
"""This is a helper function for make decisions"""
def __init__(self, start_time: str, end_time: str):
"""
This is a callable class.
**NOTE**:
- It is designed for minute-bar for intraday trading!!!!!
- Both start_time and end_time are **closed** in the range
Parameters
----------
start_time : str
e.g. "9:30"
end_time : str
e.g. "14:30"
"""
self.start_time = pd.Timestamp(start_time).time()
self.end_time = pd.Timestamp(end_time).time()
def __call__(self, trade_calendar: TradeCalendarManager) -> Tuple[int, int]:
start = trade_calendar.start_time
val_start, val_end = concat_date_time(start.date(), self.start_time), concat_date_time(
start.date(), self.end_time
)
return trade_calendar.get_range_idx(val_start, val_end)
class BaseTradeDecision:
"""
Trade decisions ara made by strategy and executed by exeuter
@@ -181,16 +211,54 @@ class BaseTradeDecision:
2. Same as `case 1.3`
"""
def __init__(self, strategy: BaseStrategy, idx_range: Tuple[int, int] = None):
def __init__(self, strategy: BaseStrategy, idx_range: Union[Tuple[int, int], Callable] = None):
"""
Parameters
----------
strategy : BaseStrategy
The strategy who make the decision
idx_range: Union[Tuple[int, int], Callable] (optional)
The index range for underlying strategy.
Here are two examples of idx_range for each type
1) Tuple[int, int]
start_index and end_index of the underlying factor(both sides are closed)
2) Callable
.. code-block:: python
def idx_range(time_per_step: str) -> Tuple[int, int]:
# time_per_step is the strategy's time_per_step (not inner strategy. It's the `self` strategy in
# `self._idx_range` )
# e.g.
# For example, strategy A with 30min each step and strategy B with 1min each step
# strategy A's will use "30min" when calling `idx_range`.
"""
self.strategy = strategy
self.total_step = None # upper strategy has no knowledge about the sub executor before `_init_sub_trading`
self.idx_range = idx_range
self._idx_range = idx_range
@staticmethod
def _calc_idx_range(
idx_range: Union[Tuple[int, int], Callable], inner_calendar: TradeCalendarManager = None
) -> Tuple[int, int]:
"""calculate index range for `idx_range` in different cases"""
if idx_range is None:
# not set, return nothing
return None, None
elif isinstance(idx_range, tuple):
return idx_range
elif isinstance(idx_range, Callable):
if inner_calendar is None:
# time_per_step is a required parameter for `def idx_range`
return None, None
else:
return idx_range(inner_calendar)
else:
raise NotImplementedError(f"This type of input is not supported")
def get_decision(self) -> List[object]:
"""
@@ -213,7 +281,7 @@ class BaseTradeDecision:
"""
Be called at the **start** of each step.
This function is designn for following purpose
This function is design for following purpose
1) Leave a hook for the strategy who make `self` decision to update the decision itself
2) Update some information from the inner executor calendar
@@ -231,12 +299,6 @@ class BaseTradeDecision:
"""
# purpose 1)
self.total_step = trade_calendar.get_trade_len()
if self.idx_range is not None:
logger = get_module_logger("decision")
start_idx, end_idx = self.idx_range
if start_idx < 0 or end_idx >= self.total_step:
logger.warning(f"{self.idx_range} go beyound the total_step({self.total_step}), it will be clipped")
self.idx_range = max(0, start_idx), min(self.total_step - 1, end_idx)
# purpose 2)
return self.strategy.update_trade_decision(self, trade_calendar)
@@ -246,9 +308,28 @@ class BaseTradeDecision:
return the expected step range for limiting the decision execution time
Both left and right are **closed**
if no available _idx_range, `default_value` will be returned
It is only used in `NestedExecutor`
- The outmost strategy will not follow any range limit (but it may give range_limit)
- The inner most strategy's range_limit will be useless due to atomic executors don't have such
features.
**NOTE**:
1) This function must be called after `self.update` in following cases(ensured by NestedExecutor):
- user relies on the auto-clip feature of `self.update`
2) This function will be called after _init_sub_trading in NestedExecutor.
Parameters
----------
**kwargs:
{"default_value": <default_value>}
# using dict is for distinguish no value provided or None provided
{
"default_value": <default_value>, # using dict is for distinguish no value provided or None provided
"inner_calendar": <trade calendar of inner strategy>
# because the range limit will control the step range of inner strategy, inner calendar will be a
# important parameter when _idx_range is callable
}
Returns
-------
@@ -259,15 +340,32 @@ class BaseTradeDecision:
NotImplementedError:
If the following criteria meet
1) the decision can't provide a unified start and end
2) default_value is None
2) default_value is not provided
"""
if self.idx_range is None:
# get index
_start_idx, _end_idx = self._calc_idx_range(self._idx_range, inner_calendar=kwargs.get("inner_calendar"))
if _start_idx is None or _end_idx is None:
# handle case without decision
# TODO: time range in the order should be checked.
# _start_idx and _end_idx should be used instead of _idx_range
# because it is possible that no limitation when _idx_range is callable and return None
if "default_value" in kwargs:
return kwargs["default_value"]
else:
# Default to get full index
raise NotImplementedError(f"The decision didn't provide an index range")
return self.idx_range
else:
# clip index
if getattr(self, "total_step", None) is not None:
# if `self.update` is called.
# Then the _start_idx, _end_idx should be clipped
if _start_idx < 0 or _end_idx >= self.total_step:
logger = get_module_logger("decision")
logger.warning(f"{self._idx_range} go beyoud the total_step({self.total_step}), it will be clipped")
_start_idx, _end_idx = max(0, _start_idx), min(self.total_step - 1, _end_idx)
return _start_idx, _end_idx
def empty(self) -> bool:
for obj in self.get_decision():
@@ -279,6 +377,27 @@ class BaseTradeDecision:
return True
return True
def mod_inner_decision(self, inner_trade_decision: BaseTradeDecision):
"""
This method will be called on the inner_trade_decision after it is generated.
`inner_trade_decision` will be changed **inplaced**.
Motivation of the `mod_inner_decision`
- Leave a hook for outer decision to affact the decision generated by the inner strategy
- e.g. the outmost strategy generate a time range for trading. But the upper layer can only affact the
nearest layer in the original design. With `mod_inner_decision`, the decision can passed through multiple
layers
Parameters
----------
inner_trade_decision : BaseTradeDecision
"""
# base class provide a default behaviour to modify inner_trade_decision
# callable _idx_range should be propagated when inner _idx_range is not set
if isinstance(self._idx_range, Callable) and inner_trade_decision._idx_range is None:
inner_trade_decision._idx_range = self._idx_range
class EmptyTradeDecision(BaseTradeDecision):
def empty(self) -> bool:
@@ -299,7 +418,7 @@ class TradeDecisionWO(BaseTradeDecision):
return self.order_list
def __repr__(self) -> str:
return f"strategy: {self.strategy}; idx_range: {self.idx_range}; order_list[{len(self.order_list)}]"
return f"strategy: {self.strategy}; idx_range: {self._idx_range}; order_list[{len(self.order_list)}]"
# TODO: the orders below need to be discussed ------------------------------------

View File

@@ -199,13 +199,13 @@ class Position(BasePosition):
}
"""
def __init__(self, cash=0, position_dict={}, now_account_value=0):
def __init__(self, cash=0, position_dict={}):
# NOTE: The position dict must be copied!!!
# Otherwise the initial value
self.init_cash = cash
self.position = position_dict.copy()
self.position["cash"] = cash
self.position["now_account_value"] = now_account_value
self.position["now_account_value"] = self.calculate_value()
def _init_stock(self, stock_id, amount, price=None):
"""

View File

@@ -371,6 +371,10 @@ class Indicator:
else:
raise NotImplementedError(f"This type of input is not supported")
# if there is no stock data during the time period
if price_s is None:
return None, None
# NOTE: there are some zeros in the trading price. These cases are known meaningless
# for aligning the previous logic, remove it.
# price_s = price_s.mask(np.isclose(price_s, 0))
@@ -443,7 +447,7 @@ class Indicator:
bp_new, bv_new = {}, {}
for pr, v, (inst, direction) in zip(bp_s.values, bv_s.values, trade_dir.items()):
if np.isnan(pr):
bp_new[inst], bv_new[inst] = self._get_base_vol_pri(
bp_tmp, bv_tmp = self._get_base_vol_pri(
inst,
start,
end,
@@ -452,6 +456,8 @@ class Indicator:
trade_exchange=trade_exchange,
pa_config=pa_config,
)
if (bp_tmp is not None) and (bv_tmp is not None):
bp_new[inst], bv_new[inst] = bp_tmp, bv_tmp
else:
bp_new[inst], bv_new[inst] = pr, v

View File

@@ -1,6 +1,7 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from __future__ import annotations
import bisect
from typing import Union, TYPE_CHECKING, Tuple, Union, List, Set
if TYPE_CHECKING:
@@ -118,6 +119,33 @@ class TradeCalendarManager:
"""Get the start_time and end_time for trading"""
return self.start_time, self.end_time
# helper functions
def get_range_idx(self, start_time: pd.Timestamp, end_time: pd.Timestamp) -> Tuple[int, int]:
"""
get the range index which involve start_time~end_time (both sides are closed)
Parameters
----------
start_time : pd.Timestamp
end_time : pd.Timestamp
Returns
-------
Tuple[int, int]:
the index of the range. **the left and right are closed**
"""
left, right = (
bisect.bisect_right(self._calendar, start_time) - 1,
bisect.bisect_right(self._calendar, end_time) - 1,
)
left -= self.start_index
right -= self.start_index
def clip(idx):
return min(max(0, idx), self.trade_len - 1)
return clip(left), clip(right)
def __repr__(self) -> str:
return f"{self.start_time}[{self.start_index}]~{self.end_time}[{self.end_index}]: [{self.trade_step}/{self.trade_len}]"
@@ -201,6 +229,6 @@ def get_start_end_idx(trade_calendar: TradeCalendarManager, outer_trade_decision
start index and end index
"""
try:
return outer_trade_decision.get_range_limit()
return outer_trade_decision.get_range_limit(inner_calendar=trade_calendar)
except NotImplementedError:
return 0, trade_calendar.get_trade_len() - 1

View File

@@ -4,7 +4,7 @@
Time related utils are compiled in this script
"""
import bisect
from datetime import datetime, time
from datetime import datetime, time, date
from typing import List, Tuple
import re
from numpy import append
@@ -122,6 +122,20 @@ def get_day_min_idx_range(start: str, end: str, freq: str) -> Tuple[int, int]:
return left_idx, right_idx
def concat_date_time(date_obj: date, time_obj: time) -> pd.Timestamp:
return pd.Timestamp(
datetime(
date_obj.year,
month=date_obj.month,
day=date_obj.day,
hour=time_obj.hour,
minute=time_obj.minute,
second=time_obj.second,
microsecond=time_obj.microsecond,
)
)
def cal_sam_minute(x: pd.Timestamp, sam_minutes: int) -> pd.Timestamp:
"""
align the minute-level data to a down sampled calendar
@@ -143,17 +157,7 @@ def cal_sam_minute(x: pd.Timestamp, sam_minutes: int) -> pd.Timestamp:
cal = get_min_cal(C.min_data_shift)[::sam_minutes]
idx = bisect.bisect_right(cal, x.time()) - 1
date, new_time = x.date(), cal[idx]
return pd.Timestamp(
datetime(
date.year,
month=date.month,
day=date.day,
hour=new_time.hour,
minute=new_time.minute,
second=new_time.second,
microsecond=new_time.microsecond,
)
)
return concat_date_time(date, new_time)
if __name__ == "__main__":