mirror of
https://github.com/microsoft/qlib.git
synced 2026-07-04 19:41:00 +08:00
Merge branch 'main' of https://github.com/you-n-g/qlib into main
This commit is contained in:
@@ -14,7 +14,7 @@ cdef class Expanding(object):
|
||||
cdef int na_count
|
||||
def __init__(self):
|
||||
self.na_count = 0
|
||||
|
||||
|
||||
cdef double update(self, double val):
|
||||
pass
|
||||
|
||||
@@ -25,7 +25,7 @@ cdef class Mean(Expanding):
|
||||
def __init__(self):
|
||||
super(Mean, self).__init__()
|
||||
self.vsum = 0
|
||||
|
||||
|
||||
cdef double update(self, double val):
|
||||
self.barv.push_back(val)
|
||||
if isnan(val):
|
||||
@@ -62,7 +62,7 @@ cdef class Slope(Expanding):
|
||||
return (N*self.xy_sum - self.x_sum*self.y_sum) / \
|
||||
(N*self.x2_sum - self.x_sum*self.x_sum)
|
||||
|
||||
|
||||
|
||||
cdef class Resi(Expanding):
|
||||
"""1-D array expanding residuals"""
|
||||
cdef double x_sum
|
||||
@@ -94,7 +94,7 @@ cdef class Resi(Expanding):
|
||||
interp = y_mean - slope*x_mean
|
||||
return val - (slope*size + interp)
|
||||
|
||||
|
||||
|
||||
cdef class Rsquare(Expanding):
|
||||
"""1-D array expanding rsquare"""
|
||||
cdef double x_sum
|
||||
@@ -117,7 +117,7 @@ cdef class Rsquare(Expanding):
|
||||
self.na_count += 1
|
||||
else:
|
||||
self.x_sum += size
|
||||
self.x2_sum += size
|
||||
self.x2_sum += size * size
|
||||
self.y_sum += val
|
||||
self.y2_sum += val * val
|
||||
self.xy_sum += size * val
|
||||
@@ -126,7 +126,7 @@ cdef class Rsquare(Expanding):
|
||||
sqrt((N*self.x2_sum - self.x_sum*self.x_sum) * (N*self.y2_sum - self.y_sum*self.y_sum))
|
||||
return rvalue * rvalue
|
||||
|
||||
|
||||
|
||||
cdef np.ndarray[double, ndim=1] expanding(Expanding r, np.ndarray a):
|
||||
cdef int i
|
||||
cdef int N = len(a)
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
import abc
|
||||
import bisect
|
||||
import logging
|
||||
from typing import Union, Tuple, List
|
||||
from typing import Union, Tuple, List, Iterator, Optional
|
||||
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
@@ -113,8 +113,7 @@ class DataHandler(Serializable):
|
||||
CS_ALL = "__all"
|
||||
|
||||
def _fetch_df_by_col(self, df: pd.DataFrame, col_set: str) -> pd.DataFrame:
|
||||
cln = len(df.columns.levels)
|
||||
if cln == 1:
|
||||
if not isinstance(df.columns, pd.MultiIndex):
|
||||
return df
|
||||
elif col_set == self.CS_ALL:
|
||||
return df.droplevel(axis=1, level=0)
|
||||
@@ -126,6 +125,7 @@ class DataHandler(Serializable):
|
||||
selector: Union[pd.Timestamp, slice, str],
|
||||
level: Union[str, int] = "datetime",
|
||||
col_set: Union[str, List[str]] = CS_ALL,
|
||||
squeeze: bool = False
|
||||
) -> pd.DataFrame:
|
||||
"""
|
||||
fetch data from underlying data source
|
||||
@@ -141,13 +141,22 @@ class DataHandler(Serializable):
|
||||
select a set of meaningful columns.(e.g. features, columns)
|
||||
if isinstance(col_set, List[str]):
|
||||
select several sets of meaningful columns, the returned data has multiple levels
|
||||
squeeze : bool
|
||||
whether squeeze columns and index
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame:
|
||||
"""
|
||||
df = self._fetch_df_by_index(self._data, selector, level)
|
||||
return self._fetch_df_by_col(df, col_set)
|
||||
df = self._fetch_df_by_col(df, col_set)
|
||||
if squeeze:
|
||||
# squeeze columns
|
||||
df = df.squeeze()
|
||||
# squeeze index
|
||||
if isinstance(selector, (str, pd.Timestamp)):
|
||||
df = df.reset_index(level=level, drop=True)
|
||||
return df
|
||||
|
||||
def get_cols(self, col_set=CS_ALL) -> list:
|
||||
"""
|
||||
@@ -167,6 +176,40 @@ class DataHandler(Serializable):
|
||||
df = self._fetch_df_by_col(df, col_set)
|
||||
return df.columns.to_list()
|
||||
|
||||
def get_range_selector(self, cur_date: Union[pd.Timestamp, str], periods: int) -> slice:
|
||||
"""
|
||||
get range selector by number of periods
|
||||
|
||||
Args:
|
||||
cur_date (pd.Timestamp or str): current date
|
||||
periods (int): number of periods
|
||||
"""
|
||||
trading_dates = self._data.index.unique(level='datetime')
|
||||
cur_loc = trading_dates.get_loc(cur_date)
|
||||
pre_loc = cur_loc - periods + 1
|
||||
if pre_loc < 0:
|
||||
warnings.warn('`periods` is too large. the first date will be returned.')
|
||||
pre_loc = 0
|
||||
ref_date = trading_dates[pre_loc]
|
||||
return slice(ref_date, cur_date)
|
||||
|
||||
def get_range_iterator(self, periods: int, min_periods: Optional[int] = None,
|
||||
**kwargs) -> Iterator[Tuple[pd.Timestamp, pd.DataFrame]]:
|
||||
"""
|
||||
get a iterator of sliced data with given periods
|
||||
|
||||
Args:
|
||||
periods (int): number of periods
|
||||
min_periods (int): minimum periods for sliced dataframe
|
||||
kwargs (dict): will be passed to `self.fetch`
|
||||
"""
|
||||
trading_dates = self._data.index.unique(level='datetime')
|
||||
if min_periods is None:
|
||||
min_periods = periods
|
||||
for cur_date in trading_dates[min_periods:]:
|
||||
selector = self.get_range_selector(cur_date, periods)
|
||||
yield cur_date, self.fetch(selector, **kwargs)
|
||||
|
||||
|
||||
class DataHandlerLP(DataHandler):
|
||||
"""
|
||||
|
||||
@@ -1,78 +1,75 @@
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
import abc
|
||||
import warnings
|
||||
import pandas as pd
|
||||
from qlib.data import D
|
||||
|
||||
from typing import Tuple
|
||||
|
||||
from qlib.data import D
|
||||
|
||||
class DataLoader(ABC):
|
||||
"""
|
||||
class DataLoader(abc.ABC):
|
||||
'''
|
||||
DataLoader is designed for loading raw data from original data source.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
'''
|
||||
@abc.abstractmethod
|
||||
def load(self, instruments, start_time=None, end_time=None) -> pd.DataFrame:
|
||||
"""
|
||||
load the data as pd.DataFrame
|
||||
load the data as pd.DataFrame
|
||||
|
||||
Parameters
|
||||
----------
|
||||
self : [TODO:type]
|
||||
[TODO:description]
|
||||
instruments : [TODO:type]
|
||||
[TODO:description]
|
||||
start_time : [TODO:type]
|
||||
[TODO:description]
|
||||
end_time : [TODO:type]
|
||||
[TODO:description]
|
||||
Parameters
|
||||
----------
|
||||
self : [TODO:type]
|
||||
[TODO:description]
|
||||
instruments : [TODO:type]
|
||||
[TODO:description]
|
||||
start_time : [TODO:type]
|
||||
[TODO:description]
|
||||
end_time : [TODO:type]
|
||||
[TODO:description]
|
||||
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame:
|
||||
data load from the under layer source
|
||||
Returns
|
||||
-------
|
||||
pd.DataFrame:
|
||||
data load from the under layer source
|
||||
|
||||
Example of the data:
|
||||
The multi-index of the columns is optional.
|
||||
feature label
|
||||
$close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0
|
||||
datetime instrument
|
||||
2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032
|
||||
SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042
|
||||
SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289
|
||||
Example of the data:
|
||||
(The multi-index of the columns is optional.)
|
||||
feature label
|
||||
$close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0
|
||||
datetime instrument
|
||||
2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032
|
||||
SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042
|
||||
SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class QlibDataLoader(DataLoader):
|
||||
"""Same as QlibDataLoader. The fields can be define by config"""
|
||||
|
||||
'''Same as QlibDataLoader. The fields can be define by config'''
|
||||
def __init__(self, config: Tuple[list, tuple, dict], filter_pipe=None):
|
||||
"""
|
||||
Parameters
|
||||
----------
|
||||
config : Tuple[list ,tuple, dict]
|
||||
config : Tuple[list, tuple, dict]
|
||||
Config will be used to describe the fields and column names
|
||||
|
||||
<config> := {
|
||||
"group_name1": <fields_info1>
|
||||
"group_name2": <fields_info2>
|
||||
}
|
||||
|
||||
or
|
||||
<config> := <fields_info>
|
||||
|
||||
<fields_info> := ["expr", ...] | (["expr", ...], ["col_name", ...])
|
||||
|
||||
Here is a few examples to describe the fields
|
||||
TODO:
|
||||
"""
|
||||
self.is_group = isinstance(config, dict)
|
||||
self.is_group = isinstance(config, dict)
|
||||
|
||||
if self.is_group:
|
||||
self.fields = {grp: self._parse_fields_info(fields_info) for grp, fields_info in config.items()}
|
||||
else:
|
||||
self.fields = self._parse_fields_info(fields_info)
|
||||
self.fields = self._parse_fields_info(config)
|
||||
|
||||
self.filter_pipe = filter_pipe
|
||||
|
||||
@@ -86,14 +83,18 @@ class QlibDataLoader(DataLoader):
|
||||
return exprs, names
|
||||
|
||||
def load(self, instruments, start_time=None, end_time=None) -> pd.DataFrame:
|
||||
if isinstance(instruments, str):
|
||||
instruments = D.instruments(instruments, filter_pipe=self.filter_pipe)
|
||||
elif self.filter_pipe is not None:
|
||||
warnings.warn('`filter_pipe` is not None, but it will not be used with `instruments` as list')
|
||||
def _get_df(exprs, names):
|
||||
df = D.features(D.instruments(instruments, filter_pipe=self.filter_pipe), exprs, start_time, end_time)
|
||||
df = D.features(instruments, exprs, start_time, end_time)
|
||||
df.columns = names
|
||||
return df
|
||||
|
||||
if self.is_group:
|
||||
df = pd.concat({grp: _get_df(exprs, names) for grp, (exprs, names) in self.fields.items()}, axis=1)
|
||||
else:
|
||||
exprs, names = self.fields
|
||||
df = _get_df(exprs, names)
|
||||
df = df.swaplevel().sort_index()
|
||||
df = df.swaplevel().sort_index() # NOTE: always return <datetime, instrument>
|
||||
return df
|
||||
|
||||
@@ -8,6 +8,8 @@ from __future__ import print_function
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
from scipy.stats import percentileofscore
|
||||
|
||||
from .base import Expression, ExpressionOps
|
||||
from ..log import get_module_logger
|
||||
|
||||
@@ -687,6 +689,8 @@ class Rolling(ExpressionOps):
|
||||
# isnull = series.isnull() # NOTE: isnull = NaN, inf is not null
|
||||
if self.N == 0:
|
||||
series = getattr(series.expanding(min_periods=1), self.func)()
|
||||
elif 0 < self.N < 1:
|
||||
series = series.ewm(alpha=self.N, min_periods=1).mean()
|
||||
else:
|
||||
series = getattr(series.rolling(self.N, min_periods=1), self.func)()
|
||||
# series.iloc[:self.N-1] = np.nan
|
||||
@@ -696,6 +700,8 @@ class Rolling(ExpressionOps):
|
||||
def get_longest_back_rolling(self):
|
||||
if self.N == 0:
|
||||
return np.inf
|
||||
if 0 < self.N < 1:
|
||||
return int(np.log(1e-6) / np.log(1 - self.N)) # (1 - N)**window == 1e-6
|
||||
return self.feature.get_longest_back_rolling() + self.N - 1
|
||||
|
||||
def get_extended_window_size(self):
|
||||
@@ -704,6 +710,11 @@ class Rolling(ExpressionOps):
|
||||
# remove such support for N == 0?
|
||||
get_module_logger(self.__class__.__name__).warning("The Rolling(ATTR, 0) will not be accurately calculated")
|
||||
return self.feature.get_extended_window_size()
|
||||
elif 0 < self.N < 1:
|
||||
lft_etd, rght_etd = self.feature.get_extended_window_size()
|
||||
size = int(np.log(1e-6) / np.log(1 - self.N))
|
||||
lft_etd = max(lft_etd + size - 1, lft_etd)
|
||||
return lft_etd, rght_etd
|
||||
else:
|
||||
lft_etd, rght_etd = self.feature.get_extended_window_size()
|
||||
lft_etd = max(lft_etd + self.N - 1, lft_etd)
|
||||
@@ -1087,7 +1098,7 @@ class Rank(Rolling):
|
||||
x1 = x[~np.isnan(x)]
|
||||
if x1.shape[0] == 0:
|
||||
return np.nan
|
||||
return (x1.argsort()[-1] + 1) / len(x1)
|
||||
return percentileofscore(x1, x1[-1]) / len(x1)
|
||||
|
||||
if self.N == 0:
|
||||
series = series.expanding(min_periods=1).apply(rank, raw=True)
|
||||
@@ -1273,7 +1284,7 @@ class EMA(Rolling):
|
||||
----------
|
||||
feature : Expression
|
||||
feature instance
|
||||
N : int
|
||||
N : int, float
|
||||
rolling window size
|
||||
|
||||
Returns
|
||||
@@ -1296,6 +1307,8 @@ class EMA(Rolling):
|
||||
|
||||
if self.N == 0:
|
||||
series = series.expanding(min_periods=1).apply(exp_weighted_mean, raw=True)
|
||||
elif 0 < self.N < 1:
|
||||
series = series.ewm(alpha=self.N, min_periods=1).mean()
|
||||
else:
|
||||
series = series.ewm(span=self.N, min_periods=1).mean()
|
||||
return series
|
||||
|
||||
455
qlib/model/riskmodel.py
Normal file
455
qlib/model/riskmodel.py
Normal file
@@ -0,0 +1,455 @@
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
import warnings
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
from typing import Union
|
||||
|
||||
from qlib.model.base import BaseModel
|
||||
|
||||
|
||||
class RiskModel(BaseModel):
|
||||
"""Risk Model
|
||||
|
||||
A risk model is used to estimate the covariance matrix of stock returns.
|
||||
"""
|
||||
|
||||
MASK_NAN = 'mask'
|
||||
FILL_NAN = 'fill'
|
||||
IGNORE_NAN = 'ignore'
|
||||
|
||||
def __init__(self, nan_option: str = 'ignore', assume_centered: bool = False, scale_return: bool = True):
|
||||
"""
|
||||
Args:
|
||||
nan_option (str): nan handling option (`ignore`/`mask`/`fill`)
|
||||
assume_centered (bool): whether the data is assumed to be centered
|
||||
scale_return (bool): whether scale returns as percentage
|
||||
"""
|
||||
# nan
|
||||
assert nan_option in [self.MASK_NAN, self.FILL_NAN, self.IGNORE_NAN], \
|
||||
f'`nan_option={nan_option}` is not supported'
|
||||
self.nan_option = nan_option
|
||||
|
||||
self.assume_centered = assume_centered
|
||||
self.scale_return = scale_return
|
||||
|
||||
def predict(self, X: Union[pd.Series, pd.DataFrame, np.ndarray],
|
||||
return_corr: bool = False, is_price: bool = True) -> Union[pd.DataFrame, np.ndarray]:
|
||||
"""
|
||||
Args:
|
||||
X (pd.Series, pd.DataFrame or np.ndarray): data from which to estimate the covariance,
|
||||
with variables as columns and observations as rows.
|
||||
return_corr (bool): whether return the correlation matrix
|
||||
is_price (bool): whether `X` contains price (if not assume stock returns)
|
||||
|
||||
Returns:
|
||||
pd.DataFrame or np.ndarray: estimated covariance (or correlation)
|
||||
"""
|
||||
# transform input into 2D array
|
||||
if not isinstance(X, (pd.Series, pd.DataFrame)):
|
||||
columns = None
|
||||
else:
|
||||
if isinstance(X.index, pd.MultiIndex):
|
||||
if isinstance(X, pd.DataFrame):
|
||||
X = X.iloc[:, 0].unstack(level='instrument') # always use the first column
|
||||
else:
|
||||
X = X.unstack(level='instrument')
|
||||
else:
|
||||
# X is 2D DataFrame
|
||||
pass
|
||||
columns = X.columns # will be used to restore dataframe
|
||||
X = X.values
|
||||
|
||||
# calculate pct_change
|
||||
if is_price:
|
||||
X = X[1:] / X[:-1] - 1 # NOTE: resulting `n - 1` rows
|
||||
|
||||
# scale return
|
||||
if self.scale_return:
|
||||
X *= 100
|
||||
|
||||
# handle nan and centered
|
||||
X = self._preprocess(X)
|
||||
|
||||
# estimate covariance
|
||||
S = self._predict(X)
|
||||
|
||||
# return correlation if needed
|
||||
if return_corr:
|
||||
vola = np.sqrt(np.diag(S))
|
||||
corr = S / np.outer(vola, vola)
|
||||
if columns is None:
|
||||
return corr
|
||||
return pd.DataFrame(corr, index=columns, columns=columns)
|
||||
|
||||
# return covariance
|
||||
if columns is None:
|
||||
return S
|
||||
return pd.DataFrame(S, index=columns, columns=columns)
|
||||
|
||||
def _predict(self, X: np.ndarray) -> np.ndarray:
|
||||
"""covariance estimation implementation
|
||||
|
||||
This method should be overridden by child classes.
|
||||
|
||||
By default, this method implements the empirical covariance estimation.
|
||||
|
||||
Args:
|
||||
X (np.ndarray): data matrix containing multiple variables (columns) and observations (rows)
|
||||
|
||||
Returns:
|
||||
np.ndarray: covariance matrix
|
||||
"""
|
||||
xTx = np.asarray(X.T.dot(X))
|
||||
N = len(X)
|
||||
if isinstance(X, np.ma.MaskedArray):
|
||||
M = 1 - X.mask
|
||||
N = M.T.dot(M) # each pair has distinct number of samples
|
||||
return xTx / N
|
||||
|
||||
def _preprocess(self, X: np.ndarray) -> Union[np.ndarray, np.ma.MaskedArray]:
|
||||
"""handle nan and centerize data
|
||||
|
||||
Note:
|
||||
if `nan_option='mask'` then the returned array will be `np.ma.MaskedArray`
|
||||
"""
|
||||
# handle nan
|
||||
if self.nan_option == self.FILL_NAN:
|
||||
X = np.nan_to_num(X)
|
||||
elif self.nan_option == self.MASK_NAN:
|
||||
X = np.ma.masked_invalid(X)
|
||||
# centerize
|
||||
if not self.assume_centered:
|
||||
X = X - np.nanmean(X, axis=0)
|
||||
return X
|
||||
|
||||
|
||||
class ShrinkCovEstimator(RiskModel):
|
||||
"""Shrinkage Covariance Estimator
|
||||
|
||||
This estimator will shrink the sample covariance matrix towards
|
||||
an identify matrix:
|
||||
S_hat = (1 - alpha) * S + alpha * F
|
||||
where `alpha` is the shrink parameter and `F` is the shrinking target.
|
||||
|
||||
The following shrinking parameters (`alpha`) are supported:
|
||||
- `lw` [1][2][3]: use Ledoit-Wolf shrinking parameter
|
||||
- `oas` [4]: use Oracle Approximating Shrinkage shrinking parameter
|
||||
- float: directly specify the shrink parameter, should be between [0, 1]
|
||||
|
||||
The following shrinking targets (`F`) are supported:
|
||||
- `const_var` [1][4][5]: assume stocks have the same constant variance and zero correlation
|
||||
- `const_corr` [2][6]: assume stocks have different variance but equal correlation
|
||||
- `single_factor` [3][7]: assume single factor model as the shrinking target
|
||||
- np.ndarray: provide the shrinking targets directly
|
||||
|
||||
Note:
|
||||
- The optimal shrinking parameter depends on the selection of the shrinking target.
|
||||
Currently, `oas` is not supported for `const_corr` and `single_factor`.
|
||||
- Remember to set `nan_option` to `fill` or `mask` if your data has missing values.
|
||||
|
||||
References:
|
||||
[1] Ledoit, O., & Wolf, M. (2004). A well-conditioned estimator for large-dimensional covariance matrices.
|
||||
Journal of Multivariate Analysis, 88(2), 365–411. https://doi.org/10.1016/S0047-259X(03)00096-4
|
||||
[2] Ledoit, O., & Wolf, M. (2004). Honey, I shrunk the sample covariance matrix.
|
||||
Journal of Portfolio Management, 30(4), 1–22. https://doi.org/10.3905/jpm.2004.110
|
||||
[3] Ledoit, O., & Wolf, M. (2003). Improved estimation of the covariance matrix of stock returns
|
||||
with an application to portfolio selection.
|
||||
Journal of Empirical Finance, 10(5), 603–621. https://doi.org/10.1016/S0927-5398(03)00007-0
|
||||
[4] Chen, Y., Wiesel, A., Eldar, Y. C., & Hero, A. O. (2010). Shrinkage algorithms for MMSE covariance estimation.
|
||||
IEEE Transactions on Signal Processing, 58(10), 5016–5029. https://doi.org/10.1109/TSP.2010.2053029
|
||||
[5] https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-0000-00007f64e5b9/cov1para.m.zip
|
||||
[6] https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-ffff-ffffde5e2d4e/covCor.m.zip
|
||||
[7] https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-0000-0000648dfc98/covMarket.m.zip
|
||||
"""
|
||||
|
||||
SHR_LW = 'lw'
|
||||
SHR_OAS = 'oas'
|
||||
|
||||
TGT_CONST_VAR = 'const_var'
|
||||
TGT_CONST_CORR = 'const_corr'
|
||||
TGT_SINGLE_FACTOR = 'single_factor'
|
||||
|
||||
def __init__(self, alpha: Union[str, float] = 0.0, target: Union[str, np.ndarray] = 'const_var', **kwargs):
|
||||
"""
|
||||
Args:
|
||||
alpha (str or float): shrinking parameter or estimator (`lw`/`oas`)
|
||||
target (str or np.ndarray): shrinking target (`const_var`/`const_corr`/`single_factor`)
|
||||
kwargs: see `RiskModel` for more information
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
|
||||
# alpha
|
||||
if isinstance(alpha, str):
|
||||
assert alpha in [self.SHR_LW, self.SHR_OAS], \
|
||||
f'shrinking method `{alpha}` is not supported'
|
||||
elif isinstance(alpha, (float, np.floating)):
|
||||
assert 0 <= alpha <= 1, 'alpha should be between [0, 1]'
|
||||
else:
|
||||
raise TypeError('invalid argument type for `alpha`')
|
||||
self.alpha = alpha
|
||||
|
||||
# target
|
||||
if isinstance(target, str):
|
||||
assert target in [self.TGT_CONST_VAR, self.TGT_CONST_CORR, self.TGT_SINGLE_FACTOR], \
|
||||
f'shrinking target `{target} is not supported'
|
||||
elif isinstance(target, np.ndarray):
|
||||
pass
|
||||
else:
|
||||
raise TypeError('invalid argument type for `target`')
|
||||
if alpha == self.SHR_OAS and target != self.TGT_CONST_VAR:
|
||||
raise NotImplementedError('currently `oas` can only support `const_var` as target')
|
||||
self.target = target
|
||||
|
||||
def _predict(self, X: np.ndarray) -> np.ndarray:
|
||||
# sample covariance
|
||||
S = super()._predict(X)
|
||||
|
||||
# shrinking target
|
||||
F = self._get_shrink_target(X, S)
|
||||
|
||||
# get shrinking parameter
|
||||
alpha = self._get_shrink_param(X, S, F)
|
||||
|
||||
# shrink covariance
|
||||
if alpha > 0:
|
||||
S *= (1 - alpha)
|
||||
F *= alpha
|
||||
S += F
|
||||
|
||||
return S
|
||||
|
||||
def _get_shrink_target(self, X: np.ndarray, S: np.ndarray) -> np.ndarray:
|
||||
"""get shrinking target `F`"""
|
||||
if self.target == self.TGT_CONST_VAR:
|
||||
return self._get_shrink_target_const_var(X, S)
|
||||
if self.target == self.TGT_CONST_CORR:
|
||||
return self._get_shrink_target_const_corr(X, S)
|
||||
if self.target == self.TGT_SINGLE_FACTOR:
|
||||
return self._get_shrink_target_single_factor(X, S)
|
||||
|
||||
def _get_shrink_target_const_var(self, X: np.ndarray, S: np.ndarray) -> np.ndarray:
|
||||
"""get shrinking target with constant variance
|
||||
|
||||
This target assumes zero pair-wise correlation and constant variance.
|
||||
The constant variance is estimated by averaging all sample's variances.
|
||||
"""
|
||||
n = len(S)
|
||||
F = np.eye(n)
|
||||
np.fill_diagonal(F, np.mean(np.diag(S)))
|
||||
return F
|
||||
|
||||
def _get_shrink_target_const_corr(self, X: np.ndarray, S: np.ndarray) -> np.ndarray:
|
||||
"""get shrinking target with constant correlation
|
||||
|
||||
This target assumes constant pair-wise correlation but keep the sample variance.
|
||||
The constant correlation is estimated by averaging all pairwise correlations.
|
||||
"""
|
||||
n = len(S)
|
||||
var = np.diag(S)
|
||||
sqrt_var = np.sqrt(var)
|
||||
covar = np.outer(sqrt_var, sqrt_var)
|
||||
r_bar = (np.sum(S / covar) - n) / (n * (n - 1))
|
||||
F = r_bar * covar
|
||||
np.fill_diagonal(F, var)
|
||||
return F
|
||||
|
||||
def _get_shrink_target_single_factor(self, X: np.ndarray, S: np.ndarray) -> np.ndarray:
|
||||
"""get shrinking target with single factor model"""
|
||||
X_mkt = np.nanmean(X, axis=1)
|
||||
cov_mkt = np.asarray(X.T.dot(X_mkt) / len(X))
|
||||
var_mkt = np.asarray(X_mkt.dot(X_mkt) / len(X))
|
||||
F = np.outer(cov_mkt, cov_mkt) / var_mkt
|
||||
np.fill_diagonal(F, np.diag(S))
|
||||
return F
|
||||
|
||||
def _get_shrink_param(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float:
|
||||
"""get shrinking parameter `alpha`
|
||||
|
||||
Note:
|
||||
The Ledoit-Wolf shrinking parameter estimator consists of three different
|
||||
"""
|
||||
if self.alpha == self.SHR_OAS:
|
||||
return self._get_shrink_param_oas(X, S, F)
|
||||
elif self.alpha == self.SHR_LW:
|
||||
if self.target == self.TGT_CONST_VAR:
|
||||
return self._get_shrink_param_lw_const_var(X, S, F)
|
||||
if self.target == self.TGT_CONST_CORR:
|
||||
return self._get_shrink_param_lw_const_corr(X, S, F)
|
||||
if self.target == self.TGT_SINGLE_FACTOR:
|
||||
return self._get_shrink_param_lw_single_factor(X, S, F)
|
||||
return self.alpha
|
||||
|
||||
def _get_shrink_param_oas(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float:
|
||||
"""Oracle Approximating Shrinkage Estimator
|
||||
|
||||
This method uses the following formula to estimate the `alpha`
|
||||
parameter for the shrink covariance estimator:
|
||||
A = (1 - 2 / p) * trace(S^2) + trace^2(S)
|
||||
B = (n + 1 - 2 / p) * (trace(S^2) - trace^2(S) / p)
|
||||
alpha = A / B
|
||||
where `n`, `p` are the dim of observations and variables respectively.
|
||||
"""
|
||||
trS2 = np.sum(S**2)
|
||||
tr2S = np.trace(S)**2
|
||||
|
||||
n, p = X.shape
|
||||
|
||||
A = (1 - 2 / p) * (trS2 + tr2S)
|
||||
B = (n + 1 - 2 / p) * (trS2 + tr2S / p)
|
||||
alpha = A / B
|
||||
|
||||
return alpha
|
||||
|
||||
def _get_shrink_param_lw_const_var(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float:
|
||||
"""Ledoit-Wolf Shrinkage Estimator (Constant Variance)
|
||||
|
||||
This method shrinks the covariance matrix towards the constand variance target.
|
||||
"""
|
||||
t, n = X.shape
|
||||
|
||||
y = X**2
|
||||
phi = np.sum(y.T.dot(y) / t - S**2)
|
||||
|
||||
gamma = np.linalg.norm(S - F, 'fro')**2
|
||||
|
||||
kappa = phi / gamma
|
||||
alpha = max(0, min(1, kappa / t))
|
||||
|
||||
return alpha
|
||||
|
||||
def _get_shrink_param_lw_const_corr(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float:
|
||||
"""Ledoit-Wolf Shrinkage Estimator (Constant Correlation)
|
||||
|
||||
This method shrinks the covariance matrix towards the constand correlation target.
|
||||
"""
|
||||
t, n = X.shape
|
||||
|
||||
var = np.diag(S)
|
||||
sqrt_var = np.sqrt(var)
|
||||
r_bar = (np.sum(S / np.outer(sqrt_var, sqrt_var)) - n) / (n * (n - 1))
|
||||
|
||||
y = X**2
|
||||
phi_mat = y.T.dot(y) / t - S**2
|
||||
phi = np.sum(phi_mat)
|
||||
|
||||
theta_mat = (X**3).T.dot(X) / t - var[:, None] * S
|
||||
np.fill_diagonal(theta_mat, 0)
|
||||
rho = np.sum(np.diag(phi_mat)) + r_bar * np.sum(np.outer(1 / sqrt_var, sqrt_var) * theta_mat)
|
||||
|
||||
gamma = np.linalg.norm(S - F, 'fro')**2
|
||||
|
||||
kappa = (phi - rho) / gamma
|
||||
alpha = max(0, min(1, kappa / t))
|
||||
|
||||
return alpha
|
||||
|
||||
def _get_shrink_param_lw_single_factor(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float:
|
||||
"""Ledoit-Wolf Shrinkage Estimator (Single Factor Model)
|
||||
|
||||
This method shrinks the covariance matrix towards the single factor model target.
|
||||
"""
|
||||
t, n = X.shape
|
||||
|
||||
X_mkt = np.nanmean(X, axis=1)
|
||||
cov_mkt = np.asarray(X.T.dot(X_mkt) / len(X))
|
||||
var_mkt = np.asarray(X_mkt.dot(X_mkt) / len(X))
|
||||
|
||||
y = X**2
|
||||
phi = np.sum(y.T.dot(y)) / t - np.sum(S**2)
|
||||
|
||||
rdiag = np.sum(y**2) / t - np.sum(np.diag(S)**2)
|
||||
z = X * X_mkt[:, None]
|
||||
v1 = y.T.dot(z) / t - cov_mkt[:, None] * S
|
||||
roff1 = np.sum(v1 * cov_mkt[:, None].T) / var_mkt - np.sum(np.diag(v1) * cov_mkt) / var_mkt
|
||||
v3 = z.T.dot(z) / t - var_mkt * S
|
||||
roff3 = np.sum(v3 * np.outer(cov_mkt, cov_mkt)) / var_mkt**2 - np.sum(np.diag(v3) * cov_mkt**2) / var_mkt**2
|
||||
roff = 2 * roff1 - roff3
|
||||
rho = rdiag + roff
|
||||
|
||||
gamma = np.linalg.norm(S - F, 'fro')**2
|
||||
|
||||
kappa = (phi - rho) / gamma
|
||||
alpha = max(0, min(1, kappa / t))
|
||||
|
||||
return alpha
|
||||
|
||||
|
||||
class POETCovEstimator(RiskModel):
|
||||
"""Principal Orthogonal Complement Thresholding Estimator (POET)
|
||||
|
||||
Reference:
|
||||
[1] Fan, J., Liao, Y., & Mincheva, M. (2013). Large covariance estimation by thresholding principal orthogonal complements.
|
||||
Journal of the Royal Statistical Society. Series B: Statistical Methodology, 75(4), 603–680. https://doi.org/10.1111/rssb.12016
|
||||
[2] http://econweb.rutgers.edu/yl1114/papers/poet/POET.m
|
||||
"""
|
||||
|
||||
THRESH_SOFT = 'soft'
|
||||
THRESH_HARD = 'hard'
|
||||
THRESH_SCAD = 'scad'
|
||||
|
||||
def __init__(self, num_factors: int = 0, thresh: float = 1.0, thresh_method: str = 'soft', **kwargs):
|
||||
"""
|
||||
Args:
|
||||
num_factors (int): number of factors (if set to zero, no factor model will be used)
|
||||
thresh (float): the positive constant for thresholding
|
||||
thresh_method (str): thresholding method, which can be
|
||||
- 'soft': soft thresholding
|
||||
- 'hard': hard thresholding
|
||||
- 'scad': scad thresholding
|
||||
kwargs: see `RiskModel` for more information
|
||||
"""
|
||||
super().__init__(**kwargs)
|
||||
|
||||
assert num_factors >= 0, '`num_factors` requires a positive integer'
|
||||
self.num_factors = num_factors
|
||||
|
||||
assert thresh >= 0, '`thresh` requires a positive float number'
|
||||
self.thresh = thresh
|
||||
|
||||
assert thresh_method in [self.THRESH_HARD, self.THRESH_SOFT, self.THRESH_SCAD], \
|
||||
'`thresh_method` should be `soft`/`hard`/`scad`'
|
||||
self.thresh_method = thresh_method
|
||||
|
||||
def _predict(self, X: np.ndarray) -> np.ndarray:
|
||||
|
||||
Y = X.T # NOTE: to match POET's implementation
|
||||
p, n = Y.shape
|
||||
|
||||
if self.num_factors > 0:
|
||||
Dd, V = np.linalg.eig(Y.T.dot(Y))
|
||||
V = V[:, np.argsort(Dd)]
|
||||
F = V[:, -self.num_factors:][:, ::-1] * np.sqrt(n)
|
||||
LamPCA = Y.dot(F) / n
|
||||
uhat = np.asarray(Y - LamPCA.dot(F.T))
|
||||
Lowrank = np.asarray(LamPCA.dot(LamPCA.T))
|
||||
rate = 1 / np.sqrt(p) + np.sqrt(np.log(p) / n)
|
||||
else:
|
||||
uhat = np.asarray(Y)
|
||||
rate = np.sqrt(np.log(p) / n)
|
||||
Lowrank = 0
|
||||
|
||||
lamb = rate * self.thresh
|
||||
SuPCA = uhat.dot(uhat.T) / n
|
||||
SuDiag = np.diag(np.diag(SuPCA))
|
||||
R = np.linalg.inv(SuDiag**0.5).dot(SuPCA).dot(np.linalg.inv(SuDiag**0.5))
|
||||
|
||||
if self.thresh_method == self.THRESH_HARD:
|
||||
M = R * (np.abs(R) > lamb)
|
||||
elif self.thresh_method == self.THRESH_SOFT:
|
||||
res = (np.abs(R) - lamb)
|
||||
res = (res + np.abs(res)) / 2
|
||||
M = np.sign(R) * res
|
||||
else:
|
||||
M1 = (np.abs(R) < 2 * lamb) * np.sign(R) * (np.abs(R) - lamb) * (np.abs(R) > lamb)
|
||||
M2 = (np.abs(R) < 3.7 * lamb) * (np.abs(R) >= 2 * lamb) * (2.7 * R - 3.7 * np.sign(R) * lamb) / 1.7
|
||||
M3 = (np.abs(R) >= 3.7 * lamb) * R
|
||||
M = M1 + M2 + M3
|
||||
|
||||
Rthresh = M - np.diag(np.diag(M)) + np.eye(p)
|
||||
SigmaU = (SuDiag**0.5).dot(Rthresh).dot(SuDiag**0.5)
|
||||
SigmaY = SigmaU + Lowrank
|
||||
|
||||
return SigmaY
|
||||
0
qlib/portfolio/__init__.py
Normal file
0
qlib/portfolio/__init__.py
Normal file
265
qlib/portfolio/optimizer.py
Normal file
265
qlib/portfolio/optimizer.py
Normal file
@@ -0,0 +1,265 @@
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
import warnings
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import scipy.optimize as so
|
||||
|
||||
from typing import Optional, Union, Callable, List
|
||||
|
||||
|
||||
class PortfolioOptimizer(object):
|
||||
"""Portfolio Optimizer
|
||||
|
||||
The following optimization algorithms are supported:
|
||||
- `gmv`: Global Minimum Variance Portfolio
|
||||
- `mvo`: Mean Variance Optimized Portfolio
|
||||
- `rp`: Risk Parity
|
||||
- `inv`: Inverse Volatility
|
||||
|
||||
Note:
|
||||
This optimizer always assumes full investment and no-shorting.
|
||||
"""
|
||||
|
||||
OPT_GMV = 'gmv'
|
||||
OPT_MVO = 'mvo'
|
||||
OPT_RP = 'rp'
|
||||
OPT_INV = 'inv'
|
||||
|
||||
def __init__(self, method: str = 'inv', lamb: float = 0, delta: float = 0,
|
||||
alpha: float = 0.0, scale_alpha: bool = True, tol: float = 1e-8):
|
||||
"""
|
||||
Args:
|
||||
method (str): portfolio optimization method
|
||||
lamb (float): risk aversion parameter (larger `lamb` means more focus on return)
|
||||
delta (float): turnover rate limit
|
||||
alpha (float): l2 norm regularizer
|
||||
tol (float): tolerance for optimization termination
|
||||
"""
|
||||
assert method in [self.OPT_GMV, self.OPT_MVO, self.OPT_RP, self.OPT_INV], \
|
||||
f'method `{method}` is not supported'
|
||||
self.method = method
|
||||
|
||||
assert lamb >= 0, f'risk aversion parameter `lamb` should be positive'
|
||||
self.lamb = lamb
|
||||
|
||||
assert delta >= 0, f'turnover limit `delta` should be positive'
|
||||
self.delta = delta
|
||||
|
||||
assert alpha >= 0, f'l2 norm regularizer `alpha` should be positive'
|
||||
self.alpha = alpha
|
||||
|
||||
self.tol = tol
|
||||
|
||||
def __call__(self, S: Union[np.ndarray, pd.DataFrame],
|
||||
u: Optional[Union[np.ndarray, pd.Series]] = None,
|
||||
w0: Optional[Union[np.ndarray, pd.Series]] = None) -> Union[np.ndarray, pd.Series]:
|
||||
"""
|
||||
Args:
|
||||
S (np.ndarray or pd.DataFrame): covariance matrix
|
||||
u (np.ndarray or pd.Series): expected returns (a.k.a., alpha)
|
||||
w0 (np.ndarray or pd.Series): initial weights (for turnover control)
|
||||
|
||||
Returns:
|
||||
np.ndarray or pd.Series: optimized portfolio allocation
|
||||
"""
|
||||
# transform dataframe into array
|
||||
index = None
|
||||
if isinstance(S, pd.DataFrame):
|
||||
index = S.index
|
||||
S = S.values
|
||||
|
||||
# transform alpha
|
||||
if u is not None:
|
||||
assert len(u) == len(S), '`u` has mismatched shape'
|
||||
if isinstance(u, pd.Series):
|
||||
assert all(u.index == index), '`u` has mismatched index'
|
||||
u = u.values
|
||||
|
||||
# transform initial weights
|
||||
if w0 is not None:
|
||||
assert len(w0) == len(S), '`w0` has mismatched shape'
|
||||
if isinstance(w0, pd.Series):
|
||||
assert all(w0.index == index), '`w0` has mismatched index'
|
||||
w0 = w0.values
|
||||
|
||||
# scale alpha to match volatility
|
||||
if u is not None:
|
||||
u = u / u.std()
|
||||
u *= np.mean(np.diag(S))**0.5
|
||||
|
||||
# optimize
|
||||
w = self._optimize(S, u, w0)
|
||||
|
||||
# restore index if needed
|
||||
if index is not None:
|
||||
w = pd.Series(w, index=index)
|
||||
|
||||
return w
|
||||
|
||||
def _optimize(self, S: np.ndarray, u: Optional[np.ndarray] = None,
|
||||
w0: Optional[np.ndarray] = None) -> np.ndarray:
|
||||
|
||||
# inverse volatility
|
||||
if self.method == self.OPT_INV:
|
||||
if u is not None:
|
||||
warnings.warn('`u` is set but will not be used for `inv` portfolio')
|
||||
if w0 is not None:
|
||||
warnings.warn('`w0` is set but will not be used for `inv` portfolio')
|
||||
return self._optimize_inv(S)
|
||||
|
||||
# global minimum variance
|
||||
if self.method == self.OPT_GMV:
|
||||
if u is not None:
|
||||
warnings.warn('`u` is set but will not be used for `gmv` portfolio')
|
||||
return self._optimize_gmv(S, w0)
|
||||
|
||||
# mean-variance
|
||||
if self.method == self.OPT_MVO:
|
||||
return self._optimize_mvo(S, u, w0)
|
||||
|
||||
# risk parity
|
||||
if self.method == self.OPT_RP:
|
||||
if u is not None:
|
||||
warnings.warn('`u` is set but will not be used for `rp` portfolio')
|
||||
return self._optimize_rp(S, w0)
|
||||
|
||||
def _optimize_inv(self, S: np.ndarray) -> np.ndarray:
|
||||
"""Inverse volatility"""
|
||||
vola = np.diag(S)**0.5
|
||||
w = 1 / vola
|
||||
w /= w.sum()
|
||||
return w
|
||||
|
||||
def _optimize_gmv(self, S: np.ndarray, w0: Optional[np.ndarray] = None) -> np.ndarray:
|
||||
"""optimize global minimum variance portfolio
|
||||
|
||||
This method solves the following optimization problem
|
||||
min_w w' S w
|
||||
s.t. w >= 0, sum(w) == 1
|
||||
where `S` is the covariance matrix.
|
||||
"""
|
||||
return self._solve(
|
||||
len(S),
|
||||
self._get_objective_gmv(S),
|
||||
*self._get_constrains(w0)
|
||||
)
|
||||
|
||||
def _optimize_mvo(self, S: np.ndarray, u: Optional[np.ndarray] = None,
|
||||
w0: Optional[np.ndarray] = None) -> np.ndarray:
|
||||
"""optimize mean-variance portfolio
|
||||
|
||||
This method solves the following optimization problem
|
||||
min_w - w' u + lamb * w' S w
|
||||
s.t. w >= 0, sum(w) == 1
|
||||
where `S` is the covariance matrix, `u` is the expected returns,
|
||||
and `lamb` is the risk aversion parameter.
|
||||
"""
|
||||
return self._solve(
|
||||
len(S),
|
||||
self._get_objective_mvo(S, u),
|
||||
*self._get_constrains(w0)
|
||||
)
|
||||
|
||||
def _optimize_rp(self, S: np.ndarray, w0: Optional[np.ndarray] = None) -> np.ndarray:
|
||||
"""optimize risk parity portfolio
|
||||
|
||||
This method solves the following optimization problem
|
||||
min_w sum_i [w_i - (w' S w) / ((S w)_i * N)]**2
|
||||
s.t. w >= 0, sum(w) == 1
|
||||
where `S` is the covariance matrix and `N` is the number of stocks.
|
||||
"""
|
||||
return self._solve(
|
||||
len(S),
|
||||
self._get_objective_rp(S),
|
||||
*self._get_constrains(w0)
|
||||
)
|
||||
|
||||
def _get_objective_gmv(self, S: np.ndarray) -> np.ndarray:
|
||||
"""global minimum variance optimization objective
|
||||
|
||||
Optimization objective
|
||||
min_w w' S w
|
||||
"""
|
||||
|
||||
def func(x):
|
||||
return x @ S @ x
|
||||
|
||||
return func
|
||||
|
||||
def _get_objective_mvo(self, S: np.ndarray, u: np.ndarray = None) -> np.ndarray:
|
||||
"""mean-variance optimization objective
|
||||
|
||||
Optimization objective
|
||||
min_w - w' u + lamb * w' S w
|
||||
"""
|
||||
|
||||
def func(x):
|
||||
risk = x @ S @ x
|
||||
ret = x @ u
|
||||
return -ret + self.lamb * risk
|
||||
|
||||
return func
|
||||
|
||||
def _get_objective_rp(self, S: np.ndarray) -> np.ndarray:
|
||||
"""risk-parity optimization objective
|
||||
|
||||
Optimization objective
|
||||
min_w sum_i [w_i - (w' S w) / ((S w)_i * N)]**2
|
||||
"""
|
||||
|
||||
def func(x):
|
||||
N = len(x)
|
||||
Sx = S @ x
|
||||
xSx = x @ Sx
|
||||
return np.sum((x - xSx / Sx / N)**2)
|
||||
|
||||
return func
|
||||
|
||||
def _get_constrains(self, w0: Optional[np.ndarray] = None):
|
||||
"""optimization constraints
|
||||
|
||||
Defines the following constraints:
|
||||
- no shorting and leverage: 0 <= w <= 1
|
||||
- full investment: sum(w) == 1
|
||||
- turnover constraint: |w - w0| <= delta
|
||||
"""
|
||||
|
||||
# no shorting and leverage
|
||||
bounds = so.Bounds(0.0, 1.0)
|
||||
|
||||
# full investment constraint
|
||||
cons = [
|
||||
{'type': 'eq', 'fun': lambda x: np.sum(x) - 1} # == 0
|
||||
]
|
||||
|
||||
# turnover constraint
|
||||
if w0 is not None:
|
||||
cons.append(
|
||||
{'type': 'ineq', 'fun': lambda x: self.delta - np.sum(np.abs(x - w0))} # >= 0
|
||||
)
|
||||
|
||||
return bounds, cons
|
||||
|
||||
def _solve(self, n: int, obj: Callable, bounds: so.Bounds, cons: List) -> np.ndarray:
|
||||
"""solve optimization
|
||||
|
||||
Args:
|
||||
n (int): number of parameters
|
||||
obj (callable): optimization objective
|
||||
bounds (Bounds): bounds of parameters
|
||||
cons (list): optimization constraints
|
||||
"""
|
||||
# add l2 regularization
|
||||
wrapped_obj = obj
|
||||
if self.alpha > 0:
|
||||
wrapped_obj = lambda x: obj(x) + self.alpha * np.sum(np.square(x))
|
||||
|
||||
# solve
|
||||
x0 = np.ones(n) / n # init results
|
||||
sol = so.minimize(wrapped_obj, x0, bounds=bounds, constraints=cons, tol=self.tol)
|
||||
if not sol.success:
|
||||
warnings.warn(f'optimization not success ({sol.status})')
|
||||
|
||||
return sol.x
|
||||
Reference in New Issue
Block a user