diff --git a/examples/highfreq/highfreq_ops.py b/examples/highfreq/highfreq_ops.py index ef784b34c..175f4f66b 100644 --- a/examples/highfreq/highfreq_ops.py +++ b/examples/highfreq/highfreq_ops.py @@ -5,30 +5,7 @@ from qlib.data.ops import ElemOperator, PairOperator from qlib.config import C from qlib.data.cache import H from qlib.data.data import Cal - - -def get_calendar_day(freq="day", future=False): - """Load High-Freq Calendar Date Using Memcache. - - Parameters - ---------- - freq : str - frequency of read calendar file. - future : bool - whether including future trading day. - - Returns - ------- - _calendar: - array of date. - """ - flag = f"{freq}_future_{future}_day" - if flag in H["c"]: - _calendar = H["c"][flag] - else: - _calendar = np.array(list(map(lambda x: pd.Timestamp(x.date()), Cal.load_calendar(freq, future)))) - H["c"][flag] = _calendar - return _calendar +from qlib.contrib.ops.high_freq import get_calendar_day class DayLast(ElemOperator): diff --git a/qlib/backtest/exchange.py b/qlib/backtest/exchange.py index eae7bb4f6..eeee269bd 100644 --- a/qlib/backtest/exchange.py +++ b/qlib/backtest/exchange.py @@ -1,6 +1,7 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. from __future__ import annotations +from collections import defaultdict from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -68,30 +69,37 @@ class Exchange: ) `False` value indicates the stock is tradable `True` value indicates the stock is limited and not tradable - - :param volume_threshold: Union[ + + :param volume_threshold: Union[ Dict[ - "all": Union(str, List[str], Tuple[str]), - "buy": Union(str, List[str], Tuple[str]), - "sell": Union(str, List[str], Tuple[str]), + "all": ("cum" or "current", limit_str), + "buy": ("cum" or "current", limit_str), + "sell":("cum" or "current", limit_str), ], - Union(str, List[str], Tuple[str], - ] - 1) str means one volume limit. In another words, each volume limit is a string. - There are two kinds of string to represent limit. - - the first kind of string is qlib data expression but it must starts with "$". - such as "$askV1", "$bidV1 * 0.8" - - the second kind of string is composed of special fields. Currently we only - supports #market and #dealed. #market is market volume so far that day. - !!!Note that if you use the #market field, you must register the DayCumsum operator - in qlib.contrib.ops.high_freq when initial the qlib. #dealed is dealed order num so far that day. - such as "0.8 * #market - #dealed", "0.6 * #market" - 2) "all" means the volume limits are both of buying and selling. + ("cum" or "current", limit_str), + ] + 1) ("cum" or "current", limit_str) denotes a single volume limit. + - limit_str is qlib data expression which is allowed to define your own Operator. + Please refer to qlib/contrib/ops/high_freq.py, here are any custom operator for high frequency, + such as DayCumsum. !!!NOTE: if you want you use the custom operator, you need to + register it in qlib_init. + - "cum" means that this is a cumulative value over time, such as cumulative market volume. + So when it is used as a volume limit, it is necessary to subtract the dealed amount. + - "current" means that this is a real-time value and will not accumulate over time, + so it can be directly used as a capacity limit. + e.g. ("cum", "0.2 * DayCumsum($volume, '9:45', '14:45')"), ("current", "$bidV1") + + 2) "all" means the volume limits are both buying and selling. "buy" means the volume limits of buying. "sell" means the volume limits of selling. Different volume limits will be aggregated with min(). If volume_threshold is only - Union(str, List[str], Tuple[str]) instead of a dict, the volume limits are for - both by deault. - 3) e.g. {"all": ("#market * 0.2 - #dealed"), "buy": ("$askV1"), "sell": ("$bidV1")} + ("cum" or "current", limit_str) instead of a dict, the volume limits are for + both by deault. In other words, it is same as {"all": ("cum" or "current", limit_str)}. + + 3) e.g. "volume_threshold": { + "all": ("cum", "0.2 * DayCumsum($volume, '9:45', '14:45')"), + "buy": ("current", "$askV1"), + "sell": ("current", "$bidV1"), + } :param open_cost: cost rate for open, default 0.0015 :param close_cost: cost rate for close, default 0.0025 @@ -277,9 +285,9 @@ class Exchange: ------- fields: set the fields need to get from qlib. - buy_vol_limit: List[str] + buy_vol_limit: List[Tuple[str]] all volume limits of buying. - sell_vol_limit: List[str] + sell_vol_limit: List[Tuple[str]] all volume limits of selling. Raises @@ -293,27 +301,19 @@ class Exchange: fields = set() buy_vol_limit = [] sell_vol_limit = [] - if isinstance(volume_threshold, (str, tuple, list)): + if isinstance(volume_threshold, tuple): volume_threshold = {"all": volume_threshold} + assert type(volume_threshold) == dict for key in volume_threshold: - vol_limits = volume_threshold[key] - if isinstance(vol_limits, str): - vol_limits = [vol_limits] - for vol_lt in vol_limits: - # the str is qlib data expression when the first character is "$". - if vol_lt[0] == "$": - fields.add(vol_lt) - # the str is composed of special_fields - elif "#market" in vol_lt: - fields.add("DayCumsum($volume)") - else: - raise ValueError(f"volume limit string must be qlib expression or special_fields") + vol_limit = volume_threshold[key] + assert type(vol_limit) == tuple + fields.add(vol_limit[1]) - if key in ("buy", "all"): - buy_vol_limit.append(vol_lt) - if key in ("sell", "all"): - sell_vol_limit.append(vol_lt) + if key in ("buy", "all"): + buy_vol_limit.append(vol_limit) + if key in ("sell", "all"): + sell_vol_limit.append(vol_limit) return buy_vol_limit, sell_vol_limit, fields @@ -366,7 +366,11 @@ class Exchange: return True def deal_order( - self, order, trade_account: Account = None, position: BasePosition = None, deal_order_num: dict = None + self, + order, + trade_account: Account = None, + position: BasePosition = None, + dealed_order_amount: defaultdict = defaultdict(float), ): """ Deal order when the actual transaction @@ -376,7 +380,7 @@ class Exchange: :param order: Deal the order. :param trade_account: Trade account to be updated after dealing the order. :param position: position to be updated after dealing the order. - :param deal_order_num: the dealed order num dict with the format of {"buy":{stock_id: int}, "sell":{stock_id: int}} + :param dealed_order_amount: the dealed order amount dict with the format of {stock_id: float} :return: trade_val, trade_cost, trade_price """ # check order first. @@ -391,7 +395,7 @@ class Exchange: trade_price = self.get_deal_price(order.stock_id, order.start_time, order.end_time, order.direction) # NOTE: order will be changed in this function trade_val, trade_cost = self._calc_trade_info_by_order( - order, trade_account.current if trade_account else position, deal_order_num + order, trade_account.current if trade_account else position, dealed_order_amount ) if order.deal_amount > 1e-5: # If the order can only be deal 0 amount. Nothing to be updated @@ -655,64 +659,59 @@ class Exchange: return (deal_amount * factor + 0.1) // self.trade_unit * self.trade_unit / factor return deal_amount - def _get_amount_by_volume(self, order: Order, deal_order_num: dict) -> int: - """parse the capacity limit string and return the actual number of orders that can be executed. + def _get_amount_by_volume(self, order: Order, dealed_order_amount: dict) -> int: + """parse the capacity limit string and return the actual amount of orders that can be executed. Parameters ---------- order : Order the order to be executed. - deal_order_num : dict - the dealed order num dict with the format of {"buy":{stock_id: int}, "sell":{stock_id: int}} + dealed_order_amount : dict + :param dealed_order_amount: the dealed order amount dict with the format of {stock_id: float} Returns ------- int - the actual number of orders that can be executed, due to the volume limit. - """ + the actual amount of orders that can be executed, due to the volume limit. + """ if order.direction == Order.BUY: vol_limit = self.buy_vol_limit - deal_order_num = deal_order_num["buy"] elif order.direction == Order.SELL: vol_limit = self.sell_vol_limit - deal_order_num = deal_order_num["sell"] if vol_limit is None: return order.deal_amount vol_limit_num = [] for limit in vol_limit: - assert isinstance(limit, str) - if limit[0] == "$": + assert isinstance(limit, tuple) + if limit[0] == "current": vol_limit_num.append( - str( - self.quote.get_data( - order.stock_id, - order.start_time, - order.end_time, - fields=limit, - method=ts_data_last, - ) - ) - ) - else: - if "#market in limit": - market_limit = self.quote.get_data( + self.quote.get_data( order.stock_id, order.start_time, order.end_time, - fields="DayCumsum($volume)", + fields=limit[1], method=ts_data_last, ) - limit_tmp = limit.replace("#market", f"{market_limit}") - if "#dealed in limit": - limit_tmp = limit_tmp.replace("#dealed", f"{deal_order_num[order.stock_id]}") - vol_limit_num.append(limit_tmp) - - vol_limit_num = min([eval(i) for i in vol_limit_num]) + ) + elif limit[0] == "cum": + vol_limit_num.append( + self.quote.get_data( + order.stock_id, + order.start_time, + order.end_time, + fields=limit[1], + method=ts_data_last, + ) + - dealed_order_amount[order.stock_id] + ) + else: + raise ValueError(f"{limit[0]} is not supported") + vol_limit_num = min(vol_limit_num) return max(min(vol_limit_num, order.deal_amount), 0) - def _calc_trade_info_by_order(self, order, position: Position, deal_order_num): + def _calc_trade_info_by_order(self, order, position: Position, dealed_order_amount): """ Calculation of trade info @@ -720,7 +719,7 @@ class Exchange: :param order: :param position: Position - :param deal_order_num: the dealed order num dict with the format of {"buy":{stock_id: int}, "sell":{stock_id: int}} + :param dealed_order_amount: the dealed order amount dict with the format of {stock_id: float} :return: trade_val, trade_cost """ @@ -744,7 +743,7 @@ class Exchange: # We choose to sell all order.deal_amount = order.amount - order.deal_amount = self._get_amount_by_volume(order, deal_order_num) + order.deal_amount = self._get_amount_by_volume(order, dealed_order_amount) trade_val = order.deal_amount * trade_price trade_cost = max(trade_val * self.close_cost, self.min_cost) elif order.direction == Order.BUY: @@ -764,7 +763,7 @@ class Exchange: # Unknown amount of money. Just round the amount order.deal_amount = self.round_amount_by_trade_unit(order.amount, order.factor) - order.deal_amount = self._get_amount_by_volume(order, deal_order_num) + order.deal_amount = self._get_amount_by_volume(order, dealed_order_amount) trade_val = order.deal_amount * trade_price trade_cost = max(trade_val * self.open_cost, self.min_cost) else: diff --git a/qlib/backtest/executor.py b/qlib/backtest/executor.py index b79de011a..6b44bd1b7 100644 --- a/qlib/backtest/executor.py +++ b/qlib/backtest/executor.py @@ -114,6 +114,10 @@ class BaseExecutor: if common_infra is None: get_module_logger("BaseExecutor").warning(f"`common_infra` is not set for {self}") + # record deal order amount in one day + self.dealed_order_amount = defaultdict(float) + self.deal_day = None + def reset_common_infra(self, common_infra): """ reset infrastructure for trading @@ -467,10 +471,6 @@ class SimulatorExecutor(BaseExecutor): self.trade_type = trade_type - # record deal order num in one day - self.deal_order_num = {"buy": defaultdict(int), "sell": defaultdict(int)} - self.deal_day = None - def _get_order_iterator(self, trade_decision: BaseTradeDecision) -> List[Order]: """ @@ -500,21 +500,14 @@ class SimulatorExecutor(BaseExecutor): raise NotImplementedError(f"This type of input is not supported") return order_it - def _update_order_num(self, order): - """update date and dealed order num in the day.""" + def _update_dealed_order_amount(self, order): + """update date and dealed order amount in the day.""" - now_deal_day = order.start_time.floor(freq="D") - if self.deal_day is None: + now_deal_day = self.trade_calendar.get_step_time()[0].floor(freq="D") + if self.deal_day is None or now_deal_day > self.deal_day: + self.dealed_order_amount = defaultdict(float) self.deal_day = now_deal_day - if now_deal_day > self.deal_day: - self.deal_order_num = {"buy": defaultdict(int), "sell": defaultdict(int)} - self.deal_day = now_deal_day - if order.direction == Order.BUY: - self.deal_order_num["buy"][order.stock_id] += order.deal_amount - elif order.direction == Order.SELL: - self.deal_order_num["sell"][order.stock_id] += order.deal_amount - else: - raise NotImplementedError(f"order type {order.type} error") + self.dealed_order_amount[order.stock_id] += order.deal_amount def _collect_data(self, trade_decision: BaseTradeDecision, level: int = 0): @@ -527,10 +520,10 @@ class SimulatorExecutor(BaseExecutor): trade_val, trade_cost, trade_price = self.trade_exchange.deal_order( order, trade_account=self.trade_account, - deal_order_num=self.deal_order_num, + dealed_order_amount=self.dealed_order_amount, ) execute_result.append((order, trade_val, trade_cost, trade_price)) - self._update_order_num(order) + self._update_dealed_order_amount(order) if self.verbose: print( "[I {:%Y-%m-%d %H:%M:%S}]: {} {}, price {:.2f}, amount {}, deal_amount {}, factor {}, value {:.2f}, cash {:.2f}.".format( diff --git a/qlib/contrib/ops/high_freq.py b/qlib/contrib/ops/high_freq.py index eee28c275..6f03b71cf 100644 --- a/qlib/contrib/ops/high_freq.py +++ b/qlib/contrib/ops/high_freq.py @@ -3,6 +3,7 @@ from pathlib import Path import numpy as np import pandas as pd +from datetime import datetime import qlib from qlib.data import D @@ -12,7 +13,9 @@ from qlib.data.ops import ElemOperator def get_calendar_day(freq="1min", future=False): - """Load High-Freq Calendar Date Using Memcache. + """ + Load High-Freq Calendar Date Using Memcache. + !!!NOTE: Loading the calendar is quite slow. So loading calendar before start multiprocessing will make it faster. Parameters ---------- @@ -36,20 +39,57 @@ def get_calendar_day(freq="1min", future=False): class DayCumsum(ElemOperator): - """DayLast Operator + """DayCumsum Operator during start time and end time. Parameters ---------- feature : Expression feature instance + start : str + the start time of backtest in one day. + !!!NOTE: "9:30" means the time period of (9:30, 9:31) is in transaction. + end : str + the end time of backtest in one day. + !!!NOTE: "14:59" means the time period of (14:59, 15:00) is in transaction, + but (15:00, 15:01) is not. + So start="9:30" and end="14:59" means trading all day. Returns ---------- feature: - a series of that each value equals the last value of its day + a series of that each value equals the cumsum value during start time and end time. + Otherwise, the value is zero. """ + def __init__(self, feature, start: str = "9:30", end: str = "14:59"): + self.feature = feature + self.start = datetime.strptime(start, "%H:%M") + self.end = datetime.strptime(end, "%H:%M") + + self.morning_open = datetime.strptime("9:30", "%H:%M") + self.morning_close = datetime.strptime("11:30", "%H:%M") + self.noon_open = datetime.strptime("13:00", "%H:%M") + self.noon_close = datetime.strptime("15:00", "%H:%M") + + self.start_id = self.time_to_index(self.start) + self.end_id = self.time_to_index(self.end) + + def time_to_index(self, t): + if t >= self.morning_open and t < self.morning_close: + return int((t - self.morning_open).total_seconds() / 60) + elif t >= self.noon_open and t < self.noon_close: + return int((t - self.noon_open).total_seconds() / 60) + 120 + else: + raise ValueError(f"{t} is not the opening time of the stock market") + + def period_cusum(self, df): + assert len(df) == 240 + df.iloc[0 : self.start_id] = 0 + df = df.cumsum() + df.iloc[self.end_id + 1 : 240] = 0 + return df + def _load_internal(self, instrument, start_index, end_index, freq): _calendar = get_calendar_day(freq=freq) series = self.feature.load(instrument, start_index, end_index, freq) - return series.groupby(_calendar[series.index]).cumsum() + return series.groupby(_calendar[series.index]).transform(self.period_cusum)