From 571d27cba7949c65efdfa6b5f48fee8a9c1759e5 Mon Sep 17 00:00:00 2001 From: Young Date: Wed, 14 Jul 2021 13:05:36 +0000 Subject: [PATCH 1/2] exchange support expression buy sell limit --- qlib/backtest/exchange.py | 63 +++++++++++++++++++++++++++++---------- 1 file changed, 47 insertions(+), 16 deletions(-) diff --git a/qlib/backtest/exchange.py b/qlib/backtest/exchange.py index 3794651dc..58f57ed73 100644 --- a/qlib/backtest/exchange.py +++ b/qlib/backtest/exchange.py @@ -26,7 +26,7 @@ class Exchange: codes="all", deal_price: Union[str, Tuple[str], List[str]] = None, subscribe_fields=[], - limit_threshold=None, + limit_threshold: Union[Tuple[str, str], float, None] = None, volume_threshold=None, open_cost=0.0015, close_cost=0.0025, @@ -41,7 +41,7 @@ class Exchange: :param end_time: closed end time for backtest :param codes: list stock_id list or a string of instruments(i.e. all, csi500, sse50) - :param deal_price: Union[str, Tuple[str], List[str]] + :param deal_price: Union[str, Tuple[str, str], List[str]] The `deal_price` supports following two types of input - : str - (, ): Tuple[str] or List[str] @@ -51,8 +51,16 @@ class Exchange: - for example '$close', '$open', '$vwap' ("close" is OK. `Exchange` will help to prepend "$" to the expression) - :param subscribe_fields: list, subscribe fields - :param limit_threshold: float, 0.1 for example, default None + :param subscribe_fields: list, subscribe fields. This expressions will be added to the query and `self.quote`. + It is useful when users want more fields to be queried + + :param limit_threshold: Union[Tuple[str, str], float, None] + 1) `None`: no limitation + 2) float, 0.1 for example, default None + 3) Tuple[str, str]: (, + ) + `False` value indicates the stock is tradable + `True` value indicates the stock is limited and not tradable :param volume_threshold: float, 0.1 for example, default None :param open_cost: cost rate for open, default 0.0015 :param close_cost: cost rate for close, default 0.0025 @@ -97,7 +105,7 @@ class Exchange: if limit_threshold is None: if C.region == REG_CN: self.logger.warning(f"limit_threshold not set. The stocks hit the limit may be bought/sold") - elif abs(limit_threshold) > 0.1: + elif self._get_limit_type(limit_threshold) == self.LT_FLT and abs(limit_threshold) > 0.1: if C.region == REG_CN: self.logger.warning(f"limit_threshold may not be set to a reasonable value") @@ -119,13 +127,17 @@ class Exchange: # $change is for calculating the limit of the stock necessary_fields = {self.buy_price, self.sell_price, "$close", "$change", "$factor", "$volume"} + if self._get_limit_type(limit_threshold) == self.LT_TP_EXP: + for exp in limit_threshold: + necessary_fields.add(exp) subscribe_fields = list(necessary_fields | set(subscribe_fields)) all_fields = list(necessary_fields | set(subscribe_fields)) + self.all_fields = all_fields self.open_cost = open_cost self.close_cost = close_cost self.min_cost = min_cost - self.limit_threshold = limit_threshold + self.limit_threshold: Union[Tuple[str, str], float, None] = limit_threshold self.volume_threshold = volume_threshold self.extra_quote = extra_quote self.set_quote(codes, start_time, end_time) @@ -133,6 +145,7 @@ class Exchange: def set_quote(self, codes, start_time, end_time): if len(codes) == 0: codes = D.instruments() + self.quote = D.features(codes, self.all_fields, start_time, end_time, freq=self.freq, disk_cache=True).dropna( subset=["$close"] ) @@ -157,13 +170,7 @@ class Exchange: self.trade_w_adj_price = False # update limit - # check limit_threshold - if self.limit_threshold is None: - self.quote["limit_buy"] = False - self.quote["limit_sell"] = False - else: - # set limit - self._update_limit(buy_limit=self.limit_threshold, sell_limit=self.limit_threshold) + self._update_limit() quote_df = self.quote if self.extra_quote is not None: @@ -194,9 +201,33 @@ class Exchange: self.quote = quote_dict - def _update_limit(self, buy_limit, sell_limit): - self.quote["limit_buy"] = self.quote["$change"].ge(buy_limit) - self.quote["limit_sell"] = self.quote["$change"].le(-sell_limit) + LT_TP_EXP = "(exp)" # Tuple[str, str] + LT_FLT = "float" # float + LT_NONE = "none" # none + + def _get_limit_type(self, limit_threshold): + if isinstance(limit_threshold, Tuple): + return self.LT_TP_EXP + elif isinstance(limit_threshold, float): + return self.LT_FLT + elif limit_threshold is None: + return self.LT_NONE + else: + raise NotImplementedError(f"This type of `limit_threshold` is not supported") + + def _update_limit(self): + # check limit_threshold + lt_type = self._get_limit_type(self.limit_threshold) + if lt_type == self.LT_NONE: + self.quote["limit_buy"] = False + self.quote["limit_sell"] = False + elif lt_type == self.LT_TP_EXP: + # set limit + self.quote["limit_buy"] = self.quote[self.limit_threshold[0]] + self.quote["limit_sell"] = self.quote[self.limit_threshold[1]] + elif lt_type == self.LT_FLT: + self.quote["limit_buy"] = self.quote["$change"].ge(self.limit_threshold) + self.quote["limit_sell"] = self.quote["$change"].le(-self.limit_threshold) # pylint: disable=E1130 def check_stock_limit(self, stock_id, start_time, end_time, direction=None): """ From 94b456714de74917b629f7bc042527b662363432 Mon Sep 17 00:00:00 2001 From: Young Date: Thu, 15 Jul 2021 07:54:27 +0000 Subject: [PATCH 2/2] refactor index_range to trade_range --- qlib/backtest/__init__.py | 2 +- qlib/backtest/executor.py | 2 +- qlib/backtest/order.py | 258 ++++++++++--------------- qlib/backtest/report.py | 20 +- qlib/contrib/strategy/rule_strategy.py | 29 ++- 5 files changed, 121 insertions(+), 190 deletions(-) diff --git a/qlib/backtest/__init__.py b/qlib/backtest/__init__.py index b4c3e32b8..23b8ec9c5 100644 --- a/qlib/backtest/__init__.py +++ b/qlib/backtest/__init__.py @@ -247,7 +247,7 @@ def collect_data( """initialize the strategy and executor, then collect the trade decision data for rl training please refer to the docs of the backtest for the explanation of the parameters - + Yields ------- object diff --git a/qlib/backtest/executor.py b/qlib/backtest/executor.py index 009e3300f..78cdbe5e0 100644 --- a/qlib/backtest/executor.py +++ b/qlib/backtest/executor.py @@ -296,7 +296,7 @@ class NestedExecutor(BaseExecutor): - The decisions may be updated by steps - The inner executor may not follow the decisions from the outer strategy align_range_limit: bool - force to align the index_range decision + force to align the trade_range decision It is only for nested executor, because range_limit is given by outer strategy """ self.inner_executor: BaseExecutor = init_instance_by_config( diff --git a/qlib/backtest/order.py b/qlib/backtest/order.py index eee9bd8f2..6bf1d5ad9 100644 --- a/qlib/backtest/order.py +++ b/qlib/backtest/order.py @@ -165,7 +165,60 @@ class OrderHelper: ) -class IndexRangeByTime: +class TradeRange: + def __call__(self, trade_calendar: TradeCalendarManager) -> Tuple[int, int]: + """ + This method will be call with following way + + The outer strategy give a decision with with `TradeRange` + The decision will be checked by the inner decision. + inner decision will pass its trade_calendar as parameter when getting the trading range + - The framework's step is integer-index based. + + Parameters + ---------- + trade_calendar : TradeCalendarManager + the trade_calendar is from inner strategy + + Returns + ------- + Tuple[int, int]: + the start index and end index which are tradable + + Raises + ------ + NotImplementedError: + Exceptions are raised when no range limitation + """ + raise NotImplementedError(f"Please implement the `__call__` method") + + def clip_time_range(self, start_time: pd.Timestamp, end_time: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]: + """ + Parameters + ---------- + start_time : pd.Timestamp + end_time : pd.Timestamp + Both sides (start_time, end_time) are closed + + Returns + ------- + Tuple[pd.Timestamp, pd.Timestamp]: + The tradable time range. + - It is intersection of [start_time, end_time] and the rule of TradeRange itself + """ + raise NotImplementedError(f"Please implement the `clip_time_range` method") + + +class IdxTradeRange(TradeRange): + def __init__(self, start_idx: int, end_idx: int): + self._start_idx = start_idx + self._end_idx = end_idx + + def __call__(self, trade_calendar: TradeCalendarManager = None) -> Tuple[int, int]: + return self._start_idx, self._end_idx + + +class TradeRangeByTime(TradeRange): """This is a helper function for make decisions""" def __init__(self, start_time: str, end_time: str): @@ -185,14 +238,24 @@ class IndexRangeByTime: """ self.start_time = pd.Timestamp(start_time).time() self.end_time = pd.Timestamp(end_time).time() + assert self.start_time < self.end_time - def __call__(self, trade_calendar: TradeCalendarManager) -> Tuple[int, int]: + def __call__(self, trade_calendar: TradeCalendarManager = None) -> Tuple[int, int]: + if trade_calendar is None: + raise NotImplementedError("trade_calendar is necessary for getting TradeRangeByTime.") start = trade_calendar.start_time val_start, val_end = concat_date_time(start.date(), self.start_time), concat_date_time( start.date(), self.end_time ) return trade_calendar.get_range_idx(val_start, val_end) + def clip_time_range(self, start_time: pd.Timestamp, end_time: pd.Timestamp) -> Tuple[pd.Timestamp, pd.Timestamp]: + start_date = start_time.date() + val_start, val_end = concat_date_time(start_date, self.start_time), concat_date_time(start_date, self.end_time) + # NOTE: `end_date` should not be used. Because the `end_date` is for slicing. It may be in the next day + # Assumption: start_time and end_time is for intraday trading. So it is OK for only using start_date + return max(val_start, start_time), min(val_end, end_time) + class BaseTradeDecision: """ @@ -210,54 +273,29 @@ class BaseTradeDecision: 2. Same as `case 1.3` """ - def __init__(self, strategy: BaseStrategy, idx_range: Union[Tuple[int, int], Callable] = None): + def __init__(self, strategy: BaseStrategy, trade_range: Union[Tuple[int, int], TradeRange] = None): """ Parameters ---------- strategy : BaseStrategy The strategy who make the decision - idx_range: Union[Tuple[int, int], Callable] (optional) + trade_range: Union[Tuple[int, int], Callable] (optional) The index range for underlying strategy. - Here are two examples of idx_range for each type + Here are two examples of trade_range for each type 1) Tuple[int, int] - start_index and end_index of the underlying factor(both sides are closed) + start_index and end_index of the underlying strategy(both sides are closed) - - 2) Callable - - .. code-block:: python - def idx_range(time_per_step: str) -> Tuple[int, int]: - # time_per_step is the strategy's time_per_step (not inner strategy. It's the `self` strategy in - # `self._idx_range` ) - # e.g. - # For example, strategy A with 30min each step and strategy B with 1min each step - # strategy A's will use "30min" when calling `idx_range`. + 2) TradeRange """ self.strategy = strategy self.total_step = None # upper strategy has no knowledge about the sub executor before `_init_sub_trading` - self._idx_range = idx_range - - @staticmethod - def _calc_idx_range( - idx_range: Union[Tuple[int, int], Callable], inner_calendar: TradeCalendarManager = None - ) -> Tuple[int, int]: - """calculate index range for `idx_range` in different cases""" - if idx_range is None: - # not set, return nothing - return None, None - elif isinstance(idx_range, tuple): - return idx_range - elif isinstance(idx_range, Callable): - if inner_calendar is None: - # time_per_step is a required parameter for `def idx_range` - return None, None - else: - return idx_range(inner_calendar) - else: - raise NotImplementedError(f"This type of input is not supported") + if isinstance(trade_range, Tuple): + # for Tuple[int, int] + trade_range = IdxTradeRange(**trade_range) + self.trade_range: TradeRange = trade_range def get_decision(self) -> List[object]: """ @@ -302,12 +340,18 @@ class BaseTradeDecision: # purpose 2) return self.strategy.update_trade_decision(self, trade_calendar) + def _get_range_limit(self, **kwargs) -> Tuple[int, int]: + if self.trade_range is not None: + return self.trade_range(trade_calendar=kwargs.get("inner_calendar")) + else: + raise NotImplementedError("The decision didn't provide an index range") + def get_range_limit(self, **kwargs) -> Tuple[int, int]: """ return the expected step range for limiting the decision execution time Both left and right are **closed** - if no available _idx_range, `default_value` will be returned + if no available trade_range, `default_value` will be returned It is only used in `NestedExecutor` - The outmost strategy will not follow any range limit (but it may give range_limit) @@ -327,7 +371,7 @@ class BaseTradeDecision: "default_value": , # using dict is for distinguish no value provided or None provided "inner_calendar": # because the range limit will control the step range of inner strategy, inner calendar will be a - # important parameter when _idx_range is callable + # important parameter when trade_range is callable } Returns @@ -341,29 +385,25 @@ class BaseTradeDecision: 1) the decision can't provide a unified start and end 2) default_value is not provided """ - - # get index - _start_idx, _end_idx = self._calc_idx_range(self._idx_range, inner_calendar=kwargs.get("inner_calendar")) - if _start_idx is None or _end_idx is None: - # handle case without decision - # TODO: time range in the order should be checked. - - # _start_idx and _end_idx should be used instead of _idx_range - # because it is possible that no limitation when _idx_range is callable and return None + try: + _start_idx, _end_idx = self._get_range_limit(**kwargs) + except NotImplementedError: if "default_value" in kwargs: return kwargs["default_value"] else: # Default to get full index raise NotImplementedError(f"The decision didn't provide an index range") - else: - # clip index - if getattr(self, "total_step", None) is not None: - # if `self.update` is called. - # Then the _start_idx, _end_idx should be clipped - if _start_idx < 0 or _end_idx >= self.total_step: - logger = get_module_logger("decision") - logger.warning(f"{self._idx_range} go beyoud the total_step({self.total_step}), it will be clipped") - _start_idx, _end_idx = max(0, _start_idx), min(self.total_step - 1, _end_idx) + + # clip index + if getattr(self, "total_step", None) is not None: + # if `self.update` is called. + # Then the _start_idx, _end_idx should be clipped + if _start_idx < 0 or _end_idx >= self.total_step: + logger = get_module_logger("decision") + logger.warning( + f"[{_start_idx},{_end_idx}] go beyoud the total_step({self.total_step}), it will be clipped" + ) + _start_idx, _end_idx = max(0, _start_idx), min(self.total_step - 1, _end_idx) return _start_idx, _end_idx def empty(self) -> bool: @@ -393,9 +433,9 @@ class BaseTradeDecision: inner_trade_decision : BaseTradeDecision """ # base class provide a default behaviour to modify inner_trade_decision - # callable _idx_range should be propagated when inner _idx_range is not set - if isinstance(self._idx_range, Callable) and inner_trade_decision._idx_range is None: - inner_trade_decision._idx_range = self._idx_range + # trade_range should be propagated when inner trade_range is not set + if inner_trade_decision.trade_range is None: + inner_trade_decision.trade_range = self.trade_range class EmptyTradeDecision(BaseTradeDecision): @@ -409,106 +449,12 @@ class TradeDecisionWO(BaseTradeDecision): Besides, the time_range is also included. """ - def __init__(self, order_list: List[Order], strategy: BaseStrategy, idx_range: Tuple[int, int] = None): - super().__init__(strategy, idx_range=idx_range) + def __init__(self, order_list: List[Order], strategy: BaseStrategy, trade_range: Tuple[int, int] = None): + super().__init__(strategy, trade_range=trade_range) self.order_list = order_list def get_decision(self) -> List[object]: return self.order_list def __repr__(self) -> str: - return f"strategy: {self.strategy}; idx_range: {self._idx_range}; order_list[{len(self.order_list)}]" - - -# TODO: the orders below need to be discussed ------------------------------------ -# - The classes below are designed for Case 1 -# - However, Case 1 can't take `order_pool` as the an argument as the constructor function -class TradeDecisionWithOrderPool: - """trade decision that made by strategy""" - - def __init__(self, strategy, order_pool): - """ - Parameters - ---------- - strategy : BaseStrategy - the original strategy that make the decision - order_pool : list, optional - the candinate order pool for generate trade decision - """ - super(TradeDecisionWithOrderPool, self).__init__(strategy) - self.order_pool = order_pool - self.order_list = [] - - def pop_order_pool(self, pop_len): - if pop_len > len(self.order_pool): - warnings.warn( - f"pop len {pop_len} is too much length than order pool, cut it as pool length {len(self.order_pool)}" - ) - pop_len = len(self.order_pool) - res = self.order_pool[:pop_len] - del self.order_pool[:pop_len] - return res - - def push_order_list(self, order_list): - self.order_list.extend(order_list) - - def get_decision(self): - """get the order list - - Parameters - ---------- - only_enable : bool, optional - wether to ignore disabled order, by default False - only_disable : bool, optional - wether to ignore enabled order, by default False - Returns - ------- - List[Order] - the order list - """ - return self.order_list - - def update(self, trade_calendar): - """make the original strategy update the enabled status of orders.""" - self.ori_strategy.update_trade_decision(self, trade_calendar) - - -class BaseDecisionUpdater: - def update_decision(self, decision, trade_calendar) -> BaseTradeDecision: - """ - Parameters - ---------- - decision : BaseTradeDecision - the trade decision to be updated - trade_calendar : BaseTradeCalendar - the trade calendar of inner execution - - Returns - ------- - BaseTradeDecision - the updated decision - """ - raise NotImplementedError(f"This method is not implemented") - - -class DecisionUpdaterWithOrderPool: - def __init__(self, plan_config=None): - """ - Parameters - ---------- - plan_config : Dict[Tuple(int, float)], optional - the plan config, by default None - """ - if plan_config is None: - self.plan_config = [(0, 1)] - else: - self.plan_config = plan_config - - def update_decision(self, decision, trade_calendar) -> BaseTradeDecision: - # get the number of trading step finished, trade_step can be [0, 1, 2, ..., trade_len - 1] - trade_step = self.trade_calendar.get_trade_step() - for _index, _ratio in self.plan_config: - if trade_step == _index: - pop_len = len(decision.order_pool) * _ratio - pop_order_list = decision.pop_order_pool(pop_len) - decision.push_order_list(pop_order_list) + return f"strategy: {self.strategy}; trade_range: {self.trade_range}; order_list[{len(self.order_list)}]" diff --git a/qlib/backtest/report.py b/qlib/backtest/report.py index cb650beb7..0d4d3f0d7 100644 --- a/qlib/backtest/report.py +++ b/qlib/backtest/report.py @@ -364,6 +364,11 @@ class Indicator: agg = pa_config.get("agg", "twap").lower() price = pa_config.get("price", "deal_price").lower() + # NOTE: IndexTradeRange is not supported!!!!! Because inner index is not available + trade_start_time, trade_end_time = decision.trade_range.clip_time_range( + start_time=trade_start_time, end_time=trade_end_time + ) + if price == "deal_price": price_s = trade_exchange.get_deal_price( inst, trade_start_time, trade_end_time, direction=direction, method=None @@ -386,21 +391,6 @@ class Indicator: else: raise NotImplementedError(f"This type of input is not supported") - # no sub executor on the lowest level - # So range_limit an total step will all be None - total_step = decision.total_step - if total_step is None: - total_step = 1 - range_limit = decision.get_range_limit(default_value=(0, total_step - 1)) - - assert volume_s.shape[0] % total_step == 0, "The price series can't be divided by step length" - factor = volume_s.shape[0] // total_step - - slc = slice(range_limit[0] * factor, (range_limit[1] + 1) * factor) - - volume_s = volume_s.iloc[slc] - price_s = price_s.iloc[slc] - base_volume = volume_s.sum().item() base_price = ((price_s * volume_s).sum() / base_volume).item() diff --git a/qlib/contrib/strategy/rule_strategy.py b/qlib/contrib/strategy/rule_strategy.py index f689b4003..56884cd48 100644 --- a/qlib/contrib/strategy/rule_strategy.py +++ b/qlib/contrib/strategy/rule_strategy.py @@ -10,7 +10,7 @@ from qlib.utils import lazy_sort_index from ...utils.resam import resam_ts_data, ts_data_last from ...data.data import D from ...strategy.base import BaseStrategy -from ...backtest.order import BaseTradeDecision, Order, TradeDecisionWO +from ...backtest.order import BaseTradeDecision, Order, TradeDecisionWO, TradeRange from ...backtest.exchange import Exchange, OrderHelper from ...backtest.utils import CommonInfrastructure, LevelInfrastructure from qlib.utils.file import get_io_object @@ -625,7 +625,7 @@ class ACStrategy(BaseStrategy): class RandomOrderStrategy(BaseStrategy): def __init__( self, - index_range: Tuple[int, int], # The range is closed on both left and right. + trade_range: Union[Tuple[int, int], TradeRange], # The range is closed on both left and right. sample_ratio: float = 1.0, volume_ratio: float = 0.01, market: str = "all", @@ -636,13 +636,8 @@ class RandomOrderStrategy(BaseStrategy): """ Parameters ---------- - index_range : Tuple - the intra day time index range of the orders - the left and right is closed. - - If you want to get the index_range in intra-day - - `qlib/utils/time.py:def get_day_min_idx_range` can help you create the index range easier - # TODO: this is a index_range level limitation. We'll implement a more detailed limitation later. + trade_range : Tuple + please refer to the `trade_range` parameter of BaseStrategy sample_ratio : float the ratio of all orders are sampled volume_ratio : float @@ -653,7 +648,6 @@ class RandomOrderStrategy(BaseStrategy): """ super().__init__(*args, **kwargs) - self.index_range = index_range self.sample_ratio = sample_ratio self.volume_ratio = volume_ratio self.market = market @@ -664,6 +658,7 @@ class RandomOrderStrategy(BaseStrategy): D.instruments(market), ["Mean(Ref($volume, 1), 10)"], start_time=exch.start_time, end_time=exch.end_time ) self.volume_df = self.volume.iloc[:, 0].unstack() + self.trade_range = trade_range def generate_trade_decision(self, execute_result=None): trade_step = self.trade_calendar.get_trade_step() @@ -683,7 +678,7 @@ class RandomOrderStrategy(BaseStrategy): direction=self.direction, ) ) - return TradeDecisionWO(order_list, self, self.index_range) + return TradeDecisionWO(order_list, self, self.trade_range) class FileOrderStrategy(BaseStrategy): @@ -692,7 +687,7 @@ class FileOrderStrategy(BaseStrategy): - This class provides an interface for user to read orders from csv files. """ - def __init__(self, file: Union[IO, str, Path], index_range: Tuple[int, int] = None, *args, **kwargs): + def __init__(self, file: Union[IO, str, Path], trade_range: Union[Tuple[int, int], TradeRange]= None, *args, **kwargs): """ Parameters @@ -709,13 +704,13 @@ class FileOrderStrategy(BaseStrategy): 20200103, SH600519, 1000, buy 20200106, SH600519, 1000, sell - index_range : Tuple[int, int] + trade_range : Tuple[int, int] the intra day time index range of the orders the left and right is closed. - If you want to get the index_range in intra-day + If you want to get the trade_range in intra-day - `qlib/utils/time.py:def get_day_min_idx_range` can help you create the index range easier - # TODO: this is a index_range level limitation. We'll implement a more detailed limitation later. + # TODO: this is a trade_range level limitation. We'll implement a more detailed limitation later. """ super().__init__(*args, **kwargs) @@ -727,7 +722,7 @@ class FileOrderStrategy(BaseStrategy): # make sure the datetime is the first level for fast indexing self.order_df = lazy_sort_index(convert_index_format(self.order_df, level="datetime")) - self.index_range = index_range + self.trade_range = trade_range def generate_trade_decision(self, execute_result=None) -> TradeDecisionWO: """ @@ -757,4 +752,4 @@ class FileOrderStrategy(BaseStrategy): end_time=end, ) ) - return TradeDecisionWO(order_list, self, self.index_range) + return TradeDecisionWO(order_list, self, self.trade_range)