1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-02 02:21:18 +08:00
Files
qlib/qlib/data/ops.py
Pengrong Zhu c276de4040 Fix backtest (#719)
* modify FileStorage to support multiple freqs

* modify backtest's sample documentation

* change the logging level of read data exception from error to debug

* fix the backtest exception when volume is 0 or np.nan

* fix test_storage.py

* add backtest_daily

* modify backtest_daily's docstring

* add __repr__/__str__ to Position

* fix the bug of nested_decision_execution example

Co-authored-by: Young <afe.young@gmail.com>
Co-authored-by: you-n-g <you-n-g@users.noreply.github.com>
2021-12-07 19:04:23 +08:00

1584 lines
40 KiB
Python

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from __future__ import division
from __future__ import print_function
import sys
import abc
import numpy as np
import pandas as pd
from typing import Union, List, Type
from scipy.stats import percentileofscore
from .base import Expression, ExpressionOps, Feature
from ..config import C
from ..log import get_module_logger
from ..utils import get_callable_kwargs
try:
from ._libs.rolling import rolling_slope, rolling_rsquare, rolling_resi
from ._libs.expanding import expanding_slope, expanding_rsquare, expanding_resi
except ImportError:
print(
"#### Do not import qlib package in the repository directory in case of importing qlib from . without compiling #####"
)
raise
np.seterr(invalid="ignore")
#################### Element-Wise Operator ####################
class ElemOperator(ExpressionOps):
"""Element-wise Operator
Parameters
----------
feature : Expression
feature instance
Returns
----------
Expression
feature operation output
"""
def __init__(self, feature):
self.feature = feature
def __str__(self):
return "{}({})".format(type(self).__name__, self.feature)
def get_longest_back_rolling(self):
return self.feature.get_longest_back_rolling()
def get_extended_window_size(self):
return self.feature.get_extended_window_size()
class NpElemOperator(ElemOperator):
"""Numpy Element-wise Operator
Parameters
----------
feature : Expression
feature instance
func : str
numpy feature operation method
Returns
----------
Expression
feature operation output
"""
def __init__(self, feature, func):
self.func = func
super(NpElemOperator, self).__init__(feature)
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
return getattr(np, self.func)(series)
class Abs(NpElemOperator):
"""Feature Absolute Value
Parameters
----------
feature : Expression
feature instance
Returns
----------
Expression
a feature instance with absolute output
"""
def __init__(self, feature):
super(Abs, self).__init__(feature, "abs")
class Sign(NpElemOperator):
"""Feature Sign
Parameters
----------
feature : Expression
feature instance
Returns
----------
Expression
a feature instance with sign
"""
def __init__(self, feature):
super(Sign, self).__init__(feature, "sign")
def _load_internal(self, instrument, start_index, end_index, freq):
"""
To avoid error raised by bool type input, we transform the data into float32.
"""
series = self.feature.load(instrument, start_index, end_index, freq)
# TODO: More precision types should be configurable
series = series.astype(np.float32)
return getattr(np, self.func)(series)
class Log(NpElemOperator):
"""Feature Log
Parameters
----------
feature : Expression
feature instance
Returns
----------
Expression
a feature instance with log
"""
def __init__(self, feature):
super(Log, self).__init__(feature, "log")
class Power(NpElemOperator):
"""Feature Power
Parameters
----------
feature : Expression
feature instance
Returns
----------
Expression
a feature instance with power
"""
def __init__(self, feature, exponent):
super(Power, self).__init__(feature, "power")
self.exponent = exponent
def __str__(self):
return "{}({},{})".format(type(self).__name__, self.feature, self.exponent)
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
return getattr(np, self.func)(series, self.exponent)
class Mask(NpElemOperator):
"""Feature Mask
Parameters
----------
feature : Expression
feature instance
instrument : str
instrument mask
Returns
----------
Expression
a feature instance with masked instrument
"""
def __init__(self, feature, instrument):
super(Mask, self).__init__(feature, "mask")
self.instrument = instrument
def __str__(self):
return "{}({},{})".format(type(self).__name__, self.feature, self.instrument.lower())
def _load_internal(self, instrument, start_index, end_index, freq):
return self.feature.load(self.instrument, start_index, end_index, freq)
class Not(NpElemOperator):
"""Not Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
feature elementwise not output
"""
def __init__(self, feature):
super(Not, self).__init__(feature, "bitwise_not")
#################### Pair-Wise Operator ####################
class PairOperator(ExpressionOps):
"""Pair-wise operator
Parameters
----------
feature_left : Expression
feature instance or numeric value
feature_right : Expression
feature instance or numeric value
func : str
operator function
Returns
----------
Feature:
two features' operation output
"""
def __init__(self, feature_left, feature_right):
self.feature_left = feature_left
self.feature_right = feature_right
def __str__(self):
return "{}({},{})".format(type(self).__name__, self.feature_left, self.feature_right)
def get_longest_back_rolling(self):
if isinstance(self.feature_left, Expression):
left_br = self.feature_left.get_longest_back_rolling()
else:
left_br = 0
if isinstance(self.feature_right, Expression):
right_br = self.feature_right.get_longest_back_rolling()
else:
right_br = 0
return max(left_br, right_br)
def get_extended_window_size(self):
if isinstance(self.feature_left, Expression):
ll, lr = self.feature_left.get_extended_window_size()
else:
ll, lr = 0, 0
if isinstance(self.feature_right, Expression):
rl, rr = self.feature_right.get_extended_window_size()
else:
rl, rr = 0, 0
return max(ll, rl), max(lr, rr)
class NpPairOperator(PairOperator):
"""Numpy Pair-wise operator
Parameters
----------
feature_left : Expression
feature instance or numeric value
feature_right : Expression
feature instance or numeric value
func : str
operator function
Returns
----------
Feature:
two features' operation output
"""
def __init__(self, feature_left, feature_right, func):
self.func = func
super(NpPairOperator, self).__init__(feature_left, feature_right)
def _load_internal(self, instrument, start_index, end_index, freq):
assert any(
[isinstance(self.feature_left, Expression), self.feature_right, Expression]
), "at least one of two inputs is Expression instance"
if isinstance(self.feature_left, Expression):
series_left = self.feature_left.load(instrument, start_index, end_index, freq)
else:
series_left = self.feature_left # numeric value
if isinstance(self.feature_right, Expression):
series_right = self.feature_right.load(instrument, start_index, end_index, freq)
else:
series_right = self.feature_right
check_length = isinstance(series_left, (np.ndarray, pd.Series)) and isinstance(
series_right, (np.ndarray, pd.Series)
)
if check_length:
warning_info = (
f"Loading {instrument}: {str(self)}; np.{self.func}(series_left, series_right), "
f"The length of series_left and series_right is different: ({len(series_left)}, {len(series_right)}), "
f"series_left is {str(self.feature_left)}, series_right is {str(self.feature_right)}. Please check the data"
)
else:
warning_info = (
f"Loading {instrument}: {str(self)}; np.{self.func}(series_left, series_right), "
f"series_left is {str(self.feature_left)}, series_right is {str(self.feature_right)}. Please check the data"
)
try:
res = getattr(np, self.func)(series_left, series_right)
except ValueError as e:
get_module_logger("ops").debug(warning_info)
raise ValueError(f"{str(e)}. \n\t{warning_info}")
else:
if check_length and len(series_left) != len(series_right):
get_module_logger("ops").debug(warning_info)
return res
class Add(NpPairOperator):
"""Add Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
two features' sum
"""
def __init__(self, feature_left, feature_right):
super(Add, self).__init__(feature_left, feature_right, "add")
class Sub(NpPairOperator):
"""Subtract Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
two features' subtraction
"""
def __init__(self, feature_left, feature_right):
super(Sub, self).__init__(feature_left, feature_right, "subtract")
class Mul(NpPairOperator):
"""Multiply Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
two features' product
"""
def __init__(self, feature_left, feature_right):
super(Mul, self).__init__(feature_left, feature_right, "multiply")
class Div(NpPairOperator):
"""Division Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
two features' division
"""
def __init__(self, feature_left, feature_right):
super(Div, self).__init__(feature_left, feature_right, "divide")
class Greater(NpPairOperator):
"""Greater Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
greater elements taken from the input two features
"""
def __init__(self, feature_left, feature_right):
super(Greater, self).__init__(feature_left, feature_right, "maximum")
class Less(NpPairOperator):
"""Less Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
smaller elements taken from the input two features
"""
def __init__(self, feature_left, feature_right):
super(Less, self).__init__(feature_left, feature_right, "minimum")
class Gt(NpPairOperator):
"""Greater Than Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
bool series indicate `left > right`
"""
def __init__(self, feature_left, feature_right):
super(Gt, self).__init__(feature_left, feature_right, "greater")
class Ge(NpPairOperator):
"""Greater Equal Than Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
bool series indicate `left >= right`
"""
def __init__(self, feature_left, feature_right):
super(Ge, self).__init__(feature_left, feature_right, "greater_equal")
class Lt(NpPairOperator):
"""Less Than Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
bool series indicate `left < right`
"""
def __init__(self, feature_left, feature_right):
super(Lt, self).__init__(feature_left, feature_right, "less")
class Le(NpPairOperator):
"""Less Equal Than Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
bool series indicate `left <= right`
"""
def __init__(self, feature_left, feature_right):
super(Le, self).__init__(feature_left, feature_right, "less_equal")
class Eq(NpPairOperator):
"""Equal Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
bool series indicate `left == right`
"""
def __init__(self, feature_left, feature_right):
super(Eq, self).__init__(feature_left, feature_right, "equal")
class Ne(NpPairOperator):
"""Not Equal Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
bool series indicate `left != right`
"""
def __init__(self, feature_left, feature_right):
super(Ne, self).__init__(feature_left, feature_right, "not_equal")
class And(NpPairOperator):
"""And Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
two features' row by row & output
"""
def __init__(self, feature_left, feature_right):
super(And, self).__init__(feature_left, feature_right, "bitwise_and")
class Or(NpPairOperator):
"""Or Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
two features' row by row | outputs
"""
def __init__(self, feature_left, feature_right):
super(Or, self).__init__(feature_left, feature_right, "bitwise_or")
#################### Triple-wise Operator ####################
class If(ExpressionOps):
"""If Operator
Parameters
----------
condition : Expression
feature instance with bool values as condition
feature_left : Expression
feature instance
feature_right : Expression
feature instance
"""
def __init__(self, condition, feature_left, feature_right):
self.condition = condition
self.feature_left = feature_left
self.feature_right = feature_right
def __str__(self):
return "If({},{},{})".format(self.condition, self.feature_left, self.feature_right)
def _load_internal(self, instrument, start_index, end_index, freq):
series_cond = self.condition.load(instrument, start_index, end_index, freq)
if isinstance(self.feature_left, Expression):
series_left = self.feature_left.load(instrument, start_index, end_index, freq)
else:
series_left = self.feature_left
if isinstance(self.feature_right, Expression):
series_right = self.feature_right.load(instrument, start_index, end_index, freq)
else:
series_right = self.feature_right
series = pd.Series(np.where(series_cond, series_left, series_right), index=series_cond.index)
return series
def get_longest_back_rolling(self):
if isinstance(self.feature_left, Expression):
left_br = self.feature_left.get_longest_back_rolling()
else:
left_br = 0
if isinstance(self.feature_right, Expression):
right_br = self.feature_right.get_longest_back_rolling()
else:
right_br = 0
if isinstance(self.condition, Expression):
c_br = self.condition.get_longest_back_rolling()
else:
c_br = 0
return max(left_br, right_br, c_br)
def get_extended_window_size(self):
if isinstance(self.feature_left, Expression):
ll, lr = self.feature_left.get_extended_window_size()
else:
ll, lr = 0, 0
if isinstance(self.feature_right, Expression):
rl, rr = self.feature_right.get_extended_window_size()
else:
rl, rr = 0, 0
if isinstance(self.condition, Expression):
cl, cr = self.condition.get_extended_window_size()
else:
cl, cr = 0, 0
return max(ll, rl, cl), max(lr, rr, cr)
#################### Rolling ####################
# NOTE: methods like `rolling.mean` are optimized with cython,
# and are super faster than `rolling.apply(np.mean)`
class Rolling(ExpressionOps):
"""Rolling Operator
The meaning of rolling and expanding is the same in pandas.
When the window is set to 0, the behaviour of the operator should follow `expanding`
Otherwise, it follows `rolling`
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
func : str
rolling method
Returns
----------
Expression
rolling outputs
"""
def __init__(self, feature, N, func):
self.feature = feature
self.N = N
self.func = func
def __str__(self):
return "{}({},{})".format(type(self).__name__, self.feature, self.N)
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
# NOTE: remove all null check,
# now it's user's responsibility to decide whether use features in null days
# isnull = series.isnull() # NOTE: isnull = NaN, inf is not null
if self.N == 0:
series = getattr(series.expanding(min_periods=1), self.func)()
elif 0 < self.N < 1:
series = series.ewm(alpha=self.N, min_periods=1).mean()
else:
series = getattr(series.rolling(self.N, min_periods=1), self.func)()
# series.iloc[:self.N-1] = np.nan
# series[isnull] = np.nan
return series
def get_longest_back_rolling(self):
if self.N == 0:
return np.inf
if 0 < self.N < 1:
return int(np.log(1e-6) / np.log(1 - self.N)) # (1 - N)**window == 1e-6
return self.feature.get_longest_back_rolling() + self.N - 1
def get_extended_window_size(self):
if self.N == 0:
# FIXME: How to make this accurate and efficiently? Or should we
# remove such support for N == 0?
get_module_logger(self.__class__.__name__).warning("The Rolling(ATTR, 0) will not be accurately calculated")
return self.feature.get_extended_window_size()
elif 0 < self.N < 1:
lft_etd, rght_etd = self.feature.get_extended_window_size()
size = int(np.log(1e-6) / np.log(1 - self.N))
lft_etd = max(lft_etd + size - 1, lft_etd)
return lft_etd, rght_etd
else:
lft_etd, rght_etd = self.feature.get_extended_window_size()
lft_etd = max(lft_etd + self.N - 1, lft_etd)
return lft_etd, rght_etd
class Ref(Rolling):
"""Feature Reference
Parameters
----------
feature : Expression
feature instance
N : int
N = 0, retrieve the first data; N > 0, retrieve data of N periods ago; N < 0, future data
Returns
----------
Expression
a feature instance with target reference
"""
def __init__(self, feature, N):
super(Ref, self).__init__(feature, N, "ref")
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
# N = 0, return first day
if series.empty:
return series # Pandas bug, see: https://github.com/pandas-dev/pandas/issues/21049
elif self.N == 0:
series = pd.Series(series.iloc[0], index=series.index)
else:
series = series.shift(self.N) # copy
return series
def get_longest_back_rolling(self):
if self.N == 0:
return np.inf
return self.feature.get_longest_back_rolling() + self.N
def get_extended_window_size(self):
if self.N == 0:
get_module_logger(self.__class__.__name__).warning("The Ref(ATTR, 0) will not be accurately calculated")
return self.feature.get_extended_window_size()
else:
lft_etd, rght_etd = self.feature.get_extended_window_size()
lft_etd = max(lft_etd + self.N, lft_etd)
rght_etd = max(rght_etd - self.N, rght_etd)
return lft_etd, rght_etd
class Mean(Rolling):
"""Rolling Mean (MA)
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling average
"""
def __init__(self, feature, N):
super(Mean, self).__init__(feature, N, "mean")
class Sum(Rolling):
"""Rolling Sum
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling sum
"""
def __init__(self, feature, N):
super(Sum, self).__init__(feature, N, "sum")
class Std(Rolling):
"""Rolling Std
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling std
"""
def __init__(self, feature, N):
super(Std, self).__init__(feature, N, "std")
class Var(Rolling):
"""Rolling Variance
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling variance
"""
def __init__(self, feature, N):
super(Var, self).__init__(feature, N, "var")
class Skew(Rolling):
"""Rolling Skewness
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling skewness
"""
def __init__(self, feature, N):
if N != 0 and N < 3:
raise ValueError("The rolling window size of Skewness operation should >= 3")
super(Skew, self).__init__(feature, N, "skew")
class Kurt(Rolling):
"""Rolling Kurtosis
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling kurtosis
"""
def __init__(self, feature, N):
if N != 0 and N < 4:
raise ValueError("The rolling window size of Kurtosis operation should >= 5")
super(Kurt, self).__init__(feature, N, "kurt")
class Max(Rolling):
"""Rolling Max
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling max
"""
def __init__(self, feature, N):
super(Max, self).__init__(feature, N, "max")
class IdxMax(Rolling):
"""Rolling Max Index
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling max index
"""
def __init__(self, feature, N):
super(IdxMax, self).__init__(feature, N, "idxmax")
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
if self.N == 0:
series = series.expanding(min_periods=1).apply(lambda x: x.argmax() + 1, raw=True)
else:
series = series.rolling(self.N, min_periods=1).apply(lambda x: x.argmax() + 1, raw=True)
return series
class Min(Rolling):
"""Rolling Min
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling min
"""
def __init__(self, feature, N):
super(Min, self).__init__(feature, N, "min")
class IdxMin(Rolling):
"""Rolling Min Index
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling min index
"""
def __init__(self, feature, N):
super(IdxMin, self).__init__(feature, N, "idxmin")
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
if self.N == 0:
series = series.expanding(min_periods=1).apply(lambda x: x.argmin() + 1, raw=True)
else:
series = series.rolling(self.N, min_periods=1).apply(lambda x: x.argmin() + 1, raw=True)
return series
class Quantile(Rolling):
"""Rolling Quantile
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling quantile
"""
def __init__(self, feature, N, qscore):
super(Quantile, self).__init__(feature, N, "quantile")
self.qscore = qscore
def __str__(self):
return "{}({},{},{})".format(type(self).__name__, self.feature, self.N, self.qscore)
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
if self.N == 0:
series = series.expanding(min_periods=1).quantile(self.qscore)
else:
series = series.rolling(self.N, min_periods=1).quantile(self.qscore)
return series
class Med(Rolling):
"""Rolling Median
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling median
"""
def __init__(self, feature, N):
super(Med, self).__init__(feature, N, "median")
class Mad(Rolling):
"""Rolling Mean Absolute Deviation
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling mean absolute deviation
"""
def __init__(self, feature, N):
super(Mad, self).__init__(feature, N, "mad")
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
# TODO: implement in Cython
def mad(x):
x1 = x[~np.isnan(x)]
return np.mean(np.abs(x1 - x1.mean()))
if self.N == 0:
series = series.expanding(min_periods=1).apply(mad, raw=True)
else:
series = series.rolling(self.N, min_periods=1).apply(mad, raw=True)
return series
class Rank(Rolling):
"""Rolling Rank (Percentile)
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling rank
"""
def __init__(self, feature, N):
super(Rank, self).__init__(feature, N, "rank")
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
# TODO: implement in Cython
def rank(x):
if np.isnan(x[-1]):
return np.nan
x1 = x[~np.isnan(x)]
if x1.shape[0] == 0:
return np.nan
return percentileofscore(x1, x1[-1]) / len(x1)
if self.N == 0:
series = series.expanding(min_periods=1).apply(rank, raw=True)
else:
series = series.rolling(self.N, min_periods=1).apply(rank, raw=True)
return series
class Count(Rolling):
"""Rolling Count
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling count of number of non-NaN elements
"""
def __init__(self, feature, N):
super(Count, self).__init__(feature, N, "count")
class Delta(Rolling):
"""Rolling Delta
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with end minus start in rolling window
"""
def __init__(self, feature, N):
super(Delta, self).__init__(feature, N, "delta")
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
if self.N == 0:
series = series - series.iloc[0]
else:
series = series - series.shift(self.N)
return series
# TODO:
# support pair-wise rolling like `Slope(A, B, N)`
class Slope(Rolling):
"""Rolling Slope
This operator calculate the slope between `idx` and `feature`.
(e.g. [<feature_t1>, <feature_t2>, <feature_t3>] and [1, 2, 3])
Usage Example:
- "Slope($close, %d)/$close"
# TODO:
# Some users may want pair-wise rolling like `Slope(A, B, N)`
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with linear regression slope of given window
"""
def __init__(self, feature, N):
super(Slope, self).__init__(feature, N, "slope")
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
if self.N == 0:
series = pd.Series(expanding_slope(series.values), index=series.index)
else:
series = pd.Series(rolling_slope(series.values, self.N), index=series.index)
return series
class Rsquare(Rolling):
"""Rolling R-value Square
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with linear regression r-value square of given window
"""
def __init__(self, feature, N):
super(Rsquare, self).__init__(feature, N, "rsquare")
def _load_internal(self, instrument, start_index, end_index, freq):
_series = self.feature.load(instrument, start_index, end_index, freq)
if self.N == 0:
series = pd.Series(expanding_rsquare(_series.values), index=_series.index)
else:
series = pd.Series(rolling_rsquare(_series.values, self.N), index=_series.index)
series.loc[np.isclose(_series.rolling(self.N, min_periods=1).std(), 0, atol=2e-05)] = np.nan
return series
class Resi(Rolling):
"""Rolling Regression Residuals
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with regression residuals of given window
"""
def __init__(self, feature, N):
super(Resi, self).__init__(feature, N, "resi")
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
if self.N == 0:
series = pd.Series(expanding_resi(series.values), index=series.index)
else:
series = pd.Series(rolling_resi(series.values, self.N), index=series.index)
return series
class WMA(Rolling):
"""Rolling WMA
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with weighted moving average output
"""
def __init__(self, feature, N):
super(WMA, self).__init__(feature, N, "wma")
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
# TODO: implement in Cython
def weighted_mean(x):
w = np.arange(len(x))
w = w / w.sum()
return np.nanmean(w * x)
if self.N == 0:
series = series.expanding(min_periods=1).apply(weighted_mean, raw=True)
else:
series = series.rolling(self.N, min_periods=1).apply(weighted_mean, raw=True)
return series
class EMA(Rolling):
"""Rolling Exponential Mean (EMA)
Parameters
----------
feature : Expression
feature instance
N : int, float
rolling window size
Returns
----------
Expression
a feature instance with regression r-value square of given window
"""
def __init__(self, feature, N):
super(EMA, self).__init__(feature, N, "ema")
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
def exp_weighted_mean(x):
a = 1 - 2 / (1 + len(x))
w = a ** np.arange(len(x))[::-1]
w /= w.sum()
return np.nansum(w * x)
if self.N == 0:
series = series.expanding(min_periods=1).apply(exp_weighted_mean, raw=True)
elif 0 < self.N < 1:
series = series.ewm(alpha=self.N, min_periods=1).mean()
else:
series = series.ewm(span=self.N, min_periods=1).mean()
return series
#################### Pair-Wise Rolling ####################
class PairRolling(ExpressionOps):
"""Pair Rolling Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling output of two input features
"""
def __init__(self, feature_left, feature_right, N, func):
self.feature_left = feature_left
self.feature_right = feature_right
self.N = N
self.func = func
def __str__(self):
return "{}({},{},{})".format(type(self).__name__, self.feature_left, self.feature_right, self.N)
def _load_internal(self, instrument, start_index, end_index, freq):
series_left = self.feature_left.load(instrument, start_index, end_index, freq)
series_right = self.feature_right.load(instrument, start_index, end_index, freq)
if self.N == 0:
series = getattr(series_left.expanding(min_periods=1), self.func)(series_right)
else:
series = getattr(series_left.rolling(self.N, min_periods=1), self.func)(series_right)
return series
def get_longest_back_rolling(self):
if self.N == 0:
return np.inf
return (
max(self.feature_left.get_longest_back_rolling(), self.feature_right.get_longest_back_rolling())
+ self.N
- 1
)
def get_extended_window_size(self):
if self.N == 0:
get_module_logger(self.__class__.__name__).warning(
"The PairRolling(ATTR, 0) will not be accurately calculated"
)
return self.feature.get_extended_window_size()
else:
ll, lr = self.feature_left.get_extended_window_size()
rl, rr = self.feature_right.get_extended_window_size()
return max(ll, rl) + self.N - 1, max(lr, rr)
class Corr(PairRolling):
"""Rolling Correlation
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling correlation of two input features
"""
def __init__(self, feature_left, feature_right, N):
super(Corr, self).__init__(feature_left, feature_right, N, "corr")
def _load_internal(self, instrument, start_index, end_index, freq):
res: pd.Series = super(Corr, self)._load_internal(instrument, start_index, end_index, freq)
# NOTE: Load uses MemCache, so calling load again will not cause performance degradation
series_left = self.feature_left.load(instrument, start_index, end_index, freq)
series_right = self.feature_right.load(instrument, start_index, end_index, freq)
res.loc[
np.isclose(series_left.rolling(self.N, min_periods=1).std(), 0, atol=2e-05)
| np.isclose(series_right.rolling(self.N, min_periods=1).std(), 0, atol=2e-05)
] = np.nan
return res
class Cov(PairRolling):
"""Rolling Covariance
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling max of two input features
"""
def __init__(self, feature_left, feature_right, N):
super(Cov, self).__init__(feature_left, feature_right, N, "cov")
OpsList = [
Ref,
Max,
Min,
Sum,
Mean,
Std,
Var,
Skew,
Kurt,
Med,
Mad,
Slope,
Rsquare,
Resi,
Rank,
Quantile,
Count,
EMA,
WMA,
Corr,
Cov,
Delta,
Abs,
Sign,
Log,
Power,
Add,
Sub,
Mul,
Div,
Greater,
Less,
And,
Or,
Not,
Gt,
Ge,
Lt,
Le,
Eq,
Ne,
Mask,
IdxMax,
IdxMin,
If,
Feature,
]
class OpsWrapper:
"""Ops Wrapper"""
def __init__(self):
self._ops = {}
def reset(self):
self._ops = {}
def register(self, ops_list: List[Union[Type[ExpressionOps], dict]]):
"""register operator
Parameters
----------
ops_list : List[Union[Type[ExpressionOps], dict]]
- if type(ops_list) is List[Type[ExpressionOps]], each element of ops_list represents the operator class, which should be the subclass of `ExpressionOps`.
- if type(ops_list) is List[dict], each element of ops_list represents the config of operator, which has the following format:
{
"class": class_name,
"module_path": path,
}
Note: `class` should be the class name of operator, `module_path` should be a python module or path of file.
"""
for _operator in ops_list:
if isinstance(_operator, dict):
_ops_class, _ = get_callable_kwargs(_operator)
else:
_ops_class = _operator
if not issubclass(_ops_class, Expression):
raise TypeError("operator must be subclass of ExpressionOps, not {}".format(_ops_class))
if _ops_class.__name__ in self._ops:
get_module_logger(self.__class__.__name__).warning(
"The custom operator [{}] will override the qlib default definition".format(_ops_class.__name__)
)
self._ops[_ops_class.__name__] = _ops_class
def __getattr__(self, key):
if key not in self._ops:
raise AttributeError("The operator [{0}] is not registered".format(key))
return self._ops[key]
Operators = OpsWrapper()
def register_all_ops(C):
"""register all operator"""
logger = get_module_logger("ops")
Operators.reset()
Operators.register(OpsList)
if getattr(C, "custom_ops", None) is not None:
Operators.register(C.custom_ops)
logger.debug("register custom operator {}".format(C.custom_ops))