1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-03 02:50:58 +08:00

fix calculating base_price

This commit is contained in:
Young
2021-07-08 05:54:36 +00:00
parent 5c5379e09d
commit 32ae6e4259
4 changed files with 130 additions and 58 deletions

View File

@@ -3,6 +3,7 @@
import copy
from typing import Dict, List
from qlib.utils import init_instance_by_config
import warnings
import pandas as pd
@@ -248,7 +249,7 @@ class Account:
atomic: bool,
outer_trade_decision: BaseTradeDecision,
trade_info: list = None,
inner_order_indicators: Indicator = None,
inner_order_indicators: List[Dict[str, pd.Series]] = None,
indicator_config: dict = {},
):
"""update account at each trading bar step
@@ -292,10 +293,15 @@ class Account:
self.indicator.clear()
if atomic:
self.indicator.update_order_indicators(trade_start_time, trade_end_time, trade_info, trade_exchange)
self.indicator.update_order_indicators(trade_info)
else:
self.indicator.agg_order_indicators(
inner_order_indicators, indicator_config=indicator_config, outer_trade_decision=outer_trade_decision
trade_start_time,
trade_end_time,
inner_order_indicators,
outer_trade_decision=outer_trade_decision,
trade_exchange=trade_exchange,
indicator_config=indicator_config,
)
self.indicator.cal_trade_indicators(trade_start_time, self.freq, indicator_config)

View File

@@ -281,27 +281,27 @@ class Exchange:
return trade_val, trade_cost, trade_price
def get_quote_info(self, stock_id, start_time, end_time):
return resam_ts_data(self.quote[stock_id], start_time, end_time, method=ts_data_last)
def get_quote_info(self, stock_id, start_time, end_time, method=ts_data_last):
return resam_ts_data(self.quote[stock_id], start_time, end_time, method=method)
def get_close(self, stock_id, start_time, end_time):
return resam_ts_data(self.quote[stock_id]["$close"], start_time, end_time, method=ts_data_last)
def get_close(self, stock_id, start_time, end_time, method=ts_data_last):
return resam_ts_data(self.quote[stock_id]["$close"], start_time, end_time, method=method)
def get_volume(self, stock_id, start_time, end_time):
return resam_ts_data(self.quote[stock_id]["$volume"], start_time, end_time, method="sum")
def get_volume(self, stock_id, start_time, end_time, method="sum"):
return resam_ts_data(self.quote[stock_id]["$volume"], start_time, end_time, method=method)
def get_deal_price(self, stock_id, start_time, end_time, direction: OrderDir):
def get_deal_price(self, stock_id, start_time, end_time, direction: OrderDir, method=ts_data_last):
if direction == OrderDir.SELL:
pstr = self.sell_price
elif direction == OrderDir.BUY:
pstr = self.buy_price
else:
raise NotImplementedError(f"This type of input is not supported")
deal_price = resam_ts_data(self.quote[stock_id][pstr], start_time, end_time, method=ts_data_last)
if np.isclose(deal_price, 0.0) or np.isnan(deal_price):
deal_price = resam_ts_data(self.quote[stock_id][pstr], start_time, end_time, method=method)
if method is not None and (np.isclose(deal_price, 0.0) or np.isnan(deal_price)):
self.logger.warning(f"(stock_id:{stock_id}, trade_time:{(start_time, end_time)}, {pstr}): {deal_price}!!!")
self.logger.warning(f"setting deal_price to close price")
deal_price = self.get_close(stock_id, start_time, end_time)
deal_price = self.get_close(stock_id, start_time, end_time, method)
return deal_price
def get_factor(self, stock_id, start_time, end_time) -> Union[float, None]:

View File

@@ -93,7 +93,10 @@ class Order:
if isinstance(direction, OrderDir):
return direction
elif isinstance(direction, (int, float, np.integer, np.floating)):
return OrderDir(int(direction))
if direction > 0:
return Order.BUY
else:
return Order.SELL
elif isinstance(direction, str):
dl = direction.lower()
if dl.strip() == "sell":

View File

@@ -4,9 +4,11 @@
from collections import OrderedDict
from logging import warning
from typing import List
from qlib.backtest.order import BaseTradeDecision, Order
from qlib.backtest.exchange import Exchange
from typing import Dict, List
from qlib.backtest.order import BaseTradeDecision, Order, OrderDir
import pandas as pd
import numpy as np
import pathlib
import warnings
from pandas.core import groupby
@@ -221,6 +223,33 @@ class Report:
class Indicator:
"""
`Indicator` is implemented in a aggregate way.
All the metrics are calculated aggregately.
All the metrics are calculated for a seperated stock and in a specific step on a specific level.
| indicator | desc. |
|--------------+--------------------------------------------------------------|
| amount | the *target* amount given by the outer strategy |
| inner_amount | the total *target* amount of inner strategy |
| trade_price | the average deal price |
| trade_value | the total trade value |
| trade_cost | the total trade cost (base price need drection) |
| trade_dir | the trading direction |
| ffr | full fill rate |
| pa | price advantage |
| pos | win rate |
| base_price | the price of baseline |
| base_volume | the volume of baseline (for weighted aggregating base_price) |
**NOTE**:
The `base_price` and `base_volume` can't be NaN when there are not trading on that step. Otherwise
aggregating get wrong results.
So `base_price` will not be calculated in a aggregate way!!
"""
def __init__(self):
self.order_indicator_his = OrderedDict()
self.order_indicator = OrderedDict()
@@ -241,6 +270,7 @@ class Indicator:
trade_price = dict()
trade_value = dict()
trade_cost = dict()
trade_dir = dict()
for order, _trade_val, _trade_cost, _trade_price in trade_info:
amount[order.stock_id] = order.amount_delta
@@ -248,36 +278,32 @@ class Indicator:
trade_price[order.stock_id] = _trade_price
trade_value[order.stock_id] = _trade_val * order.sign
trade_cost[order.stock_id] = _trade_cost
trade_dir[order.stock_id] = order.direction
self.order_indicator["amount"] = self.order_indicator["inner_amount"] = pd.Series(amount)
self.order_indicator["deal_amount"] = pd.Series(deal_amount)
# NOTE: trade_price and baseline price will be same on the lowest-level
self.order_indicator["trade_price"] = pd.Series(trade_price)
self.order_indicator["trade_value"] = pd.Series(trade_value)
self.order_indicator["trade_cost"] = pd.Series(trade_cost)
self.order_indicator["trade_dir"] = pd.Series(trade_dir)
def _update_order_fulfill_rate(self):
self.order_indicator["ffr"] = self.order_indicator["deal_amount"] / self.order_indicator["amount"]
def _update_order_price_advantage(self, trade_exchange, trade_start_time, trade_end_time):
self.order_indicator["base_price"] = self.order_indicator["trade_price"]
instruments = list(self.order_indicator["base_price"].index)
self.order_indicator["volume"] = pd.Series(
[
trade_exchange.get_volume(stock_id=inst, start_time=trade_start_time, end_time=trade_end_time)
for inst in instruments
],
index=instruments,
)
self.order_indicator["pa"] = (
self.order_indicator["trade_price"] - self.order_indicator["base_price"]
) / self.order_indicator["base_price"]
def _update_order_price_advantage(self):
# NOTE:
# trade_price and baseline price will be same on the lowest-level
# So Pa should be 0
self.order_indicator["pa"] = 0
def _agg_order_trade_info(self, inner_order_indicators):
def _agg_order_trade_info(self, inner_order_indicators: List[Dict[str, pd.Series]]):
inner_amount = pd.Series()
deal_amount = pd.Series()
trade_price = pd.Series()
trade_value = pd.Series()
trade_cost = pd.Series()
trade_dir = pd.Series()
for _order_indicator in inner_order_indicators:
inner_amount = inner_amount.add(_order_indicator["inner_amount"], fill_value=0)
deal_amount = deal_amount.add(_order_indicator["deal_amount"], fill_value=0)
@@ -286,6 +312,9 @@ class Indicator:
)
trade_value = trade_value.add(_order_indicator["trade_value"], fill_value=0)
trade_cost = trade_cost.add(_order_indicator["trade_cost"], fill_value=0)
trade_dir = trade_dir.add(_order_indicator["trade_dir"])
trade_dir = trade_dir.apply(Order.parse_dir)
self.order_indicator["inner_amount"] = inner_amount
self.order_indicator["deal_amount"] = deal_amount
@@ -293,6 +322,7 @@ class Indicator:
self.order_indicator["trade_price"] = trade_price
self.order_indicator["trade_value"] = trade_value
self.order_indicator["trade_cost"] = trade_cost
self.order_indicator["trade_dir"] = trade_dir
def _update_trade_amount(self, outer_trade_decision: BaseTradeDecision):
# NOTE: these indicator is designed for order execution, so the
@@ -305,34 +335,59 @@ class Indicator:
def _agg_order_fulfill_rate(self):
self.order_indicator["ffr"] = self.order_indicator["deal_amount"] / self.order_indicator["amount"]
def _agg_order_price_advantage(self, inner_order_indicators, base_price="twap"):
base_price = base_price.lower()
volume = pd.Series()
for _order_indicator in inner_order_indicators:
volume = volume.add(_order_indicator["volume"], fill_value=0)
self.order_indicator["volume"] = volume
def _agg_order_price_advantage(
self,
inner_order_indicators: List[Dict[str, pd.Series]],
trade_start_time: pd.Timestamp,
trade_end_time: pd.Timestamp,
trade_exchange: Exchange,
pa_config: dict = {},
):
"""
if base_price == "twap":
base_price = pd.Series()
price_count = pd.Series()
for _order_indicator in inner_order_indicators:
base_price = base_price.add(_order_indicator["base_price"], fill_value=0)
price_count = price_count.add(pd.Series(1, index=_order_indicator["base_price"].index), fill_value=0)
base_price /= price_count
self.order_indicator["base_price"] = base_price
Parameters
----------
inner_order_indicators : List[Dict[str, pd.Series]]
the indicators of account of inner executor
trade_start_time : pd.Timestamp
the start_time of the trade period, for slicing
trade_end_time : pd.Timestamp
the end_time of the trade period, for slicing (so it may include more time at the end)
trade_exchange : Exchange
for retrieving trading price
pa_config : dict
For example
{
"agg": "twap", # "vwap"
"price": "$close", # TODO: this is not supported now!!!!!
# default to use deal price of the exchange
}
"""
elif base_price == "vwap":
base_price = pd.Series()
for _order_indicator in inner_order_indicators:
base_price = base_price.add(_order_indicator["base_price"] * _order_indicator["volume"], fill_value=0)
base_price /= self.order_indicator["volume"]
self.order_indicator["base_price"] = base_price
agg = pa_config.get("agg", "twap").lower()
price = pa_config.get("price", "deal_price").lower()
else:
raise ValueError(f"base_price {base_price} is not supported!")
base_price = {}
for inst, dir in self.order_indicator["trade_dir"].items():
self.order_indicator["pa"] = self.order_indicator["trade_price"] / self.order_indicator["base_price"] - 1
# print("trade_price", self.order_indicator["trade_price"], "base_price", self.order_indicator["base_price"], "pa", self.order_indicator["pa"]* (2 * (self.order_indicator["amount"] < 0).astype(int) - 1))
if price == "deal_price":
price_s = trade_exchange.get_deal_price(inst, trade_start_time, trade_end_time, dir, method=None)
else:
raise NotImplementedError(f"This type of input is not supported")
# there are some zeros in the trading price. These cases are known meaningless
price_s = price_s.mask(np.isclose(price_s, 0))
if agg == "vwap":
volume_s = trade_exchange.get_volume(inst, trade_start_time, trade_end_time, method=None)
base_price[inst] = ((price_s * volume_s).sum() / volume_s.sum()).item()
elif agg == "twap":
base_price[inst] = price_s.mean().item()
base_price = pd.Series(base_price)
# update PA
self.order_indicator["pa"] = self.order_indicator["trade_price"] / base_price - 1
def _cal_trade_fulfill_rate(self, method="mean"):
if method == "mean":
@@ -372,19 +427,27 @@ class Indicator:
def _cal_trade_order_count(self):
return self.order_indicator["amount"].count()
def update_order_indicators(self, trade_start_time, trade_end_time, trade_info, trade_exchange):
def update_order_indicators(self, trade_info: list):
self._update_order_trade_info(trade_info=trade_info)
self._update_order_fulfill_rate()
self._update_order_price_advantage(trade_exchange, trade_start_time, trade_end_time)
self._update_order_price_advantage()
def agg_order_indicators(
self, inner_order_indicators, outer_trade_decision: BaseTradeDecision, indicator_config={}
self,
trade_start_time,
trade_end_time,
inner_order_indicators: List[Dict[str, pd.Series]],
outer_trade_decision: BaseTradeDecision,
trade_exchange: Exchange,
indicator_config={},
):
self._agg_order_trade_info(inner_order_indicators)
self._update_trade_amount(outer_trade_decision)
self._agg_order_fulfill_rate()
pa_config = indicator_config.get("pa_config", {})
self._agg_order_price_advantage(inner_order_indicators, base_price=pa_config.get("base_price", "twap"))
self._agg_order_price_advantage(
inner_order_indicators, trade_start_time, trade_end_time, trade_exchange, pa_config=pa_config
)
def cal_trade_indicators(self, trade_start_time, freq, indicator_config={}):
show_indicator = indicator_config.get("show_indicator", False)