From 32ae6e42597bb3f64523d42255c116bcbc1524ab Mon Sep 17 00:00:00 2001 From: Young Date: Thu, 8 Jul 2021 05:54:36 +0000 Subject: [PATCH] fix calculating base_price --- qlib/backtest/account.py | 12 ++- qlib/backtest/exchange.py | 20 ++--- qlib/backtest/order.py | 5 +- qlib/backtest/report.py | 151 +++++++++++++++++++++++++++----------- 4 files changed, 130 insertions(+), 58 deletions(-) diff --git a/qlib/backtest/account.py b/qlib/backtest/account.py index b394d5823..67f7b056a 100644 --- a/qlib/backtest/account.py +++ b/qlib/backtest/account.py @@ -3,6 +3,7 @@ import copy +from typing import Dict, List from qlib.utils import init_instance_by_config import warnings import pandas as pd @@ -248,7 +249,7 @@ class Account: atomic: bool, outer_trade_decision: BaseTradeDecision, trade_info: list = None, - inner_order_indicators: Indicator = None, + inner_order_indicators: List[Dict[str, pd.Series]] = None, indicator_config: dict = {}, ): """update account at each trading bar step @@ -292,10 +293,15 @@ class Account: self.indicator.clear() if atomic: - self.indicator.update_order_indicators(trade_start_time, trade_end_time, trade_info, trade_exchange) + self.indicator.update_order_indicators(trade_info) else: self.indicator.agg_order_indicators( - inner_order_indicators, indicator_config=indicator_config, outer_trade_decision=outer_trade_decision + trade_start_time, + trade_end_time, + inner_order_indicators, + outer_trade_decision=outer_trade_decision, + trade_exchange=trade_exchange, + indicator_config=indicator_config, ) self.indicator.cal_trade_indicators(trade_start_time, self.freq, indicator_config) diff --git a/qlib/backtest/exchange.py b/qlib/backtest/exchange.py index 26fae378f..3794651dc 100644 --- a/qlib/backtest/exchange.py +++ b/qlib/backtest/exchange.py @@ -281,27 +281,27 @@ class Exchange: return trade_val, trade_cost, trade_price - def get_quote_info(self, stock_id, start_time, end_time): - return resam_ts_data(self.quote[stock_id], start_time, end_time, method=ts_data_last) + def get_quote_info(self, stock_id, start_time, end_time, method=ts_data_last): + return resam_ts_data(self.quote[stock_id], start_time, end_time, method=method) - def get_close(self, stock_id, start_time, end_time): - return resam_ts_data(self.quote[stock_id]["$close"], start_time, end_time, method=ts_data_last) + def get_close(self, stock_id, start_time, end_time, method=ts_data_last): + return resam_ts_data(self.quote[stock_id]["$close"], start_time, end_time, method=method) - def get_volume(self, stock_id, start_time, end_time): - return resam_ts_data(self.quote[stock_id]["$volume"], start_time, end_time, method="sum") + def get_volume(self, stock_id, start_time, end_time, method="sum"): + return resam_ts_data(self.quote[stock_id]["$volume"], start_time, end_time, method=method) - def get_deal_price(self, stock_id, start_time, end_time, direction: OrderDir): + def get_deal_price(self, stock_id, start_time, end_time, direction: OrderDir, method=ts_data_last): if direction == OrderDir.SELL: pstr = self.sell_price elif direction == OrderDir.BUY: pstr = self.buy_price else: raise NotImplementedError(f"This type of input is not supported") - deal_price = resam_ts_data(self.quote[stock_id][pstr], start_time, end_time, method=ts_data_last) - if np.isclose(deal_price, 0.0) or np.isnan(deal_price): + deal_price = resam_ts_data(self.quote[stock_id][pstr], start_time, end_time, method=method) + if method is not None and (np.isclose(deal_price, 0.0) or np.isnan(deal_price)): self.logger.warning(f"(stock_id:{stock_id}, trade_time:{(start_time, end_time)}, {pstr}): {deal_price}!!!") self.logger.warning(f"setting deal_price to close price") - deal_price = self.get_close(stock_id, start_time, end_time) + deal_price = self.get_close(stock_id, start_time, end_time, method) return deal_price def get_factor(self, stock_id, start_time, end_time) -> Union[float, None]: diff --git a/qlib/backtest/order.py b/qlib/backtest/order.py index 1953426fd..20c97aa90 100644 --- a/qlib/backtest/order.py +++ b/qlib/backtest/order.py @@ -93,7 +93,10 @@ class Order: if isinstance(direction, OrderDir): return direction elif isinstance(direction, (int, float, np.integer, np.floating)): - return OrderDir(int(direction)) + if direction > 0: + return Order.BUY + else: + return Order.SELL elif isinstance(direction, str): dl = direction.lower() if dl.strip() == "sell": diff --git a/qlib/backtest/report.py b/qlib/backtest/report.py index ce2812bd0..43a6a455b 100644 --- a/qlib/backtest/report.py +++ b/qlib/backtest/report.py @@ -4,9 +4,11 @@ from collections import OrderedDict from logging import warning -from typing import List -from qlib.backtest.order import BaseTradeDecision, Order +from qlib.backtest.exchange import Exchange +from typing import Dict, List +from qlib.backtest.order import BaseTradeDecision, Order, OrderDir import pandas as pd +import numpy as np import pathlib import warnings from pandas.core import groupby @@ -221,6 +223,33 @@ class Report: class Indicator: + """ + `Indicator` is implemented in a aggregate way. + All the metrics are calculated aggregately. + All the metrics are calculated for a seperated stock and in a specific step on a specific level. + + | indicator | desc. | + |--------------+--------------------------------------------------------------| + | amount | the *target* amount given by the outer strategy | + | inner_amount | the total *target* amount of inner strategy | + | trade_price | the average deal price | + | trade_value | the total trade value | + | trade_cost | the total trade cost (base price need drection) | + | trade_dir | the trading direction | + | ffr | full fill rate | + | pa | price advantage | + | pos | win rate | + | base_price | the price of baseline | + | base_volume | the volume of baseline (for weighted aggregating base_price) | + + **NOTE**: + The `base_price` and `base_volume` can't be NaN when there are not trading on that step. Otherwise + aggregating get wrong results. + + So `base_price` will not be calculated in a aggregate way!! + + """ + def __init__(self): self.order_indicator_his = OrderedDict() self.order_indicator = OrderedDict() @@ -241,6 +270,7 @@ class Indicator: trade_price = dict() trade_value = dict() trade_cost = dict() + trade_dir = dict() for order, _trade_val, _trade_cost, _trade_price in trade_info: amount[order.stock_id] = order.amount_delta @@ -248,36 +278,32 @@ class Indicator: trade_price[order.stock_id] = _trade_price trade_value[order.stock_id] = _trade_val * order.sign trade_cost[order.stock_id] = _trade_cost + trade_dir[order.stock_id] = order.direction self.order_indicator["amount"] = self.order_indicator["inner_amount"] = pd.Series(amount) self.order_indicator["deal_amount"] = pd.Series(deal_amount) + # NOTE: trade_price and baseline price will be same on the lowest-level self.order_indicator["trade_price"] = pd.Series(trade_price) self.order_indicator["trade_value"] = pd.Series(trade_value) self.order_indicator["trade_cost"] = pd.Series(trade_cost) + self.order_indicator["trade_dir"] = pd.Series(trade_dir) def _update_order_fulfill_rate(self): self.order_indicator["ffr"] = self.order_indicator["deal_amount"] / self.order_indicator["amount"] - def _update_order_price_advantage(self, trade_exchange, trade_start_time, trade_end_time): - self.order_indicator["base_price"] = self.order_indicator["trade_price"] - instruments = list(self.order_indicator["base_price"].index) - self.order_indicator["volume"] = pd.Series( - [ - trade_exchange.get_volume(stock_id=inst, start_time=trade_start_time, end_time=trade_end_time) - for inst in instruments - ], - index=instruments, - ) - self.order_indicator["pa"] = ( - self.order_indicator["trade_price"] - self.order_indicator["base_price"] - ) / self.order_indicator["base_price"] + def _update_order_price_advantage(self): + # NOTE: + # trade_price and baseline price will be same on the lowest-level + # So Pa should be 0 + self.order_indicator["pa"] = 0 - def _agg_order_trade_info(self, inner_order_indicators): + def _agg_order_trade_info(self, inner_order_indicators: List[Dict[str, pd.Series]]): inner_amount = pd.Series() deal_amount = pd.Series() trade_price = pd.Series() trade_value = pd.Series() trade_cost = pd.Series() + trade_dir = pd.Series() for _order_indicator in inner_order_indicators: inner_amount = inner_amount.add(_order_indicator["inner_amount"], fill_value=0) deal_amount = deal_amount.add(_order_indicator["deal_amount"], fill_value=0) @@ -286,6 +312,9 @@ class Indicator: ) trade_value = trade_value.add(_order_indicator["trade_value"], fill_value=0) trade_cost = trade_cost.add(_order_indicator["trade_cost"], fill_value=0) + trade_dir = trade_dir.add(_order_indicator["trade_dir"]) + + trade_dir = trade_dir.apply(Order.parse_dir) self.order_indicator["inner_amount"] = inner_amount self.order_indicator["deal_amount"] = deal_amount @@ -293,6 +322,7 @@ class Indicator: self.order_indicator["trade_price"] = trade_price self.order_indicator["trade_value"] = trade_value self.order_indicator["trade_cost"] = trade_cost + self.order_indicator["trade_dir"] = trade_dir def _update_trade_amount(self, outer_trade_decision: BaseTradeDecision): # NOTE: these indicator is designed for order execution, so the @@ -305,34 +335,59 @@ class Indicator: def _agg_order_fulfill_rate(self): self.order_indicator["ffr"] = self.order_indicator["deal_amount"] / self.order_indicator["amount"] - def _agg_order_price_advantage(self, inner_order_indicators, base_price="twap"): - base_price = base_price.lower() - volume = pd.Series() - for _order_indicator in inner_order_indicators: - volume = volume.add(_order_indicator["volume"], fill_value=0) - self.order_indicator["volume"] = volume + def _agg_order_price_advantage( + self, + inner_order_indicators: List[Dict[str, pd.Series]], + trade_start_time: pd.Timestamp, + trade_end_time: pd.Timestamp, + trade_exchange: Exchange, + pa_config: dict = {}, + ): + """ - if base_price == "twap": - base_price = pd.Series() - price_count = pd.Series() - for _order_indicator in inner_order_indicators: - base_price = base_price.add(_order_indicator["base_price"], fill_value=0) - price_count = price_count.add(pd.Series(1, index=_order_indicator["base_price"].index), fill_value=0) - base_price /= price_count - self.order_indicator["base_price"] = base_price + Parameters + ---------- + inner_order_indicators : List[Dict[str, pd.Series]] + the indicators of account of inner executor + trade_start_time : pd.Timestamp + the start_time of the trade period, for slicing + trade_end_time : pd.Timestamp + the end_time of the trade period, for slicing (so it may include more time at the end) + trade_exchange : Exchange + for retrieving trading price + pa_config : dict + For example + { + "agg": "twap", # "vwap" + "price": "$close", # TODO: this is not supported now!!!!! + # default to use deal price of the exchange + } + """ - elif base_price == "vwap": - base_price = pd.Series() - for _order_indicator in inner_order_indicators: - base_price = base_price.add(_order_indicator["base_price"] * _order_indicator["volume"], fill_value=0) - base_price /= self.order_indicator["volume"] - self.order_indicator["base_price"] = base_price + agg = pa_config.get("agg", "twap").lower() + price = pa_config.get("price", "deal_price").lower() - else: - raise ValueError(f"base_price {base_price} is not supported!") + base_price = {} + for inst, dir in self.order_indicator["trade_dir"].items(): - self.order_indicator["pa"] = self.order_indicator["trade_price"] / self.order_indicator["base_price"] - 1 - # print("trade_price", self.order_indicator["trade_price"], "base_price", self.order_indicator["base_price"], "pa", self.order_indicator["pa"]* (2 * (self.order_indicator["amount"] < 0).astype(int) - 1)) + if price == "deal_price": + price_s = trade_exchange.get_deal_price(inst, trade_start_time, trade_end_time, dir, method=None) + else: + raise NotImplementedError(f"This type of input is not supported") + + # there are some zeros in the trading price. These cases are known meaningless + price_s = price_s.mask(np.isclose(price_s, 0)) + + if agg == "vwap": + volume_s = trade_exchange.get_volume(inst, trade_start_time, trade_end_time, method=None) + base_price[inst] = ((price_s * volume_s).sum() / volume_s.sum()).item() + elif agg == "twap": + base_price[inst] = price_s.mean().item() + + base_price = pd.Series(base_price) + + # update PA + self.order_indicator["pa"] = self.order_indicator["trade_price"] / base_price - 1 def _cal_trade_fulfill_rate(self, method="mean"): if method == "mean": @@ -372,19 +427,27 @@ class Indicator: def _cal_trade_order_count(self): return self.order_indicator["amount"].count() - def update_order_indicators(self, trade_start_time, trade_end_time, trade_info, trade_exchange): + def update_order_indicators(self, trade_info: list): self._update_order_trade_info(trade_info=trade_info) self._update_order_fulfill_rate() - self._update_order_price_advantage(trade_exchange, trade_start_time, trade_end_time) + self._update_order_price_advantage() def agg_order_indicators( - self, inner_order_indicators, outer_trade_decision: BaseTradeDecision, indicator_config={} + self, + trade_start_time, + trade_end_time, + inner_order_indicators: List[Dict[str, pd.Series]], + outer_trade_decision: BaseTradeDecision, + trade_exchange: Exchange, + indicator_config={}, ): self._agg_order_trade_info(inner_order_indicators) self._update_trade_amount(outer_trade_decision) self._agg_order_fulfill_rate() pa_config = indicator_config.get("pa_config", {}) - self._agg_order_price_advantage(inner_order_indicators, base_price=pa_config.get("base_price", "twap")) + self._agg_order_price_advantage( + inner_order_indicators, trade_start_time, trade_end_time, trade_exchange, pa_config=pa_config + ) def cal_trade_indicators(self, trade_start_time, freq, indicator_config={}): show_indicator = indicator_config.get("show_indicator", False)