From 8eb7a1fddcd2c771208fae7f50b0cc3a8a186d8b Mon Sep 17 00:00:00 2001 From: "wangwenxi.handsome" Date: Tue, 17 Aug 2021 07:15:45 +0000 Subject: [PATCH] numpy_order_indicator --- qlib/backtest/high_performance_ds.py | 267 +++++++++++++++++++++++++-- qlib/backtest/report.py | 9 +- 2 files changed, 255 insertions(+), 21 deletions(-) diff --git a/qlib/backtest/high_performance_ds.py b/qlib/backtest/high_performance_ds.py index f2bdbe651..0f47c7df1 100644 --- a/qlib/backtest/high_performance_ds.py +++ b/qlib/backtest/high_performance_ds.py @@ -3,8 +3,9 @@ import logging +from pandas._config.config import is_instance_factory from qlib.data.base import Feature -from typing import List, Text, Tuple, Union, Callable, Iterable, Dict +from typing import List, Text, Tuple, Union, Callable, Iterable, Dict, ValuesView from collections import OrderedDict import inspect @@ -135,8 +136,8 @@ class NumpyQuote(BaseQuote): the init dataframe from qlib. Variables - self.data: Dict[stock_id, np.array] - each stock has one two-dimensional np.array to represent data. + self.data: Dict[stock_id, np.ndarray] + each stock has one two-dimensional np.ndarray to represent data. self.columns: Dict[str, int] map column name to column id in self.data. self.dates: Dict[stock_id, Dict[pd.Timestap, int]] @@ -144,6 +145,7 @@ class NumpyQuote(BaseQuote): self.dates_list: Dict[stock_id, List[pd.Timestap]] the dates of each stock for searching. """ + super().__init__(quote_df=quote_df) # init data columns = quote_df.columns.values @@ -156,6 +158,7 @@ class NumpyQuote(BaseQuote): def _to_numpy(self, quote_df): """convert dataframe to numpy.""" + quote_dict = {} date_dict = {} date_list = {} @@ -201,12 +204,13 @@ class NumpyQuote(BaseQuote): # result lru if len(self.muti_lru) >= self.max_lru_len: - self.muti_lru = self.muti_lru[64:] + self.muti_lru.clear() self.muti_lru[(stock_id, start_time, end_time, fields, method)] = agg_stock_data return agg_stock_data def _agg_data(self, data, method): """Agg data by specific method.""" + if method == "sum": return data.sum() if method == "mean": @@ -238,6 +242,7 @@ class NumpyQuote(BaseQuote): bool True means one piece of data to obtaine. """ + if end_time - start_time < np.timedelta64(1, "m"): return True if start_time.hour == 11 and start_time.minute == 29 and start_time.second == 0: @@ -393,23 +398,21 @@ class BaseOrderIndicator: @staticmethod def sum_all_indicators( - indicators: list, metrics: Union[str, List[str]], fill_value: float = None - ) -> Dict[str, BaseSingleMetric]: + cls, indicators: list, metrics: Union[str, List[str]], fill_value: float = None + ): """sum indicators with the same metrics. + and assign to the cls(BaseOrderIndicator). Parameters ---------- + cls : BaseOrderIndicator + the order indicator to assign. indicators : List[BaseOrderIndicator] the list of all inner indicators. metrics : Union[str, List[str]] all metrics needs ot be sumed. fill_value : float, optional fill np.NaN with value. By default None. - - Return - ---------- - Dict[str: PandasSingleMetric] - a dict of metric name and data. """ pass @@ -567,17 +570,249 @@ class PandasOrderIndicator(BaseOrderIndicator): @staticmethod def sum_all_indicators( - indicators: list, metrics: Union[str, List[str]], fill_value=None - ) -> Dict[str, PandasSingleMetric]: - metric_dict = {} + cls, indicators: list, metrics: Union[str, List[str]], fill_value=None + ): if isinstance(metrics, str): metrics = [metrics] for metric in metrics: tmp_metric = PandasSingleMetric({}) for indicator in indicators: tmp_metric = tmp_metric.add(indicator.data[metric], fill_value) - metric_dict[metric] = tmp_metric.metric - return metric_dict + cls.assign(metric, tmp_metric.metric) def to_series(self): return {k: v.metric for k, v in self.data.items()} + + +class NumpySingleMetric(BaseSingleMetric): + def __init__(self, metric: np.ndarray): + self.metric = metric + + def __add__(self, other): + if isinstance(other, (int, float)): + return NumpySingleMetric(self.metric + other) + elif isinstance(other, NumpySingleMetric): + return NumpySingleMetric(self.metric + other.metric) + else: + return NotImplemented + + def __sub__(self, other): + if isinstance(other, (int, float)): + return NumpySingleMetric(self.metric - other) + elif isinstance(other, NumpySingleMetric): + return NumpySingleMetric(self.metric - other.metric) + else: + return NotImplemented + + def __rsub__(self, other): + if isinstance(other, (int, float)): + return NumpySingleMetric(other - self.metric) + elif isinstance(other, NumpySingleMetric): + return NumpySingleMetric(other.metric - self.metric) + else: + return NotImplemented + + def __mul__(self, other): + if isinstance(other, (int, float)): + return NumpySingleMetric(self.metric * other) + elif isinstance(other, NumpySingleMetric): + return NumpySingleMetric(self.metric * other.metric) + else: + return NotImplemented + + def __truediv__(self, other): + if isinstance(other, (int, float)): + return NumpySingleMetric(self.metric / other) + elif isinstance(other, NumpySingleMetric): + return NumpySingleMetric(self.metric / other.metric) + else: + return NotImplemented + + def __eq__(self, other): + if isinstance(other, (int, float)): + return NumpySingleMetric(self.metric == other) + elif isinstance(other, NumpySingleMetric): + return NumpySingleMetric(self.metric == other.metric) + else: + return NotImplemented + + def __gt__(self, other): + if isinstance(other, (int, float)): + return NumpySingleMetric(self.metric > other) + elif isinstance(other, NumpySingleMetric): + return NumpySingleMetric(self.metric > other.metric) + else: + return NotImplemented + + def __lt__(self, other): + if isinstance(other, (int, float)): + return NumpySingleMetric(self.metric < other) + elif isinstance(other, NumpySingleMetric): + return NumpySingleMetric(self.metric < other.metric) + else: + return NotImplemented + + def __len__(self): + return len(self.metric) + + def sum(self): + return self.metric.sum() + + def mean(self): + return self.metric.mean() + + def count(self): + return len(self.metric[~np.isnan(self.metric)]) + + def abs(self): + return NumpySingleMetric(np.absolute(self.metric)) + + def astype(self, type): + return NumpySingleMetric(self.metric.astype(type)) + + @property + def empty(self): + return len(self.metric) == 0 + + def replace(self, replace_dict: dict): + tmp_metric = self.metric.copy() + for num in replace_dict: + tmp_metric[tmp_metric == num] = replace_dict[num] + return NumpySingleMetric(tmp_metric) + + def apply(self, func: Callable): + tmp_metric = self.metric.copy() + for i in range(len(tmp_metric)): + tmp_metric[i] = func(tmp_metric[i]) + return NumpySingleMetric(tmp_metric) + + +class NumpyOrderIndicator(BaseOrderIndicator): + # all metrics + ROW = [ + "amount", + "deal_amount", + "inner_amount", + "trade_price", + "trade_value", + "trade_cost", + "trade_dir", + "ffr", + "pa", + "pos", + "base_price", + "base_volume", + ] + ROW_MAP = dict(zip(ROW, range(len(ROW)))) + + def __init__(self): + self.row_tag = [0 for tag in range(len(NumpyOrderIndicator.ROW))] + self.data = None + + def assign(self, col: str, metric: Union[dict, np.ndarray, pd.Series]): + if col not in NumpyOrderIndicator.ROW: + raise ValueError(f"{col} metric is not supoorted") + if not isinstance(metric, (dict, np.ndarray, pd.Series)): + raise ValueError(f"metric must be dict, pd.Series or np.ndarray") + if isinstance(metric, (pd.Series, np.ndarray)) and self.data is None: + raise ValueError(f"data can not be None when metric is np.ndarray or pd.Series") + + # if data is None, init numpy ndarray + if self.data is None: + self.data = np.zeros((len(NumpyOrderIndicator.ROW), len(metric))) + self.column = list(metric.keys()) + self.column_map = dict(zip(self.column, range(len(self.column)))) + + metric_column = list(metric.keys()) + if self.column != metric_column: + assert len(set(self.column) - set(metric_column)) == 0 + # modify the order + tmp_metric = {} + for column in self.column: + tmp_metric[column] = metric[column] + metric = tmp_metric + + # assign data + self.row_tag[NumpyOrderIndicator.ROW_MAP[col]] = 1 + if isinstance(metric, dict): + self.data[NumpyOrderIndicator.ROW_MAP[col]] = list(metric.values()) + elif isinstance(metric, np.ndarray): + self.data[NumpyOrderIndicator.ROW_MAP[col]] = metric + elif isinstance(metric, pd.Series): + self.data[NumpyOrderIndicator.ROW_MAP[col]] = metric.values + + def transfer(self, func: Callable, new_col: str = None) -> Union[None, NumpySingleMetric]: + func_sig = inspect.signature(func).parameters.keys() + func_kwargs = {} + for sig in func_sig: + if self._if_valid_metric(sig): + func_kwargs[sig] = NumpySingleMetric(self.data[NumpyOrderIndicator.ROW_MAP[sig]]) + else: + print(f"{sig} is not assigned") + func_kwargs[sig] = NumpySingleMetric(np.array([])) + tmp_metric = func(**func_kwargs) + if new_col is not None: + self.row_tag[NumpyOrderIndicator.ROW_MAP[new_col]] = 1 + self.data[NumpyOrderIndicator.ROW_MAP[new_col]] = tmp_metric.metric + else: + return tmp_metric + + def get_metric_series(self, metric: str) -> Union[pd.Series]: + if self._if_valid_metric(metric): + return pd.Series(self.data[NumpyOrderIndicator.ROW_MAP[metric]], index=self.column) + else: + return pd.Series() + + def to_series(self) -> Dict[str, pd.Series]: + tmp_metric_dict = {} + for metric in NumpyOrderIndicator.ROW: + tmp_metric_dict[metric] = self.get_metric_series(metric) + return tmp_metric_dict + + def _if_valid_metric(self, metric): + if metric in NumpyOrderIndicator.ROW and self.row_tag[NumpyOrderIndicator.ROW_MAP[metric]] == 1: + return True + else: + return False + + @staticmethod + def sum_all_indicators( + cls, indicators: list, metrics: Union[str, List[str]], fill_value=None + ) -> Dict[str, NumpySingleMetric]: + # metrics is all metrics to add + # metrics_id means the index in the NumpyOrderIndicator.ROW for metrics. + if isinstance(metrics, str): + metrics = [metrics] + metrics_id = [NumpyOrderIndicator.ROW_MAP[metric] for metric in metrics] + + # get all stock_id and all metric data + stocks = set() + indicator_metrics = [] + for indicator in indicators: + stocks = stocks | set(indicator.column) + indicator_metrics.append(indicator.data[metrics_id, :].copy()) + stocks = list(stocks) + stocks_map = dict(zip(stocks, range(len(stocks)))) + + # fill value + if fill_value is not None: + base_metrics = fill_value * np.ones((len(metrics), len(stocks))) + for i in range(len(indicators)): + tmp_netrics = base_metrics.copy() + stocks_index = [stocks_map[stock] for stock in indicators[i].column] + tmp_netrics[:, stocks_index] = indicator_metrics[i] + indicator_metrics[i] = tmp_netrics + else: + raise ValueError(f"fill value can not be None in NumpyOrderIndicator") + + # add metric and assign to cls + metric_sum = sum(indicator_metrics) + if cls.data is not None: + raise ValueError(f"this function must assign to an empty order indicator") + cls.data = np.zeros((len(NumpyOrderIndicator.ROW), len(stocks))) + cls.column = stocks + cls.column_map = dict(zip(stocks, range(len(stocks)))) + for i in range(len(metrics)): + cls.row_tag[NumpyOrderIndicator.ROW_MAP[metrics[i]]] = 1 + cls.data[NumpyOrderIndicator.ROW_MAP[metrics[i]]] = metric_sum[i] + diff --git a/qlib/backtest/report.py b/qlib/backtest/report.py index dea72e46a..21e5986ea 100644 --- a/qlib/backtest/report.py +++ b/qlib/backtest/report.py @@ -16,7 +16,7 @@ from qlib.backtest.exchange import Exchange from qlib.backtest.order import BaseTradeDecision, Order, OrderDir from qlib.backtest.utils import TradeCalendarManager -from .high_performance_ds import PandasOrderIndicator +from .high_performance_ds import PandasOrderIndicator, NumpyOrderIndicator from ..data import D from ..tests.config import CSI300_BENCH from ..utils.resam import get_higher_eq_freq_feature, resam_ts_data @@ -236,6 +236,7 @@ class Indicator: | indicator | desc. | |--------------+--------------------------------------------------------------| | amount | the *target* amount given by the outer strategy | + | deal_amount | the real deal amount | | inner_amount | the total *target* amount of inner strategy | | trade_price | the average deal price | | trade_value | the total trade value | @@ -255,7 +256,7 @@ class Indicator: """ - def __init__(self, order_indicator_cls=PandasOrderIndicator): + def __init__(self, order_indicator_cls=NumpyOrderIndicator): self.order_indicator_cls = order_indicator_cls # order indicator is metrics for a single order for a specific step @@ -329,9 +330,7 @@ class Indicator: # sum inner order indicators with same metric. all_metric = ["inner_amount", "deal_amount", "trade_price", "trade_value", "trade_cost", "trade_dir"] - metric_dict = self.order_indicator_cls.sum_all_indicators(inner_order_indicators, all_metric, fill_value=0) - for metric in metric_dict: - self.order_indicator.assign(metric, metric_dict[metric]) + self.order_indicator_cls.sum_all_indicators(self.order_indicator, inner_order_indicators, all_metric, fill_value=0) def func(trade_price, deal_amount): # trade_price is np.NaN instead of inf when deal_amount is zero.