mirror of
https://github.com/microsoft/qlib.git
synced 2026-07-01 01:51:18 +08:00
high_performance_data_structure
This commit is contained in:
@@ -4,7 +4,7 @@
|
||||
|
||||
import random
|
||||
import logging
|
||||
from typing import List, Tuple, Union
|
||||
from typing import List, Tuple, Union, Callable, Iterable
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
@@ -15,6 +15,7 @@ from ..config import C, REG_CN
|
||||
from ..utils.resam import resam_ts_data, ts_data_last
|
||||
from ..log import get_module_logger
|
||||
from .order import Order, OrderDir, OrderHelper
|
||||
from .high_performane_ds import PandasQuote
|
||||
|
||||
|
||||
class Exchange:
|
||||
@@ -32,6 +33,7 @@ class Exchange:
|
||||
close_cost=0.0025,
|
||||
min_cost=5,
|
||||
extra_quote=None,
|
||||
quote_cls=PandasQuote,
|
||||
**kwargs,
|
||||
):
|
||||
"""__init__
|
||||
@@ -143,7 +145,8 @@ class Exchange:
|
||||
self.get_quote_from_qlib()
|
||||
|
||||
# init quote by quote_df
|
||||
self.quote = PandasQuote(self.quote_df)
|
||||
self.quote_cls = quote_cls
|
||||
self.quote = self.quote_cls(self.quote_df)
|
||||
|
||||
def get_quote_from_qlib(self):
|
||||
# get stock data from qlib
|
||||
@@ -593,102 +596,3 @@ class Exchange:
|
||||
# cache to avoid recreate the same instance
|
||||
self._order_helper = OrderHelper(self)
|
||||
return self._order_helper
|
||||
|
||||
|
||||
class BaseQuote:
|
||||
def __init__(self, quote_df: pd.DataFrame):
|
||||
self.logger = get_module_logger("online operator", level=logging.INFO)
|
||||
|
||||
def get_all_stock(self):
|
||||
"""return all stock codes
|
||||
|
||||
Return
|
||||
------
|
||||
Union[list, Dict.keys(), set, tuple]
|
||||
all stock codes
|
||||
"""
|
||||
raise NotImplementedError(f"Please implement the `get_all_stock` method")
|
||||
|
||||
def get_data(
|
||||
self,
|
||||
stock_id: Union[str, list],
|
||||
start_time: Union[pd.Timestamp, str],
|
||||
end_time: Union[pd.Timestamp, str],
|
||||
fields: Union[str, list] = None,
|
||||
method: Union[str, "Callable"] = None,
|
||||
):
|
||||
"""get the specific fields of stock data during start time and end_time,
|
||||
and apply method to the data.
|
||||
|
||||
Example:
|
||||
.. code-block::
|
||||
$close $volume
|
||||
instrument datetime
|
||||
SH600000 2010-01-04 86.778313 16162960.0
|
||||
2010-01-05 87.433578 28117442.0
|
||||
2010-01-06 85.713585 23632884.0
|
||||
2010-01-07 83.788803 20813402.0
|
||||
2010-01-08 84.730675 16044853.0
|
||||
|
||||
SH600655 2010-01-04 2699.567383 158193.328125
|
||||
2010-01-08 2612.359619 77501.406250
|
||||
2010-01-11 2712.982422 160852.390625
|
||||
2010-01-12 2788.688232 164587.937500
|
||||
2010-01-13 2790.604004 145460.453125
|
||||
|
||||
print(get_data(stock_id=["SH600000", "SH600655"], start_time="2010-01-04", end_time="2010-01-05", fields=["$close", "$volume"], method="last"))
|
||||
|
||||
$close $volume
|
||||
instrument
|
||||
SH600000 87.433578 28117442.0
|
||||
SH600655 2699.567383 158193.328125
|
||||
|
||||
print(get_data(stock_id="SH600000", start_time="2010-01-04", end_time="2010-01-05", fields=["$close", "$volume"], method="last"))
|
||||
|
||||
$close 87.433578
|
||||
$volume 28117442.0
|
||||
|
||||
print(get_data(stock_id="SH600000", start_time="2010-01-04", end_time="2010-01-05", fields="$close", method="last"))
|
||||
|
||||
87.433578
|
||||
|
||||
Parameters
|
||||
----------
|
||||
stock_id: Union[str, list]
|
||||
start_time : Union[pd.Timestamp, str]
|
||||
closed start time for backtest
|
||||
end_time : Union[pd.Timestamp, str]
|
||||
closed end time for backtest
|
||||
fields : Union[str, List]
|
||||
the columns of data to fetch
|
||||
method : Union[str, Callable]
|
||||
the method apply to data.
|
||||
e.g ["None", "last", "all", "sum", "mean", "any", qlib/utils/resam.py/ts_data_last]
|
||||
|
||||
Return
|
||||
----------
|
||||
Union[None, float, pd.Series, pd.DataFrame]
|
||||
The resampled DataFrame/Series/value, return None when the resampled data is empty.
|
||||
"""
|
||||
|
||||
raise NotImplementedError(f"Please implement the `get_data` method")
|
||||
|
||||
|
||||
class PandasQuote(BaseQuote):
|
||||
def __init__(self, quote_df: pd.DataFrame):
|
||||
super().__init__(quote_df=quote_df)
|
||||
quote_dict = {}
|
||||
for stock_id, stock_val in quote_df.groupby(level="instrument"):
|
||||
quote_dict[stock_id] = stock_val.droplevel(level="instrument")
|
||||
self.data = quote_dict
|
||||
|
||||
def get_all_stock(self):
|
||||
return self.data.keys()
|
||||
|
||||
def get_data(self, stock_id, start_time, end_time, fields=None, method=None):
|
||||
if fields is None:
|
||||
return resam_ts_data(self.data[stock_id], start_time, end_time, method=method)
|
||||
elif isinstance(fields, (str, list)):
|
||||
return resam_ts_data(self.data[stock_id][fields], start_time, end_time, method=method)
|
||||
else:
|
||||
raise ValueError(f"fields must be None, str or list")
|
||||
|
||||
414
qlib/backtest/high_performance_ds.py
Normal file
414
qlib/backtest/high_performance_ds.py
Normal file
@@ -0,0 +1,414 @@
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
|
||||
import logging
|
||||
from typing import List, Tuple, Union, Callable, Iterable, Dict
|
||||
from collections import OrderedDict
|
||||
|
||||
import inspect
|
||||
import pandas as pd
|
||||
|
||||
from ..utils.resam import resam_ts_data
|
||||
from ..log import get_module_logger
|
||||
|
||||
|
||||
class BaseQuote:
|
||||
def __init__(self, quote_df: pd.DataFrame):
|
||||
self.logger = get_module_logger("online operator", level=logging.INFO)
|
||||
|
||||
def get_all_stock(self) -> Iterable:
|
||||
"""return all stock codes
|
||||
|
||||
Return
|
||||
------
|
||||
Iterable
|
||||
all stock codes
|
||||
"""
|
||||
raise NotImplementedError(f"Please implement the `get_all_stock` method")
|
||||
|
||||
def get_data(
|
||||
self,
|
||||
stock_id: Union[str, list],
|
||||
start_time: Union[pd.Timestamp, str],
|
||||
end_time: Union[pd.Timestamp, str],
|
||||
fields: Union[str, list] = None,
|
||||
method: Union[str, Callable] = None,
|
||||
) -> Union[None, float, pd.Series, pd.DataFrame]:
|
||||
"""get the specific fields of stock data during start time and end_time,
|
||||
and apply method to the data.
|
||||
|
||||
Example:
|
||||
.. code-block::
|
||||
$close $volume
|
||||
instrument datetime
|
||||
SH600000 2010-01-04 86.778313 16162960.0
|
||||
2010-01-05 87.433578 28117442.0
|
||||
2010-01-06 85.713585 23632884.0
|
||||
2010-01-07 83.788803 20813402.0
|
||||
2010-01-08 84.730675 16044853.0
|
||||
|
||||
SH600655 2010-01-04 2699.567383 158193.328125
|
||||
2010-01-08 2612.359619 77501.406250
|
||||
2010-01-11 2712.982422 160852.390625
|
||||
2010-01-12 2788.688232 164587.937500
|
||||
2010-01-13 2790.604004 145460.453125
|
||||
|
||||
print(get_data(stock_id=["SH600000", "SH600655"], start_time="2010-01-04", end_time="2010-01-05", fields=["$close", "$volume"], method="last"))
|
||||
|
||||
$close $volume
|
||||
instrument
|
||||
SH600000 87.433578 28117442.0
|
||||
SH600655 2699.567383 158193.328125
|
||||
|
||||
print(get_data(stock_id="SH600000", start_time="2010-01-04", end_time="2010-01-05", fields=["$close", "$volume"], method="last"))
|
||||
|
||||
$close 87.433578
|
||||
$volume 28117442.0
|
||||
|
||||
print(get_data(stock_id="SH600000", start_time="2010-01-04", end_time="2010-01-05", fields="$close", method="last"))
|
||||
|
||||
87.433578
|
||||
|
||||
Parameters
|
||||
----------
|
||||
stock_id: Union[str, list]
|
||||
start_time : Union[pd.Timestamp, str]
|
||||
closed start time for backtest
|
||||
end_time : Union[pd.Timestamp, str]
|
||||
closed end time for backtest
|
||||
fields : Union[str, List]
|
||||
the columns of data to fetch
|
||||
method : Union[str, Callable]
|
||||
the method apply to data.
|
||||
e.g ["None", "last", "all", "sum", "mean", "any", qlib/utils/resam.py/ts_data_last]
|
||||
|
||||
Return
|
||||
----------
|
||||
Union[None, float, pd.Series, pd.DataFrame]
|
||||
The resampled DataFrame/Series/value, return None when the resampled data is empty.
|
||||
"""
|
||||
|
||||
raise NotImplementedError(f"Please implement the `get_data` method")
|
||||
|
||||
|
||||
class PandasQuote(BaseQuote):
|
||||
def __init__(self, quote_df: pd.DataFrame):
|
||||
super().__init__(quote_df=quote_df)
|
||||
quote_dict = {}
|
||||
for stock_id, stock_val in quote_df.groupby(level="instrument"):
|
||||
quote_dict[stock_id] = stock_val.droplevel(level="instrument")
|
||||
self.data = quote_dict
|
||||
|
||||
def get_all_stock(self):
|
||||
return self.data.keys()
|
||||
|
||||
def get_data(self, stock_id, start_time, end_time, fields=None, method=None):
|
||||
if fields is None:
|
||||
return resam_ts_data(self.data[stock_id], start_time, end_time, method=method)
|
||||
elif isinstance(fields, (str, list)):
|
||||
return resam_ts_data(self.data[stock_id][fields], start_time, end_time, method=method)
|
||||
else:
|
||||
raise ValueError(f"fields must be None, str or list")
|
||||
|
||||
|
||||
class BaseSingleMetric:
|
||||
"""
|
||||
The data structure of the single metric.
|
||||
The following methods are used for computing metrics in one indicator.
|
||||
"""
|
||||
|
||||
def __init__(self, metric: Union[dict, pd.Series]):
|
||||
pass
|
||||
|
||||
def __add__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric":
|
||||
pass
|
||||
|
||||
def __radd__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric":
|
||||
return self + other
|
||||
|
||||
def __sub__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric":
|
||||
pass
|
||||
|
||||
def __rsub__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric":
|
||||
pass
|
||||
|
||||
def __mul__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric":
|
||||
pass
|
||||
|
||||
def __truediv__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric":
|
||||
pass
|
||||
|
||||
def __eq__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric":
|
||||
pass
|
||||
|
||||
def __gt__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric":
|
||||
pass
|
||||
|
||||
def __lt__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric":
|
||||
pass
|
||||
|
||||
def __len__(self) -> int:
|
||||
pass
|
||||
|
||||
def sum(self) -> float:
|
||||
pass
|
||||
|
||||
def mean(self) -> float:
|
||||
pass
|
||||
|
||||
def count(self) -> int:
|
||||
pass
|
||||
|
||||
def abs(self) -> "BaseSingleMetric":
|
||||
pass
|
||||
|
||||
def astype(self, type: type) -> "BaseSingleMetric":
|
||||
pass
|
||||
|
||||
@property
|
||||
def empty(self) -> bool:
|
||||
"""If metric is empyt, return True."""
|
||||
pass
|
||||
|
||||
def add(self, other: "BaseSingleMetric", fill_value: float = None) -> "BaseSingleMetric":
|
||||
"""Replace np.NaN with fill_value in two metrics and add them."""
|
||||
pass
|
||||
|
||||
def apply(self, map_dict: dict) -> "BaseSingleMetric":
|
||||
"""Replace the value of metric according to map_dict."""
|
||||
pass
|
||||
|
||||
|
||||
class BaseOrderIndicator:
|
||||
"""
|
||||
The data structure of order indicator.
|
||||
!!!NOTE: There are two ways to organize the data structure. Please choose a better way.
|
||||
1. one way is use BaseSingleMetric to represent each metric. For example, the data
|
||||
structure of PandasOrderIndicator is Dict[str: PandasSingleMetric]. It uses
|
||||
PandasSingleMetric based on pd.Series to represent each metric.
|
||||
2. the another way doesn't BaseSingleMetric to represent each metric. The data
|
||||
structure of PandasOrderIndicator is a whole matrix.
|
||||
"""
|
||||
|
||||
def assign(self, col: str, metric: Union[dict, pd.Series]):
|
||||
"""assign one metric.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
col : str
|
||||
the metric name of one metric.
|
||||
metric : Union[dict, pd.Series]
|
||||
the metric data.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
def transfer(self, func: Callable, new_col: str = None) -> Union[None, BaseSingleMetric]:
|
||||
"""compute new metric with existing metrics.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
func : Callable
|
||||
the func of computing new metric.
|
||||
the kwargs of func will be replaced with metric data by name in this function.
|
||||
e.g.
|
||||
def func(pa):
|
||||
return (pa > 0).astype(int).sum() / pa.count()
|
||||
new_col : str, optional
|
||||
New metric will be assigned in the data if new_col is not None, by default None.
|
||||
|
||||
Return
|
||||
----------
|
||||
BaseSingleMetric
|
||||
new metric.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
def get_metric_series(self, metric: str) -> pd.Series:
|
||||
"""return the single metric with pd.Series format.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
metric : str
|
||||
the metric name.
|
||||
|
||||
Return
|
||||
----------
|
||||
pd.Series
|
||||
the single metric.
|
||||
If there is no metric name in the data, return pd.Series().
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def sum_all_indicators(
|
||||
indicators: list, metrics: Union[str, List[str]], fill_value: float = None
|
||||
) -> Dict[str, BaseSingleMetric]:
|
||||
"""sum indicators with the same metrics.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
indicators : List[BaseOrderIndicator]
|
||||
the list of all inner indicators.
|
||||
metrics : Union[str, List[str]]
|
||||
all metrics needs ot be sumed.
|
||||
fill_value : float, optional
|
||||
fill np.NaN with value. By default None.
|
||||
|
||||
Return
|
||||
----------
|
||||
Dict[str: PandasSingleMetric]
|
||||
a dict of metric name and data.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class PandasSingleMetric:
|
||||
"""Each SingleMetric is based on pd.Series."""
|
||||
|
||||
def __init__(self, metric: Union[dict, pd.Series]):
|
||||
if isinstance(metric, dict):
|
||||
self.metric = pd.Series(metric)
|
||||
elif isinstance(metric, pd.Series):
|
||||
self.metric = metric
|
||||
else:
|
||||
raise ValueError(f"metric must be dict or pd.Series")
|
||||
|
||||
def __add__(self, other):
|
||||
if isinstance(other, (int, float)):
|
||||
return PandasSingleMetric(self.metric + other)
|
||||
elif isinstance(other, PandasSingleMetric):
|
||||
return PandasSingleMetric(self.metric + other.metric)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __sub__(self, other):
|
||||
if isinstance(other, (int, float)):
|
||||
return PandasSingleMetric(self.metric - other)
|
||||
elif isinstance(other, PandasSingleMetric):
|
||||
return PandasSingleMetric(self.metric - other.metric)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __rsub__(self, other):
|
||||
if isinstance(other, (int, float)):
|
||||
return PandasSingleMetric(other - self.metric)
|
||||
elif isinstance(other, PandasSingleMetric):
|
||||
return PandasSingleMetric(other.metric - self.metric)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __mul__(self, other):
|
||||
if isinstance(other, (int, float)):
|
||||
return PandasSingleMetric(self.metric * other)
|
||||
elif isinstance(other, PandasSingleMetric):
|
||||
return PandasSingleMetric(self.metric * other.metric)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __truediv__(self, other):
|
||||
if isinstance(other, (int, float)):
|
||||
return PandasSingleMetric(self.metric / other)
|
||||
elif isinstance(other, PandasSingleMetric):
|
||||
return PandasSingleMetric(self.metric / other.metric)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, (int, float)):
|
||||
return PandasSingleMetric(self.metric == other)
|
||||
elif isinstance(other, PandasSingleMetric):
|
||||
return PandasSingleMetric(self.metric == other.metric)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __gt__(self, other):
|
||||
if isinstance(other, (int, float)):
|
||||
return PandasSingleMetric(self.metric < other)
|
||||
elif isinstance(other, PandasSingleMetric):
|
||||
return PandasSingleMetric(self.metric < other.metric)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __lt__(self, other):
|
||||
if isinstance(other, (int, float)):
|
||||
return PandasSingleMetric(self.metric > other)
|
||||
elif isinstance(other, PandasSingleMetric):
|
||||
return PandasSingleMetric(self.metric > other.metric)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __len__(self):
|
||||
return len(self.metric)
|
||||
|
||||
def sum(self):
|
||||
return self.metric.sum()
|
||||
|
||||
def mean(self):
|
||||
return self.metric.mean()
|
||||
|
||||
def count(self):
|
||||
return self.metric.count()
|
||||
|
||||
def abs(self):
|
||||
return PandasSingleMetric(self.metric.abs())
|
||||
|
||||
def astype(self, type):
|
||||
return PandasSingleMetric(self.metric.astype(type))
|
||||
|
||||
@property
|
||||
def empty(self):
|
||||
return self.metric.empty
|
||||
|
||||
def add(self, other, fill_value=None):
|
||||
return PandasSingleMetric(self.metric.add(other.metric, fill_value=fill_value))
|
||||
|
||||
def apply(self, map_dict: dict):
|
||||
return PandasSingleMetric(self.metric.apply(map_dict))
|
||||
|
||||
|
||||
class PandasOrderIndicator(BaseOrderIndicator):
|
||||
"""
|
||||
The data structure is OrderedDict(str: PandasSingleMetric).
|
||||
Each PandasSingleMetric based on pd.Series is one metric.
|
||||
Str is the name of metric.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.data: Dict[str, PandasSingleMetric] = OrderedDict()
|
||||
|
||||
def assign(self, col: str, metric: Union[dict, pd.Series]):
|
||||
self.data[col] = PandasSingleMetric(metric)
|
||||
|
||||
def transfer(self, func: Callable, new_col: str = None) -> Union[None, PandasSingleMetric]:
|
||||
func_sig = inspect.signature(func).parameters.keys()
|
||||
func_kwargs = {sig: self.data[sig] for sig in func_sig}
|
||||
tmp_metric = func(**func_kwargs)
|
||||
if new_col is not None:
|
||||
self.data[new_col] = tmp_metric
|
||||
else:
|
||||
return tmp_metric
|
||||
|
||||
def get_metric_series(self, metric: str) -> Union[pd.Series]:
|
||||
if metric in self.data:
|
||||
return self.data[metric].metric
|
||||
else:
|
||||
return pd.Series()
|
||||
|
||||
@staticmethod
|
||||
def sum_all_indicators(
|
||||
indicators: list, metrics: Union[str, List[str]], fill_value=None
|
||||
) -> Dict[str, PandasSingleMetric]:
|
||||
metric_dict = {}
|
||||
if isinstance(metrics, str):
|
||||
metrics = [metrics]
|
||||
for metric in metrics:
|
||||
tmp_metric = PandasSingleMetric({})
|
||||
for indicator in indicators:
|
||||
tmp_metric = tmp_metric.add(indicator.data[metric], fill_value)
|
||||
metric_dict[metric] = tmp_metric.metric
|
||||
return metric_dict
|
||||
@@ -5,7 +5,7 @@
|
||||
from collections import OrderedDict
|
||||
from logging import warning
|
||||
import pathlib
|
||||
from typing import Dict, List, Tuple, Union
|
||||
from typing import Dict, List, Tuple, Union, Callable
|
||||
import warnings
|
||||
import inspect
|
||||
|
||||
@@ -18,6 +18,7 @@ from qlib.backtest.exchange import Exchange
|
||||
from qlib.backtest.order import BaseTradeDecision, Order, OrderDir
|
||||
from qlib.backtest.utils import TradeCalendarManager
|
||||
|
||||
from .high_performane_ds import PandasOrderIndicator
|
||||
from ..data import D
|
||||
from ..tests.config import CSI300_BENCH
|
||||
from ..utils.resam import get_higher_eq_freq_feature, resam_ts_data
|
||||
@@ -254,10 +255,12 @@ class Indicator:
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, order_indicator_cls=PandasOrderIndicator):
|
||||
self.order_indicator_cls = order_indicator_cls
|
||||
|
||||
# order indicator is metrics for a single order for a specific step
|
||||
self.order_indicator_his = OrderedDict()
|
||||
self.order_indicator = PandasOrderIndicator()
|
||||
self.order_indicator = self.order_indicator_cls()
|
||||
|
||||
# trade indicator is metrics for all orders for a specific step
|
||||
self.trade_indicator_his = OrderedDict()
|
||||
@@ -267,7 +270,7 @@ class Indicator:
|
||||
|
||||
# def reset(self, trade_calendar: TradeCalendarManager):
|
||||
def reset(self):
|
||||
self.order_indicator = PandasOrderIndicator()
|
||||
self.order_indicator = self.order_indicator_cls()
|
||||
self.trade_indicator = OrderedDict()
|
||||
# self._trade_calendar = trade_calendar
|
||||
|
||||
@@ -291,6 +294,7 @@ class Indicator:
|
||||
trade_value[order.stock_id] = _trade_val * order.sign
|
||||
trade_cost[order.stock_id] = _trade_cost
|
||||
trade_dir[order.stock_id] = order.direction
|
||||
# The PA in the innermost layer is meanless
|
||||
pa[order.stock_id] = 0
|
||||
|
||||
self.order_indicator.assign("amount", amount)
|
||||
@@ -306,32 +310,33 @@ class Indicator:
|
||||
def _update_order_fulfill_rate(self):
|
||||
def func(deal_amount, amount):
|
||||
return deal_amount / amount
|
||||
|
||||
self.order_indicator.transfer(func, "ffr")
|
||||
|
||||
def update_order_indicators(self, trade_info: list):
|
||||
self._update_order_trade_info(trade_info=trade_info)
|
||||
self._update_order_fulfill_rate()
|
||||
# self._update_order_price_advantage()
|
||||
|
||||
def _agg_order_trade_info(self, inner_order_indicators: List[Dict[str, pd.Series]]):
|
||||
all_metric = ["inner_amount", "deal_amount", "trade_price",
|
||||
"trade_value", "trade_cost", "trade_dir"]
|
||||
metric_dict = PandasOrderIndicator.agg_all_indicators(inner_order_indicators, all_metric, fill_value=0)
|
||||
all_metric = ["inner_amount", "deal_amount", "trade_price", "trade_value", "trade_cost", "trade_dir"]
|
||||
metric_dict = self.order_indicator_cls.sum_all_indicators(inner_order_indicators, all_metric, fill_value=0)
|
||||
for metric in metric_dict:
|
||||
self.order_indicator.assign(metric, metric_dict[metric])
|
||||
|
||||
def func(trade_price, deal_amount):
|
||||
return trade_price / deal_amount
|
||||
|
||||
self.order_indicator.transfer(func, "trade_price")
|
||||
|
||||
def func_apply(trade_dir):
|
||||
return trade_dir.apply(Order.parse_dir)
|
||||
|
||||
self.order_indicator.transfer(func_apply, "trade_dir")
|
||||
|
||||
def _update_trade_amount(self, outer_trade_decision: BaseTradeDecision):
|
||||
# NOTE: these indicator is designed for order execution, so the
|
||||
decision: List[Order] = outer_trade_decision.get_decision()
|
||||
if decision is None:
|
||||
if len(decision) == 0:
|
||||
self.order_indicator.assign("amount", {})
|
||||
else:
|
||||
self.order_indicator.assign("amount", {order.stock_id: order.amount_delta for order in decision})
|
||||
@@ -450,11 +455,14 @@ class Indicator:
|
||||
def _agg_order_price_advantage(self):
|
||||
def if_empty_func(trade_price):
|
||||
return trade_price.empty
|
||||
|
||||
if_empty = self.order_indicator.transfer(if_empty_func)
|
||||
if not if_empty:
|
||||
|
||||
def func(trade_dir, trade_price, base_price):
|
||||
sign = 1 - trade_dir * 2
|
||||
return sign * (trade_price / base_price - 1)
|
||||
|
||||
self.order_indicator.transfer(func, "pa")
|
||||
else:
|
||||
self.order_indicator.assign("pa", {})
|
||||
@@ -471,33 +479,45 @@ class Indicator:
|
||||
self._update_trade_amount(outer_trade_decision)
|
||||
self._update_order_fulfill_rate()
|
||||
pa_config = indicator_config.get("pa_config", {})
|
||||
self._agg_base_price(inner_order_indicators, decision_list, trade_exchange, pa_config=pa_config) # TODO
|
||||
self._agg_base_price(inner_order_indicators, decision_list, trade_exchange, pa_config=pa_config) # TODO
|
||||
self._agg_order_price_advantage()
|
||||
|
||||
def _cal_trade_fulfill_rate(self, method="mean"):
|
||||
if method == "mean":
|
||||
|
||||
def func(ffr):
|
||||
return ffr.mean()
|
||||
|
||||
elif method == "amount_weighted":
|
||||
|
||||
def func(ffr, deal_amount):
|
||||
return (ffr * deal_amount.abs()).sum() / (deal_amount.abs().sum())
|
||||
|
||||
elif method == "value_weighted":
|
||||
|
||||
def func(ffr, trade_value):
|
||||
return (ffr * trade_value.abs()).sum() / (trade_value.abs().sum())
|
||||
|
||||
else:
|
||||
raise ValueError(f"method {method} is not supported!")
|
||||
return self.order_indicator.transfer(func)
|
||||
|
||||
def _cal_trade_price_advantage(self, method="mean"):
|
||||
if method == "mean":
|
||||
|
||||
def func(pa):
|
||||
return pa.mean()
|
||||
|
||||
elif method == "amount_weighted":
|
||||
|
||||
def func(pa, deal_amount):
|
||||
return (pa * deal_amount.abs()).sum() / (deal_amount.abs().sum())
|
||||
|
||||
elif method == "value_weighted":
|
||||
|
||||
def func(pa, trade_value):
|
||||
return (pa * trade_value.abs()).sum() / (trade_value.abs().sum())
|
||||
|
||||
else:
|
||||
raise ValueError(f"method {method} is not supported!")
|
||||
return self.order_indicator.transfer(func)
|
||||
@@ -505,21 +525,25 @@ class Indicator:
|
||||
def _cal_trade_positive_rate(self):
|
||||
def func(pa):
|
||||
return (pa > 0).astype(int).sum() / pa.count()
|
||||
|
||||
return self.order_indicator.transfer(func)
|
||||
|
||||
def _cal_deal_amount(self):
|
||||
def func(deal_amount):
|
||||
return deal_amount.abs().sum()
|
||||
|
||||
return self.order_indicator.transfer(func)
|
||||
|
||||
def _cal_trade_value(self):
|
||||
def func(trade_value):
|
||||
return trade_value.abs().sum()
|
||||
|
||||
return self.order_indicator.transfer(func)
|
||||
|
||||
def _cal_trade_order_count(self):
|
||||
def func(amount):
|
||||
return amount.count()
|
||||
|
||||
return self.order_indicator.transfer(func)
|
||||
|
||||
def cal_trade_indicators(self, trade_start_time, freq, indicator_config={}):
|
||||
@@ -553,236 +577,3 @@ class Indicator:
|
||||
|
||||
def generate_trade_indicators_dataframe(self):
|
||||
return pd.DataFrame.from_dict(self.trade_indicator_his, orient="index")
|
||||
|
||||
|
||||
class BaseOrderIndicator:
|
||||
"""The data structure of order indicator.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def assign(self, col: str, metric: Union[dict, pd.Series]):
|
||||
"""assign one metric.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
col : str
|
||||
the metric name of one metric.
|
||||
metric : Union[dict, pd.Series]
|
||||
the metric data.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
def transfer(self, func: "Callable", new_col: str = None):
|
||||
"""compute new metric with existing.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
func : Callable
|
||||
the func of computing new metric.
|
||||
the kwargs of func will be replaced with metric data by name in this function.
|
||||
e.g.
|
||||
def func(pa):
|
||||
return (pa > 0).astype(int).sum() / pa.count()
|
||||
new_col : str, optional
|
||||
New metric will be assigned in the data if new_col is not None, by default None.
|
||||
|
||||
Return
|
||||
----------
|
||||
SingleMetric
|
||||
new metric.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
def get_metric_series(self, metric: str):
|
||||
"""return the single metric with pd.Series format
|
||||
|
||||
Parameters
|
||||
----------
|
||||
metric : str
|
||||
the metric name.
|
||||
|
||||
Return
|
||||
----------
|
||||
pd.Series
|
||||
the single metric.
|
||||
If there is no metric name in the data, return pd.Series().
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
def agg_all_indicators(indicators: list, metrics: Union[str, List[str]], fill_value: float = None):
|
||||
"""sum indicators with the same metrics.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
indicators : List[BaseOrderIndicator]
|
||||
the list of all inner indicators.
|
||||
metrics : Union[str, List[str]]
|
||||
all metrics needs ot be sumed.
|
||||
fill_value : float, optional
|
||||
fill np.NaN with value. By default None.
|
||||
|
||||
Return
|
||||
----------
|
||||
Dict[str: SingleMetric]
|
||||
a dict of metric name and data.
|
||||
"""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class PandasOrderIndicator(BaseOrderIndicator):
|
||||
"""The data structure is OrderedDict(str: SingleMetric).
|
||||
Each SingleMetric based on pd.Series is one metric.
|
||||
Str is the name of metric.
|
||||
"""
|
||||
|
||||
class SingleMetric:
|
||||
"""The data structure of the single metric.
|
||||
The following methods are used for computing metrics in one indicator.
|
||||
"""
|
||||
|
||||
def __init__(self, metric: Union[dict, pd.Series]):
|
||||
if isinstance(metric, dict):
|
||||
self.metric = pd.Series(metric)
|
||||
elif isinstance(metric, pd.Series):
|
||||
self.metric = metric
|
||||
else:
|
||||
raise ValueError(f"metric must be dict or pd.Series")
|
||||
|
||||
def __add__(self, other):
|
||||
if isinstance(other, (int, float)):
|
||||
return PandasOrderIndicator.SingleMetric(self.metric + other)
|
||||
elif isinstance(other, PandasOrderIndicator.SingleMetric):
|
||||
return PandasOrderIndicator.SingleMetric(self.metric + other.metric)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __radd__(self, other):
|
||||
if isinstance(other, (int, float)):
|
||||
return PandasOrderIndicator.SingleMetric(other + self.metric)
|
||||
elif isinstance(other, PandasOrderIndicator.SingleMetric):
|
||||
return PandasOrderIndicator.SingleMetric(other.metric + self.metric)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __sub__(self, other):
|
||||
if isinstance(other, (int, float)):
|
||||
return PandasOrderIndicator.SingleMetric(self.metric - other)
|
||||
elif isinstance(other, PandasOrderIndicator.SingleMetric):
|
||||
return PandasOrderIndicator.SingleMetric(self.metric - other.metric)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __rsub__(self, other):
|
||||
if isinstance(other, (int, float)):
|
||||
return PandasOrderIndicator.SingleMetric(other - self.metric)
|
||||
elif isinstance(other, PandasOrderIndicator.SingleMetric):
|
||||
return PandasOrderIndicator.SingleMetric(other.metric - self.metric)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __mul__(self, other):
|
||||
if isinstance(other, (int, float)):
|
||||
return PandasOrderIndicator.SingleMetric(self.metric * other)
|
||||
elif isinstance(other, PandasOrderIndicator.SingleMetric):
|
||||
return PandasOrderIndicator.SingleMetric(self.metric * other.metric)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __truediv__(self, other):
|
||||
if isinstance(other, (int, float)):
|
||||
return PandasOrderIndicator.SingleMetric(self.metric / other)
|
||||
elif isinstance(other, PandasOrderIndicator.SingleMetric):
|
||||
return PandasOrderIndicator.SingleMetric(self.metric / other.metric)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, (int, float)):
|
||||
return PandasOrderIndicator.SingleMetric(self.metric == other)
|
||||
elif isinstance(other, PandasOrderIndicator.SingleMetric):
|
||||
return PandasOrderIndicator.SingleMetric(self.metric == other.metric)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __gt__(self, other):
|
||||
if isinstance(other, (int, float)):
|
||||
return PandasOrderIndicator.SingleMetric(self.metric < other)
|
||||
elif isinstance(other, PandasOrderIndicator.SingleMetric):
|
||||
return PandasOrderIndicator.SingleMetric(self.metric < other.metric)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __lt__(self, other):
|
||||
if isinstance(other, (int, float)):
|
||||
return PandasOrderIndicator.SingleMetric(self.metric > other)
|
||||
elif isinstance(other, PandasOrderIndicator.SingleMetric):
|
||||
return PandasOrderIndicator.SingleMetric(self.metric > other.metric)
|
||||
else:
|
||||
return NotImplemented
|
||||
|
||||
def __len__(self):
|
||||
return len(self.metric)
|
||||
|
||||
def sum(self):
|
||||
return self.metric.sum()
|
||||
|
||||
def mean(self):
|
||||
return self.metric.mean()
|
||||
|
||||
def count(self):
|
||||
return self.metric.count()
|
||||
|
||||
def abs(self):
|
||||
return PandasOrderIndicator.SingleMetric(self.metric.abs())
|
||||
|
||||
def astype(self, type):
|
||||
return PandasOrderIndicator.SingleMetric(self.metric.astype(type))
|
||||
|
||||
@property
|
||||
def empty(self):
|
||||
return self.metric.empty
|
||||
|
||||
def add(self, other, fill_value: None):
|
||||
return PandasOrderIndicator.SingleMetric(self.metric.add(other.metric, fill_value = fill_value))
|
||||
|
||||
def apply(self, map_dict: dict):
|
||||
return PandasOrderIndicator.SingleMetric(self.metric.apply(map_dict))
|
||||
|
||||
def __init__(self):
|
||||
self.data: Dict[str, self.SingleMetric] = OrderedDict()
|
||||
|
||||
def assign(self, col: str, metric: Union[dict, pd.Series]):
|
||||
self.data[col] = self.SingleMetric(metric)
|
||||
|
||||
def transfer(self, func: "Callable", new_col: str = None):
|
||||
func_sig = inspect.signature(func).parameters.keys()
|
||||
func_kwargs = {sig: self.data[sig] for sig in func_sig}
|
||||
tmp_metric = func(**func_kwargs)
|
||||
if(new_col is not None):
|
||||
self.data[new_col] = tmp_metric
|
||||
return tmp_metric
|
||||
|
||||
def get_metric_series(self, metric: str):
|
||||
if(metric in self.data):
|
||||
return self.data[metric].metric
|
||||
else:
|
||||
return pd.Series()
|
||||
|
||||
@staticmethod
|
||||
def agg_all_indicators(indicators: list, metrics: Union[str, List[str]], fill_value = None):
|
||||
metric_dict = {}
|
||||
if isinstance(metrics, str):
|
||||
metrics = [metrics]
|
||||
for metric in metrics:
|
||||
tmp_metric = PandasOrderIndicator.SingleMetric({})
|
||||
for indicator in indicators:
|
||||
tmp_metric = tmp_metric.add(indicator.data[metric], fill_value)
|
||||
metric_dict[metric] = tmp_metric.metric
|
||||
return metric_dict
|
||||
Reference in New Issue
Block a user