From b2e2142594d38c0afa4e31d560a17cfad05e2705 Mon Sep 17 00:00:00 2001 From: Charles Young Date: Mon, 22 Feb 2021 09:00:12 +0800 Subject: [PATCH] Applied slight modification to follow PEP 8. --- qlib/portfolio/optimizer.py | 114 +++--------------------------------- 1 file changed, 7 insertions(+), 107 deletions(-) diff --git a/qlib/portfolio/optimizer.py b/qlib/portfolio/optimizer.py index e04923ed6..104e2c441 100644 --- a/qlib/portfolio/optimizer.py +++ b/qlib/portfolio/optimizer.py @@ -42,6 +42,7 @@ class PortfolioOptimizer: lamb (float): risk aversion parameter (larger `lamb` means more focus on return) delta (float): turnover rate limit alpha (float): l2 norm regularizer + scale_alpha (bool): if to scale alpha to match the volatility of the covariance matrix tol (float): tolerance for optimization termination """ assert method in [self.OPT_GMV, self.OPT_MVO, self.OPT_RP, self.OPT_INV], f"method `{method}` is not supported" @@ -57,6 +58,7 @@ class PortfolioOptimizer: self.alpha = alpha self.tol = tol + self.scale_alpha = scale_alpha def __call__( self, @@ -94,7 +96,7 @@ class PortfolioOptimizer: w0 = w0.values # scale alpha to match volatility - if u is not None: + if u is not None and self.scale_alpha: u = u / u.std() u *= np.mean(np.diag(S)) ** 0.5 @@ -247,7 +249,10 @@ class PortfolioOptimizer: # add l2 regularization wrapped_obj = obj if self.alpha > 0: - wrapped_obj = lambda x: obj(x) + self.alpha * np.sum(np.square(x)) + def opt_obj(x): + return obj(x) + self.alpha * np.sum(np.square(x)) + + wrapped_obj = opt_obj # solve x0 = np.ones(n) / n # init results @@ -256,108 +261,3 @@ class PortfolioOptimizer: warnings.warn(f"optimization not success ({sol.status})") return sol.x - - -class EnhancedIndexingOptimizer: - """ - Portfolio Optimizer with Enhanced Indexing - - Note: - This optimizer always assumes full investment and no-shorting. - """ - - START_FROM_W0 = 'w0' - START_FROM_BENCH = 'benchmark' - DO_NOT_START_FROM = '' - - def __init__(self, lamb: float = 10, delta: float = 0.4, bench_dev: float = 0.01, inds_dev: float = 0.01, - scale_alpha=True, verbose: bool = False, warm_start: str = '', max_iters: int = 10000): - """ - Args: - lamb (float): risk aversion parameter (larger `lamb` means less focus on return) - delta (float): turnover rate limit - bench_dev (float): benchmark deviation limit - inds_dev (float): industry deviation limit - verbose (bool): if print detailed information about the solver - warm_start (str): whether try to warm start (`w0`/`benchmark`/``) - (https://www.cvxpy.org/tutorial/advanced/index.html#warm-start) - """ - - assert lamb >= 0, "risk aversion parameter `lamb` should be positive" - self.lamb = lamb - - assert delta >= 0, "turnover limit `delta` should be positive" - self.delta = delta - - assert bench_dev >= 0, "benchmark deviation limit `bench_dev` should be positive" - self.bench_dev = bench_dev - - assert inds_dev >= 0, "industry deviation limit `inds_dev` should be positive" - self.inds_dev = inds_dev - - assert warm_start in [self.DO_NOT_START_FROM, self.START_FROM_W0, - self.START_FROM_BENCH], "illegal warm start option" - self.start_from_w0 = (warm_start == self.START_FROM_W0) - self.start_from_bench = (warm_start == self.START_FROM_BENCH) - - self.scale_alpha = scale_alpha - self.verbose = verbose - self.max_iters = max_iters - - def __call__(self, u: np.ndarray, F: np.ndarray, covB: np.ndarray, varU: np.ndarray, w0: np.ndarray, - w_bench: np.ndarray, inds_onehot: np.ndarray - ) -> Union[np.ndarray, pd.Series]: - """ - Args: - u (np.ndarray): expected returns (a.k.a., alpha) - F, covB, varU (np.ndarray): see StructuredCovEstimator - w0 (np.ndarray): initial weights (for turnover control) - w_bench (np.ndarray): benchmark weights - inds_onehot (np.ndarray): industry (onehot) - - Returns: - np.ndarray or pd.Series: optimized portfolio allocation - """ - # scale alpha to match volatility - if self.scale_alpha: - u = u / u.std() - x_variance = np.mean(np.diag(F @ covB @ F.T) + varU) - u *= x_variance ** 0.5 - - w = cp.Variable(len(u)) # num_assets - v = w @ F # num_factors - ret = w @ u - risk = cp.quad_form(v, covB) + cp.sum(cp.multiply(varU, w ** 2)) - obj = cp.Maximize(ret - self.lamb * risk) - d_bench = w - w_bench - d_inds = d_bench @ inds_onehot - cons = [ - w >= 0, - cp.sum(w) == 1, - d_bench >= -self.bench_dev, - d_bench <= self.bench_dev, - d_inds >= -self.inds_dev, - d_inds <= self.inds_dev - ] - if w0 is not None: - turnover = cp.sum(cp.abs(w - w0)) - cons.append(turnover <= self.delta) - - warm_start = False - if self.start_from_w0: - if w0 is None: - print('Warning: try warm start with w0, but w0 is `None`.') - else: - w.value = w0 - warm_start = True - elif self.start_from_bench: - w.value = w_bench - warm_start = True - - prob = cp.Problem(obj, cons) - prob.solve(solver=cp.SCS, verbose=self.verbose, warm_start=warm_start, max_iters=self.max_iters) - - if prob.status != 'optimal': - print('Warning: solve failed.', prob.status) - - return np.asarray(w.value)