diff --git a/qlib/model/riskmodel.py b/qlib/model/riskmodel.py deleted file mode 100644 index f19c60fc9..000000000 --- a/qlib/model/riskmodel.py +++ /dev/null @@ -1,611 +0,0 @@ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - -import numpy as np -import pandas as pd -from typing import Union -from sklearn.decomposition import PCA, FactorAnalysis - -from qlib.model.base import BaseModel - - -class RiskModel(BaseModel): - """Risk Model - - A risk model is used to estimate the covariance matrix of stock returns. - """ - - MASK_NAN = "mask" - FILL_NAN = "fill" - IGNORE_NAN = "ignore" - - def __init__(self, nan_option: str = "ignore", assume_centered: bool = False, scale_return: bool = True): - """ - Args: - nan_option (str): nan handling option (`ignore`/`mask`/`fill`). - assume_centered (bool): whether the data is assumed to be centered. - scale_return (bool): whether scale returns as percentage. - """ - # nan - assert nan_option in [ - self.MASK_NAN, - self.FILL_NAN, - self.IGNORE_NAN, - ], f"`nan_option={nan_option}` is not supported" - self.nan_option = nan_option - - self.assume_centered = assume_centered - self.scale_return = scale_return - - def predict( - self, X: Union[pd.Series, pd.DataFrame, np.ndarray], return_corr: bool = False, is_price: bool = True - ) -> Union[pd.DataFrame, np.ndarray]: - """ - Args: - X (pd.Series, pd.DataFrame or np.ndarray): data from which to estimate the covariance, - with variables as columns and observations as rows. - return_corr (bool): whether return the correlation matrix. - is_price (bool): whether `X` contains price (if not assume stock returns). - - Returns: - pd.DataFrame or np.ndarray: estimated covariance (or correlation). - """ - # transform input into 2D array - if not isinstance(X, (pd.Series, pd.DataFrame)): - columns = None - else: - if isinstance(X.index, pd.MultiIndex): - if isinstance(X, pd.DataFrame): - X = X.iloc[:, 0].unstack(level="instrument") # always use the first column - else: - X = X.unstack(level="instrument") - else: - # X is 2D DataFrame - pass - columns = X.columns # will be used to restore dataframe - X = X.values - - # calculate pct_change - if is_price: - X = X[1:] / X[:-1] - 1 # NOTE: resulting `n - 1` rows - - # scale return - if self.scale_return: - X *= 100 - - # handle nan and centered - X = self._preprocess(X) - - # estimate covariance - S = self._predict(X) - - # return correlation if needed - if return_corr: - vola = np.sqrt(np.diag(S)) - corr = S / np.outer(vola, vola) - if columns is None: - return corr - return pd.DataFrame(corr, index=columns, columns=columns) - - # return covariance - if columns is None: - return S - return pd.DataFrame(S, index=columns, columns=columns) - - def _predict(self, X: np.ndarray) -> np.ndarray: - """covariance estimation implementation - - This method should be overridden by child classes. - - By default, this method implements the empirical covariance estimation. - - Args: - X (np.ndarray): data matrix containing multiple variables (columns) and observations (rows). - - Returns: - np.ndarray: covariance matrix. - """ - xTx = np.asarray(X.T.dot(X)) - N = len(X) - if isinstance(X, np.ma.MaskedArray): - M = 1 - X.mask - N = M.T.dot(M) # each pair has distinct number of samples - return xTx / N - - def _preprocess(self, X: np.ndarray) -> Union[np.ndarray, np.ma.MaskedArray]: - """handle nan and centerize data - - Note: - if `nan_option='mask'` then the returned array will be `np.ma.MaskedArray`. - """ - # handle nan - if self.nan_option == self.FILL_NAN: - X = np.nan_to_num(X) - elif self.nan_option == self.MASK_NAN: - X = np.ma.masked_invalid(X) - # centralize - if not self.assume_centered: - X = X - np.nanmean(X, axis=0) - return X - - -class ShrinkCovEstimator(RiskModel): - """Shrinkage Covariance Estimator - - This estimator will shrink the sample covariance matrix towards - an identify matrix: - S_hat = (1 - alpha) * S + alpha * F - where `alpha` is the shrink parameter and `F` is the shrinking target. - - The following shrinking parameters (`alpha`) are supported: - - `lw` [1][2][3]: use Ledoit-Wolf shrinking parameter. - - `oas` [4]: use Oracle Approximating Shrinkage shrinking parameter. - - float: directly specify the shrink parameter, should be between [0, 1]. - - The following shrinking targets (`F`) are supported: - - `const_var` [1][4][5]: assume stocks have the same constant variance and zero correlation. - - `const_corr` [2][6]: assume stocks have different variance but equal correlation. - - `single_factor` [3][7]: assume single factor model as the shrinking target. - - np.ndarray: provide the shrinking targets directly. - - Note: - - The optimal shrinking parameter depends on the selection of the shrinking target. - Currently, `oas` is not supported for `const_corr` and `single_factor`. - - Remember to set `nan_option` to `fill` or `mask` if your data has missing values. - - References: - [1] Ledoit, O., & Wolf, M. (2004). A well-conditioned estimator for large-dimensional covariance matrices. - Journal of Multivariate Analysis, 88(2), 365–411. https://doi.org/10.1016/S0047-259X(03)00096-4 - [2] Ledoit, O., & Wolf, M. (2004). Honey, I shrunk the sample covariance matrix. - Journal of Portfolio Management, 30(4), 1–22. https://doi.org/10.3905/jpm.2004.110 - [3] Ledoit, O., & Wolf, M. (2003). Improved estimation of the covariance matrix of stock returns - with an application to portfolio selection. - Journal of Empirical Finance, 10(5), 603–621. https://doi.org/10.1016/S0927-5398(03)00007-0 - [4] Chen, Y., Wiesel, A., Eldar, Y. C., & Hero, A. O. (2010). Shrinkage algorithms for MMSE covariance - estimation. IEEE Transactions on Signal Processing, 58(10), 5016–5029. - https://doi.org/10.1109/TSP.2010.2053029 - [5] https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-0000-00007f64e5b9/cov1para.m.zip - [6] https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-ffff-ffffde5e2d4e/covCor.m.zip - [7] https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-0000-0000648dfc98/covMarket.m.zip - """ - - SHR_LW = "lw" - SHR_OAS = "oas" - - TGT_CONST_VAR = "const_var" - TGT_CONST_CORR = "const_corr" - TGT_SINGLE_FACTOR = "single_factor" - - def __init__(self, alpha: Union[str, float] = 0.0, target: Union[str, np.ndarray] = "const_var", **kwargs): - """ - Args: - alpha (str or float): shrinking parameter or estimator (`lw`/`oas`) - target (str or np.ndarray): shrinking target (`const_var`/`const_corr`/`single_factor`) - kwargs: see `RiskModel` for more information - """ - super().__init__(**kwargs) - - # alpha - if isinstance(alpha, str): - assert alpha in [self.SHR_LW, self.SHR_OAS], f"shrinking method `{alpha}` is not supported" - elif isinstance(alpha, (float, np.floating)): - assert 0 <= alpha <= 1, "alpha should be between [0, 1]" - else: - raise TypeError("invalid argument type for `alpha`") - self.alpha = alpha - - # target - if isinstance(target, str): - assert target in [ - self.TGT_CONST_VAR, - self.TGT_CONST_CORR, - self.TGT_SINGLE_FACTOR, - ], f"shrinking target `{target} is not supported" - elif isinstance(target, np.ndarray): - pass - else: - raise TypeError("invalid argument type for `target`") - if alpha == self.SHR_OAS and target != self.TGT_CONST_VAR: - raise NotImplementedError("currently `oas` can only support `const_var` as target") - self.target = target - - def _predict(self, X: np.ndarray) -> np.ndarray: - # sample covariance - S = super()._predict(X) - - # shrinking target - F = self._get_shrink_target(X, S) - - # get shrinking parameter - alpha = self._get_shrink_param(X, S, F) - - # shrink covariance - if alpha > 0: - S *= 1 - alpha - F *= alpha - S += F - - return S - - def _get_shrink_target(self, X: np.ndarray, S: np.ndarray) -> np.ndarray: - """get shrinking target `F`""" - if self.target == self.TGT_CONST_VAR: - return self._get_shrink_target_const_var(X, S) - if self.target == self.TGT_CONST_CORR: - return self._get_shrink_target_const_corr(X, S) - if self.target == self.TGT_SINGLE_FACTOR: - return self._get_shrink_target_single_factor(X, S) - return self.target - - def _get_shrink_target_const_var(self, X: np.ndarray, S: np.ndarray) -> np.ndarray: - """get shrinking target with constant variance - - This target assumes zero pair-wise correlation and constant variance. - The constant variance is estimated by averaging all sample's variances. - """ - n = len(S) - F = np.eye(n) - np.fill_diagonal(F, np.mean(np.diag(S))) - return F - - def _get_shrink_target_const_corr(self, X: np.ndarray, S: np.ndarray) -> np.ndarray: - """get shrinking target with constant correlation - - This target assumes constant pair-wise correlation but keep the sample variance. - The constant correlation is estimated by averaging all pairwise correlations. - """ - n = len(S) - var = np.diag(S) - sqrt_var = np.sqrt(var) - covar = np.outer(sqrt_var, sqrt_var) - r_bar = (np.sum(S / covar) - n) / (n * (n - 1)) - F = r_bar * covar - np.fill_diagonal(F, var) - return F - - def _get_shrink_target_single_factor(self, X: np.ndarray, S: np.ndarray) -> np.ndarray: - """get shrinking target with single factor model""" - X_mkt = np.nanmean(X, axis=1) - cov_mkt = np.asarray(X.T.dot(X_mkt) / len(X)) - var_mkt = np.asarray(X_mkt.dot(X_mkt) / len(X)) - F = np.outer(cov_mkt, cov_mkt) / var_mkt - np.fill_diagonal(F, np.diag(S)) - return F - - def _get_shrink_param(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float: - """get shrinking parameter `alpha` - - Note: - The Ledoit-Wolf shrinking parameter estimator consists of three different methods. - """ - if self.alpha == self.SHR_OAS: - return self._get_shrink_param_oas(X, S, F) - elif self.alpha == self.SHR_LW: - if self.target == self.TGT_CONST_VAR: - return self._get_shrink_param_lw_const_var(X, S, F) - if self.target == self.TGT_CONST_CORR: - return self._get_shrink_param_lw_const_corr(X, S, F) - if self.target == self.TGT_SINGLE_FACTOR: - return self._get_shrink_param_lw_single_factor(X, S, F) - return self.alpha - - def _get_shrink_param_oas(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float: - """Oracle Approximating Shrinkage Estimator - - This method uses the following formula to estimate the `alpha` - parameter for the shrink covariance estimator: - A = (1 - 2 / p) * trace(S^2) + trace^2(S) - B = (n + 1 - 2 / p) * (trace(S^2) - trace^2(S) / p) - alpha = A / B - where `n`, `p` are the dim of observations and variables respectively. - """ - trS2 = np.sum(S ** 2) - tr2S = np.trace(S) ** 2 - - n, p = X.shape - - A = (1 - 2 / p) * (trS2 + tr2S) - B = (n + 1 - 2 / p) * (trS2 + tr2S / p) - alpha = A / B - - return alpha - - def _get_shrink_param_lw_const_var(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float: - """Ledoit-Wolf Shrinkage Estimator (Constant Variance) - - This method shrinks the covariance matrix towards the constand variance target. - """ - t, n = X.shape - - y = X ** 2 - phi = np.sum(y.T.dot(y) / t - S ** 2) - - gamma = np.linalg.norm(S - F, "fro") ** 2 - - kappa = phi / gamma - alpha = max(0, min(1, kappa / t)) - - return alpha - - def _get_shrink_param_lw_const_corr(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float: - """Ledoit-Wolf Shrinkage Estimator (Constant Correlation) - - This method shrinks the covariance matrix towards the constand correlation target. - """ - t, n = X.shape - - var = np.diag(S) - sqrt_var = np.sqrt(var) - r_bar = (np.sum(S / np.outer(sqrt_var, sqrt_var)) - n) / (n * (n - 1)) - - y = X ** 2 - phi_mat = y.T.dot(y) / t - S ** 2 - phi = np.sum(phi_mat) - - theta_mat = (X ** 3).T.dot(X) / t - var[:, None] * S - np.fill_diagonal(theta_mat, 0) - rho = np.sum(np.diag(phi_mat)) + r_bar * np.sum(np.outer(1 / sqrt_var, sqrt_var) * theta_mat) - - gamma = np.linalg.norm(S - F, "fro") ** 2 - - kappa = (phi - rho) / gamma - alpha = max(0, min(1, kappa / t)) - - return alpha - - def _get_shrink_param_lw_single_factor(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float: - """Ledoit-Wolf Shrinkage Estimator (Single Factor Model) - - This method shrinks the covariance matrix towards the single factor model target. - """ - t, n = X.shape - - X_mkt = np.nanmean(X, axis=1) - cov_mkt = np.asarray(X.T.dot(X_mkt) / len(X)) - var_mkt = np.asarray(X_mkt.dot(X_mkt) / len(X)) - - y = X ** 2 - phi = np.sum(y.T.dot(y)) / t - np.sum(S ** 2) - - rdiag = np.sum(y ** 2) / t - np.sum(np.diag(S) ** 2) - z = X * X_mkt[:, None] - v1 = y.T.dot(z) / t - cov_mkt[:, None] * S - roff1 = np.sum(v1 * cov_mkt[:, None].T) / var_mkt - np.sum(np.diag(v1) * cov_mkt) / var_mkt - v3 = z.T.dot(z) / t - var_mkt * S - roff3 = ( - np.sum(v3 * np.outer(cov_mkt, cov_mkt)) / var_mkt ** 2 - np.sum(np.diag(v3) * cov_mkt ** 2) / var_mkt ** 2 - ) - roff = 2 * roff1 - roff3 - rho = rdiag + roff - - gamma = np.linalg.norm(S - F, "fro") ** 2 - - kappa = (phi - rho) / gamma - alpha = max(0, min(1, kappa / t)) - - return alpha - - -class POETCovEstimator(RiskModel): - """Principal Orthogonal Complement Thresholding Estimator (POET) - - Reference: - [1] Fan, J., Liao, Y., & Mincheva, M. (2013). Large covariance estimation by thresholding principal orthogonal complements. - Journal of the Royal Statistical Society. Series B: Statistical Methodology, 75(4), 603–680. https://doi.org/10.1111/rssb.12016 - [2] http://econweb.rutgers.edu/yl1114/papers/poet/POET.m - """ - - THRESH_SOFT = "soft" - THRESH_HARD = "hard" - THRESH_SCAD = "scad" - - def __init__(self, num_factors: int = 0, thresh: float = 1.0, thresh_method: str = "soft", **kwargs): - """ - Args: - num_factors (int): number of factors (if set to zero, no factor model will be used). - thresh (float): the positive constant for thresholding. - thresh_method (str): thresholding method, which can be - - 'soft': soft thresholding. - - 'hard': hard thresholding. - - 'scad': scad thresholding. - kwargs: see `RiskModel` for more information. - """ - super().__init__(**kwargs) - - assert num_factors >= 0, "`num_factors` requires a positive integer" - self.num_factors = num_factors - - assert thresh >= 0, "`thresh` requires a positive float number" - self.thresh = thresh - - assert thresh_method in [ - self.THRESH_HARD, - self.THRESH_SOFT, - self.THRESH_SCAD, - ], "`thresh_method` should be `soft`/`hard`/`scad`" - self.thresh_method = thresh_method - - def _predict(self, X: np.ndarray) -> np.ndarray: - - Y = X.T # NOTE: to match POET's implementation - p, n = Y.shape - - if self.num_factors > 0: - Dd, V = np.linalg.eig(Y.T.dot(Y)) - V = V[:, np.argsort(Dd)] - F = V[:, -self.num_factors :][:, ::-1] * np.sqrt(n) - LamPCA = Y.dot(F) / n - uhat = np.asarray(Y - LamPCA.dot(F.T)) - Lowrank = np.asarray(LamPCA.dot(LamPCA.T)) - rate = 1 / np.sqrt(p) + np.sqrt(np.log(p) / n) - else: - uhat = np.asarray(Y) - rate = np.sqrt(np.log(p) / n) - Lowrank = 0 - - lamb = rate * self.thresh - SuPCA = uhat.dot(uhat.T) / n - SuDiag = np.diag(np.diag(SuPCA)) - R = np.linalg.inv(SuDiag ** 0.5).dot(SuPCA).dot(np.linalg.inv(SuDiag ** 0.5)) - - if self.thresh_method == self.THRESH_HARD: - M = R * (np.abs(R) > lamb) - elif self.thresh_method == self.THRESH_SOFT: - res = np.abs(R) - lamb - res = (res + np.abs(res)) / 2 - M = np.sign(R) * res - else: - M1 = (np.abs(R) < 2 * lamb) * np.sign(R) * (np.abs(R) - lamb) * (np.abs(R) > lamb) - M2 = (np.abs(R) < 3.7 * lamb) * (np.abs(R) >= 2 * lamb) * (2.7 * R - 3.7 * np.sign(R) * lamb) / 1.7 - M3 = (np.abs(R) >= 3.7 * lamb) * R - M = M1 + M2 + M3 - - Rthresh = M - np.diag(np.diag(M)) + np.eye(p) - SigmaU = (SuDiag ** 0.5).dot(Rthresh).dot(SuDiag ** 0.5) - SigmaY = SigmaU + Lowrank - - return SigmaY - - -class StructuredCovEstimator(RiskModel): - """Structured Covariance Estimator - - This estimator assumes observations can be predicted by multiple factors - X = FB + U - where `F` can be specified by explicit risk factors or latent factors. - - Therefore the structured covariance can be estimated by - cov(X) = F cov(B) F.T + cov(U) - - We use latent factor models to estimate the structured covariance. - Specifically, the following latent factor models are supported: - - `pca`: Principal Component Analysis - - `fa`: Factor Analysis - - Reference: [1] Fan, J., Liao, Y., & Liu, H. (2016). An overview of the estimation of large covariance and - precision matrices. Econometrics Journal, 19(1), C1–C32. https://doi.org/10.1111/ectj.12061 - """ - - FACTOR_MODEL_PCA = "pca" - FACTOR_MODEL_FA = "fa" - - def __init__( - self, - factor_model: str = "pca", - num_factors: int = 10, - nan_option: str = "ignore", - assume_centered: bool = False, - scale_return: bool = True, - ): - """ - Args: - factor_model (str): the latent factor models used to estimate the structured covariance (`pca`/`fa`). - num_factors (int): number of components to keep. - nan_option (str): nan handling option (`ignore`/`fill`). - assume_centered (bool): whether the data is assumed to be centered. - scale_return (bool): whether scale returns as percentage. - """ - super().__init__(nan_option, assume_centered, scale_return) - - assert factor_model in [ - self.FACTOR_MODEL_PCA, - self.FACTOR_MODEL_FA, - ], "factor_model={} is not supported".format(factor_model) - self.solver = PCA if factor_model == self.FACTOR_MODEL_PCA else FactorAnalysis - - self.num_factors = num_factors - - def predict( - self, - X: Union[pd.Series, pd.DataFrame, np.ndarray], - return_corr: bool = False, - is_price: bool = True, - return_decomposed_components=False, - ) -> Union[pd.DataFrame, np.ndarray, tuple]: - """ - Args: - X (pd.Series, pd.DataFrame or np.ndarray): data from which to estimate the covariance, - with variables as columns and observations as rows. - return_corr (bool): whether return the correlation matrix. - is_price (bool): whether `X` contains price (if not assume stock returns). - return_decomposed_components (bool): whether return decomposed components of the covariance matrix. - - Returns: - tuple or pd.DataFrame or np.ndarray: decomposed covariance matrix or estimated covariance or correlation. - """ - assert ( - not return_corr or not return_decomposed_components - ), "Can only return either correlation matrix or decomposed components." - - # transform input into 2D array - if not isinstance(X, (pd.Series, pd.DataFrame)): - columns = None - else: - if isinstance(X.index, pd.MultiIndex): - if isinstance(X, pd.DataFrame): - X = X.iloc[:, 0].unstack(level="instrument") # always use the first column - else: - X = X.unstack(level="instrument") - else: - # X is 2D DataFrame - pass - columns = X.columns # will be used to restore dataframe - X = X.values - - # calculate pct_change - if is_price: - X = X[1:] / X[:-1] - 1 # NOTE: resulting `n - 1` rows - - # scale return - if self.scale_return: - X *= 100 - - # handle nan and centered - X = self._preprocess(X) - - if return_decomposed_components: - F, cov_b, var_u = self._predict(X, return_structured=True) - return F, cov_b, var_u - else: - # estimate covariance - S = self._predict(X) - - # return correlation if needed - if return_corr: - vola = np.sqrt(np.diag(S)) - corr = S / np.outer(vola, vola) - if columns is None: - return corr - return pd.DataFrame(corr, index=columns, columns=columns) - - # return covariance - if columns is None: - return S - return pd.DataFrame(S, index=columns, columns=columns) - - def _predict(self, X: np.ndarray, return_structured=False) -> Union[np.ndarray, tuple]: - """ - covariance estimation implementation - - Args: - X (np.ndarray): data matrix containing multiple variables (columns) and observations (rows). - return_structured (bool): whether return decomposed components of the covariance matrix. - - Returns: - tuple or np.ndarray: decomposed covariance matrix or covariance matrix. - """ - - model = self.solver(self.num_factors, random_state=0).fit(X) - - F = model.components_.T # num_features x num_factors - B = model.transform(X) # num_samples x num_factors - U = X - B @ F.T - cov_b = np.cov(B.T) # num_factors x num_factors - var_u = np.var(U, axis=0) # diagonal - - if return_structured: - return F, cov_b, var_u - - cov_x = F @ cov_b @ F.T + np.diag(var_u) - - return cov_x diff --git a/qlib/model/riskmodel_poet.py b/qlib/model/riskmodel/__init__.py similarity index 100% rename from qlib/model/riskmodel_poet.py rename to qlib/model/riskmodel/__init__.py diff --git a/qlib/model/riskmodel/base.py b/qlib/model/riskmodel/base.py new file mode 100644 index 000000000..d5b009ccc --- /dev/null +++ b/qlib/model/riskmodel/base.py @@ -0,0 +1,141 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import numpy as np +import pandas as pd +from typing import Union + +from qlib.model.base import BaseModel + +from qlib.model.riskmodel_poet import POETCovEstimator +from qlib.model.riskmodel_shrink import ShrinkCovEstimator +from qlib.model.riskmodel_structured import StructuredCovEstimator + + +class RiskModel(BaseModel): + """Risk Model + + A risk model is used to estimate the covariance matrix of stock returns. + """ + + MASK_NAN = "mask" + FILL_NAN = "fill" + IGNORE_NAN = "ignore" + + def __init__(self, nan_option: str = "ignore", assume_centered: bool = False, scale_return: bool = True): + """ + Args: + nan_option (str): nan handling option (`ignore`/`mask`/`fill`). + assume_centered (bool): whether the data is assumed to be centered. + scale_return (bool): whether scale returns as percentage. + """ + # nan + assert nan_option in [ + self.MASK_NAN, + self.FILL_NAN, + self.IGNORE_NAN, + ], f"`nan_option={nan_option}` is not supported" + self.nan_option = nan_option + + self.assume_centered = assume_centered + self.scale_return = scale_return + + def predict( + self, X: Union[pd.Series, pd.DataFrame, np.ndarray], return_corr: bool = False, is_price: bool = True + ) -> Union[pd.DataFrame, np.ndarray]: + """ + Args: + X (pd.Series, pd.DataFrame or np.ndarray): data from which to estimate the covariance, + with variables as columns and observations as rows. + return_corr (bool): whether return the correlation matrix. + is_price (bool): whether `X` contains price (if not assume stock returns). + + Returns: + pd.DataFrame or np.ndarray: estimated covariance (or correlation). + """ + # transform input into 2D array + if not isinstance(X, (pd.Series, pd.DataFrame)): + columns = None + else: + if isinstance(X.index, pd.MultiIndex): + if isinstance(X, pd.DataFrame): + X = X.iloc[:, 0].unstack(level="instrument") # always use the first column + else: + X = X.unstack(level="instrument") + else: + # X is 2D DataFrame + pass + columns = X.columns # will be used to restore dataframe + X = X.values + + # calculate pct_change + if is_price: + X = X[1:] / X[:-1] - 1 # NOTE: resulting `n - 1` rows + + # scale return + if self.scale_return: + X *= 100 + + # handle nan and centered + X = self._preprocess(X) + + # estimate covariance + S = self._predict(X) + + # return correlation if needed + if return_corr: + vola = np.sqrt(np.diag(S)) + corr = S / np.outer(vola, vola) + if columns is None: + return corr + return pd.DataFrame(corr, index=columns, columns=columns) + + # return covariance + if columns is None: + return S + return pd.DataFrame(S, index=columns, columns=columns) + + def _predict(self, X: np.ndarray) -> np.ndarray: + """covariance estimation implementation + + This method should be overridden by child classes. + + By default, this method implements the empirical covariance estimation. + + Args: + X (np.ndarray): data matrix containing multiple variables (columns) and observations (rows). + + Returns: + np.ndarray: covariance matrix. + """ + xTx = np.asarray(X.T.dot(X)) + N = len(X) + if isinstance(X, np.ma.MaskedArray): + M = 1 - X.mask + N = M.T.dot(M) # each pair has distinct number of samples + return xTx / N + + def _preprocess(self, X: np.ndarray) -> Union[np.ndarray, np.ma.MaskedArray]: + """handle nan and centerize data + + Note: + if `nan_option='mask'` then the returned array will be `np.ma.MaskedArray`. + """ + # handle nan + if self.nan_option == self.FILL_NAN: + X = np.nan_to_num(X) + elif self.nan_option == self.MASK_NAN: + X = np.ma.masked_invalid(X) + # centralize + if not self.assume_centered: + X = X - np.nanmean(X, axis=0) + return X + + + + + + + + + diff --git a/qlib/model/riskmodel/poet.py b/qlib/model/riskmodel/poet.py new file mode 100644 index 000000000..8dbe89036 --- /dev/null +++ b/qlib/model/riskmodel/poet.py @@ -0,0 +1,84 @@ +import numpy as np + +from qlib.model.riskmodel import RiskModel + + +class POETCovEstimator(RiskModel): + """Principal Orthogonal Complement Thresholding Estimator (POET) + + Reference: + [1] Fan, J., Liao, Y., & Mincheva, M. (2013). Large covariance estimation by thresholding principal orthogonal complements. + Journal of the Royal Statistical Society. Series B: Statistical Methodology, 75(4), 603–680. https://doi.org/10.1111/rssb.12016 + [2] http://econweb.rutgers.edu/yl1114/papers/poet/POET.m + """ + + THRESH_SOFT = "soft" + THRESH_HARD = "hard" + THRESH_SCAD = "scad" + + def __init__(self, num_factors: int = 0, thresh: float = 1.0, thresh_method: str = "soft", **kwargs): + """ + Args: + num_factors (int): number of factors (if set to zero, no factor model will be used). + thresh (float): the positive constant for thresholding. + thresh_method (str): thresholding method, which can be + - 'soft': soft thresholding. + - 'hard': hard thresholding. + - 'scad': scad thresholding. + kwargs: see `RiskModel` for more information. + """ + super().__init__(**kwargs) + + assert num_factors >= 0, "`num_factors` requires a positive integer" + self.num_factors = num_factors + + assert thresh >= 0, "`thresh` requires a positive float number" + self.thresh = thresh + + assert thresh_method in [ + self.THRESH_HARD, + self.THRESH_SOFT, + self.THRESH_SCAD, + ], "`thresh_method` should be `soft`/`hard`/`scad`" + self.thresh_method = thresh_method + + def _predict(self, X: np.ndarray) -> np.ndarray: + + Y = X.T # NOTE: to match POET's implementation + p, n = Y.shape + + if self.num_factors > 0: + Dd, V = np.linalg.eig(Y.T.dot(Y)) + V = V[:, np.argsort(Dd)] + F = V[:, -self.num_factors:][:, ::-1] * np.sqrt(n) + LamPCA = Y.dot(F) / n + uhat = np.asarray(Y - LamPCA.dot(F.T)) + Lowrank = np.asarray(LamPCA.dot(LamPCA.T)) + rate = 1 / np.sqrt(p) + np.sqrt(np.log(p) / n) + else: + uhat = np.asarray(Y) + rate = np.sqrt(np.log(p) / n) + Lowrank = 0 + + lamb = rate * self.thresh + SuPCA = uhat.dot(uhat.T) / n + SuDiag = np.diag(np.diag(SuPCA)) + R = np.linalg.inv(SuDiag ** 0.5).dot(SuPCA).dot(np.linalg.inv(SuDiag ** 0.5)) + + if self.thresh_method == self.THRESH_HARD: + M = R * (np.abs(R) > lamb) + elif self.thresh_method == self.THRESH_SOFT: + res = np.abs(R) - lamb + res = (res + np.abs(res)) / 2 + M = np.sign(R) * res + else: + M1 = (np.abs(R) < 2 * lamb) * np.sign(R) * (np.abs(R) - lamb) * (np.abs(R) > lamb) + M2 = (np.abs(R) < 3.7 * lamb) * (np.abs(R) >= 2 * lamb) * (2.7 * R - 3.7 * np.sign(R) * lamb) / 1.7 + M3 = (np.abs(R) >= 3.7 * lamb) * R + M = M1 + M2 + M3 + + Rthresh = M - np.diag(np.diag(M)) + np.eye(p) + SigmaU = (SuDiag ** 0.5).dot(Rthresh).dot(SuDiag ** 0.5) + SigmaY = SigmaU + Lowrank + + return SigmaY diff --git a/qlib/model/riskmodel/shrink.py b/qlib/model/riskmodel/shrink.py new file mode 100644 index 000000000..1298891fb --- /dev/null +++ b/qlib/model/riskmodel/shrink.py @@ -0,0 +1,262 @@ +import numpy as np +from typing import Union + +from qlib.model.riskmodel import RiskModel + + +class ShrinkCovEstimator(RiskModel): + """Shrinkage Covariance Estimator + + This estimator will shrink the sample covariance matrix towards + an identify matrix: + S_hat = (1 - alpha) * S + alpha * F + where `alpha` is the shrink parameter and `F` is the shrinking target. + + The following shrinking parameters (`alpha`) are supported: + - `lw` [1][2][3]: use Ledoit-Wolf shrinking parameter. + - `oas` [4]: use Oracle Approximating Shrinkage shrinking parameter. + - float: directly specify the shrink parameter, should be between [0, 1]. + + The following shrinking targets (`F`) are supported: + - `const_var` [1][4][5]: assume stocks have the same constant variance and zero correlation. + - `const_corr` [2][6]: assume stocks have different variance but equal correlation. + - `single_factor` [3][7]: assume single factor model as the shrinking target. + - np.ndarray: provide the shrinking targets directly. + + Note: + - The optimal shrinking parameter depends on the selection of the shrinking target. + Currently, `oas` is not supported for `const_corr` and `single_factor`. + - Remember to set `nan_option` to `fill` or `mask` if your data has missing values. + + References: + [1] Ledoit, O., & Wolf, M. (2004). A well-conditioned estimator for large-dimensional covariance matrices. + Journal of Multivariate Analysis, 88(2), 365–411. https://doi.org/10.1016/S0047-259X(03)00096-4 + [2] Ledoit, O., & Wolf, M. (2004). Honey, I shrunk the sample covariance matrix. + Journal of Portfolio Management, 30(4), 1–22. https://doi.org/10.3905/jpm.2004.110 + [3] Ledoit, O., & Wolf, M. (2003). Improved estimation of the covariance matrix of stock returns + with an application to portfolio selection. + Journal of Empirical Finance, 10(5), 603–621. https://doi.org/10.1016/S0927-5398(03)00007-0 + [4] Chen, Y., Wiesel, A., Eldar, Y. C., & Hero, A. O. (2010). Shrinkage algorithms for MMSE covariance + estimation. IEEE Transactions on Signal Processing, 58(10), 5016–5029. + https://doi.org/10.1109/TSP.2010.2053029 + [5] https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-0000-00007f64e5b9/cov1para.m.zip + [6] https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-ffff-ffffde5e2d4e/covCor.m.zip + [7] https://www.econ.uzh.ch/dam/jcr:ffffffff-935a-b0d6-0000-0000648dfc98/covMarket.m.zip + """ + + SHR_LW = "lw" + SHR_OAS = "oas" + + TGT_CONST_VAR = "const_var" + TGT_CONST_CORR = "const_corr" + TGT_SINGLE_FACTOR = "single_factor" + + def __init__(self, alpha: Union[str, float] = 0.0, target: Union[str, np.ndarray] = "const_var", **kwargs): + """ + Args: + alpha (str or float): shrinking parameter or estimator (`lw`/`oas`) + target (str or np.ndarray): shrinking target (`const_var`/`const_corr`/`single_factor`) + kwargs: see `RiskModel` for more information + """ + super().__init__(**kwargs) + + # alpha + if isinstance(alpha, str): + assert alpha in [self.SHR_LW, self.SHR_OAS], f"shrinking method `{alpha}` is not supported" + elif isinstance(alpha, (float, np.floating)): + assert 0 <= alpha <= 1, "alpha should be between [0, 1]" + else: + raise TypeError("invalid argument type for `alpha`") + self.alpha = alpha + + # target + if isinstance(target, str): + assert target in [ + self.TGT_CONST_VAR, + self.TGT_CONST_CORR, + self.TGT_SINGLE_FACTOR, + ], f"shrinking target `{target} is not supported" + elif isinstance(target, np.ndarray): + pass + else: + raise TypeError("invalid argument type for `target`") + if alpha == self.SHR_OAS and target != self.TGT_CONST_VAR: + raise NotImplementedError("currently `oas` can only support `const_var` as target") + self.target = target + + def _predict(self, X: np.ndarray) -> np.ndarray: + # sample covariance + S = super()._predict(X) + + # shrinking target + F = self._get_shrink_target(X, S) + + # get shrinking parameter + alpha = self._get_shrink_param(X, S, F) + + # shrink covariance + if alpha > 0: + S *= 1 - alpha + F *= alpha + S += F + + return S + + def _get_shrink_target(self, X: np.ndarray, S: np.ndarray) -> np.ndarray: + """get shrinking target `F`""" + if self.target == self.TGT_CONST_VAR: + return self._get_shrink_target_const_var(X, S) + if self.target == self.TGT_CONST_CORR: + return self._get_shrink_target_const_corr(X, S) + if self.target == self.TGT_SINGLE_FACTOR: + return self._get_shrink_target_single_factor(X, S) + return self.target + + def _get_shrink_target_const_var(self, X: np.ndarray, S: np.ndarray) -> np.ndarray: + """get shrinking target with constant variance + + This target assumes zero pair-wise correlation and constant variance. + The constant variance is estimated by averaging all sample's variances. + """ + n = len(S) + F = np.eye(n) + np.fill_diagonal(F, np.mean(np.diag(S))) + return F + + def _get_shrink_target_const_corr(self, X: np.ndarray, S: np.ndarray) -> np.ndarray: + """get shrinking target with constant correlation + + This target assumes constant pair-wise correlation but keep the sample variance. + The constant correlation is estimated by averaging all pairwise correlations. + """ + n = len(S) + var = np.diag(S) + sqrt_var = np.sqrt(var) + covar = np.outer(sqrt_var, sqrt_var) + r_bar = (np.sum(S / covar) - n) / (n * (n - 1)) + F = r_bar * covar + np.fill_diagonal(F, var) + return F + + def _get_shrink_target_single_factor(self, X: np.ndarray, S: np.ndarray) -> np.ndarray: + """get shrinking target with single factor model""" + X_mkt = np.nanmean(X, axis=1) + cov_mkt = np.asarray(X.T.dot(X_mkt) / len(X)) + var_mkt = np.asarray(X_mkt.dot(X_mkt) / len(X)) + F = np.outer(cov_mkt, cov_mkt) / var_mkt + np.fill_diagonal(F, np.diag(S)) + return F + + def _get_shrink_param(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float: + """get shrinking parameter `alpha` + + Note: + The Ledoit-Wolf shrinking parameter estimator consists of three different methods. + """ + if self.alpha == self.SHR_OAS: + return self._get_shrink_param_oas(X, S, F) + elif self.alpha == self.SHR_LW: + if self.target == self.TGT_CONST_VAR: + return self._get_shrink_param_lw_const_var(X, S, F) + if self.target == self.TGT_CONST_CORR: + return self._get_shrink_param_lw_const_corr(X, S, F) + if self.target == self.TGT_SINGLE_FACTOR: + return self._get_shrink_param_lw_single_factor(X, S, F) + return self.alpha + + def _get_shrink_param_oas(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float: + """Oracle Approximating Shrinkage Estimator + + This method uses the following formula to estimate the `alpha` + parameter for the shrink covariance estimator: + A = (1 - 2 / p) * trace(S^2) + trace^2(S) + B = (n + 1 - 2 / p) * (trace(S^2) - trace^2(S) / p) + alpha = A / B + where `n`, `p` are the dim of observations and variables respectively. + """ + trS2 = np.sum(S ** 2) + tr2S = np.trace(S) ** 2 + + n, p = X.shape + + A = (1 - 2 / p) * (trS2 + tr2S) + B = (n + 1 - 2 / p) * (trS2 + tr2S / p) + alpha = A / B + + return alpha + + def _get_shrink_param_lw_const_var(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float: + """Ledoit-Wolf Shrinkage Estimator (Constant Variance) + + This method shrinks the covariance matrix towards the constand variance target. + """ + t, n = X.shape + + y = X ** 2 + phi = np.sum(y.T.dot(y) / t - S ** 2) + + gamma = np.linalg.norm(S - F, "fro") ** 2 + + kappa = phi / gamma + alpha = max(0, min(1, kappa / t)) + + return alpha + + def _get_shrink_param_lw_const_corr(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float: + """Ledoit-Wolf Shrinkage Estimator (Constant Correlation) + + This method shrinks the covariance matrix towards the constand correlation target. + """ + t, n = X.shape + + var = np.diag(S) + sqrt_var = np.sqrt(var) + r_bar = (np.sum(S / np.outer(sqrt_var, sqrt_var)) - n) / (n * (n - 1)) + + y = X ** 2 + phi_mat = y.T.dot(y) / t - S ** 2 + phi = np.sum(phi_mat) + + theta_mat = (X ** 3).T.dot(X) / t - var[:, None] * S + np.fill_diagonal(theta_mat, 0) + rho = np.sum(np.diag(phi_mat)) + r_bar * np.sum(np.outer(1 / sqrt_var, sqrt_var) * theta_mat) + + gamma = np.linalg.norm(S - F, "fro") ** 2 + + kappa = (phi - rho) / gamma + alpha = max(0, min(1, kappa / t)) + + return alpha + + def _get_shrink_param_lw_single_factor(self, X: np.ndarray, S: np.ndarray, F: np.ndarray) -> float: + """Ledoit-Wolf Shrinkage Estimator (Single Factor Model) + + This method shrinks the covariance matrix towards the single factor model target. + """ + t, n = X.shape + + X_mkt = np.nanmean(X, axis=1) + cov_mkt = np.asarray(X.T.dot(X_mkt) / len(X)) + var_mkt = np.asarray(X_mkt.dot(X_mkt) / len(X)) + + y = X ** 2 + phi = np.sum(y.T.dot(y)) / t - np.sum(S ** 2) + + rdiag = np.sum(y ** 2) / t - np.sum(np.diag(S) ** 2) + z = X * X_mkt[:, None] + v1 = y.T.dot(z) / t - cov_mkt[:, None] * S + roff1 = np.sum(v1 * cov_mkt[:, None].T) / var_mkt - np.sum(np.diag(v1) * cov_mkt) / var_mkt + v3 = z.T.dot(z) / t - var_mkt * S + roff3 = ( + np.sum(v3 * np.outer(cov_mkt, cov_mkt)) / var_mkt ** 2 - np.sum( + np.diag(v3) * cov_mkt ** 2) / var_mkt ** 2 + ) + roff = 2 * roff1 - roff3 + rho = rdiag + roff + + gamma = np.linalg.norm(S - F, "fro") ** 2 + + kappa = (phi - rho) / gamma + alpha = max(0, min(1, kappa / t)) + + return alpha diff --git a/qlib/model/riskmodel/structured.py b/qlib/model/riskmodel/structured.py new file mode 100644 index 000000000..e778c2faa --- /dev/null +++ b/qlib/model/riskmodel/structured.py @@ -0,0 +1,152 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import numpy as np +import pandas as pd +from typing import Union +from sklearn.decomposition import PCA, FactorAnalysis + +from qlib.model.riskmodel import RiskModel + + +class StructuredCovEstimator(RiskModel): + """Structured Covariance Estimator + + This estimator assumes observations can be predicted by multiple factors + X = FB + U + where `F` can be specified by explicit risk factors or latent factors. + + Therefore the structured covariance can be estimated by + cov(X) = F cov(B) F.T + cov(U) + + We use latent factor models to estimate the structured covariance. + Specifically, the following latent factor models are supported: + - `pca`: Principal Component Analysis + - `fa`: Factor Analysis + + Reference: [1] Fan, J., Liao, Y., & Liu, H. (2016). An overview of the estimation of large covariance and + precision matrices. Econometrics Journal, 19(1), C1–C32. https://doi.org/10.1111/ectj.12061 + """ + + FACTOR_MODEL_PCA = "pca" + FACTOR_MODEL_FA = "fa" + NAN_OPTION = "fill" + + def __init__( + self, + factor_model: str = "pca", + num_factors: int = 10, + assume_centered: bool = False, + scale_return: bool = True, + ): + """ + Args: + factor_model (str): the latent factor models used to estimate the structured covariance (`pca`/`fa`). + num_factors (int): number of components to keep. + assume_centered (bool): whether the data is assumed to be centered. + scale_return (bool): whether scale returns as percentage. + """ + super().__init__(self.NAN_OPTION, assume_centered, scale_return) + + assert factor_model in [ + self.FACTOR_MODEL_PCA, + self.FACTOR_MODEL_FA, + ], "factor_model={} is not supported".format(factor_model) + self.solver = PCA if factor_model == self.FACTOR_MODEL_PCA else FactorAnalysis + + self.num_factors = num_factors + + def predict( + self, + X: Union[pd.Series, pd.DataFrame, np.ndarray], + return_corr: bool = False, + is_price: bool = True, + return_decomposed_components=False, + ) -> Union[pd.DataFrame, np.ndarray, tuple]: + """ + Args: + X (pd.Series, pd.DataFrame or np.ndarray): data from which to estimate the covariance, + with variables as columns and observations as rows. + return_corr (bool): whether return the correlation matrix. + is_price (bool): whether `X` contains price (if not assume stock returns). + return_decomposed_components (bool): whether return decomposed components of the covariance matrix. + + Returns: + tuple or pd.DataFrame or np.ndarray: decomposed covariance matrix or estimated covariance or correlation. + """ + assert ( + not return_corr or not return_decomposed_components + ), "Can only return either correlation matrix or decomposed components." + + # transform input into 2D array + if not isinstance(X, (pd.Series, pd.DataFrame)): + columns = None + else: + if isinstance(X.index, pd.MultiIndex): + if isinstance(X, pd.DataFrame): + X = X.iloc[:, 0].unstack(level="instrument") # always use the first column + else: + X = X.unstack(level="instrument") + else: + # X is 2D DataFrame + pass + columns = X.columns # will be used to restore dataframe + X = X.values + + # calculate pct_change + if is_price: + X = X[1:] / X[:-1] - 1 # NOTE: resulting `n - 1` rows + + # scale return + if self.scale_return: + X *= 100 + + # handle nan and centered + X = self._preprocess(X) + + if return_decomposed_components: + F, cov_b, var_u = self._predict(X, return_structured=True) + return F, cov_b, var_u + else: + # estimate covariance + S = self._predict(X) + + # return correlation if needed + if return_corr: + vola = np.sqrt(np.diag(S)) + corr = S / np.outer(vola, vola) + if columns is None: + return corr + return pd.DataFrame(corr, index=columns, columns=columns) + + # return covariance + if columns is None: + return S + return pd.DataFrame(S, index=columns, columns=columns) + + def _predict(self, X: np.ndarray, return_structured=False) -> Union[np.ndarray, tuple]: + """ + covariance estimation implementation + + Args: + X (np.ndarray): data matrix containing multiple variables (columns) and observations (rows). + return_structured (bool): whether return decomposed components of the covariance matrix. + + Returns: + tuple or np.ndarray: decomposed covariance matrix or covariance matrix. + """ + + model = self.solver(self.num_factors, random_state=0).fit(X) + + F = model.components_.T # num_features x num_factors + B = model.transform(X) # num_samples x num_factors + U = X - B @ F.T + cov_b = np.cov(B.T) # num_factors x num_factors + var_u = np.var(U, axis=0) # diagonal + + if return_structured: + return F, cov_b, var_u + + cov_x = F @ cov_b @ F.T + np.diag(var_u) + + return cov_x diff --git a/qlib/portfolio/enhanced_indexing.py b/qlib/portfolio/enhanced_indexing.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/qlib/model/riskmodel_shrink.py b/qlib/portfolio/optimizer/__init__.py similarity index 100% rename from qlib/model/riskmodel_shrink.py rename to qlib/portfolio/optimizer/__init__.py diff --git a/qlib/model/riskmodel_structured.py b/qlib/portfolio/optimizer/base.py similarity index 100% rename from qlib/model/riskmodel_structured.py rename to qlib/portfolio/optimizer/base.py diff --git a/qlib/portfolio/optimizer/enhanced_indexing.py b/qlib/portfolio/optimizer/enhanced_indexing.py new file mode 100644 index 000000000..d988c776b --- /dev/null +++ b/qlib/portfolio/optimizer/enhanced_indexing.py @@ -0,0 +1,140 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import numpy as np +import cvxpy as cp +import pandas as pd +from typing import Union + +from qlib.portfolio.optimizer import BaseOptimizer + + +class EnhancedIndexingOptimizer(BaseOptimizer): + """ + Portfolio Optimizer with Enhanced Indexing + + Note: + This optimizer always assumes full investment and no-shorting. + """ + + START_FROM_W0 = "w0" + START_FROM_BENCH = "benchmark" + DO_NOT_START_FROM = "no_warm_start" + + def __init__( + self, + lamb: float = 10, + delta: float = 0.4, + bench_dev: float = 0.01, + inds_dev: float = None, + scale_alpha: bool = True, + verbose: bool = False, + warm_start: str = DO_NOT_START_FROM, + max_iters: int = 10000, + ): + """ + Args: + lamb (float): risk aversion parameter (larger `lamb` means less focus on return) + delta (float): turnover rate limit + bench_dev (float): benchmark deviation limit + inds_dev (float/None): industry deviation limit, set `inds_dev` to None to ignore industry specific + restriction + scale_alpha (bool): if to scale alpha to match the volatility of the covariance matrix + verbose (bool): if print detailed information about the solver + warm_start (str): whether try to warm start (`w0`/`benchmark`/``) + (https://www.cvxpy.org/tutorial/advanced/index.html#warm-start) + """ + + assert lamb >= 0, "risk aversion parameter `lamb` should be positive" + self.lamb = lamb + + assert delta >= 0, "turnover limit `delta` should be positive" + self.delta = delta + + assert bench_dev >= 0, "benchmark deviation limit `bench_dev` should be positive" + self.bench_dev = bench_dev + + assert inds_dev is None or inds_dev >= 0, "industry deviation limit `inds_dev` should be positive or None." + self.inds_dev = inds_dev + + assert warm_start in [ + self.DO_NOT_START_FROM, + self.START_FROM_W0, + self.START_FROM_BENCH, + ], "illegal warm start option" + self.start_from_w0 = warm_start == self.START_FROM_W0 + self.start_from_bench = warm_start == self.START_FROM_BENCH + + self.scale_alpha = scale_alpha + self.verbose = verbose + self.max_iters = max_iters + + def __call__( + self, + u: np.ndarray, + F: np.ndarray, + covB: np.ndarray, + varU: np.ndarray, + w0: np.ndarray, + w_bench: np.ndarray, + inds_onehot: np.ndarray = None, + ) -> Union[np.ndarray, pd.Series]: + """ + Args: + u (np.ndarray): expected returns (a.k.a., alpha) + F, covB, varU (np.ndarray): see StructuredCovEstimator + w0 (np.ndarray): initial weights (for turnover control) + w_bench (np.ndarray): benchmark weights + inds_onehot (np.ndarray): industry (onehot) + + Returns: + np.ndarray or pd.Series: optimized portfolio allocation + """ + assert inds_onehot is not None or self.inds_dev is None, "Industry onehot vector is required." + + # scale alpha to match volatility + if self.scale_alpha: + u = u / u.std() + x_variance = np.mean(np.diag(F @ covB @ F.T) + varU) + u *= x_variance ** 0.5 + + w = cp.Variable(len(u)) # num_assets + v = w @ F # num_factors + ret = w @ u + risk = cp.quad_form(v, covB) + cp.sum(cp.multiply(varU, w ** 2)) + obj = cp.Maximize(ret - self.lamb * risk) + d_bench = w - w_bench + cons = [ + w >= 0, + cp.sum(w) == 1, + d_bench >= -self.bench_dev, + d_bench <= self.bench_dev, + ] + + if self.inds_dev is not None: + d_inds = d_bench @ inds_onehot + cons.append(d_inds >= -self.inds_dev) + cons.append(d_inds <= self.inds_dev) + + if w0 is not None: + turnover = cp.sum(cp.abs(w - w0)) + cons.append(turnover <= self.delta) + + warm_start = False + if self.start_from_w0: + if w0 is None: + print("Warning: try warm start with w0, but w0 is `None`.") + else: + w.value = w0 + warm_start = True + elif self.start_from_bench: + w.value = w_bench + warm_start = True + + prob = cp.Problem(obj, cons) + prob.solve(solver=cp.SCS, verbose=self.verbose, warm_start=warm_start, max_iters=self.max_iters) + + if prob.status != "optimal": + print("Warning: solve failed.", prob.status) + + return np.asarray(w.value) diff --git a/qlib/portfolio/optimizer.py b/qlib/portfolio/optimizer/optimizer.py similarity index 62% rename from qlib/portfolio/optimizer.py rename to qlib/portfolio/optimizer/optimizer.py index 6ee396a51..17a7fc30a 100644 --- a/qlib/portfolio/optimizer.py +++ b/qlib/portfolio/optimizer/optimizer.py @@ -4,11 +4,12 @@ import abc import warnings import numpy as np -import cvxpy as cp import pandas as pd import scipy.optimize as so from typing import Optional, Union, Callable, List +from qlib.portfolio.enhanced_indexing import EnhancedIndexingOptimizer + class BaseOptimizer(abc.ABC): """ Construct portfolio with a optimization related method """ @@ -38,13 +39,13 @@ class PortfolioOptimizer(BaseOptimizer): OPT_INV = "inv" def __init__( - self, - method: str = "inv", - lamb: float = 0, - delta: float = 0, - alpha: float = 0.0, - scale_alpha: bool = True, - tol: float = 1e-8, + self, + method: str = "inv", + lamb: float = 0, + delta: float = 0, + alpha: float = 0.0, + scale_alpha: bool = True, + tol: float = 1e-8, ): """ Args: @@ -71,10 +72,10 @@ class PortfolioOptimizer(BaseOptimizer): self.scale_alpha = scale_alpha def __call__( - self, - S: Union[np.ndarray, pd.DataFrame], - u: Optional[Union[np.ndarray, pd.Series]] = None, - w0: Optional[Union[np.ndarray, pd.Series]] = None, + self, + S: Union[np.ndarray, pd.DataFrame], + u: Optional[Union[np.ndarray, pd.Series]] = None, + w0: Optional[Union[np.ndarray, pd.Series]] = None, ) -> Union[np.ndarray, pd.Series]: """ Args: @@ -163,7 +164,7 @@ class PortfolioOptimizer(BaseOptimizer): return self._solve(len(S), self._get_objective_gmv(S), *self._get_constrains(w0)) def _optimize_mvo( - self, S: np.ndarray, u: Optional[np.ndarray] = None, w0: Optional[np.ndarray] = None + self, S: np.ndarray, u: Optional[np.ndarray] = None, w0: Optional[np.ndarray] = None ) -> np.ndarray: """optimize mean-variance portfolio @@ -259,7 +260,6 @@ class PortfolioOptimizer(BaseOptimizer): # add l2 regularization wrapped_obj = obj if self.alpha > 0: - def opt_obj(x): return obj(x) + self.alpha * np.sum(np.square(x)) @@ -272,134 +272,3 @@ class PortfolioOptimizer(BaseOptimizer): warnings.warn(f"optimization not success ({sol.status})") return sol.x - - -class EnhancedIndexingOptimizer(BaseOptimizer): - """ - Portfolio Optimizer with Enhanced Indexing - - Note: - This optimizer always assumes full investment and no-shorting. - """ - - START_FROM_W0 = "w0" - START_FROM_BENCH = "benchmark" - DO_NOT_START_FROM = "no_warm_start" - - def __init__( - self, - lamb: float = 10, - delta: float = 0.4, - bench_dev: float = 0.01, - inds_dev: float = None, - scale_alpha: bool = True, - verbose: bool = False, - warm_start: str = DO_NOT_START_FROM, - max_iters: int = 10000, - ): - """ - Args: - lamb (float): risk aversion parameter (larger `lamb` means less focus on return) - delta (float): turnover rate limit - bench_dev (float): benchmark deviation limit - inds_dev (float/None): industry deviation limit, set `inds_dev` to None to ignore industry specific - restriction - scale_alpha (bool): if to scale alpha to match the volatility of the covariance matrix - verbose (bool): if print detailed information about the solver - warm_start (str): whether try to warm start (`w0`/`benchmark`/``) - (https://www.cvxpy.org/tutorial/advanced/index.html#warm-start) - """ - - assert lamb >= 0, "risk aversion parameter `lamb` should be positive" - self.lamb = lamb - - assert delta >= 0, "turnover limit `delta` should be positive" - self.delta = delta - - assert bench_dev >= 0, "benchmark deviation limit `bench_dev` should be positive" - self.bench_dev = bench_dev - - assert inds_dev >= 0, "industry deviation limit `inds_dev` should be positive" - self.inds_dev = inds_dev - - assert warm_start in [ - self.DO_NOT_START_FROM, - self.START_FROM_W0, - self.START_FROM_BENCH, - ], "illegal warm start option" - self.start_from_w0 = warm_start == self.START_FROM_W0 - self.start_from_bench = warm_start == self.START_FROM_BENCH - - self.scale_alpha = scale_alpha - self.verbose = verbose - self.max_iters = max_iters - - def __call__( - self, - u: np.ndarray, - F: np.ndarray, - covB: np.ndarray, - varU: np.ndarray, - w0: np.ndarray, - w_bench: np.ndarray, - inds_onehot: np.ndarray = None, - ) -> Union[np.ndarray, pd.Series]: - """ - Args: - u (np.ndarray): expected returns (a.k.a., alpha) - F, covB, varU (np.ndarray): see StructuredCovEstimator - w0 (np.ndarray): initial weights (for turnover control) - w_bench (np.ndarray): benchmark weights - inds_onehot (np.ndarray): industry (onehot) - - Returns: - np.ndarray or pd.Series: optimized portfolio allocation - """ - assert inds_onehot is not None or self.inds_dev is None, "Industry onehot vector is required." - - # scale alpha to match volatility - if self.scale_alpha: - u = u / u.std() - x_variance = np.mean(np.diag(F @ covB @ F.T) + varU) - u *= x_variance ** 0.5 - - w = cp.Variable(len(u)) # num_assets - v = w @ F # num_factors - ret = w @ u - risk = cp.quad_form(v, covB) + cp.sum(cp.multiply(varU, w ** 2)) - obj = cp.Maximize(ret - self.lamb * risk) - d_bench = w - w_bench - cons = [ - w >= 0, - cp.sum(w) == 1, - d_bench >= -self.bench_dev, - d_bench <= self.bench_dev, - ] - - if self.inds_dev is not None: - d_inds = d_bench @ inds_onehot - cons.append(d_inds >= -self.inds_dev) - cons.append(d_inds <= self.inds_dev) - - if w0 is not None: - turnover = cp.sum(cp.abs(w - w0)) - cons.append(turnover <= self.delta) - - warm_start = False - if self.start_from_w0: - if w0 is None: - print("Warning: try warm start with w0, but w0 is `None`.") - else: - w.value = w0 - warm_start = True - elif self.start_from_bench: - w.value = w_bench - warm_start = True - - prob = cp.Problem(obj, cons) - prob.solve(solver=cp.SCS, verbose=self.verbose, warm_start=warm_start, max_iters=self.max_iters) - - if prob.status != "optimal": - print("Warning: solve failed.", prob.status) - - return np.asarray(w.value) diff --git a/tests/test_enhanced_indexing.py b/tests/test_enhanced_indexing.py deleted file mode 100644 index f21d51984..000000000 --- a/tests/test_enhanced_indexing.py +++ /dev/null @@ -1,282 +0,0 @@ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - -import sys -import math -import shutil -import unittest -import numpy as np -import pandas as pd -from tqdm import tqdm -from pathlib import Path - -import qlib -from qlib.config import C -from qlib.utils import init_instance_by_config, flatten_dict -from qlib.workflow import R -from qlib.config import REG_CN -from qlib.workflow.record_temp import SignalRecord, SigAnaRecord -from qlib.tests import TestAutoData -from qlib.portfolio.optimizer import EnhancedIndexingOptimizer -from qlib.model.riskmodel import StructuredCovEstimator -from qlib.data.dataset.loader import QlibDataLoader -from qlib.data.dataset.handler import DataHandler -from qlib.data import D -from qlib.utils import exists_qlib_data, init_instance_by_config - -market = "all" -trade_gap = 21 -label_config = "Ref($close, -{}) / Ref($close, -1) - 1".format(trade_gap) # reconstruct portfolio once a month - -provider_uri = "~/.qlib_ei/qlib_data/cn_data" # target_dir -if not exists_qlib_data(provider_uri): - print(f"Qlib data is not found in {provider_uri}") - sys.path.append(str(Path.cwd().parent.joinpath("scripts"))) - from get_data import GetData - GetData().qlib_data(target_dir=provider_uri, region=REG_CN) -qlib.init(provider_uri=provider_uri, region=REG_CN) - -################################### -# train model -################################### -data_handler_config = { - "start_time": "2008-01-01", - "end_time": "2020-08-01", - "fit_start_time": "2008-01-01", - "fit_end_time": "2014-11-30", - "instruments": market, - "label": [label_config] -} - -task = { - "model": { - "class": "LGBModel", - "module_path": "qlib.contrib.model.gbdt", - "kwargs": { - "loss": "mse", - "colsample_bytree": 0.8879, - "learning_rate": 0.0421, - "subsample": 0.8789, - "lambda_l1": 205.6999, - "lambda_l2": 580.9768, - "max_depth": 8, - "num_leaves": 210, - "num_threads": 32, - }, - }, - "dataset": { - "class": "DatasetH", - "module_path": "qlib.data.dataset", - "kwargs": { - "handler": { - "class": "Alpha158", - "module_path": "qlib.contrib.data.handler", - "kwargs": data_handler_config, - }, - "segments": { - "train": ("2008-01-01", "2014-11-30"), - "valid": ("2015-01-01", "2016-11-30"), - "test": ("2017-01-01", "2018-01-01"), - }, - }, - }, -} - - -class CSI300: - """Simulate CSI300 as the Benchmark for Enhanced Indexing to Track""" - - def __init__(self): - # provider_uri = '/nfs_data/qlib_data/ycz_daily/qlib' - # qlib.init(provider_uri=provider_uri, region=REG_CN, dataset_cache=None, expression_cache=None) - self.csi_weight = D.features(D.instruments('csi300'), ['$csi300_weight']) - - def __call__(self, pd_index, trade_date): - weights = np.zeros(len(pd_index)) - - for idx, instrument in enumerate(pd_index): - if (instrument, trade_date) in self.csi_weight.index: - weight = self.csi_weight.loc[(instrument, trade_date)].values[0] - if not math.isnan(weight): - weights[idx] = weight - - assert weights.sum() > 0, ' Fetch CSI Weights Error!' - weights = weights / weights.sum() - - return weights - - -class EnhancedIndexingStrategy: - """Enhanced Indexing Strategy""" - - def __init__(self): - self.benchmark = CSI300() - - provider_uri = "~/.qlib_ei/qlib_data/cn_data" - qlib.init(provider_uri=provider_uri, region=REG_CN) - - self.data_handler = DataHandler(market, "2015-01-01", "2019-01-01", QlibDataLoader(["$close"])) - self.label_handler = DataHandler(market, "2015-01-01", "2019-01-01", QlibDataLoader([label_config])) - self.cov_estimator = StructuredCovEstimator() - self.optimizer = EnhancedIndexingOptimizer(lamb=0.1, delta=0.4, bench_dev=0.03, max_iters=50000) - - def update(self, score_series, current, pred_date): - """ - Parameters - ----------- - score_series : pd.Series - stock_id , score. - current : Position() - current of account. - trade_exchange : Exchange() - exchange. - trade_date : pd.Timestamp - date. - """ - print(score_series) - score_series = score_series.dropna() - - # portfolio init weight - init_weight = current.reindex(score_series.index, fill_value=0).values.squeeze() - init_weight_sum = init_weight.sum() - if init_weight_sum > 0: - init_weight /= init_weight_sum - - # covariance estimation - selector = (self.data_handler.get_range_selector(pred_date, 252), score_series.index) - price = self.data_handler.fetch(selector, level=None, squeeze=True) - F, cov_b, var_u = self.cov_estimator.predict(price, return_decomposed_components=True) - - # optimize target portfolio - w_bench = self.benchmark(score_series.index, pred_date) - passed_init_weight = init_weight if init_weight_sum > 0 else None - # print(F) - # print(cov_b) - # print(var_u) - # print(passed_init_weight) - # print(w_bench) - target_weight = self.optimizer(score_series.values, F, cov_b, var_u, passed_init_weight, w_bench) - # print(target_weight) - target = pd.DataFrame(data=target_weight, index=score_series.index) - - active_weights = target_weight - w_bench - selector = (self.label_handler.get_range_selector(pred_date, 1), score_series.index) - label = self.label_handler.fetch(selector, level=None, squeeze=True) - alpha = 0 - for instrument, weight in zip(score_series.index, active_weights): - delta = label.loc[(pred_date, instrument)] - alpha += weight * (0 if math.isnan(delta) else delta) - - print(alpha) - - return alpha, target - - -def train(): - """train model - - Returns - ------- - pred_score: pandas.DataFrame - predict scores - performance: dict - model performance - """ - - # model initiation - model = init_instance_by_config(task["model"]) - dataset = init_instance_by_config(task["dataset"]) - - # start exp - with R.start(experiment_name="workflow"): - R.log_params(**flatten_dict(task)) - model.fit(dataset) - - # prediction - recorder = R.get_recorder() - rid = recorder.id - sr = SignalRecord(model, dataset, recorder) - sr.generate() - pred_score = sr.load() - - # calculate ic and ric - sar = SigAnaRecord(recorder) - sar.generate() - ic = sar.load(sar.get_path("ic.pkl")) - ric = sar.load(sar.get_path("ric.pkl")) - - return pred_score, {"ic": ic, "ric": ric}, rid - - -def backtest_analysis(scores): - """backtest enhanced indexing - - Parameters - ---------- - scores: pandas.DataFrame - predict scores - - Returns - ------- - sharpe_ratio: floating-point - sharpe ratio of the enhanced indexing portfolio - """ - - # backtest and analysis - with R.start(experiment_name="backtest_analysis"): - strategy = EnhancedIndexingStrategy() - dates = scores.index.get_level_values(0).unique() - - alphas = [] - current = pd.DataFrame() - gap_between_next_trade = 0 - for date in tqdm(dates): - if gap_between_next_trade == 0: - score_series = scores.loc[date] - alpha, current = strategy.update(score_series, current, date) - alphas.append(alpha) - gap_between_next_trade = trade_gap - else: - gap_between_next_trade -= 1 - - alphas = np.array(alphas) - sharpe_ratio = alphas.mean() / np.std(alphas) - print('Sharpe:', sharpe_ratio) - - return sharpe_ratio - - -class TestAllFlow(TestAutoData): - PRED_SCORE = None - REPORT_NORMAL = None - POSITIONS = None - RID = None - - @classmethod - def tearDownClass(cls) -> None: - shutil.rmtree(str(Path(C["exp_manager"]["kwargs"]["uri"].strip("file:")).resolve())) - - def test_0_train(self): - TestAllFlow.PRED_SCORE, ic_ric, TestAllFlow.RID = train() - self.assertGreaterEqual(ic_ric["ic"].all(), 0, "train failed") - self.assertGreaterEqual(ic_ric["ric"].all(), 0, "train failed") - - def test_1_backtest(self): - sharpe_ratio = backtest_analysis(TestAllFlow.PRED_SCORE) - self.assertGreaterEqual( - sharpe_ratio, - 0.90, - "backtest failed", - ) - - -def suite(): - _suite = unittest.TestSuite() - _suite.addTest(TestAllFlow("test_0_train")) - _suite.addTest(TestAllFlow("test_1_backtest")) - return _suite - - -if __name__ == "__main__": - runner = unittest.TextTestRunner() - runner.run(suite())