# Copyright (c) Microsoft Corporation. # Licensed under the MIT License. from __future__ import division from __future__ import print_function import numpy as np import pandas as pd from scipy.stats import percentileofscore from .base import Expression, ExpressionOps from ..log import get_module_logger try: from ._libs.rolling import rolling_slope, rolling_rsquare, rolling_resi from ._libs.expanding import expanding_slope, expanding_rsquare, expanding_resi except ImportError as err: print(err) print("Do not import qlib package in the repository directory") exit(-1) __all__ = ( "Ref", "Max", "Min", "Sum", "Mean", "Std", "Var", "Skew", "Kurt", "Med", "Mad", "Slope", "Rsquare", "Resi", "Rank", "Quantile", "Count", "EMA", "WMA", "Corr", "Cov", "Delta", "Abs", "Sign", "Log", "Power", "Add", "Sub", "Mul", "Div", "Greater", "Less", "And", "Or", "Not", "Gt", "Ge", "Lt", "Le", "Eq", "Ne", "Mask", "IdxMax", "IdxMin", "If", ) np.seterr(invalid="ignore") #################### Element-Wise Operator #################### class ElemOperator(ExpressionOps): """Element-wise Operator Parameters ---------- feature : Expression feature instance func : str feature operation method Returns ---------- Expression feature operation output """ def __init__(self, feature, func): self.feature = feature self.func = func def __str__(self): return "{}({})".format(type(self).__name__, self.feature) def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) return getattr(np, self.func)(series) def get_longest_back_rolling(self): return self.feature.get_longest_back_rolling() def get_extended_window_size(self): return self.feature.get_extended_window_size() class Abs(ElemOperator): """Feature Absolute Value Parameters ---------- feature : Expression feature instance Returns ---------- Expression a feature instance with absolute output """ def __init__(self, feature): super(Abs, self).__init__(feature, "abs") class Sign(ElemOperator): """Feature Sign Parameters ---------- feature : Expression feature instance Returns ---------- Expression a feature instance with sign """ def __init__(self, feature): super(Sign, self).__init__(feature, "sign") class Log(ElemOperator): """Feature Log Parameters ---------- feature : Expression feature instance Returns ---------- Expression a feature instance with log """ def __init__(self, feature): super(Log, self).__init__(feature, "log") class Power(ElemOperator): """Feature Power Parameters ---------- feature : Expression feature instance Returns ---------- Expression a feature instance with power """ def __init__(self, feature, exponent): super(Power, self).__init__(feature, "power") self.exponent = exponent def __str__(self): return "{}({},{})".format(type(self).__name__, self.feature, self.exponent) def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) return getattr(np, self.func)(series, self.exponent) class Mask(ElemOperator): """Feature Mask Parameters ---------- feature : Expression feature instance instrument : str instrument mask Returns ---------- Expression a feature instance with masked instrument """ def __init__(self, feature, instrument): super(Mask, self).__init__(feature, "mask") self.instrument = instrument def __str__(self): return "{}({},{})".format(type(self).__name__, self.feature, self.instrument.lower()) def _load_internal(self, instrument, start_index, end_index, freq): return self.feature.load(self.instrument, start_index, end_index, freq) class Not(ElemOperator): """Not Operator Parameters ---------- feature_left : Expression feature instance feature_right : Expression feature instance Returns ---------- Feature: feature elementwise not output """ def __init__(self, feature): super(Not, self).__init__(feature, "bitwise_not") #################### Pair-Wise Operator #################### class PairOperator(ExpressionOps): """Pair-wise operator Parameters ---------- feature_left : Expression feature instance or numeric value feature_right : Expression feature instance or numeric value func : str operator function Returns ---------- Feature: two features' operation output """ def __init__(self, feature_left, feature_right, func): self.feature_left = feature_left self.feature_right = feature_right self.func = func def __str__(self): return "{}({},{})".format(type(self).__name__, self.feature_left, self.feature_right) def _load_internal(self, instrument, start_index, end_index, freq): assert any( [isinstance(self.feature_left, Expression), self.feature_right, Expression] ), "at least one of two inputs is Expression instance" if isinstance(self.feature_left, Expression): series_left = self.feature_left.load(instrument, start_index, end_index, freq) else: series_left = self.feature_left # numeric value if isinstance(self.feature_right, Expression): series_right = self.feature_right.load(instrument, start_index, end_index, freq) else: series_right = self.feature_right return getattr(np, self.func)(series_left, series_right) def get_longest_back_rolling(self): if isinstance(self.feature_left, Expression): left_br = self.feature_left.get_longest_back_rolling() else: left_br = 0 if isinstance(self.feature_right, Expression): right_br = self.feature_right.get_longest_back_rolling() else: right_br = 0 return max(left_br, right_br) def get_extended_window_size(self): if isinstance(self.feature_left, Expression): ll, lr = self.feature_left.get_extended_window_size() else: ll, lr = 0, 0 if isinstance(self.feature_right, Expression): rl, rr = self.feature_right.get_extended_window_size() else: rl, rr = 0, 0 return max(ll, rl), max(lr, rr) class Add(PairOperator): """Add Operator Parameters ---------- feature_left : Expression feature instance feature_right : Expression feature instance Returns ---------- Feature: two features' sum """ def __init__(self, feature_left, feature_right): super(Add, self).__init__(feature_left, feature_right, "add") class Sub(PairOperator): """Subtract Operator Parameters ---------- feature_left : Expression feature instance feature_right : Expression feature instance Returns ---------- Feature: two features' subtraction """ def __init__(self, feature_left, feature_right): super(Sub, self).__init__(feature_left, feature_right, "subtract") class Mul(PairOperator): """Multiply Operator Parameters ---------- feature_left : Expression feature instance feature_right : Expression feature instance Returns ---------- Feature: two features' product """ def __init__(self, feature_left, feature_right): super(Mul, self).__init__(feature_left, feature_right, "multiply") class Div(PairOperator): """Division Operator Parameters ---------- feature_left : Expression feature instance feature_right : Expression feature instance Returns ---------- Feature: two features' division """ def __init__(self, feature_left, feature_right): super(Div, self).__init__(feature_left, feature_right, "divide") class Greater(PairOperator): """Greater Operator Parameters ---------- feature_left : Expression feature instance feature_right : Expression feature instance Returns ---------- Feature: greater elements taken from the input two features """ def __init__(self, feature_left, feature_right): super(Greater, self).__init__(feature_left, feature_right, "maximum") class Less(PairOperator): """Less Operator Parameters ---------- feature_left : Expression feature instance feature_right : Expression feature instance Returns ---------- Feature: smaller elements taken from the input two features """ def __init__(self, feature_left, feature_right): super(Less, self).__init__(feature_left, feature_right, "minimum") class Gt(PairOperator): """Greater Than Operator Parameters ---------- feature_left : Expression feature instance feature_right : Expression feature instance Returns ---------- Feature: bool series indicate `left > right` """ def __init__(self, feature_left, feature_right): super(Gt, self).__init__(feature_left, feature_right, "greater") class Ge(PairOperator): """Greater Equal Than Operator Parameters ---------- feature_left : Expression feature instance feature_right : Expression feature instance Returns ---------- Feature: bool series indicate `left >= right` """ def __init__(self, feature_left, feature_right): super(Ge, self).__init__(feature_left, feature_right, "greater_equal") class Lt(PairOperator): """Less Than Operator Parameters ---------- feature_left : Expression feature instance feature_right : Expression feature instance Returns ---------- Feature: bool series indicate `left < right` """ def __init__(self, feature_left, feature_right): super(Lt, self).__init__(feature_left, feature_right, "less") class Le(PairOperator): """Less Equal Than Operator Parameters ---------- feature_left : Expression feature instance feature_right : Expression feature instance Returns ---------- Feature: bool series indicate `left <= right` """ def __init__(self, feature_left, feature_right): super(Le, self).__init__(feature_left, feature_right, "less_equal") class Eq(PairOperator): """Equal Operator Parameters ---------- feature_left : Expression feature instance feature_right : Expression feature instance Returns ---------- Feature: bool series indicate `left == right` """ def __init__(self, feature_left, feature_right): super(Eq, self).__init__(feature_left, feature_right, "equal") class Ne(PairOperator): """Not Equal Operator Parameters ---------- feature_left : Expression feature instance feature_right : Expression feature instance Returns ---------- Feature: bool series indicate `left != right` """ def __init__(self, feature_left, feature_right): super(Ne, self).__init__(feature_left, feature_right, "not_equal") class And(PairOperator): """And Operator Parameters ---------- feature_left : Expression feature instance feature_right : Expression feature instance Returns ---------- Feature: two features' row by row & output """ def __init__(self, feature_left, feature_right): super(And, self).__init__(feature_left, feature_right, "bitwise_and") class Or(PairOperator): """Or Operator Parameters ---------- feature_left : Expression feature instance feature_right : Expression feature instance Returns ---------- Feature: two features' row by row | outputs """ def __init__(self, feature_left, feature_right): super(Or, self).__init__(feature_left, feature_right, "bitwise_or") #################### Triple-wise Operator #################### class If(ExpressionOps): """If Operator Parameters ---------- condition : Expression feature instance with bool values as condition feature_left : Expression feature instance feature_right : Expression feature instance """ def __init__(self, condition, feature_left, feature_right): self.condition = condition self.feature_left = feature_left self.feature_right = feature_right def __str__(self): return "If({},{},{})".format(self.condition, self.feature_left, self.feature_right) def _load_internal(self, instrument, start_index, end_index, freq): series_cond = self.condition.load(instrument, start_index, end_index, freq) if isinstance(self.feature_left, Expression): series_left = self.feature_left.load(instrument, start_index, end_index, freq) else: series_left = self.feature_left if isinstance(self.feature_right, Expression): series_right = self.feature_right.load(instrument, start_index, end_index, freq) else: series_right = self.feature_right series = pd.Series(np.where(series_cond, series_left, series_right), index=series_cond.index) return series def get_longest_back_rolling(self): if isinstance(self.feature_left, Expression): left_br = self.feature_left.get_longest_back_rolling() else: left_br = 0 if isinstance(self.feature_right, Expression): right_br = self.feature_right.get_longest_back_rolling() else: right_br = 0 if isinstance(self.condition, Expression): c_br = self.condition.get_longest_back_rolling() else: c_br = 0 return max(left_br, right_br, c_br) def get_extended_window_size(self): if isinstance(self.feature_left, Expression): ll, lr = self.feature_left.get_extended_window_size() else: ll, lr = 0, 0 if isinstance(self.feature_right, Expression): rl, rr = self.feature_right.get_extended_window_size() else: rl, rr = 0, 0 if isinstance(self.condition, Expression): cl, cr = self.condition.get_extended_window_size() else: cl, cr = 0, 0 return max(ll, rl, cl), max(lr, rr, cr) #################### Rolling #################### # NOTE: methods like `rolling.mean` are optimized with cython, # and are super faster than `rolling.apply(np.mean)` class Rolling(ExpressionOps): """Rolling Operator Parameters ---------- feature : Expression feature instance N : int rolling window size func : str rolling method Returns ---------- Expression rolling outputs """ def __init__(self, feature, N, func): self.feature = feature self.N = N self.func = func def __str__(self): return "{}({},{})".format(type(self).__name__, self.feature, self.N) def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) # NOTE: remove all null check, # now it's user's responsibility to decide whether use features in null days # isnull = series.isnull() # NOTE: isnull = NaN, inf is not null if self.N == 0: series = getattr(series.expanding(min_periods=1), self.func)() elif 0 < self.N < 1: series = series.ewm(alpha=self.N, min_periods=1).mean() else: series = getattr(series.rolling(self.N, min_periods=1), self.func)() # series.iloc[:self.N-1] = np.nan # series[isnull] = np.nan return series def get_longest_back_rolling(self): if self.N == 0: return np.inf if 0 < self.N < 1: return int(np.log(1e-6) / np.log(1 - self.N)) # (1 - N)**window == 1e-6 return self.feature.get_longest_back_rolling() + self.N - 1 def get_extended_window_size(self): if self.N == 0: # FIXME: How to make this accurate and efficiently? Or should we # remove such support for N == 0? get_module_logger(self.__class__.__name__).warning("The Rolling(ATTR, 0) will not be accurately calculated") return self.feature.get_extended_window_size() elif 0 < self.N < 1: lft_etd, rght_etd = self.feature.get_extended_window_size() size = int(np.log(1e-6) / np.log(1 - self.N)) lft_etd = max(lft_etd + size - 1, lft_etd) return lft_etd, rght_etd else: lft_etd, rght_etd = self.feature.get_extended_window_size() lft_etd = max(lft_etd + self.N - 1, lft_etd) return lft_etd, rght_etd class Ref(Rolling): """Feature Reference Parameters ---------- feature : Expression feature instance N : int N = 0, retrieve the first data; N > 0, retrieve data of N periods ago; N < 0, future data Returns ---------- Expression a feature instance with target reference """ def __init__(self, feature, N): super(Ref, self).__init__(feature, N, "ref") def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) # N = 0, return first day if series.empty: return series # Pandas bug, see: https://github.com/pandas-dev/pandas/issues/21049 elif self.N == 0: series = pd.Series(series.iloc[0], index=series.index) else: series = series.shift(self.N) # copy return series def get_longest_back_rolling(self): if self.N == 0: return np.inf return self.feature.get_longest_back_rolling() + self.N def get_extended_window_size(self): if self.N == 0: get_module_logger(self.__class__.__name__).warning("The Ref(ATTR, 0) will not be accurately calculated") return self.feature.get_extended_window_size() else: lft_etd, rght_etd = self.feature.get_extended_window_size() lft_etd = max(lft_etd + self.N, lft_etd) rght_etd = max(rght_etd - self.N, rght_etd) return lft_etd, rght_etd class Mean(Rolling): """Rolling Mean (MA) Parameters ---------- feature : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with rolling average """ def __init__(self, feature, N): super(Mean, self).__init__(feature, N, "mean") class Sum(Rolling): """Rolling Sum Parameters ---------- feature : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with rolling sum """ def __init__(self, feature, N): super(Sum, self).__init__(feature, N, "sum") class Std(Rolling): """Rolling Std Parameters ---------- feature : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with rolling std """ def __init__(self, feature, N): super(Std, self).__init__(feature, N, "std") class Var(Rolling): """Rolling Variance Parameters ---------- feature : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with rolling variance """ def __init__(self, feature, N): super(Var, self).__init__(feature, N, "var") class Skew(Rolling): """Rolling Skewness Parameters ---------- feature : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with rolling skewness """ def __init__(self, feature, N): super(Skew, self).__init__(feature, N, "skew") class Kurt(Rolling): """Rolling Kurtosis Parameters ---------- feature : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with rolling kurtosis """ def __init__(self, feature, N): super(Kurt, self).__init__(feature, N, "kurt") class Max(Rolling): """Rolling Max Parameters ---------- feature : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with rolling max """ def __init__(self, feature, N): super(Max, self).__init__(feature, N, "max") class IdxMax(Rolling): """Rolling Max Index Parameters ---------- feature : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with rolling max index """ def __init__(self, feature, N): super(IdxMax, self).__init__(feature, N, "idxmax") def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) if self.N == 0: series = series.expanding(min_periods=1).apply(lambda x: x.argmax() + 1, raw=True) else: series = series.rolling(self.N, min_periods=1).apply(lambda x: x.argmax() + 1, raw=True) return series class Min(Rolling): """Rolling Min Parameters ---------- feature : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with rolling min """ def __init__(self, feature, N): super(Min, self).__init__(feature, N, "min") class IdxMin(Rolling): """Rolling Min Index Parameters ---------- feature : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with rolling min index """ def __init__(self, feature, N): super(IdxMin, self).__init__(feature, N, "idxmin") def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) if self.N == 0: series = series.expanding(min_periods=1).apply(lambda x: x.argmin() + 1, raw=True) else: series = series.rolling(self.N, min_periods=1).apply(lambda x: x.argmin() + 1, raw=True) return series class Quantile(Rolling): """Rolling Quantile Parameters ---------- feature : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with rolling quantile """ def __init__(self, feature, N, qscore): super(Quantile, self).__init__(feature, N, "quantile") self.qscore = qscore def __str__(self): return "{}({},{},{})".format(type(self).__name__, self.feature, self.N, self.qscore) def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) if self.N == 0: series = series.expanding(min_periods=1).quantile(self.qscore) else: series = series.rolling(self.N, min_periods=1).quantile(self.qscore) return series class Med(Rolling): """Rolling Median Parameters ---------- feature : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with rolling median """ def __init__(self, feature, N): super(Med, self).__init__(feature, N, "median") class Mad(Rolling): """Rolling Mean Absolute Deviation Parameters ---------- feature : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with rolling mean absolute deviation """ def __init__(self, feature, N): super(Mad, self).__init__(feature, N, "mad") def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) # TODO: implement in Cython def mad(x): x1 = x[~np.isnan(x)] return np.mean(np.abs(x1 - x1.mean())) if self.N == 0: series = series.expanding(min_periods=1).apply(mad, raw=True) else: series = series.rolling(self.N, min_periods=1).apply(mad, raw=True) return series class Rank(Rolling): """Rolling Rank (Percentile) Parameters ---------- feature : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with rolling rank """ def __init__(self, feature, N): super(Rank, self).__init__(feature, N, "rank") def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) # TODO: implement in Cython def rank(x): if np.isnan(x[-1]): return np.nan x1 = x[~np.isnan(x)] if x1.shape[0] == 0: return np.nan return percentileofscore(x1, x1[-1]) / len(x1) if self.N == 0: series = series.expanding(min_periods=1).apply(rank, raw=True) else: series = series.rolling(self.N, min_periods=1).apply(rank, raw=True) return series class Count(Rolling): """Rolling Count Parameters ---------- feature : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with rolling count of number of non-NaN elements """ def __init__(self, feature, N): super(Count, self).__init__(feature, N, "count") class Delta(Rolling): """Rolling Delta Parameters ---------- feature : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with end minus start in rolling window """ def __init__(self, feature, N): super(Delta, self).__init__(feature, N, "delta") def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) if self.N == 0: series = series - series.iloc[0] else: series = series - series.shift(self.N) return series # TODO: # support pair-wise rolling like `Slope(A, B, N)` class Slope(Rolling): """Rolling Slope Parameters ---------- feature : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with regression slope of given window """ def __init__(self, feature, N): super(Slope, self).__init__(feature, N, "slope") def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) if self.N == 0: series = pd.Series(expanding_slope(series.values), index=series.index) else: series = pd.Series(rolling_slope(series.values, self.N), index=series.index) return series class Rsquare(Rolling): """Rolling R-value Square Parameters ---------- feature : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with regression r-value square of given window """ def __init__(self, feature, N): super(Rsquare, self).__init__(feature, N, "rsquare") def _load_internal(self, instrument, start_index, end_index, freq): _series = self.feature.load(instrument, start_index, end_index, freq) if self.N == 0: series = pd.Series(expanding_rsquare(_series.values), index=_series.index) else: series = pd.Series(rolling_rsquare(_series.values, self.N), index=_series.index) series.loc[np.isclose(_series.rolling(self.N, min_periods=1).std(), 0, atol=2e-05)] = np.nan return series class Resi(Rolling): """Rolling Regression Residuals Parameters ---------- feature : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with regression residuals of given window """ def __init__(self, feature, N): super(Resi, self).__init__(feature, N, "resi") def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) if self.N == 0: series = pd.Series(expanding_resi(series.values), index=series.index) else: series = pd.Series(rolling_resi(series.values, self.N), index=series.index) return series class WMA(Rolling): """Rolling WMA Parameters ---------- feature : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with weighted moving average output """ def __init__(self, feature, N): super(WMA, self).__init__(feature, N, "wma") def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) # TODO: implement in Cython def weighted_mean(x): w = np.arange(len(x)) w /= w.sum() return np.nanmean(w * x) if self.N == 0: series = series.expanding(min_periods=1).apply(weighted_mean, raw=True) else: series = series.rolling(self.N, min_periods=1).apply(weighted_mean, raw=True) return series class EMA(Rolling): """Rolling Exponential Mean (EMA) Parameters ---------- feature : Expression feature instance N : int, float rolling window size Returns ---------- Expression a feature instance with regression r-value square of given window """ def __init__(self, feature, N): super(EMA, self).__init__(feature, N, "ema") def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) def exp_weighted_mean(x): a = 1 - 2 / (1 + len(x)) w = a ** np.arange(len(x))[::-1] w /= w.sum() return np.nansum(w * x) if self.N == 0: series = series.expanding(min_periods=1).apply(exp_weighted_mean, raw=True) elif 0 < self.N < 1: series = series.ewm(alpha=self.N, min_periods=1).mean() else: series = series.ewm(span=self.N, min_periods=1).mean() return series #################### Pair-Wise Rolling #################### class PairRolling(ExpressionOps): """Pair Rolling Operator Parameters ---------- feature_left : Expression feature instance feature_right : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with rolling output of two input features """ def __init__(self, feature_left, feature_right, N, func): self.feature_left = feature_left self.feature_right = feature_right self.N = N self.func = func def __str__(self): return "{}({},{},{})".format(type(self).__name__, self.feature_left, self.feature_right, self.N) def _load_internal(self, instrument, start_index, end_index, freq): series_left = self.feature_left.load(instrument, start_index, end_index, freq) series_right = self.feature_right.load(instrument, start_index, end_index, freq) if self.N == 0: series = getattr(series_left.expanding(min_periods=1), self.func)(series_right) else: series = getattr(series_left.rolling(self.N, min_periods=1), self.func)(series_right) return series def get_longest_back_rolling(self): if self.N == 0: return np.inf return ( max(self.feature_left.get_longest_back_rolling(), self.feature_right.get_longest_back_rolling()) + self.N - 1 ) def get_extended_window_size(self): if self.N == 0: get_module_logger(self.__class__.__name__).warning( "The PairRolling(ATTR, 0) will not be accurately calculated" ) return self.feature.get_extended_window_size() else: ll, lr = self.feature_left.get_extended_window_size() rl, rr = self.feature_right.get_extended_window_size() return max(ll, rl) + self.N - 1, max(lr, rr) class Corr(PairRolling): """Rolling Correlation Parameters ---------- feature_left : Expression feature instance feature_right : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with rolling correlation of two input features """ def __init__(self, feature_left, feature_right, N): super(Corr, self).__init__(feature_left, feature_right, N, "corr") def _load_internal(self, instrument, start_index, end_index, freq): res = super(Corr, self)._load_internal(instrument, start_index, end_index, freq) # NOTE: Load uses MemCache, so calling load again will not cause performance degradation series_left = self.feature_left.load(instrument, start_index, end_index, freq) series_right = self.feature_right.load(instrument, start_index, end_index, freq) res.loc[ np.isclose(series_left.rolling(self.N, min_periods=1).std(), 0, atol=2e-05) | np.isclose(series_right.rolling(self.N, min_periods=1).std(), 0, atol=2e-05) ] = np.nan return res class Cov(PairRolling): """Rolling Covariance Parameters ---------- feature_left : Expression feature instance feature_right : Expression feature instance N : int rolling window size Returns ---------- Expression a feature instance with rolling max of two input features """ def __init__(self, feature_left, feature_right, N): super(Cov, self).__init__(feature_left, feature_right, N, "cov")