1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-06-06 05:51:17 +08:00

Reindex files.

This commit is contained in:
Charles Young
2021-03-04 22:30:38 +08:00
parent 2bff6eb781
commit 83c6e74783
12 changed files with 793 additions and 1038 deletions

View File

@@ -1,611 +0,0 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import numpy as np
import pandas as pd
from typing import Union
from sklearn.decomposition import PCA, FactorAnalysis
from qlib.model.base import BaseModel
class RiskModel(BaseModel):
"""Risk Model
A risk model is used to estimate the covariance matrix of stock returns.
"""
MASK_NAN = "mask"
FILL_NAN = "fill"
IGNORE_NAN = "ignore"
def __init__(self, nan_option: str = "ignore", assume_centered: bool = False, scale_return: bool = True):
"""
Args:
nan_option (str): nan handling option (`ignore`/`mask`/`fill`).
assume_centered (bool): whether the data is assumed to be centered.
scale_return (bool): whether scale returns as percentage.
"""
# nan
assert nan_option in [
self.MASK_NAN,
self.FILL_NAN,
self.IGNORE_NAN,
], f"`nan_option={nan_option}` is not supported"
self.nan_option = nan_option
self.assume_centered = assume_centered
self.scale_return = scale_return
def predict(
self, X: Union[pd.Series, pd.DataFrame, np.ndarray], return_corr: bool = False, is_price: bool = True
) -> Union[pd.DataFrame, np.ndarray]:
"""
Args:
X (pd.Series, pd.DataFrame or np.ndarray): data from which to estimate the covariance,
with variables as columns and observations as rows.
return_corr (bool): whether return the correlation matrix.
is_price (bool): whether `X` contains price (if not assume stock returns).
Returns:
pd.DataFrame or np.ndarray: estimated covariance (or correlation).
"""
# transform input into 2D array
if not isinstance(X, (pd.Series, pd.DataFrame)):
columns = None
else:
if isinstance(X.index, pd.MultiIndex):
if isinstance(X, pd.DataFrame):
X = X.iloc[:, 0].unstack(level="instrument") # always use the first column
else:
X = X.unstack(level="instrument")
else:
# X is 2D DataFrame
pass
columns = X.columns # will be used to restore dataframe
X = X.values
# calculate pct_change
if is_price:
X = X[1:] / X[:-1] - 1 # NOTE: resulting `n - 1` rows
# scale return
if self.scale_return:
X *= 100
# handle nan and centered
X = self._preprocess(X)
# estimate covariance
S = self._predict(X)
# return correlation if needed
if return_corr:
vola = np.sqrt(np.diag(S))
corr = S / np.outer(vola, vola)
if columns is None:
return corr
return pd.DataFrame(corr, index=columns, columns=columns)
# return covariance
if columns is None:
return S
return pd.DataFrame(S, index=columns, columns=columns)
def _predict(self, X: np.ndarray) -> np.ndarray:
"""covariance estimation implementation
This method should be overridden by child classes.
By default, this method implements the empirical covariance estimation.
Args:
X (np.ndarray): data matrix containing multiple variables (columns) and observations (rows).
Returns:
np.ndarray: covariance matrix.
"""
xTx = np.asarray(X.T.dot(X))
N = len(X)
if isinstance(X, np.ma.MaskedArray):
M = 1 - X.mask
N = M.T.dot(M) # each pair has distinct number of samples
return xTx / N
def _preprocess(self, X: np.ndarray) -> Union[np.ndarray, np.ma.MaskedArray]:
"""handle nan and centerize data
Note:
if `nan_option='mask'` then the returned array will be `np.ma.MaskedArray`.
"""
# handle nan
if self.nan_option == self.FILL_NAN:
X = np.nan_to_num(X)
elif self.nan_option == self.MASK_NAN:
X = np.ma.masked_invalid(X)
# centralize
if not self.assume_centered:
X = X - np.nanmean(X, axis=0)
return X
class ShrinkCovEstimator(RiskModel):
"""Shrinkage Covariance Estimator
This estimator will shrink the sample covariance matrix towards
an identify matrix:
S_hat = (1 - alpha) * S + alpha * F
where `alpha` is the shrink parameter and `F` is the shrinking target.
The following shrinking parameters (`alpha`) are supported:
- `lw` [1][2][3]: use Ledoit-Wolf shrinking parameter.
- `oas` [4]: use Oracle Approximating Shrinkage shrinking parameter.
- float: directly specify the shrink parameter, should be between [0, 1].
The following shrinking targets (`F`) are supported:
- `const_var` [1][4][5]: assume stocks have the same constant variance and zero correlation.
- `const_corr` [2][6]: assume stocks have different variance but equal correlation.
- `single_factor` [3][7]: assume single factor model as the shrinking target.
- np.ndarray: provide the shrinking targets directly.
Note:
- The optimal shrinking parameter depends on the selection of the shrinking target.
Currently, `oas` is not supported for `const_corr` and `single_factor`.
- Remember to set `nan_option` to `fill` or `mask` if your data has missing values.
References:
[1] Ledoit, O., & Wolf, M. (2004). A well-conditioned estimator for large-dimensional covariance matrices.
Journal of Multivariate Analysis, 88(2), 365411. https://doi.org/10.1016/S0047-259X(03)00096-4
[2] Ledoit, O., & Wolf, M. (2004). Honey, I shrunk the sample covariance matrix.
Journal of Portfolio Management, 30(4), 122. https://doi.org/10.3905/jpm.2004.110
[3] Ledoit, O., & Wolf, M. (2003). Improved estimation of the covariance matrix of stock returns
with an application to portfolio selection.
Journal of Empirical Finance, 10(5), 603621. https://doi.org/10.1016/S0927-5398(03)00007-0
[4] Chen, Y., Wiesel, A., Eldar, Y. C., & Hero, A. O. (2010). Shrinkage algorithms for MMSE covariance
estimation. IEEE Transactions on Signal Processing, 58(10), 50165029.
https://doi.org/10.1109/TSP.2010.2053029
[5] https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-0000-00007f64e5b9/cov1para.m.zip
[6] https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-ffff-ffffde5e2d4e/covCor.m.zip
[7] https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-0000-0000648dfc98/covMarket.m.zip
"""
SHR_LW = "lw"
SHR_OAS = "oas"
TGT_CONST_VAR = "const_var"
TGT_CONST_CORR = "const_corr"
TGT_SINGLE_FACTOR = "single_factor"
def __init__(self, alpha: Union[str, float] = 0.0, target: Union[str, np.ndarray] = "const_var", **kwargs):
"""
Args:
alpha (str or float): shrinking parameter or estimator (`lw`/`oas`)
target (str or np.ndarray): shrinking target (`const_var`/`const_corr`/`single_factor`)
kwargs: see `RiskModel` for more information
"""
super().__init__(**kwargs)
# alpha
if isinstance(alpha, str):
assert alpha in [self.SHR_LW, self.SHR_OAS], f"shrinking method `{alpha}` is not supported"
elif isinstance(alpha, (float, np.floating)):
assert 0 <= alpha <= 1, "alpha should be between [0, 1]"
else:
raise TypeError("invalid argument type for `alpha`")
self.alpha = alpha
# target
if isinstance(target, str):
assert target in [
self.TGT_CONST_VAR,
self.TGT_CONST_CORR,
self.TGT_SINGLE_FACTOR,
], f"shrinking target `{target} is not supported"
elif isinstance(target, np.ndarray):
pass
else:
raise TypeError("invalid argument type for `target`")
if alpha == self.SHR_OAS and target != self.TGT_CONST_VAR:
raise NotImplementedError("currently `oas` can only support `const_var` as target")
self.target = target
def _predict(self, X: np.ndarray) -> np.ndarray:
# sample covariance
S = super()._predict(X)
# shrinking target
F = self._get_shrink_target(X, S)
# get shrinking parameter
alpha = self._get_shrink_param(X, S, F)
# shrink covariance
if alpha > 0:
S *= 1 - alpha
F *= alpha
S += F
return S
def _get_shrink_target(self, X: np.ndarray, S: np.ndarray) -> np.ndarray:
"""get shrinking target `F`"""
if self.target == self.TGT_CONST_VAR:
return self._get_shrink_target_const_var(X, S)
if self.target == self.TGT_CONST_CORR:
return self._get_shrink_target_const_corr(X, S)
if self.target == self.TGT_SINGLE_FACTOR:
return self._get_shrink_target_single_factor(X, S)
return self.target
def _get_shrink_target_const_var(self, X: np.ndarray, S: np.ndarray) -> np.ndarray:
"""get shrinking target with constant variance
This target assumes zero pair-wise correlation and constant variance.
The constant variance is estimated by averaging all sample's variances.
"""
n = len(S)
F = np.eye(n)
np.fill_diagonal(F, np.mean(np.diag(S)))
return F
def _get_shrink_target_const_corr(self, X: np.ndarray, S: np.ndarray) -> np.ndarray:
"""get shrinking target with constant correlation
This target assumes constant pair-wise correlation but keep the sample variance.
The constant correlation is estimated by averaging all pairwise correlations.
"""
n = len(S)
var = np.diag(S)
sqrt_var = np.sqrt(var)
covar = np.outer(sqrt_var, sqrt_var)
r_bar = (np.sum(S / covar) - n) / (n * (n - 1))
F = r_bar * covar
np.fill_diagonal(F, var)
return F
def _get_shrink_target_single_factor(self, X: np.ndarray, S: np.ndarray) -> np.ndarray:
"""get shrinking target with single factor model"""
X_mkt = np.nanmean(X, axis=1)
cov_mkt = np.asarray(X.T.dot(X_mkt) / len(X))
var_mkt = np.asarray(X_mkt.dot(X_mkt) / len(X))
F = np.outer(cov_mkt, cov_mkt) / var_mkt
np.fill_diagonal(F, np.diag(S))
return F
def _get_shrink_param(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float:
"""get shrinking parameter `alpha`
Note:
The Ledoit-Wolf shrinking parameter estimator consists of three different methods.
"""
if self.alpha == self.SHR_OAS:
return self._get_shrink_param_oas(X, S, F)
elif self.alpha == self.SHR_LW:
if self.target == self.TGT_CONST_VAR:
return self._get_shrink_param_lw_const_var(X, S, F)
if self.target == self.TGT_CONST_CORR:
return self._get_shrink_param_lw_const_corr(X, S, F)
if self.target == self.TGT_SINGLE_FACTOR:
return self._get_shrink_param_lw_single_factor(X, S, F)
return self.alpha
def _get_shrink_param_oas(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float:
"""Oracle Approximating Shrinkage Estimator
This method uses the following formula to estimate the `alpha`
parameter for the shrink covariance estimator:
A = (1 - 2 / p) * trace(S^2) + trace^2(S)
B = (n + 1 - 2 / p) * (trace(S^2) - trace^2(S) / p)
alpha = A / B
where `n`, `p` are the dim of observations and variables respectively.
"""
trS2 = np.sum(S ** 2)
tr2S = np.trace(S) ** 2
n, p = X.shape
A = (1 - 2 / p) * (trS2 + tr2S)
B = (n + 1 - 2 / p) * (trS2 + tr2S / p)
alpha = A / B
return alpha
def _get_shrink_param_lw_const_var(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float:
"""Ledoit-Wolf Shrinkage Estimator (Constant Variance)
This method shrinks the covariance matrix towards the constand variance target.
"""
t, n = X.shape
y = X ** 2
phi = np.sum(y.T.dot(y) / t - S ** 2)
gamma = np.linalg.norm(S - F, "fro") ** 2
kappa = phi / gamma
alpha = max(0, min(1, kappa / t))
return alpha
def _get_shrink_param_lw_const_corr(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float:
"""Ledoit-Wolf Shrinkage Estimator (Constant Correlation)
This method shrinks the covariance matrix towards the constand correlation target.
"""
t, n = X.shape
var = np.diag(S)
sqrt_var = np.sqrt(var)
r_bar = (np.sum(S / np.outer(sqrt_var, sqrt_var)) - n) / (n * (n - 1))
y = X ** 2
phi_mat = y.T.dot(y) / t - S ** 2
phi = np.sum(phi_mat)
theta_mat = (X ** 3).T.dot(X) / t - var[:, None] * S
np.fill_diagonal(theta_mat, 0)
rho = np.sum(np.diag(phi_mat)) + r_bar * np.sum(np.outer(1 / sqrt_var, sqrt_var) * theta_mat)
gamma = np.linalg.norm(S - F, "fro") ** 2
kappa = (phi - rho) / gamma
alpha = max(0, min(1, kappa / t))
return alpha
def _get_shrink_param_lw_single_factor(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float:
"""Ledoit-Wolf Shrinkage Estimator (Single Factor Model)
This method shrinks the covariance matrix towards the single factor model target.
"""
t, n = X.shape
X_mkt = np.nanmean(X, axis=1)
cov_mkt = np.asarray(X.T.dot(X_mkt) / len(X))
var_mkt = np.asarray(X_mkt.dot(X_mkt) / len(X))
y = X ** 2
phi = np.sum(y.T.dot(y)) / t - np.sum(S ** 2)
rdiag = np.sum(y ** 2) / t - np.sum(np.diag(S) ** 2)
z = X * X_mkt[:, None]
v1 = y.T.dot(z) / t - cov_mkt[:, None] * S
roff1 = np.sum(v1 * cov_mkt[:, None].T) / var_mkt - np.sum(np.diag(v1) * cov_mkt) / var_mkt
v3 = z.T.dot(z) / t - var_mkt * S
roff3 = (
np.sum(v3 * np.outer(cov_mkt, cov_mkt)) / var_mkt ** 2 - np.sum(np.diag(v3) * cov_mkt ** 2) / var_mkt ** 2
)
roff = 2 * roff1 - roff3
rho = rdiag + roff
gamma = np.linalg.norm(S - F, "fro") ** 2
kappa = (phi - rho) / gamma
alpha = max(0, min(1, kappa / t))
return alpha
class POETCovEstimator(RiskModel):
"""Principal Orthogonal Complement Thresholding Estimator (POET)
Reference:
[1] Fan, J., Liao, Y., & Mincheva, M. (2013). Large covariance estimation by thresholding principal orthogonal complements.
Journal of the Royal Statistical Society. Series B: Statistical Methodology, 75(4), 603680. https://doi.org/10.1111/rssb.12016
[2] http://econweb.rutgers.edu/yl1114/papers/poet/POET.m
"""
THRESH_SOFT = "soft"
THRESH_HARD = "hard"
THRESH_SCAD = "scad"
def __init__(self, num_factors: int = 0, thresh: float = 1.0, thresh_method: str = "soft", **kwargs):
"""
Args:
num_factors (int): number of factors (if set to zero, no factor model will be used).
thresh (float): the positive constant for thresholding.
thresh_method (str): thresholding method, which can be
- 'soft': soft thresholding.
- 'hard': hard thresholding.
- 'scad': scad thresholding.
kwargs: see `RiskModel` for more information.
"""
super().__init__(**kwargs)
assert num_factors >= 0, "`num_factors` requires a positive integer"
self.num_factors = num_factors
assert thresh >= 0, "`thresh` requires a positive float number"
self.thresh = thresh
assert thresh_method in [
self.THRESH_HARD,
self.THRESH_SOFT,
self.THRESH_SCAD,
], "`thresh_method` should be `soft`/`hard`/`scad`"
self.thresh_method = thresh_method
def _predict(self, X: np.ndarray) -> np.ndarray:
Y = X.T # NOTE: to match POET's implementation
p, n = Y.shape
if self.num_factors > 0:
Dd, V = np.linalg.eig(Y.T.dot(Y))
V = V[:, np.argsort(Dd)]
F = V[:, -self.num_factors :][:, ::-1] * np.sqrt(n)
LamPCA = Y.dot(F) / n
uhat = np.asarray(Y - LamPCA.dot(F.T))
Lowrank = np.asarray(LamPCA.dot(LamPCA.T))
rate = 1 / np.sqrt(p) + np.sqrt(np.log(p) / n)
else:
uhat = np.asarray(Y)
rate = np.sqrt(np.log(p) / n)
Lowrank = 0
lamb = rate * self.thresh
SuPCA = uhat.dot(uhat.T) / n
SuDiag = np.diag(np.diag(SuPCA))
R = np.linalg.inv(SuDiag ** 0.5).dot(SuPCA).dot(np.linalg.inv(SuDiag ** 0.5))
if self.thresh_method == self.THRESH_HARD:
M = R * (np.abs(R) > lamb)
elif self.thresh_method == self.THRESH_SOFT:
res = np.abs(R) - lamb
res = (res + np.abs(res)) / 2
M = np.sign(R) * res
else:
M1 = (np.abs(R) < 2 * lamb) * np.sign(R) * (np.abs(R) - lamb) * (np.abs(R) > lamb)
M2 = (np.abs(R) < 3.7 * lamb) * (np.abs(R) >= 2 * lamb) * (2.7 * R - 3.7 * np.sign(R) * lamb) / 1.7
M3 = (np.abs(R) >= 3.7 * lamb) * R
M = M1 + M2 + M3
Rthresh = M - np.diag(np.diag(M)) + np.eye(p)
SigmaU = (SuDiag ** 0.5).dot(Rthresh).dot(SuDiag ** 0.5)
SigmaY = SigmaU + Lowrank
return SigmaY
class StructuredCovEstimator(RiskModel):
"""Structured Covariance Estimator
This estimator assumes observations can be predicted by multiple factors
X = FB + U
where `F` can be specified by explicit risk factors or latent factors.
Therefore the structured covariance can be estimated by
cov(X) = F cov(B) F.T + cov(U)
We use latent factor models to estimate the structured covariance.
Specifically, the following latent factor models are supported:
- `pca`: Principal Component Analysis
- `fa`: Factor Analysis
Reference: [1] Fan, J., Liao, Y., & Liu, H. (2016). An overview of the estimation of large covariance and
precision matrices. Econometrics Journal, 19(1), C1C32. https://doi.org/10.1111/ectj.12061
"""
FACTOR_MODEL_PCA = "pca"
FACTOR_MODEL_FA = "fa"
def __init__(
self,
factor_model: str = "pca",
num_factors: int = 10,
nan_option: str = "ignore",
assume_centered: bool = False,
scale_return: bool = True,
):
"""
Args:
factor_model (str): the latent factor models used to estimate the structured covariance (`pca`/`fa`).
num_factors (int): number of components to keep.
nan_option (str): nan handling option (`ignore`/`fill`).
assume_centered (bool): whether the data is assumed to be centered.
scale_return (bool): whether scale returns as percentage.
"""
super().__init__(nan_option, assume_centered, scale_return)
assert factor_model in [
self.FACTOR_MODEL_PCA,
self.FACTOR_MODEL_FA,
], "factor_model={} is not supported".format(factor_model)
self.solver = PCA if factor_model == self.FACTOR_MODEL_PCA else FactorAnalysis
self.num_factors = num_factors
def predict(
self,
X: Union[pd.Series, pd.DataFrame, np.ndarray],
return_corr: bool = False,
is_price: bool = True,
return_decomposed_components=False,
) -> Union[pd.DataFrame, np.ndarray, tuple]:
"""
Args:
X (pd.Series, pd.DataFrame or np.ndarray): data from which to estimate the covariance,
with variables as columns and observations as rows.
return_corr (bool): whether return the correlation matrix.
is_price (bool): whether `X` contains price (if not assume stock returns).
return_decomposed_components (bool): whether return decomposed components of the covariance matrix.
Returns:
tuple or pd.DataFrame or np.ndarray: decomposed covariance matrix or estimated covariance or correlation.
"""
assert (
not return_corr or not return_decomposed_components
), "Can only return either correlation matrix or decomposed components."
# transform input into 2D array
if not isinstance(X, (pd.Series, pd.DataFrame)):
columns = None
else:
if isinstance(X.index, pd.MultiIndex):
if isinstance(X, pd.DataFrame):
X = X.iloc[:, 0].unstack(level="instrument") # always use the first column
else:
X = X.unstack(level="instrument")
else:
# X is 2D DataFrame
pass
columns = X.columns # will be used to restore dataframe
X = X.values
# calculate pct_change
if is_price:
X = X[1:] / X[:-1] - 1 # NOTE: resulting `n - 1` rows
# scale return
if self.scale_return:
X *= 100
# handle nan and centered
X = self._preprocess(X)
if return_decomposed_components:
F, cov_b, var_u = self._predict(X, return_structured=True)
return F, cov_b, var_u
else:
# estimate covariance
S = self._predict(X)
# return correlation if needed
if return_corr:
vola = np.sqrt(np.diag(S))
corr = S / np.outer(vola, vola)
if columns is None:
return corr
return pd.DataFrame(corr, index=columns, columns=columns)
# return covariance
if columns is None:
return S
return pd.DataFrame(S, index=columns, columns=columns)
def _predict(self, X: np.ndarray, return_structured=False) -> Union[np.ndarray, tuple]:
"""
covariance estimation implementation
Args:
X (np.ndarray): data matrix containing multiple variables (columns) and observations (rows).
return_structured (bool): whether return decomposed components of the covariance matrix.
Returns:
tuple or np.ndarray: decomposed covariance matrix or covariance matrix.
"""
model = self.solver(self.num_factors, random_state=0).fit(X)
F = model.components_.T # num_features x num_factors
B = model.transform(X) # num_samples x num_factors
U = X - B @ F.T
cov_b = np.cov(B.T) # num_factors x num_factors
var_u = np.var(U, axis=0) # diagonal
if return_structured:
return F, cov_b, var_u
cov_x = F @ cov_b @ F.T + np.diag(var_u)
return cov_x

View File

@@ -0,0 +1,141 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import numpy as np
import pandas as pd
from typing import Union
from qlib.model.base import BaseModel
from qlib.model.riskmodel_poet import POETCovEstimator
from qlib.model.riskmodel_shrink import ShrinkCovEstimator
from qlib.model.riskmodel_structured import StructuredCovEstimator
class RiskModel(BaseModel):
"""Risk Model
A risk model is used to estimate the covariance matrix of stock returns.
"""
MASK_NAN = "mask"
FILL_NAN = "fill"
IGNORE_NAN = "ignore"
def __init__(self, nan_option: str = "ignore", assume_centered: bool = False, scale_return: bool = True):
"""
Args:
nan_option (str): nan handling option (`ignore`/`mask`/`fill`).
assume_centered (bool): whether the data is assumed to be centered.
scale_return (bool): whether scale returns as percentage.
"""
# nan
assert nan_option in [
self.MASK_NAN,
self.FILL_NAN,
self.IGNORE_NAN,
], f"`nan_option={nan_option}` is not supported"
self.nan_option = nan_option
self.assume_centered = assume_centered
self.scale_return = scale_return
def predict(
self, X: Union[pd.Series, pd.DataFrame, np.ndarray], return_corr: bool = False, is_price: bool = True
) -> Union[pd.DataFrame, np.ndarray]:
"""
Args:
X (pd.Series, pd.DataFrame or np.ndarray): data from which to estimate the covariance,
with variables as columns and observations as rows.
return_corr (bool): whether return the correlation matrix.
is_price (bool): whether `X` contains price (if not assume stock returns).
Returns:
pd.DataFrame or np.ndarray: estimated covariance (or correlation).
"""
# transform input into 2D array
if not isinstance(X, (pd.Series, pd.DataFrame)):
columns = None
else:
if isinstance(X.index, pd.MultiIndex):
if isinstance(X, pd.DataFrame):
X = X.iloc[:, 0].unstack(level="instrument") # always use the first column
else:
X = X.unstack(level="instrument")
else:
# X is 2D DataFrame
pass
columns = X.columns # will be used to restore dataframe
X = X.values
# calculate pct_change
if is_price:
X = X[1:] / X[:-1] - 1 # NOTE: resulting `n - 1` rows
# scale return
if self.scale_return:
X *= 100
# handle nan and centered
X = self._preprocess(X)
# estimate covariance
S = self._predict(X)
# return correlation if needed
if return_corr:
vola = np.sqrt(np.diag(S))
corr = S / np.outer(vola, vola)
if columns is None:
return corr
return pd.DataFrame(corr, index=columns, columns=columns)
# return covariance
if columns is None:
return S
return pd.DataFrame(S, index=columns, columns=columns)
def _predict(self, X: np.ndarray) -> np.ndarray:
"""covariance estimation implementation
This method should be overridden by child classes.
By default, this method implements the empirical covariance estimation.
Args:
X (np.ndarray): data matrix containing multiple variables (columns) and observations (rows).
Returns:
np.ndarray: covariance matrix.
"""
xTx = np.asarray(X.T.dot(X))
N = len(X)
if isinstance(X, np.ma.MaskedArray):
M = 1 - X.mask
N = M.T.dot(M) # each pair has distinct number of samples
return xTx / N
def _preprocess(self, X: np.ndarray) -> Union[np.ndarray, np.ma.MaskedArray]:
"""handle nan and centerize data
Note:
if `nan_option='mask'` then the returned array will be `np.ma.MaskedArray`.
"""
# handle nan
if self.nan_option == self.FILL_NAN:
X = np.nan_to_num(X)
elif self.nan_option == self.MASK_NAN:
X = np.ma.masked_invalid(X)
# centralize
if not self.assume_centered:
X = X - np.nanmean(X, axis=0)
return X

View File

@@ -0,0 +1,84 @@
import numpy as np
from qlib.model.riskmodel import RiskModel
class POETCovEstimator(RiskModel):
"""Principal Orthogonal Complement Thresholding Estimator (POET)
Reference:
[1] Fan, J., Liao, Y., & Mincheva, M. (2013). Large covariance estimation by thresholding principal orthogonal complements.
Journal of the Royal Statistical Society. Series B: Statistical Methodology, 75(4), 603680. https://doi.org/10.1111/rssb.12016
[2] http://econweb.rutgers.edu/yl1114/papers/poet/POET.m
"""
THRESH_SOFT = "soft"
THRESH_HARD = "hard"
THRESH_SCAD = "scad"
def __init__(self, num_factors: int = 0, thresh: float = 1.0, thresh_method: str = "soft", **kwargs):
"""
Args:
num_factors (int): number of factors (if set to zero, no factor model will be used).
thresh (float): the positive constant for thresholding.
thresh_method (str): thresholding method, which can be
- 'soft': soft thresholding.
- 'hard': hard thresholding.
- 'scad': scad thresholding.
kwargs: see `RiskModel` for more information.
"""
super().__init__(**kwargs)
assert num_factors >= 0, "`num_factors` requires a positive integer"
self.num_factors = num_factors
assert thresh >= 0, "`thresh` requires a positive float number"
self.thresh = thresh
assert thresh_method in [
self.THRESH_HARD,
self.THRESH_SOFT,
self.THRESH_SCAD,
], "`thresh_method` should be `soft`/`hard`/`scad`"
self.thresh_method = thresh_method
def _predict(self, X: np.ndarray) -> np.ndarray:
Y = X.T # NOTE: to match POET's implementation
p, n = Y.shape
if self.num_factors > 0:
Dd, V = np.linalg.eig(Y.T.dot(Y))
V = V[:, np.argsort(Dd)]
F = V[:, -self.num_factors:][:, ::-1] * np.sqrt(n)
LamPCA = Y.dot(F) / n
uhat = np.asarray(Y - LamPCA.dot(F.T))
Lowrank = np.asarray(LamPCA.dot(LamPCA.T))
rate = 1 / np.sqrt(p) + np.sqrt(np.log(p) / n)
else:
uhat = np.asarray(Y)
rate = np.sqrt(np.log(p) / n)
Lowrank = 0
lamb = rate * self.thresh
SuPCA = uhat.dot(uhat.T) / n
SuDiag = np.diag(np.diag(SuPCA))
R = np.linalg.inv(SuDiag ** 0.5).dot(SuPCA).dot(np.linalg.inv(SuDiag ** 0.5))
if self.thresh_method == self.THRESH_HARD:
M = R * (np.abs(R) > lamb)
elif self.thresh_method == self.THRESH_SOFT:
res = np.abs(R) - lamb
res = (res + np.abs(res)) / 2
M = np.sign(R) * res
else:
M1 = (np.abs(R) < 2 * lamb) * np.sign(R) * (np.abs(R) - lamb) * (np.abs(R) > lamb)
M2 = (np.abs(R) < 3.7 * lamb) * (np.abs(R) >= 2 * lamb) * (2.7 * R - 3.7 * np.sign(R) * lamb) / 1.7
M3 = (np.abs(R) >= 3.7 * lamb) * R
M = M1 + M2 + M3
Rthresh = M - np.diag(np.diag(M)) + np.eye(p)
SigmaU = (SuDiag ** 0.5).dot(Rthresh).dot(SuDiag ** 0.5)
SigmaY = SigmaU + Lowrank
return SigmaY

View File

@@ -0,0 +1,262 @@
import numpy as np
from typing import Union
from qlib.model.riskmodel import RiskModel
class ShrinkCovEstimator(RiskModel):
"""Shrinkage Covariance Estimator
This estimator will shrink the sample covariance matrix towards
an identify matrix:
S_hat = (1 - alpha) * S + alpha * F
where `alpha` is the shrink parameter and `F` is the shrinking target.
The following shrinking parameters (`alpha`) are supported:
- `lw` [1][2][3]: use Ledoit-Wolf shrinking parameter.
- `oas` [4]: use Oracle Approximating Shrinkage shrinking parameter.
- float: directly specify the shrink parameter, should be between [0, 1].
The following shrinking targets (`F`) are supported:
- `const_var` [1][4][5]: assume stocks have the same constant variance and zero correlation.
- `const_corr` [2][6]: assume stocks have different variance but equal correlation.
- `single_factor` [3][7]: assume single factor model as the shrinking target.
- np.ndarray: provide the shrinking targets directly.
Note:
- The optimal shrinking parameter depends on the selection of the shrinking target.
Currently, `oas` is not supported for `const_corr` and `single_factor`.
- Remember to set `nan_option` to `fill` or `mask` if your data has missing values.
References:
[1] Ledoit, O., & Wolf, M. (2004). A well-conditioned estimator for large-dimensional covariance matrices.
Journal of Multivariate Analysis, 88(2), 365411. https://doi.org/10.1016/S0047-259X(03)00096-4
[2] Ledoit, O., & Wolf, M. (2004). Honey, I shrunk the sample covariance matrix.
Journal of Portfolio Management, 30(4), 122. https://doi.org/10.3905/jpm.2004.110
[3] Ledoit, O., & Wolf, M. (2003). Improved estimation of the covariance matrix of stock returns
with an application to portfolio selection.
Journal of Empirical Finance, 10(5), 603621. https://doi.org/10.1016/S0927-5398(03)00007-0
[4] Chen, Y., Wiesel, A., Eldar, Y. C., & Hero, A. O. (2010). Shrinkage algorithms for MMSE covariance
estimation. IEEE Transactions on Signal Processing, 58(10), 50165029.
https://doi.org/10.1109/TSP.2010.2053029
[5] https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-0000-00007f64e5b9/cov1para.m.zip
[6] https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-ffff-ffffde5e2d4e/covCor.m.zip
[7] https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-0000-0000648dfc98/covMarket.m.zip
"""
SHR_LW = "lw"
SHR_OAS = "oas"
TGT_CONST_VAR = "const_var"
TGT_CONST_CORR = "const_corr"
TGT_SINGLE_FACTOR = "single_factor"
def __init__(self, alpha: Union[str, float] = 0.0, target: Union[str, np.ndarray] = "const_var", **kwargs):
"""
Args:
alpha (str or float): shrinking parameter or estimator (`lw`/`oas`)
target (str or np.ndarray): shrinking target (`const_var`/`const_corr`/`single_factor`)
kwargs: see `RiskModel` for more information
"""
super().__init__(**kwargs)
# alpha
if isinstance(alpha, str):
assert alpha in [self.SHR_LW, self.SHR_OAS], f"shrinking method `{alpha}` is not supported"
elif isinstance(alpha, (float, np.floating)):
assert 0 <= alpha <= 1, "alpha should be between [0, 1]"
else:
raise TypeError("invalid argument type for `alpha`")
self.alpha = alpha
# target
if isinstance(target, str):
assert target in [
self.TGT_CONST_VAR,
self.TGT_CONST_CORR,
self.TGT_SINGLE_FACTOR,
], f"shrinking target `{target} is not supported"
elif isinstance(target, np.ndarray):
pass
else:
raise TypeError("invalid argument type for `target`")
if alpha == self.SHR_OAS and target != self.TGT_CONST_VAR:
raise NotImplementedError("currently `oas` can only support `const_var` as target")
self.target = target
def _predict(self, X: np.ndarray) -> np.ndarray:
# sample covariance
S = super()._predict(X)
# shrinking target
F = self._get_shrink_target(X, S)
# get shrinking parameter
alpha = self._get_shrink_param(X, S, F)
# shrink covariance
if alpha > 0:
S *= 1 - alpha
F *= alpha
S += F
return S
def _get_shrink_target(self, X: np.ndarray, S: np.ndarray) -> np.ndarray:
"""get shrinking target `F`"""
if self.target == self.TGT_CONST_VAR:
return self._get_shrink_target_const_var(X, S)
if self.target == self.TGT_CONST_CORR:
return self._get_shrink_target_const_corr(X, S)
if self.target == self.TGT_SINGLE_FACTOR:
return self._get_shrink_target_single_factor(X, S)
return self.target
def _get_shrink_target_const_var(self, X: np.ndarray, S: np.ndarray) -> np.ndarray:
"""get shrinking target with constant variance
This target assumes zero pair-wise correlation and constant variance.
The constant variance is estimated by averaging all sample's variances.
"""
n = len(S)
F = np.eye(n)
np.fill_diagonal(F, np.mean(np.diag(S)))
return F
def _get_shrink_target_const_corr(self, X: np.ndarray, S: np.ndarray) -> np.ndarray:
"""get shrinking target with constant correlation
This target assumes constant pair-wise correlation but keep the sample variance.
The constant correlation is estimated by averaging all pairwise correlations.
"""
n = len(S)
var = np.diag(S)
sqrt_var = np.sqrt(var)
covar = np.outer(sqrt_var, sqrt_var)
r_bar = (np.sum(S / covar) - n) / (n * (n - 1))
F = r_bar * covar
np.fill_diagonal(F, var)
return F
def _get_shrink_target_single_factor(self, X: np.ndarray, S: np.ndarray) -> np.ndarray:
"""get shrinking target with single factor model"""
X_mkt = np.nanmean(X, axis=1)
cov_mkt = np.asarray(X.T.dot(X_mkt) / len(X))
var_mkt = np.asarray(X_mkt.dot(X_mkt) / len(X))
F = np.outer(cov_mkt, cov_mkt) / var_mkt
np.fill_diagonal(F, np.diag(S))
return F
def _get_shrink_param(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float:
"""get shrinking parameter `alpha`
Note:
The Ledoit-Wolf shrinking parameter estimator consists of three different methods.
"""
if self.alpha == self.SHR_OAS:
return self._get_shrink_param_oas(X, S, F)
elif self.alpha == self.SHR_LW:
if self.target == self.TGT_CONST_VAR:
return self._get_shrink_param_lw_const_var(X, S, F)
if self.target == self.TGT_CONST_CORR:
return self._get_shrink_param_lw_const_corr(X, S, F)
if self.target == self.TGT_SINGLE_FACTOR:
return self._get_shrink_param_lw_single_factor(X, S, F)
return self.alpha
def _get_shrink_param_oas(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float:
"""Oracle Approximating Shrinkage Estimator
This method uses the following formula to estimate the `alpha`
parameter for the shrink covariance estimator:
A = (1 - 2 / p) * trace(S^2) + trace^2(S)
B = (n + 1 - 2 / p) * (trace(S^2) - trace^2(S) / p)
alpha = A / B
where `n`, `p` are the dim of observations and variables respectively.
"""
trS2 = np.sum(S ** 2)
tr2S = np.trace(S) ** 2
n, p = X.shape
A = (1 - 2 / p) * (trS2 + tr2S)
B = (n + 1 - 2 / p) * (trS2 + tr2S / p)
alpha = A / B
return alpha
def _get_shrink_param_lw_const_var(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float:
"""Ledoit-Wolf Shrinkage Estimator (Constant Variance)
This method shrinks the covariance matrix towards the constand variance target.
"""
t, n = X.shape
y = X ** 2
phi = np.sum(y.T.dot(y) / t - S ** 2)
gamma = np.linalg.norm(S - F, "fro") ** 2
kappa = phi / gamma
alpha = max(0, min(1, kappa / t))
return alpha
def _get_shrink_param_lw_const_corr(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float:
"""Ledoit-Wolf Shrinkage Estimator (Constant Correlation)
This method shrinks the covariance matrix towards the constand correlation target.
"""
t, n = X.shape
var = np.diag(S)
sqrt_var = np.sqrt(var)
r_bar = (np.sum(S / np.outer(sqrt_var, sqrt_var)) - n) / (n * (n - 1))
y = X ** 2
phi_mat = y.T.dot(y) / t - S ** 2
phi = np.sum(phi_mat)
theta_mat = (X ** 3).T.dot(X) / t - var[:, None] * S
np.fill_diagonal(theta_mat, 0)
rho = np.sum(np.diag(phi_mat)) + r_bar * np.sum(np.outer(1 / sqrt_var, sqrt_var) * theta_mat)
gamma = np.linalg.norm(S - F, "fro") ** 2
kappa = (phi - rho) / gamma
alpha = max(0, min(1, kappa / t))
return alpha
def _get_shrink_param_lw_single_factor(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float:
"""Ledoit-Wolf Shrinkage Estimator (Single Factor Model)
This method shrinks the covariance matrix towards the single factor model target.
"""
t, n = X.shape
X_mkt = np.nanmean(X, axis=1)
cov_mkt = np.asarray(X.T.dot(X_mkt) / len(X))
var_mkt = np.asarray(X_mkt.dot(X_mkt) / len(X))
y = X ** 2
phi = np.sum(y.T.dot(y)) / t - np.sum(S ** 2)
rdiag = np.sum(y ** 2) / t - np.sum(np.diag(S) ** 2)
z = X * X_mkt[:, None]
v1 = y.T.dot(z) / t - cov_mkt[:, None] * S
roff1 = np.sum(v1 * cov_mkt[:, None].T) / var_mkt - np.sum(np.diag(v1) * cov_mkt) / var_mkt
v3 = z.T.dot(z) / t - var_mkt * S
roff3 = (
np.sum(v3 * np.outer(cov_mkt, cov_mkt)) / var_mkt ** 2 - np.sum(
np.diag(v3) * cov_mkt ** 2) / var_mkt ** 2
)
roff = 2 * roff1 - roff3
rho = rdiag + roff
gamma = np.linalg.norm(S - F, "fro") ** 2
kappa = (phi - rho) / gamma
alpha = max(0, min(1, kappa / t))
return alpha

View File

@@ -0,0 +1,152 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import numpy as np
import pandas as pd
from typing import Union
from sklearn.decomposition import PCA, FactorAnalysis
from qlib.model.riskmodel import RiskModel
class StructuredCovEstimator(RiskModel):
"""Structured Covariance Estimator
This estimator assumes observations can be predicted by multiple factors
X = FB + U
where `F` can be specified by explicit risk factors or latent factors.
Therefore the structured covariance can be estimated by
cov(X) = F cov(B) F.T + cov(U)
We use latent factor models to estimate the structured covariance.
Specifically, the following latent factor models are supported:
- `pca`: Principal Component Analysis
- `fa`: Factor Analysis
Reference: [1] Fan, J., Liao, Y., & Liu, H. (2016). An overview of the estimation of large covariance and
precision matrices. Econometrics Journal, 19(1), C1C32. https://doi.org/10.1111/ectj.12061
"""
FACTOR_MODEL_PCA = "pca"
FACTOR_MODEL_FA = "fa"
NAN_OPTION = "fill"
def __init__(
self,
factor_model: str = "pca",
num_factors: int = 10,
assume_centered: bool = False,
scale_return: bool = True,
):
"""
Args:
factor_model (str): the latent factor models used to estimate the structured covariance (`pca`/`fa`).
num_factors (int): number of components to keep.
assume_centered (bool): whether the data is assumed to be centered.
scale_return (bool): whether scale returns as percentage.
"""
super().__init__(self.NAN_OPTION, assume_centered, scale_return)
assert factor_model in [
self.FACTOR_MODEL_PCA,
self.FACTOR_MODEL_FA,
], "factor_model={} is not supported".format(factor_model)
self.solver = PCA if factor_model == self.FACTOR_MODEL_PCA else FactorAnalysis
self.num_factors = num_factors
def predict(
self,
X: Union[pd.Series, pd.DataFrame, np.ndarray],
return_corr: bool = False,
is_price: bool = True,
return_decomposed_components=False,
) -> Union[pd.DataFrame, np.ndarray, tuple]:
"""
Args:
X (pd.Series, pd.DataFrame or np.ndarray): data from which to estimate the covariance,
with variables as columns and observations as rows.
return_corr (bool): whether return the correlation matrix.
is_price (bool): whether `X` contains price (if not assume stock returns).
return_decomposed_components (bool): whether return decomposed components of the covariance matrix.
Returns:
tuple or pd.DataFrame or np.ndarray: decomposed covariance matrix or estimated covariance or correlation.
"""
assert (
not return_corr or not return_decomposed_components
), "Can only return either correlation matrix or decomposed components."
# transform input into 2D array
if not isinstance(X, (pd.Series, pd.DataFrame)):
columns = None
else:
if isinstance(X.index, pd.MultiIndex):
if isinstance(X, pd.DataFrame):
X = X.iloc[:, 0].unstack(level="instrument") # always use the first column
else:
X = X.unstack(level="instrument")
else:
# X is 2D DataFrame
pass
columns = X.columns # will be used to restore dataframe
X = X.values
# calculate pct_change
if is_price:
X = X[1:] / X[:-1] - 1 # NOTE: resulting `n - 1` rows
# scale return
if self.scale_return:
X *= 100
# handle nan and centered
X = self._preprocess(X)
if return_decomposed_components:
F, cov_b, var_u = self._predict(X, return_structured=True)
return F, cov_b, var_u
else:
# estimate covariance
S = self._predict(X)
# return correlation if needed
if return_corr:
vola = np.sqrt(np.diag(S))
corr = S / np.outer(vola, vola)
if columns is None:
return corr
return pd.DataFrame(corr, index=columns, columns=columns)
# return covariance
if columns is None:
return S
return pd.DataFrame(S, index=columns, columns=columns)
def _predict(self, X: np.ndarray, return_structured=False) -> Union[np.ndarray, tuple]:
"""
covariance estimation implementation
Args:
X (np.ndarray): data matrix containing multiple variables (columns) and observations (rows).
return_structured (bool): whether return decomposed components of the covariance matrix.
Returns:
tuple or np.ndarray: decomposed covariance matrix or covariance matrix.
"""
model = self.solver(self.num_factors, random_state=0).fit(X)
F = model.components_.T # num_features x num_factors
B = model.transform(X) # num_samples x num_factors
U = X - B @ F.T
cov_b = np.cov(B.T) # num_factors x num_factors
var_u = np.var(U, axis=0) # diagonal
if return_structured:
return F, cov_b, var_u
cov_x = F @ cov_b @ F.T + np.diag(var_u)
return cov_x

View File

@@ -0,0 +1,140 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import numpy as np
import cvxpy as cp
import pandas as pd
from typing import Union
from qlib.portfolio.optimizer import BaseOptimizer
class EnhancedIndexingOptimizer(BaseOptimizer):
"""
Portfolio Optimizer with Enhanced Indexing
Note:
This optimizer always assumes full investment and no-shorting.
"""
START_FROM_W0 = "w0"
START_FROM_BENCH = "benchmark"
DO_NOT_START_FROM = "no_warm_start"
def __init__(
self,
lamb: float = 10,
delta: float = 0.4,
bench_dev: float = 0.01,
inds_dev: float = None,
scale_alpha: bool = True,
verbose: bool = False,
warm_start: str = DO_NOT_START_FROM,
max_iters: int = 10000,
):
"""
Args:
lamb (float): risk aversion parameter (larger `lamb` means less focus on return)
delta (float): turnover rate limit
bench_dev (float): benchmark deviation limit
inds_dev (float/None): industry deviation limit, set `inds_dev` to None to ignore industry specific
restriction
scale_alpha (bool): if to scale alpha to match the volatility of the covariance matrix
verbose (bool): if print detailed information about the solver
warm_start (str): whether try to warm start (`w0`/`benchmark`/``)
(https://www.cvxpy.org/tutorial/advanced/index.html#warm-start)
"""
assert lamb >= 0, "risk aversion parameter `lamb` should be positive"
self.lamb = lamb
assert delta >= 0, "turnover limit `delta` should be positive"
self.delta = delta
assert bench_dev >= 0, "benchmark deviation limit `bench_dev` should be positive"
self.bench_dev = bench_dev
assert inds_dev is None or inds_dev >= 0, "industry deviation limit `inds_dev` should be positive or None."
self.inds_dev = inds_dev
assert warm_start in [
self.DO_NOT_START_FROM,
self.START_FROM_W0,
self.START_FROM_BENCH,
], "illegal warm start option"
self.start_from_w0 = warm_start == self.START_FROM_W0
self.start_from_bench = warm_start == self.START_FROM_BENCH
self.scale_alpha = scale_alpha
self.verbose = verbose
self.max_iters = max_iters
def __call__(
self,
u: np.ndarray,
F: np.ndarray,
covB: np.ndarray,
varU: np.ndarray,
w0: np.ndarray,
w_bench: np.ndarray,
inds_onehot: np.ndarray = None,
) -> Union[np.ndarray, pd.Series]:
"""
Args:
u (np.ndarray): expected returns (a.k.a., alpha)
F, covB, varU (np.ndarray): see StructuredCovEstimator
w0 (np.ndarray): initial weights (for turnover control)
w_bench (np.ndarray): benchmark weights
inds_onehot (np.ndarray): industry (onehot)
Returns:
np.ndarray or pd.Series: optimized portfolio allocation
"""
assert inds_onehot is not None or self.inds_dev is None, "Industry onehot vector is required."
# scale alpha to match volatility
if self.scale_alpha:
u = u / u.std()
x_variance = np.mean(np.diag(F @ covB @ F.T) + varU)
u *= x_variance ** 0.5
w = cp.Variable(len(u)) # num_assets
v = w @ F # num_factors
ret = w @ u
risk = cp.quad_form(v, covB) + cp.sum(cp.multiply(varU, w ** 2))
obj = cp.Maximize(ret - self.lamb * risk)
d_bench = w - w_bench
cons = [
w >= 0,
cp.sum(w) == 1,
d_bench >= -self.bench_dev,
d_bench <= self.bench_dev,
]
if self.inds_dev is not None:
d_inds = d_bench @ inds_onehot
cons.append(d_inds >= -self.inds_dev)
cons.append(d_inds <= self.inds_dev)
if w0 is not None:
turnover = cp.sum(cp.abs(w - w0))
cons.append(turnover <= self.delta)
warm_start = False
if self.start_from_w0:
if w0 is None:
print("Warning: try warm start with w0, but w0 is `None`.")
else:
w.value = w0
warm_start = True
elif self.start_from_bench:
w.value = w_bench
warm_start = True
prob = cp.Problem(obj, cons)
prob.solve(solver=cp.SCS, verbose=self.verbose, warm_start=warm_start, max_iters=self.max_iters)
if prob.status != "optimal":
print("Warning: solve failed.", prob.status)
return np.asarray(w.value)

View File

@@ -4,11 +4,12 @@
import abc
import warnings
import numpy as np
import cvxpy as cp
import pandas as pd
import scipy.optimize as so
from typing import Optional, Union, Callable, List
from qlib.portfolio.enhanced_indexing import EnhancedIndexingOptimizer
class BaseOptimizer(abc.ABC):
""" Construct portfolio with a optimization related method """
@@ -38,13 +39,13 @@ class PortfolioOptimizer(BaseOptimizer):
OPT_INV = "inv"
def __init__(
self,
method: str = "inv",
lamb: float = 0,
delta: float = 0,
alpha: float = 0.0,
scale_alpha: bool = True,
tol: float = 1e-8,
self,
method: str = "inv",
lamb: float = 0,
delta: float = 0,
alpha: float = 0.0,
scale_alpha: bool = True,
tol: float = 1e-8,
):
"""
Args:
@@ -71,10 +72,10 @@ class PortfolioOptimizer(BaseOptimizer):
self.scale_alpha = scale_alpha
def __call__(
self,
S: Union[np.ndarray, pd.DataFrame],
u: Optional[Union[np.ndarray, pd.Series]] = None,
w0: Optional[Union[np.ndarray, pd.Series]] = None,
self,
S: Union[np.ndarray, pd.DataFrame],
u: Optional[Union[np.ndarray, pd.Series]] = None,
w0: Optional[Union[np.ndarray, pd.Series]] = None,
) -> Union[np.ndarray, pd.Series]:
"""
Args:
@@ -163,7 +164,7 @@ class PortfolioOptimizer(BaseOptimizer):
return self._solve(len(S), self._get_objective_gmv(S), *self._get_constrains(w0))
def _optimize_mvo(
self, S: np.ndarray, u: Optional[np.ndarray] = None, w0: Optional[np.ndarray] = None
self, S: np.ndarray, u: Optional[np.ndarray] = None, w0: Optional[np.ndarray] = None
) -> np.ndarray:
"""optimize mean-variance portfolio
@@ -259,7 +260,6 @@ class PortfolioOptimizer(BaseOptimizer):
# add l2 regularization
wrapped_obj = obj
if self.alpha > 0:
def opt_obj(x):
return obj(x) + self.alpha * np.sum(np.square(x))
@@ -272,134 +272,3 @@ class PortfolioOptimizer(BaseOptimizer):
warnings.warn(f"optimization not success ({sol.status})")
return sol.x
class EnhancedIndexingOptimizer(BaseOptimizer):
"""
Portfolio Optimizer with Enhanced Indexing
Note:
This optimizer always assumes full investment and no-shorting.
"""
START_FROM_W0 = "w0"
START_FROM_BENCH = "benchmark"
DO_NOT_START_FROM = "no_warm_start"
def __init__(
self,
lamb: float = 10,
delta: float = 0.4,
bench_dev: float = 0.01,
inds_dev: float = None,
scale_alpha: bool = True,
verbose: bool = False,
warm_start: str = DO_NOT_START_FROM,
max_iters: int = 10000,
):
"""
Args:
lamb (float): risk aversion parameter (larger `lamb` means less focus on return)
delta (float): turnover rate limit
bench_dev (float): benchmark deviation limit
inds_dev (float/None): industry deviation limit, set `inds_dev` to None to ignore industry specific
restriction
scale_alpha (bool): if to scale alpha to match the volatility of the covariance matrix
verbose (bool): if print detailed information about the solver
warm_start (str): whether try to warm start (`w0`/`benchmark`/``)
(https://www.cvxpy.org/tutorial/advanced/index.html#warm-start)
"""
assert lamb >= 0, "risk aversion parameter `lamb` should be positive"
self.lamb = lamb
assert delta >= 0, "turnover limit `delta` should be positive"
self.delta = delta
assert bench_dev >= 0, "benchmark deviation limit `bench_dev` should be positive"
self.bench_dev = bench_dev
assert inds_dev >= 0, "industry deviation limit `inds_dev` should be positive"
self.inds_dev = inds_dev
assert warm_start in [
self.DO_NOT_START_FROM,
self.START_FROM_W0,
self.START_FROM_BENCH,
], "illegal warm start option"
self.start_from_w0 = warm_start == self.START_FROM_W0
self.start_from_bench = warm_start == self.START_FROM_BENCH
self.scale_alpha = scale_alpha
self.verbose = verbose
self.max_iters = max_iters
def __call__(
self,
u: np.ndarray,
F: np.ndarray,
covB: np.ndarray,
varU: np.ndarray,
w0: np.ndarray,
w_bench: np.ndarray,
inds_onehot: np.ndarray = None,
) -> Union[np.ndarray, pd.Series]:
"""
Args:
u (np.ndarray): expected returns (a.k.a., alpha)
F, covB, varU (np.ndarray): see StructuredCovEstimator
w0 (np.ndarray): initial weights (for turnover control)
w_bench (np.ndarray): benchmark weights
inds_onehot (np.ndarray): industry (onehot)
Returns:
np.ndarray or pd.Series: optimized portfolio allocation
"""
assert inds_onehot is not None or self.inds_dev is None, "Industry onehot vector is required."
# scale alpha to match volatility
if self.scale_alpha:
u = u / u.std()
x_variance = np.mean(np.diag(F @ covB @ F.T) + varU)
u *= x_variance ** 0.5
w = cp.Variable(len(u)) # num_assets
v = w @ F # num_factors
ret = w @ u
risk = cp.quad_form(v, covB) + cp.sum(cp.multiply(varU, w ** 2))
obj = cp.Maximize(ret - self.lamb * risk)
d_bench = w - w_bench
cons = [
w >= 0,
cp.sum(w) == 1,
d_bench >= -self.bench_dev,
d_bench <= self.bench_dev,
]
if self.inds_dev is not None:
d_inds = d_bench @ inds_onehot
cons.append(d_inds >= -self.inds_dev)
cons.append(d_inds <= self.inds_dev)
if w0 is not None:
turnover = cp.sum(cp.abs(w - w0))
cons.append(turnover <= self.delta)
warm_start = False
if self.start_from_w0:
if w0 is None:
print("Warning: try warm start with w0, but w0 is `None`.")
else:
w.value = w0
warm_start = True
elif self.start_from_bench:
w.value = w_bench
warm_start = True
prob = cp.Problem(obj, cons)
prob.solve(solver=cp.SCS, verbose=self.verbose, warm_start=warm_start, max_iters=self.max_iters)
if prob.status != "optimal":
print("Warning: solve failed.", prob.status)
return np.asarray(w.value)

View File

@@ -1,282 +0,0 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import sys
import math
import shutil
import unittest
import numpy as np
import pandas as pd
from tqdm import tqdm
from pathlib import Path
import qlib
from qlib.config import C
from qlib.utils import init_instance_by_config, flatten_dict
from qlib.workflow import R
from qlib.config import REG_CN
from qlib.workflow.record_temp import SignalRecord, SigAnaRecord
from qlib.tests import TestAutoData
from qlib.portfolio.optimizer import EnhancedIndexingOptimizer
from qlib.model.riskmodel import StructuredCovEstimator
from qlib.data.dataset.loader import QlibDataLoader
from qlib.data.dataset.handler import DataHandler
from qlib.data import D
from qlib.utils import exists_qlib_data, init_instance_by_config
market = "all"
trade_gap = 21
label_config = "Ref($close, -{}) / Ref($close, -1) - 1".format(trade_gap) # reconstruct portfolio once a month
provider_uri = "~/.qlib_ei/qlib_data/cn_data" # target_dir
if not exists_qlib_data(provider_uri):
print(f"Qlib data is not found in {provider_uri}")
sys.path.append(str(Path.cwd().parent.joinpath("scripts")))
from get_data import GetData
GetData().qlib_data(target_dir=provider_uri, region=REG_CN)
qlib.init(provider_uri=provider_uri, region=REG_CN)
###################################
# train model
###################################
data_handler_config = {
"start_time": "2008-01-01",
"end_time": "2020-08-01",
"fit_start_time": "2008-01-01",
"fit_end_time": "2014-11-30",
"instruments": market,
"label": [label_config]
}
task = {
"model": {
"class": "LGBModel",
"module_path": "qlib.contrib.model.gbdt",
"kwargs": {
"loss": "mse",
"colsample_bytree": 0.8879,
"learning_rate": 0.0421,
"subsample": 0.8789,
"lambda_l1": 205.6999,
"lambda_l2": 580.9768,
"max_depth": 8,
"num_leaves": 210,
"num_threads": 32,
},
},
"dataset": {
"class": "DatasetH",
"module_path": "qlib.data.dataset",
"kwargs": {
"handler": {
"class": "Alpha158",
"module_path": "qlib.contrib.data.handler",
"kwargs": data_handler_config,
},
"segments": {
"train": ("2008-01-01", "2014-11-30"),
"valid": ("2015-01-01", "2016-11-30"),
"test": ("2017-01-01", "2018-01-01"),
},
},
},
}
class CSI300:
"""Simulate CSI300 as the Benchmark for Enhanced Indexing to Track"""
def __init__(self):
# provider_uri = '/nfs_data/qlib_data/ycz_daily/qlib'
# qlib.init(provider_uri=provider_uri, region=REG_CN, dataset_cache=None, expression_cache=None)
self.csi_weight = D.features(D.instruments('csi300'), ['$csi300_weight'])
def __call__(self, pd_index, trade_date):
weights = np.zeros(len(pd_index))
for idx, instrument in enumerate(pd_index):
if (instrument, trade_date) in self.csi_weight.index:
weight = self.csi_weight.loc[(instrument, trade_date)].values[0]
if not math.isnan(weight):
weights[idx] = weight
assert weights.sum() > 0, ' Fetch CSI Weights Error!'
weights = weights / weights.sum()
return weights
class EnhancedIndexingStrategy:
"""Enhanced Indexing Strategy"""
def __init__(self):
self.benchmark = CSI300()
provider_uri = "~/.qlib_ei/qlib_data/cn_data"
qlib.init(provider_uri=provider_uri, region=REG_CN)
self.data_handler = DataHandler(market, "2015-01-01", "2019-01-01", QlibDataLoader(["$close"]))
self.label_handler = DataHandler(market, "2015-01-01", "2019-01-01", QlibDataLoader([label_config]))
self.cov_estimator = StructuredCovEstimator()
self.optimizer = EnhancedIndexingOptimizer(lamb=0.1, delta=0.4, bench_dev=0.03, max_iters=50000)
def update(self, score_series, current, pred_date):
"""
Parameters
-----------
score_series : pd.Series
stock_id , score.
current : Position()
current of account.
trade_exchange : Exchange()
exchange.
trade_date : pd.Timestamp
date.
"""
print(score_series)
score_series = score_series.dropna()
# portfolio init weight
init_weight = current.reindex(score_series.index, fill_value=0).values.squeeze()
init_weight_sum = init_weight.sum()
if init_weight_sum > 0:
init_weight /= init_weight_sum
# covariance estimation
selector = (self.data_handler.get_range_selector(pred_date, 252), score_series.index)
price = self.data_handler.fetch(selector, level=None, squeeze=True)
F, cov_b, var_u = self.cov_estimator.predict(price, return_decomposed_components=True)
# optimize target portfolio
w_bench = self.benchmark(score_series.index, pred_date)
passed_init_weight = init_weight if init_weight_sum > 0 else None
# print(F)
# print(cov_b)
# print(var_u)
# print(passed_init_weight)
# print(w_bench)
target_weight = self.optimizer(score_series.values, F, cov_b, var_u, passed_init_weight, w_bench)
# print(target_weight)
target = pd.DataFrame(data=target_weight, index=score_series.index)
active_weights = target_weight - w_bench
selector = (self.label_handler.get_range_selector(pred_date, 1), score_series.index)
label = self.label_handler.fetch(selector, level=None, squeeze=True)
alpha = 0
for instrument, weight in zip(score_series.index, active_weights):
delta = label.loc[(pred_date, instrument)]
alpha += weight * (0 if math.isnan(delta) else delta)
print(alpha)
return alpha, target
def train():
"""train model
Returns
-------
pred_score: pandas.DataFrame
predict scores
performance: dict
model performance
"""
# model initiation
model = init_instance_by_config(task["model"])
dataset = init_instance_by_config(task["dataset"])
# start exp
with R.start(experiment_name="workflow"):
R.log_params(**flatten_dict(task))
model.fit(dataset)
# prediction
recorder = R.get_recorder()
rid = recorder.id
sr = SignalRecord(model, dataset, recorder)
sr.generate()
pred_score = sr.load()
# calculate ic and ric
sar = SigAnaRecord(recorder)
sar.generate()
ic = sar.load(sar.get_path("ic.pkl"))
ric = sar.load(sar.get_path("ric.pkl"))
return pred_score, {"ic": ic, "ric": ric}, rid
def backtest_analysis(scores):
"""backtest enhanced indexing
Parameters
----------
scores: pandas.DataFrame
predict scores
Returns
-------
sharpe_ratio: floating-point
sharpe ratio of the enhanced indexing portfolio
"""
# backtest and analysis
with R.start(experiment_name="backtest_analysis"):
strategy = EnhancedIndexingStrategy()
dates = scores.index.get_level_values(0).unique()
alphas = []
current = pd.DataFrame()
gap_between_next_trade = 0
for date in tqdm(dates):
if gap_between_next_trade == 0:
score_series = scores.loc[date]
alpha, current = strategy.update(score_series, current, date)
alphas.append(alpha)
gap_between_next_trade = trade_gap
else:
gap_between_next_trade -= 1
alphas = np.array(alphas)
sharpe_ratio = alphas.mean() / np.std(alphas)
print('Sharpe:', sharpe_ratio)
return sharpe_ratio
class TestAllFlow(TestAutoData):
PRED_SCORE = None
REPORT_NORMAL = None
POSITIONS = None
RID = None
@classmethod
def tearDownClass(cls) -> None:
shutil.rmtree(str(Path(C["exp_manager"]["kwargs"]["uri"].strip("file:")).resolve()))
def test_0_train(self):
TestAllFlow.PRED_SCORE, ic_ric, TestAllFlow.RID = train()
self.assertGreaterEqual(ic_ric["ic"].all(), 0, "train failed")
self.assertGreaterEqual(ic_ric["ric"].all(), 0, "train failed")
def test_1_backtest(self):
sharpe_ratio = backtest_analysis(TestAllFlow.PRED_SCORE)
self.assertGreaterEqual(
sharpe_ratio,
0.90,
"backtest failed",
)
def suite():
_suite = unittest.TestSuite()
_suite.addTest(TestAllFlow("test_0_train"))
_suite.addTest(TestAllFlow("test_1_backtest"))
return _suite
if __name__ == "__main__":
runner = unittest.TextTestRunner()
runner.run(suite())