1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-01 10:01:19 +08:00

numpy_order_indicator

This commit is contained in:
wangwenxi.handsome
2021-08-17 07:15:45 +00:00
committed by you-n-g
parent 222c2fd21a
commit 8eb7a1fddc
2 changed files with 255 additions and 21 deletions

View File

@@ -3,8 +3,9 @@
import logging
from pandas._config.config import is_instance_factory
from qlib.data.base import Feature
from typing import List, Text, Tuple, Union, Callable, Iterable, Dict
from typing import List, Text, Tuple, Union, Callable, Iterable, Dict, ValuesView
from collections import OrderedDict
import inspect
@@ -135,8 +136,8 @@ class NumpyQuote(BaseQuote):
the init dataframe from qlib.
Variables
self.data: Dict[stock_id, np.array]
each stock has one two-dimensional np.array to represent data.
self.data: Dict[stock_id, np.ndarray]
each stock has one two-dimensional np.ndarray to represent data.
self.columns: Dict[str, int]
map column name to column id in self.data.
self.dates: Dict[stock_id, Dict[pd.Timestap, int]]
@@ -144,6 +145,7 @@ class NumpyQuote(BaseQuote):
self.dates_list: Dict[stock_id, List[pd.Timestap]]
the dates of each stock for searching.
"""
super().__init__(quote_df=quote_df)
# init data
columns = quote_df.columns.values
@@ -156,6 +158,7 @@ class NumpyQuote(BaseQuote):
def _to_numpy(self, quote_df):
"""convert dataframe to numpy."""
quote_dict = {}
date_dict = {}
date_list = {}
@@ -201,12 +204,13 @@ class NumpyQuote(BaseQuote):
# result lru
if len(self.muti_lru) >= self.max_lru_len:
self.muti_lru = self.muti_lru[64:]
self.muti_lru.clear()
self.muti_lru[(stock_id, start_time, end_time, fields, method)] = agg_stock_data
return agg_stock_data
def _agg_data(self, data, method):
"""Agg data by specific method."""
if method == "sum":
return data.sum()
if method == "mean":
@@ -238,6 +242,7 @@ class NumpyQuote(BaseQuote):
bool
True means one piece of data to obtaine.
"""
if end_time - start_time < np.timedelta64(1, "m"):
return True
if start_time.hour == 11 and start_time.minute == 29 and start_time.second == 0:
@@ -393,23 +398,21 @@ class BaseOrderIndicator:
@staticmethod
def sum_all_indicators(
indicators: list, metrics: Union[str, List[str]], fill_value: float = None
) -> Dict[str, BaseSingleMetric]:
cls, indicators: list, metrics: Union[str, List[str]], fill_value: float = None
):
"""sum indicators with the same metrics.
and assign to the cls(BaseOrderIndicator).
Parameters
----------
cls : BaseOrderIndicator
the order indicator to assign.
indicators : List[BaseOrderIndicator]
the list of all inner indicators.
metrics : Union[str, List[str]]
all metrics needs ot be sumed.
fill_value : float, optional
fill np.NaN with value. By default None.
Return
----------
Dict[str: PandasSingleMetric]
a dict of metric name and data.
"""
pass
@@ -567,17 +570,249 @@ class PandasOrderIndicator(BaseOrderIndicator):
@staticmethod
def sum_all_indicators(
indicators: list, metrics: Union[str, List[str]], fill_value=None
) -> Dict[str, PandasSingleMetric]:
metric_dict = {}
cls, indicators: list, metrics: Union[str, List[str]], fill_value=None
):
if isinstance(metrics, str):
metrics = [metrics]
for metric in metrics:
tmp_metric = PandasSingleMetric({})
for indicator in indicators:
tmp_metric = tmp_metric.add(indicator.data[metric], fill_value)
metric_dict[metric] = tmp_metric.metric
return metric_dict
cls.assign(metric, tmp_metric.metric)
def to_series(self):
return {k: v.metric for k, v in self.data.items()}
class NumpySingleMetric(BaseSingleMetric):
def __init__(self, metric: np.ndarray):
self.metric = metric
def __add__(self, other):
if isinstance(other, (int, float)):
return NumpySingleMetric(self.metric + other)
elif isinstance(other, NumpySingleMetric):
return NumpySingleMetric(self.metric + other.metric)
else:
return NotImplemented
def __sub__(self, other):
if isinstance(other, (int, float)):
return NumpySingleMetric(self.metric - other)
elif isinstance(other, NumpySingleMetric):
return NumpySingleMetric(self.metric - other.metric)
else:
return NotImplemented
def __rsub__(self, other):
if isinstance(other, (int, float)):
return NumpySingleMetric(other - self.metric)
elif isinstance(other, NumpySingleMetric):
return NumpySingleMetric(other.metric - self.metric)
else:
return NotImplemented
def __mul__(self, other):
if isinstance(other, (int, float)):
return NumpySingleMetric(self.metric * other)
elif isinstance(other, NumpySingleMetric):
return NumpySingleMetric(self.metric * other.metric)
else:
return NotImplemented
def __truediv__(self, other):
if isinstance(other, (int, float)):
return NumpySingleMetric(self.metric / other)
elif isinstance(other, NumpySingleMetric):
return NumpySingleMetric(self.metric / other.metric)
else:
return NotImplemented
def __eq__(self, other):
if isinstance(other, (int, float)):
return NumpySingleMetric(self.metric == other)
elif isinstance(other, NumpySingleMetric):
return NumpySingleMetric(self.metric == other.metric)
else:
return NotImplemented
def __gt__(self, other):
if isinstance(other, (int, float)):
return NumpySingleMetric(self.metric > other)
elif isinstance(other, NumpySingleMetric):
return NumpySingleMetric(self.metric > other.metric)
else:
return NotImplemented
def __lt__(self, other):
if isinstance(other, (int, float)):
return NumpySingleMetric(self.metric < other)
elif isinstance(other, NumpySingleMetric):
return NumpySingleMetric(self.metric < other.metric)
else:
return NotImplemented
def __len__(self):
return len(self.metric)
def sum(self):
return self.metric.sum()
def mean(self):
return self.metric.mean()
def count(self):
return len(self.metric[~np.isnan(self.metric)])
def abs(self):
return NumpySingleMetric(np.absolute(self.metric))
def astype(self, type):
return NumpySingleMetric(self.metric.astype(type))
@property
def empty(self):
return len(self.metric) == 0
def replace(self, replace_dict: dict):
tmp_metric = self.metric.copy()
for num in replace_dict:
tmp_metric[tmp_metric == num] = replace_dict[num]
return NumpySingleMetric(tmp_metric)
def apply(self, func: Callable):
tmp_metric = self.metric.copy()
for i in range(len(tmp_metric)):
tmp_metric[i] = func(tmp_metric[i])
return NumpySingleMetric(tmp_metric)
class NumpyOrderIndicator(BaseOrderIndicator):
# all metrics
ROW = [
"amount",
"deal_amount",
"inner_amount",
"trade_price",
"trade_value",
"trade_cost",
"trade_dir",
"ffr",
"pa",
"pos",
"base_price",
"base_volume",
]
ROW_MAP = dict(zip(ROW, range(len(ROW))))
def __init__(self):
self.row_tag = [0 for tag in range(len(NumpyOrderIndicator.ROW))]
self.data = None
def assign(self, col: str, metric: Union[dict, np.ndarray, pd.Series]):
if col not in NumpyOrderIndicator.ROW:
raise ValueError(f"{col} metric is not supoorted")
if not isinstance(metric, (dict, np.ndarray, pd.Series)):
raise ValueError(f"metric must be dict, pd.Series or np.ndarray")
if isinstance(metric, (pd.Series, np.ndarray)) and self.data is None:
raise ValueError(f"data can not be None when metric is np.ndarray or pd.Series")
# if data is None, init numpy ndarray
if self.data is None:
self.data = np.zeros((len(NumpyOrderIndicator.ROW), len(metric)))
self.column = list(metric.keys())
self.column_map = dict(zip(self.column, range(len(self.column))))
metric_column = list(metric.keys())
if self.column != metric_column:
assert len(set(self.column) - set(metric_column)) == 0
# modify the order
tmp_metric = {}
for column in self.column:
tmp_metric[column] = metric[column]
metric = tmp_metric
# assign data
self.row_tag[NumpyOrderIndicator.ROW_MAP[col]] = 1
if isinstance(metric, dict):
self.data[NumpyOrderIndicator.ROW_MAP[col]] = list(metric.values())
elif isinstance(metric, np.ndarray):
self.data[NumpyOrderIndicator.ROW_MAP[col]] = metric
elif isinstance(metric, pd.Series):
self.data[NumpyOrderIndicator.ROW_MAP[col]] = metric.values
def transfer(self, func: Callable, new_col: str = None) -> Union[None, NumpySingleMetric]:
func_sig = inspect.signature(func).parameters.keys()
func_kwargs = {}
for sig in func_sig:
if self._if_valid_metric(sig):
func_kwargs[sig] = NumpySingleMetric(self.data[NumpyOrderIndicator.ROW_MAP[sig]])
else:
print(f"{sig} is not assigned")
func_kwargs[sig] = NumpySingleMetric(np.array([]))
tmp_metric = func(**func_kwargs)
if new_col is not None:
self.row_tag[NumpyOrderIndicator.ROW_MAP[new_col]] = 1
self.data[NumpyOrderIndicator.ROW_MAP[new_col]] = tmp_metric.metric
else:
return tmp_metric
def get_metric_series(self, metric: str) -> Union[pd.Series]:
if self._if_valid_metric(metric):
return pd.Series(self.data[NumpyOrderIndicator.ROW_MAP[metric]], index=self.column)
else:
return pd.Series()
def to_series(self) -> Dict[str, pd.Series]:
tmp_metric_dict = {}
for metric in NumpyOrderIndicator.ROW:
tmp_metric_dict[metric] = self.get_metric_series(metric)
return tmp_metric_dict
def _if_valid_metric(self, metric):
if metric in NumpyOrderIndicator.ROW and self.row_tag[NumpyOrderIndicator.ROW_MAP[metric]] == 1:
return True
else:
return False
@staticmethod
def sum_all_indicators(
cls, indicators: list, metrics: Union[str, List[str]], fill_value=None
) -> Dict[str, NumpySingleMetric]:
# metrics is all metrics to add
# metrics_id means the index in the NumpyOrderIndicator.ROW for metrics.
if isinstance(metrics, str):
metrics = [metrics]
metrics_id = [NumpyOrderIndicator.ROW_MAP[metric] for metric in metrics]
# get all stock_id and all metric data
stocks = set()
indicator_metrics = []
for indicator in indicators:
stocks = stocks | set(indicator.column)
indicator_metrics.append(indicator.data[metrics_id, :].copy())
stocks = list(stocks)
stocks_map = dict(zip(stocks, range(len(stocks))))
# fill value
if fill_value is not None:
base_metrics = fill_value * np.ones((len(metrics), len(stocks)))
for i in range(len(indicators)):
tmp_netrics = base_metrics.copy()
stocks_index = [stocks_map[stock] for stock in indicators[i].column]
tmp_netrics[:, stocks_index] = indicator_metrics[i]
indicator_metrics[i] = tmp_netrics
else:
raise ValueError(f"fill value can not be None in NumpyOrderIndicator")
# add metric and assign to cls
metric_sum = sum(indicator_metrics)
if cls.data is not None:
raise ValueError(f"this function must assign to an empty order indicator")
cls.data = np.zeros((len(NumpyOrderIndicator.ROW), len(stocks)))
cls.column = stocks
cls.column_map = dict(zip(stocks, range(len(stocks))))
for i in range(len(metrics)):
cls.row_tag[NumpyOrderIndicator.ROW_MAP[metrics[i]]] = 1
cls.data[NumpyOrderIndicator.ROW_MAP[metrics[i]]] = metric_sum[i]

