From 5c2ddac7f0041dadce8eba7f3c6985eedabced12 Mon Sep 17 00:00:00 2001 From: "wangwenxi.handsome" Date: Sat, 31 Jul 2021 09:31:01 +0000 Subject: [PATCH] volume limit --- qlib/backtest/exchange.py | 167 ++++++++++++++++++++++++++++++---- qlib/backtest/executor.py | 28 +++++- qlib/contrib/ops/high_freq.py | 55 +++++++++++ 3 files changed, 232 insertions(+), 18 deletions(-) create mode 100644 qlib/contrib/ops/high_freq.py diff --git a/qlib/backtest/exchange.py b/qlib/backtest/exchange.py index 9044179e0..eae7bb4f6 100644 --- a/qlib/backtest/exchange.py +++ b/qlib/backtest/exchange.py @@ -68,7 +68,31 @@ class Exchange: ) `False` value indicates the stock is tradable `True` value indicates the stock is limited and not tradable - :param volume_threshold: float, 0.1 for example, default None + + :param volume_threshold: Union[ + Dict[ + "all": Union(str, List[str], Tuple[str]), + "buy": Union(str, List[str], Tuple[str]), + "sell": Union(str, List[str], Tuple[str]), + ], + Union(str, List[str], Tuple[str], + ] + 1) str means one volume limit. In another words, each volume limit is a string. + There are two kinds of string to represent limit. + - the first kind of string is qlib data expression but it must starts with "$". + such as "$askV1", "$bidV1 * 0.8" + - the second kind of string is composed of special fields. Currently we only + supports #market and #dealed. #market is market volume so far that day. + !!!Note that if you use the #market field, you must register the DayCumsum operator + in qlib.contrib.ops.high_freq when initial the qlib. #dealed is dealed order num so far that day. + such as "0.8 * #market - #dealed", "0.6 * #market" + 2) "all" means the volume limits are both of buying and selling. + "buy" means the volume limits of buying. "sell" means the volume limits of selling. + Different volume limits will be aggregated with min(). If volume_threshold is only + Union(str, List[str], Tuple[str]) instead of a dict, the volume limits are for + both by deault. + 3) e.g. {"all": ("#market * 0.2 - #dealed"), "buy": ("$askV1"), "sell": ("$bidV1")} + :param open_cost: cost rate for open, default 0.0015 :param close_cost: cost rate for close, default 0.0025 :param trade_unit: trade unit, 100 for China A market. @@ -134,11 +158,15 @@ class Exchange: # $factor is for rounding to the trading unit # $change is for calculating the limit of the stock + #  get volume limit from kwargs + self.buy_vol_limit, self.sell_vol_limit, vol_lt_fields = self._get_vol_limit(volume_threshold) + necessary_fields = {self.buy_price, self.sell_price, "$close", "$change", "$factor", "$volume"} if self.limit_type == self.LT_TP_EXP: for exp in limit_threshold: necessary_fields.add(exp) - all_fields = list(necessary_fields | set(subscribe_fields)) + all_fields = necessary_fields | vol_lt_fields + all_fields = list(all_fields | set(subscribe_fields)) self.all_fields = all_fields self.open_cost = open_cost @@ -234,6 +262,61 @@ class Exchange: self.quote_df["limit_buy"] = self.quote_df["$change"].ge(limit_threshold) self.quote_df["limit_sell"] = self.quote_df["$change"].le(-limit_threshold) # pylint: disable=E1130 + def _get_vol_limit(self, volume_threshold): + """ + preproccess the volume limit. + get the fields need to get from qlib. + get the volume limit list of buying and selling which is composed of all limits. + + Parameters + ---------- + volume_threshold : + please refer to the doc of exchange. + + Returns + ------- + fields: set + the fields need to get from qlib. + buy_vol_limit: List[str] + all volume limits of buying. + sell_vol_limit: List[str] + all volume limits of selling. + + Raises + ------ + ValueError + the format of volume_threshold is not supported. + """ + if volume_threshold is None: + return None, None, set() + + fields = set() + buy_vol_limit = [] + sell_vol_limit = [] + if isinstance(volume_threshold, (str, tuple, list)): + volume_threshold = {"all": volume_threshold} + + for key in volume_threshold: + vol_limits = volume_threshold[key] + if isinstance(vol_limits, str): + vol_limits = [vol_limits] + for vol_lt in vol_limits: + # the str is qlib data expression when the first character is "$". + if vol_lt[0] == "$": + fields.add(vol_lt) + # the str is composed of special_fields + elif "#market" in vol_lt: + fields.add("DayCumsum($volume)") + else: + raise ValueError(f"volume limit string must be qlib expression or special_fields") + + if key in ("buy", "all"): + buy_vol_limit.append(vol_lt) + if key in ("sell", "all"): + sell_vol_limit.append(vol_lt) + + return buy_vol_limit, sell_vol_limit, fields + def check_stock_limit(self, stock_id, start_time, end_time, direction=None): """ Parameters @@ -282,7 +365,9 @@ class Exchange: else: return True - def deal_order(self, order, trade_account: Account = None, position: BasePosition = None): + def deal_order( + self, order, trade_account: Account = None, position: BasePosition = None, deal_order_num: dict = None + ): """ Deal order when the actual transaction @@ -291,6 +376,7 @@ class Exchange: :param order: Deal the order. :param trade_account: Trade account to be updated after dealing the order. :param position: position to be updated after dealing the order. + :param deal_order_num: the dealed order num dict with the format of {"buy":{stock_id: int}, "sell":{stock_id: int}} :return: trade_val, trade_cost, trade_price """ # check order first. @@ -305,7 +391,7 @@ class Exchange: trade_price = self.get_deal_price(order.stock_id, order.start_time, order.end_time, order.direction) # NOTE: order will be changed in this function trade_val, trade_cost = self._calc_trade_info_by_order( - order, trade_account.current if trade_account else position + order, trade_account.current if trade_account else position, deal_order_num ) if order.deal_amount > 1e-5: # If the order can only be deal 0 amount. Nothing to be updated @@ -569,14 +655,64 @@ class Exchange: return (deal_amount * factor + 0.1) // self.trade_unit * self.trade_unit / factor return deal_amount - def _get_amount_by_volume(self, stock_id, trade_start_time, trade_end_time, deal_amount): - if self.volume_threshold is not None: - tradable_amount = self.get_volume(stock_id, trade_start_time, trade_end_time) * self.volume_threshold - return max(min(tradable_amount, deal_amount), 0) - else: - return deal_amount + def _get_amount_by_volume(self, order: Order, deal_order_num: dict) -> int: + """parse the capacity limit string and return the actual number of orders that can be executed. - def _calc_trade_info_by_order(self, order, position: Position): + Parameters + ---------- + order : Order + the order to be executed. + deal_order_num : dict + the dealed order num dict with the format of {"buy":{stock_id: int}, "sell":{stock_id: int}} + + Returns + ------- + int + the actual number of orders that can be executed, due to the volume limit. + """ + if order.direction == Order.BUY: + vol_limit = self.buy_vol_limit + deal_order_num = deal_order_num["buy"] + elif order.direction == Order.SELL: + vol_limit = self.sell_vol_limit + deal_order_num = deal_order_num["sell"] + + if vol_limit is None: + return order.deal_amount + + vol_limit_num = [] + for limit in vol_limit: + assert isinstance(limit, str) + if limit[0] == "$": + vol_limit_num.append( + str( + self.quote.get_data( + order.stock_id, + order.start_time, + order.end_time, + fields=limit, + method=ts_data_last, + ) + ) + ) + else: + if "#market in limit": + market_limit = self.quote.get_data( + order.stock_id, + order.start_time, + order.end_time, + fields="DayCumsum($volume)", + method=ts_data_last, + ) + limit_tmp = limit.replace("#market", f"{market_limit}") + if "#dealed in limit": + limit_tmp = limit_tmp.replace("#dealed", f"{deal_order_num[order.stock_id]}") + vol_limit_num.append(limit_tmp) + + vol_limit_num = min([eval(i) for i in vol_limit_num]) + return max(min(vol_limit_num, order.deal_amount), 0) + + def _calc_trade_info_by_order(self, order, position: Position, deal_order_num): """ Calculation of trade info @@ -584,6 +720,7 @@ class Exchange: :param order: :param position: Position + :param deal_order_num: the dealed order num dict with the format of {"buy":{stock_id: int}, "sell":{stock_id: int}} :return: trade_val, trade_cost """ @@ -607,9 +744,7 @@ class Exchange: # We choose to sell all order.deal_amount = order.amount - order.deal_amount = self._get_amount_by_volume( - order.stock_id, order.start_time, order.end_time, order.deal_amount - ) + order.deal_amount = self._get_amount_by_volume(order, deal_order_num) trade_val = order.deal_amount * trade_price trade_cost = max(trade_val * self.close_cost, self.min_cost) elif order.direction == Order.BUY: @@ -629,9 +764,7 @@ class Exchange: # Unknown amount of money. Just round the amount order.deal_amount = self.round_amount_by_trade_unit(order.amount, order.factor) - order.deal_amount = self._get_amount_by_volume( - order.stock_id, order.start_time, order.end_time, order.deal_amount - ) + order.deal_amount = self._get_amount_by_volume(order, deal_order_num) trade_val = order.deal_amount * trade_price trade_cost = max(trade_val * self.open_cost, self.min_cost) else: diff --git a/qlib/backtest/executor.py b/qlib/backtest/executor.py index 0121a904e..b79de011a 100644 --- a/qlib/backtest/executor.py +++ b/qlib/backtest/executor.py @@ -7,6 +7,7 @@ from qlib.backtest.account import Account import warnings import pandas as pd from typing import List, Tuple, Union +from collections import defaultdict from qlib.backtest.report import Indicator @@ -466,6 +467,10 @@ class SimulatorExecutor(BaseExecutor): self.trade_type = trade_type + # record deal order num in one day + self.deal_order_num = {"buy": defaultdict(int), "sell": defaultdict(int)} + self.deal_day = None + def _get_order_iterator(self, trade_decision: BaseTradeDecision) -> List[Order]: """ @@ -495,6 +500,22 @@ class SimulatorExecutor(BaseExecutor): raise NotImplementedError(f"This type of input is not supported") return order_it + def _update_order_num(self, order): + """update date and dealed order num in the day.""" + + now_deal_day = order.start_time.floor(freq="D") + if self.deal_day is None: + self.deal_day = now_deal_day + if now_deal_day > self.deal_day: + self.deal_order_num = {"buy": defaultdict(int), "sell": defaultdict(int)} + self.deal_day = now_deal_day + if order.direction == Order.BUY: + self.deal_order_num["buy"][order.stock_id] += order.deal_amount + elif order.direction == Order.SELL: + self.deal_order_num["sell"][order.stock_id] += order.deal_amount + else: + raise NotImplementedError(f"order type {order.type} error") + def _collect_data(self, trade_decision: BaseTradeDecision, level: int = 0): trade_start_time, _ = self.trade_calendar.get_step_time() @@ -503,8 +524,13 @@ class SimulatorExecutor(BaseExecutor): for order in self._get_order_iterator(trade_decision): # execute the order. # NOTE: The trade_account will be changed in this function - trade_val, trade_cost, trade_price = self.trade_exchange.deal_order(order, trade_account=self.trade_account) + trade_val, trade_cost, trade_price = self.trade_exchange.deal_order( + order, + trade_account=self.trade_account, + deal_order_num=self.deal_order_num, + ) execute_result.append((order, trade_val, trade_cost, trade_price)) + self._update_order_num(order) if self.verbose: print( "[I {:%Y-%m-%d %H:%M:%S}]: {} {}, price {:.2f}, amount {}, deal_amount {}, factor {}, value {:.2f}, cash {:.2f}.".format( diff --git a/qlib/contrib/ops/high_freq.py b/qlib/contrib/ops/high_freq.py new file mode 100644 index 000000000..eee28c275 --- /dev/null +++ b/qlib/contrib/ops/high_freq.py @@ -0,0 +1,55 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +from pathlib import Path +import numpy as np +import pandas as pd + +import qlib +from qlib.data import D +from qlib.data.cache import H +from qlib.data.data import Cal +from qlib.data.ops import ElemOperator + + +def get_calendar_day(freq="1min", future=False): + """Load High-Freq Calendar Date Using Memcache. + + Parameters + ---------- + freq : str + frequency of read calendar file. + future : bool + whether including future trading day. + + Returns + ------- + _calendar: + array of date. + """ + flag = f"{freq}_future_{future}_day" + if flag in H["c"]: + _calendar = H["c"][flag] + else: + _calendar = np.array(list(map(lambda x: x.date(), Cal.load_calendar(freq, future)))) + H["c"][flag] = _calendar + return _calendar + + +class DayCumsum(ElemOperator): + """DayLast Operator + + Parameters + ---------- + feature : Expression + feature instance + + Returns + ---------- + feature: + a series of that each value equals the last value of its day + """ + + def _load_internal(self, instrument, start_index, end_index, freq): + _calendar = get_calendar_day(freq=freq) + series = self.feature.load(instrument, start_index, end_index, freq) + return series.groupby(_calendar[series.index]).cumsum()