1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-01 18:11:18 +08:00
Files
qlib/qlib/data/ops.py
2020-10-10 10:13:51 +08:00

1416 lines
34 KiB
Python

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from __future__ import division
from __future__ import print_function
import numpy as np
import pandas as pd
from .base import Expression, ExpressionOps
from ..log import get_module_logger
try:
from ._libs.rolling import rolling_slope, rolling_rsquare, rolling_resi
from ._libs.expanding import expanding_slope, expanding_rsquare, expanding_resi
except ImportError as err:
print(err)
print("Do not import qlib package in the repository directory")
exit(-1)
__all__ = (
"Ref",
"Max",
"Min",
"Sum",
"Mean",
"Std",
"Var",
"Skew",
"Kurt",
"Med",
"Mad",
"Slope",
"Rsquare",
"Resi",
"Rank",
"Quantile",
"Count",
"EMA",
"WMA",
"Corr",
"Cov",
"Delta",
"Abs",
"Sign",
"Log",
"Power",
"Add",
"Sub",
"Mul",
"Div",
"Greater",
"Less",
"And",
"Or",
"Not",
"Gt",
"Ge",
"Lt",
"Le",
"Eq",
"Ne",
"Mask",
"IdxMax",
"IdxMin",
"If",
)
np.seterr(invalid="ignore")
#################### Element-Wise Operator ####################
class ElemOperator(ExpressionOps):
"""Element-wise Operator
Parameters
----------
feature : Expression
feature instance
func : str
feature operation method
Returns
----------
Expression
feature operation output
"""
def __init__(self, feature, func):
self.feature = feature
self.func = func
def __str__(self):
return "{}({})".format(type(self).__name__, self.feature)
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
return getattr(np, self.func)(series)
def get_longest_back_rolling(self):
return self.feature.get_longest_back_rolling()
def get_extended_window_size(self):
return self.feature.get_extended_window_size()
class Abs(ElemOperator):
"""Feature Absolute Value
Parameters
----------
feature : Expression
feature instance
Returns
----------
Expression
a feature instance with absolute output
"""
def __init__(self, feature):
super(Abs, self).__init__(feature, "abs")
class Sign(ElemOperator):
"""Feature Sign
Parameters
----------
feature : Expression
feature instance
Returns
----------
Expression
a feature instance with sign
"""
def __init__(self, feature):
super(Sign, self).__init__(feature, "sign")
class Log(ElemOperator):
"""Feature Log
Parameters
----------
feature : Expression
feature instance
Returns
----------
Expression
a feature instance with log
"""
def __init__(self, feature):
super(Log, self).__init__(feature, "log")
class Power(ElemOperator):
"""Feature Power
Parameters
----------
feature : Expression
feature instance
Returns
----------
Expression
a feature instance with power
"""
def __init__(self, feature, exponent):
super(Power, self).__init__(feature, "power")
self.exponent = exponent
def __str__(self):
return "{}({},{})".format(type(self).__name__, self.feature, self.exponent)
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
return getattr(np, self.func)(series, self.exponent)
class Mask(ElemOperator):
"""Feature Mask
Parameters
----------
feature : Expression
feature instance
instrument : str
instrument mask
Returns
----------
Expression
a feature instance with masked instrument
"""
def __init__(self, feature, instrument):
super(Mask, self).__init__(feature, "mask")
self.instrument = instrument
def __str__(self):
return "{}({},{})".format(type(self).__name__, self.feature, self.instrument.lower())
def _load_internal(self, instrument, start_index, end_index, freq):
return self.feature.load(self.instrument, start_index, end_index, freq)
class Not(ElemOperator):
"""Not Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
feature elementwise not output
"""
def __init__(self, feature):
super(Not, self).__init__(feature, "bitwise_not")
#################### Pair-Wise Operator ####################
class PairOperator(ExpressionOps):
"""Pair-wise operator
Parameters
----------
feature_left : Expression
feature instance or numeric value
feature_right : Expression
feature instance or numeric value
func : str
operator function
Returns
----------
Feature:
two features' operation output
"""
def __init__(self, feature_left, feature_right, func):
self.feature_left = feature_left
self.feature_right = feature_right
self.func = func
def __str__(self):
return "{}({},{})".format(type(self).__name__, self.feature_left, self.feature_right)
def _load_internal(self, instrument, start_index, end_index, freq):
assert any(
[isinstance(self.feature_left, Expression), self.feature_right, Expression]
), "at least one of two inputs is Expression instance"
if isinstance(self.feature_left, Expression):
series_left = self.feature_left.load(instrument, start_index, end_index, freq)
else:
series_left = self.feature_left # numeric value
if isinstance(self.feature_right, Expression):
series_right = self.feature_right.load(instrument, start_index, end_index, freq)
else:
series_right = self.feature_right
return getattr(np, self.func)(series_left, series_right)
def get_longest_back_rolling(self):
if isinstance(self.feature_left, Expression):
left_br = self.feature_left.get_longest_back_rolling()
else:
left_br = 0
if isinstance(self.feature_right, Expression):
right_br = self.feature_right.get_longest_back_rolling()
else:
right_br = 0
return max(left_br, right_br)
def get_extended_window_size(self):
if isinstance(self.feature_left, Expression):
ll, lr = self.feature_left.get_extended_window_size()
else:
ll, lr = 0, 0
if isinstance(self.feature_right, Expression):
rl, rr = self.feature_right.get_extended_window_size()
else:
rl, rr = 0, 0
return max(ll, rl), max(lr, rr)
class Add(PairOperator):
"""Add Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
two features' sum
"""
def __init__(self, feature_left, feature_right):
super(Add, self).__init__(feature_left, feature_right, "add")
class Sub(PairOperator):
"""Subtract Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
two features' subtraction
"""
def __init__(self, feature_left, feature_right):
super(Sub, self).__init__(feature_left, feature_right, "subtract")
class Mul(PairOperator):
"""Multiply Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
two features' product
"""
def __init__(self, feature_left, feature_right):
super(Mul, self).__init__(feature_left, feature_right, "multiply")
class Div(PairOperator):
"""Division Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
two features' division
"""
def __init__(self, feature_left, feature_right):
super(Div, self).__init__(feature_left, feature_right, "divide")
class Greater(PairOperator):
"""Greater Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
greater elements taken from the input two features
"""
def __init__(self, feature_left, feature_right):
super(Greater, self).__init__(feature_left, feature_right, "maximum")
class Less(PairOperator):
"""Less Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
smaller elements taken from the input two features
"""
def __init__(self, feature_left, feature_right):
super(Less, self).__init__(feature_left, feature_right, "minimum")
class Gt(PairOperator):
"""Greater Than Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
bool series indicate `left > right`
"""
def __init__(self, feature_left, feature_right):
super(Gt, self).__init__(feature_left, feature_right, "greater")
class Ge(PairOperator):
"""Greater Equal Than Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
bool series indicate `left >= right`
"""
def __init__(self, feature_left, feature_right):
super(Ge, self).__init__(feature_left, feature_right, "greater_equal")
class Lt(PairOperator):
"""Less Than Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
bool series indicate `left < right`
"""
def __init__(self, feature_left, feature_right):
super(Lt, self).__init__(feature_left, feature_right, "less")
class Le(PairOperator):
"""Less Equal Than Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
bool series indicate `left <= right`
"""
def __init__(self, feature_left, feature_right):
super(Le, self).__init__(feature_left, feature_right, "less_equal")
class Eq(PairOperator):
"""Equal Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
bool series indicate `left == right`
"""
def __init__(self, feature_left, feature_right):
super(Eq, self).__init__(feature_left, feature_right, "equal")
class Ne(PairOperator):
"""Not Equal Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
bool series indicate `left != right`
"""
def __init__(self, feature_left, feature_right):
super(Ne, self).__init__(feature_left, feature_right, "not_equal")
class And(PairOperator):
"""And Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
two features' row by row & output
"""
def __init__(self, feature_left, feature_right):
super(And, self).__init__(feature_left, feature_right, "bitwise_and")
class Or(PairOperator):
"""Or Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
Returns
----------
Feature:
two features' row by row | outputs
"""
def __init__(self, feature_left, feature_right):
super(Or, self).__init__(feature_left, feature_right, "bitwise_or")
#################### Triple-wise Operator ####################
class If(ExpressionOps):
"""If Operator
Parameters
----------
condition : Expression
feature instance with bool values as condition
feature_left : Expression
feature instance
feature_right : Expression
feature instance
"""
def __init__(self, condition, feature_left, feature_right):
self.condition = condition
self.feature_left = feature_left
self.feature_right = feature_right
def __str__(self):
return "If({},{},{})".format(self.condition, self.feature_left, self.feature_right)
def _load_internal(self, instrument, start_index, end_index, freq):
series_cond = self.condition.load(instrument, start_index, end_index, freq)
if isinstance(self.feature_left, Expression):
series_left = self.feature_left.load(instrument, start_index, end_index, freq)
else:
series_left = self.feature_left
if isinstance(self.feature_right, Expression):
series_right = self.feature_right.load(instrument, start_index, end_index, freq)
else:
series_right = self.feature_right
series = pd.Series(np.where(series_cond, series_left, series_right), index=series_cond.index)
return series
def get_longest_back_rolling(self):
if isinstance(self.feature_left, Expression):
left_br = self.feature_left.get_longest_back_rolling()
else:
left_br = 0
if isinstance(self.feature_right, Expression):
right_br = self.feature_right.get_longest_back_rolling()
else:
right_br = 0
if isinstance(self.condition, Expression):
c_br = self.condition.get_longest_back_rolling()
else:
c_br = 0
return max(left_br, right_br, c_br)
def get_extended_window_size(self):
if isinstance(self.feature_left, Expression):
ll, lr = self.feature_left.get_extended_window_size()
else:
ll, lr = 0, 0
if isinstance(self.feature_right, Expression):
rl, rr = self.feature_right.get_extended_window_size()
else:
rl, rr = 0, 0
if isinstance(self.condition, Expression):
cl, cr = self.condition.get_extended_window_size()
else:
cl, cr = 0, 0
return max(ll, rl, cl), max(lr, rr, cr)
#################### Rolling ####################
# NOTE: methods like `rolling.mean` are optimized with cython,
# and are super faster than `rolling.apply(np.mean)`
class Rolling(ExpressionOps):
"""Rolling Operator
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
func : str
rolling method
Returns
----------
Expression
rolling outputs
"""
def __init__(self, feature, N, func):
self.feature = feature
self.N = N
self.func = func
def __str__(self):
return "{}({},{})".format(type(self).__name__, self.feature, self.N)
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
# NOTE: remove all null check,
# now it's user's responsibility to decide whether use features in null days
# isnull = series.isnull() # NOTE: isnull = NaN, inf is not null
if self.N == 0:
series = getattr(series.expanding(min_periods=1), self.func)()
else:
series = getattr(series.rolling(self.N, min_periods=1), self.func)()
# series.iloc[:self.N-1] = np.nan
# series[isnull] = np.nan
return series
def get_longest_back_rolling(self):
if self.N == 0:
return np.inf
return self.feature.get_longest_back_rolling() + self.N - 1
def get_extended_window_size(self):
if self.N == 0:
# FIXME: How to make this accurate and efficiently? Or should we
# remove such support for N == 0?
get_module_logger(self.__class__.__name__).warning("The Rolling(ATTR, 0) will not be accurately calculated")
return self.feature.get_extended_window_size()
else:
lft_etd, rght_etd = self.feature.get_extended_window_size()
lft_etd = max(lft_etd + self.N - 1, lft_etd)
return lft_etd, rght_etd
class Ref(Rolling):
"""Feature Reference
Parameters
----------
feature : Expression
feature instance
N : int
N = 0, retrieve the first data; N > 0, retrieve data of N periods ago; N < 0, future data
Returns
----------
Expression
a feature instance with target reference
"""
def __init__(self, feature, N):
super(Ref, self).__init__(feature, N, "ref")
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
# N = 0, return first day
if series.empty:
return series # Pandas bug, see: https://github.com/pandas-dev/pandas/issues/21049
elif self.N == 0:
series = pd.Series(series.iloc[0], index=series.index)
else:
series = series.shift(self.N) # copy
return series
def get_longest_back_rolling(self):
if self.N == 0:
return np.inf
return self.feature.get_longest_back_rolling() + self.N
def get_extended_window_size(self):
if self.N == 0:
get_module_logger(self.__class__.__name__).warning("The Ref(ATTR, 0) will not be accurately calculated")
return self.feature.get_extended_window_size()
else:
lft_etd, rght_etd = self.feature.get_extended_window_size()
lft_etd = max(lft_etd + self.N, lft_etd)
rght_etd = max(rght_etd - self.N, rght_etd)
return lft_etd, rght_etd
class Mean(Rolling):
"""Rolling Mean (MA)
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling average
"""
def __init__(self, feature, N):
super(Mean, self).__init__(feature, N, "mean")
class Sum(Rolling):
"""Rolling Sum
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling sum
"""
def __init__(self, feature, N):
super(Sum, self).__init__(feature, N, "sum")
class Std(Rolling):
"""Rolling Std
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling std
"""
def __init__(self, feature, N):
super(Std, self).__init__(feature, N, "std")
class Var(Rolling):
"""Rolling Variance
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling variance
"""
def __init__(self, feature, N):
super(Var, self).__init__(feature, N, "var")
class Skew(Rolling):
"""Rolling Skewness
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling skewness
"""
def __init__(self, feature, N):
super(Skew, self).__init__(feature, N, "skew")
class Kurt(Rolling):
"""Rolling Kurtosis
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling kurtosis
"""
def __init__(self, feature, N):
super(Kurt, self).__init__(feature, N, "kurt")
class Max(Rolling):
"""Rolling Max
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling max
"""
def __init__(self, feature, N):
super(Max, self).__init__(feature, N, "max")
class IdxMax(Rolling):
"""Rolling Max Index
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling max index
"""
def __init__(self, feature, N):
super(IdxMax, self).__init__(feature, N, "idxmax")
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
if self.N == 0:
series = series.expanding(min_periods=1).apply(lambda x: x.argmax() + 1, raw=True)
else:
series = series.rolling(self.N, min_periods=1).apply(lambda x: x.argmax() + 1, raw=True)
return series
class Min(Rolling):
"""Rolling Min
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling min
"""
def __init__(self, feature, N):
super(Min, self).__init__(feature, N, "min")
class IdxMin(Rolling):
"""Rolling Min Index
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling min index
"""
def __init__(self, feature, N):
super(IdxMin, self).__init__(feature, N, "idxmin")
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
if self.N == 0:
series = series.expanding(min_periods=1).apply(lambda x: x.argmin() + 1, raw=True)
else:
series = series.rolling(self.N, min_periods=1).apply(lambda x: x.argmin() + 1, raw=True)
return series
class Quantile(Rolling):
"""Rolling Quantile
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling quantile
"""
def __init__(self, feature, N, qscore):
super(Quantile, self).__init__(feature, N, "quantile")
self.qscore = qscore
def __str__(self):
return "{}({},{},{})".format(type(self).__name__, self.feature, self.N, self.qscore)
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
if self.N == 0:
series = series.expanding(min_periods=1).quantile(self.qscore)
else:
series = series.rolling(self.N, min_periods=1).quantile(self.qscore)
return series
class Med(Rolling):
"""Rolling Median
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling median
"""
def __init__(self, feature, N):
super(Med, self).__init__(feature, N, "median")
class Mad(Rolling):
"""Rolling Mean Absolute Deviation
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling mean absolute deviation
"""
def __init__(self, feature, N):
super(Mad, self).__init__(feature, N, "mad")
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
# TODO: implement in Cython
def mad(x):
x1 = x[~np.isnan(x)]
return np.mean(np.abs(x1 - x1.mean()))
if self.N == 0:
series = series.expanding(min_periods=1).apply(mad, raw=True)
else:
series = series.rolling(self.N, min_periods=1).apply(mad, raw=True)
return series
class Rank(Rolling):
"""Rolling Rank (Percentile)
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling rank
"""
def __init__(self, feature, N):
super(Rank, self).__init__(feature, N, "rank")
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
# TODO: implement in Cython
def rank(x):
if np.isnan(x[-1]):
return np.nan
x1 = x[~np.isnan(x)]
if x1.shape[0] == 0:
return np.nan
return (x1.argsort()[-1] + 1) / len(x1)
if self.N == 0:
series = series.expanding(min_periods=1).apply(rank, raw=True)
else:
series = series.rolling(self.N, min_periods=1).apply(rank, raw=True)
return series
class Count(Rolling):
"""Rolling Count
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling count of number of non-NaN elements
"""
def __init__(self, feature, N):
super(Count, self).__init__(feature, N, "count")
class Delta(Rolling):
"""Rolling Delta
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with end minus start in rolling window
"""
def __init__(self, feature, N):
super(Delta, self).__init__(feature, N, "delta")
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
if self.N == 0:
series = series - series.iloc[0]
else:
series = series - series.shift(self.N)
return series
# TODO:
# support pair-wise rolling like `Slope(A, B, N)`
class Slope(Rolling):
"""Rolling Slope
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with regression slope of given window
"""
def __init__(self, feature, N):
super(Slope, self).__init__(feature, N, "slope")
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
if self.N == 0:
series = pd.Series(expanding_slope(series.values), index=series.index)
else:
series = pd.Series(rolling_slope(series.values, self.N), index=series.index)
return series
class Rsquare(Rolling):
"""Rolling R-value Square
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with regression r-value square of given window
"""
def __init__(self, feature, N):
super(Rsquare, self).__init__(feature, N, "rsquare")
def _load_internal(self, instrument, start_index, end_index, freq):
_series = self.feature.load(instrument, start_index, end_index, freq)
if self.N == 0:
series = pd.Series(expanding_rsquare(_series.values), index=_series.index)
else:
series = pd.Series(rolling_rsquare(_series.values, self.N), index=_series.index)
series.loc[np.isclose(_series.rolling(self.N, min_periods=1).std(), 0, atol=2e-05)] = np.nan
return series
class Resi(Rolling):
"""Rolling Regression Residuals
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with regression residuals of given window
"""
def __init__(self, feature, N):
super(Resi, self).__init__(feature, N, "resi")
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
if self.N == 0:
series = pd.Series(expanding_resi(series.values), index=series.index)
else:
series = pd.Series(rolling_resi(series.values, self.N), index=series.index)
return series
class WMA(Rolling):
"""Rolling WMA
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with weighted moving average output
"""
def __init__(self, feature, N):
super(WMA, self).__init__(feature, N, "wma")
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
# TODO: implement in Cython
def weighted_mean(x):
w = np.arange(len(x))
w /= w.sum()
return np.nanmean(w * x)
if self.N == 0:
series = series.expanding(min_periods=1).apply(weighted_mean, raw=True)
else:
series = series.rolling(self.N, min_periods=1).apply(weighted_mean, raw=True)
return series
class EMA(Rolling):
"""Rolling Exponential Mean (EMA)
Parameters
----------
feature : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with regression r-value square of given window
"""
def __init__(self, feature, N):
super(EMA, self).__init__(feature, N, "ema")
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
def exp_weighted_mean(x):
a = 1 - 2 / (1 + len(x))
w = a ** np.arange(len(x))[::-1]
w /= w.sum()
return np.nansum(w * x)
if self.N == 0:
series = series.expanding(min_periods=1).apply(exp_weighted_mean, raw=True)
else:
series = series.ewm(span=self.N, min_periods=1).mean()
return series
#################### Pair-Wise Rolling ####################
class PairRolling(ExpressionOps):
"""Pair Rolling Operator
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling output of two input features
"""
def __init__(self, feature_left, feature_right, N, func):
self.feature_left = feature_left
self.feature_right = feature_right
self.N = N
self.func = func
def __str__(self):
return "{}({},{},{})".format(type(self).__name__, self.feature_left, self.feature_right, self.N)
def _load_internal(self, instrument, start_index, end_index, freq):
series_left = self.feature_left.load(instrument, start_index, end_index, freq)
series_right = self.feature_right.load(instrument, start_index, end_index, freq)
if self.N == 0:
series = getattr(series_left.expanding(min_periods=1), self.func)(series_right)
else:
series = getattr(series_left.rolling(self.N, min_periods=1), self.func)(series_right)
return series
def get_longest_back_rolling(self):
if self.N == 0:
return np.inf
return (
max(self.feature_left.get_longest_back_rolling(), self.feature_right.get_longest_back_rolling())
+ self.N
- 1
)
def get_extended_window_size(self):
if self.N == 0:
get_module_logger(self.__class__.__name__).warning(
"The PairRolling(ATTR, 0) will not be accurately calculated"
)
return self.feature.get_extended_window_size()
else:
ll, lr = self.feature_left.get_extended_window_size()
rl, rr = self.feature_right.get_extended_window_size()
return max(ll, rl) + self.N - 1, max(lr, rr)
class Corr(PairRolling):
"""Rolling Correlation
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling correlation of two input features
"""
def __init__(self, feature_left, feature_right, N):
super(Corr, self).__init__(feature_left, feature_right, N, "corr")
def _load_internal(self, instrument, start_index, end_index, freq):
res = super(Corr, self)._load_internal(instrument, start_index, end_index, freq)
# NOTE: Load uses MemCache, so calling load again will not cause performance degradation
series_left = self.feature_left.load(instrument, start_index, end_index, freq)
series_right = self.feature_right.load(instrument, start_index, end_index, freq)
res.loc[
np.isclose(series_left.rolling(self.N, min_periods=1).std(), 0, atol=2e-05)
| np.isclose(series_right.rolling(self.N, min_periods=1).std(), 0, atol=2e-05)
] = np.nan
return res
class Cov(PairRolling):
"""Rolling Covariance
Parameters
----------
feature_left : Expression
feature instance
feature_right : Expression
feature instance
N : int
rolling window size
Returns
----------
Expression
a feature instance with rolling max of two input features
"""
def __init__(self, feature_left, feature_right, N):
super(Cov, self).__init__(feature_left, feature_right, N, "cov")