View File

@@ -16,7 +16,7 @@ from qlib.backtest.exchange import Exchange
from qlib.backtest.order import BaseTradeDecision, Order, OrderDir
from qlib.backtest.utils import TradeCalendarManager
from .high_performance_ds import PandasOrderIndicator
from .high_performance_ds import PandasOrderIndicator, NumpyOrderIndicator
from ..data import D
from ..tests.config import CSI300_BENCH
from ..utils.resam import get_higher_eq_freq_feature, resam_ts_data
@@ -236,6 +236,7 @@ class Indicator:
| indicator | desc. |
|--------------+--------------------------------------------------------------|
| amount | the *target* amount given by the outer strategy |
| deal_amount | the real deal amount |
| inner_amount | the total *target* amount of inner strategy |
| trade_price | the average deal price |
| trade_value | the total trade value |
@@ -255,7 +256,7 @@ class Indicator:
"""
def __init__(self, order_indicator_cls=PandasOrderIndicator):
def __init__(self, order_indicator_cls=NumpyOrderIndicator):
self.order_indicator_cls = order_indicator_cls
# order indicator is metrics for a single order for a specific step
@@ -329,9 +330,7 @@ class Indicator:
# sum inner order indicators with same metric.
all_metric = ["inner_amount", "deal_amount", "trade_price", "trade_value", "trade_cost", "trade_dir"]
metric_dict = self.order_indicator_cls.sum_all_indicators(inner_order_indicators, all_metric, fill_value=0)
for metric in metric_dict:
self.order_indicator.assign(metric, metric_dict[metric])
self.order_indicator_cls.sum_all_indicators(self.order_indicator, inner_order_indicators, all_metric, fill_value=0)
def func(trade_price, deal_amount):
# trade_price is np.NaN instead of inf when deal_amount is zero.