mirror of
https://github.com/microsoft/qlib.git
synced 2026-07-03 19:10:58 +08:00
add portfolio module
This commit is contained in:
0
qlib/portfolio/__init__.py
Normal file
0
qlib/portfolio/__init__.py
Normal file
265
qlib/portfolio/optimizer.py
Normal file
265
qlib/portfolio/optimizer.py
Normal file
@@ -0,0 +1,265 @@
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
import warnings
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
import scipy.optimize as so
|
||||
|
||||
from typing import Optional, Union, Callable, List
|
||||
|
||||
|
||||
class PortfolioOptimizer(object):
|
||||
"""Portfolio Optimizer
|
||||
|
||||
The following optimization algorithms are supported:
|
||||
- `gmv`: Global Minimum Variance Portfolio
|
||||
- `mvo`: Mean Variance Optimized Portfolio
|
||||
- `rp`: Risk Parity
|
||||
- `inv`: Inverse Volatility
|
||||
|
||||
Note:
|
||||
This optimizer always assumes full investment and no-shorting.
|
||||
"""
|
||||
|
||||
OPT_GMV = 'gmv'
|
||||
OPT_MVO = 'mvo'
|
||||
OPT_RP = 'rp'
|
||||
OPT_INV = 'inv'
|
||||
|
||||
def __init__(self, method: str = 'inv', lamb: float = 0, delta: float = 0,
|
||||
alpha: float = 0.0, scale_alpha: bool = True, tol: float = 1e-8):
|
||||
"""
|
||||
Args:
|
||||
method (str): portfolio optimization method
|
||||
lamb (float): risk aversion parameter (larger `lamb` means more focus on return)
|
||||
delta (float): turnover rate limit
|
||||
alpha (float): l2 norm regularizer
|
||||
tol (float): tolerance for optimization termination
|
||||
"""
|
||||
assert method in [self.OPT_GMV, self.OPT_MVO, self.OPT_RP, self.OPT_INV], \
|
||||
f'method `{method}` is not supported'
|
||||
self.method = method
|
||||
|
||||
assert lamb >= 0, f'risk aversion parameter `lamb` should be positive'
|
||||
self.lamb = lamb
|
||||
|
||||
assert delta >= 0, f'turnover limit `delta` should be positive'
|
||||
self.delta = delta
|
||||
|
||||
assert alpha >= 0, f'l2 norm regularizer `alpha` should be positive'
|
||||
self.alpha = alpha
|
||||
|
||||
self.tol = tol
|
||||
|
||||
def __call__(self, S: Union[np.ndarray, pd.DataFrame],
|
||||
u: Optional[Union[np.ndarray, pd.Series]] = None,
|
||||
w0: Optional[Union[np.ndarray, pd.Series]] = None) -> Union[np.ndarray, pd.Series]:
|
||||
"""
|
||||
Args:
|
||||
S (np.ndarray or pd.DataFrame): covariance matrix
|
||||
u (np.ndarray or pd.Series): expected returns (a.k.a., alpha)
|
||||
w0 (np.ndarray or pd.Series): initial weights (for turnover control)
|
||||
|
||||
Returns:
|
||||
np.ndarray or pd.Series: optimized portfolio allocation
|
||||
"""
|
||||
# transform dataframe into array
|
||||
index = None
|
||||
if isinstance(S, pd.DataFrame):
|
||||
index = S.index
|
||||
S = S.values
|
||||
|
||||
# transform alpha
|
||||
if u is not None:
|
||||
assert len(u) == len(S), '`u` has mismatched shape'
|
||||
if isinstance(u, pd.Series):
|
||||
assert all(u.index == index), '`u` has mismatched index'
|
||||
u = u.values
|
||||
|
||||
# transform initial weights
|
||||
if w0 is not None:
|
||||
assert len(w0) == len(S), '`w0` has mismatched shape'
|
||||
if isinstance(w0, pd.Series):
|
||||
assert all(w0.index == index), '`w0` has mismatched index'
|
||||
w0 = w0.values
|
||||
|
||||
# scale alpha to match volatility
|
||||
if u is not None:
|
||||
u = u / u.std()
|
||||
u *= np.mean(np.diag(S))**0.5
|
||||
|
||||
# optimize
|
||||
w = self._optimize(S, u, w0)
|
||||
|
||||
# restore index if needed
|
||||
if index is not None:
|
||||
w = pd.Series(w, index=index)
|
||||
|
||||
return w
|
||||
|
||||
def _optimize(self, S: np.ndarray, u: Optional[np.ndarray] = None,
|
||||
w0: Optional[np.ndarray] = None) -> np.ndarray:
|
||||
|
||||
# inverse volatility
|
||||
if self.method == self.OPT_INV:
|
||||
if u is not None:
|
||||
warnings.warn('`u` is set but will not be used for `inv` portfolio')
|
||||
if w0 is not None:
|
||||
warnings.warn('`w0` is set but will not be used for `inv` portfolio')
|
||||
return self._optimize_inv(S)
|
||||
|
||||
# global minimum variance
|
||||
if self.method == self.OPT_GMV:
|
||||
if u is not None:
|
||||
warnings.warn('`u` is set but will not be used for `gmv` portfolio')
|
||||
return self._optimize_gmv(S, w0)
|
||||
|
||||
# mean-variance
|
||||
if self.method == self.OPT_MVO:
|
||||
return self._optimize_mvo(S, u, w0)
|
||||
|
||||
# risk parity
|
||||
if self.method == self.OPT_RP:
|
||||
if u is not None:
|
||||
warnings.warn('`u` is set but will not be used for `rp` portfolio')
|
||||
return self._optimize_rp(S, w0)
|
||||
|
||||
def _optimize_inv(self, S: np.ndarray) -> np.ndarray:
|
||||
"""Inverse volatility"""
|
||||
vola = np.diag(S)**0.5
|
||||
w = 1 / vola
|
||||
w /= w.sum()
|
||||
return w
|
||||
|
||||
def _optimize_gmv(self, S: np.ndarray, w0: Optional[np.ndarray] = None) -> np.ndarray:
|
||||
"""optimize global minimum variance portfolio
|
||||
|
||||
This method solves the following optimization problem
|
||||
min_w w' S w
|
||||
s.t. w >= 0, sum(w) == 1
|
||||
where `S` is the covariance matrix.
|
||||
"""
|
||||
return self._solve(
|
||||
len(S),
|
||||
self._get_objective_gmv(S),
|
||||
*self._get_constrains(w0)
|
||||
)
|
||||
|
||||
def _optimize_mvo(self, S: np.ndarray, u: Optional[np.ndarray] = None,
|
||||
w0: Optional[np.ndarray] = None) -> np.ndarray:
|
||||
"""optimize mean-variance portfolio
|
||||
|
||||
This method solves the following optimization problem
|
||||
min_w - w' u + lamb * w' S w
|
||||
s.t. w >= 0, sum(w) == 1
|
||||
where `S` is the covariance matrix, `u` is the expected returns,
|
||||
and `lamb` is the risk aversion parameter.
|
||||
"""
|
||||
return self._solve(
|
||||
len(S),
|
||||
self._get_objective_mvo(S, u),
|
||||
*self._get_constrains(w0)
|
||||
)
|
||||
|
||||
def _optimize_rp(self, S: np.ndarray, w0: Optional[np.ndarray] = None) -> np.ndarray:
|
||||
"""optimize risk parity portfolio
|
||||
|
||||
This method solves the following optimization problem
|
||||
min_w sum_i [w_i - (w' S w) / ((S w)_i * N)]**2
|
||||
s.t. w >= 0, sum(w) == 1
|
||||
where `S` is the covariance matrix and `N` is the number of stocks.
|
||||
"""
|
||||
return self._solve(
|
||||
len(S),
|
||||
self._get_objective_rp(S),
|
||||
*self._get_constrains(w0)
|
||||
)
|
||||
|
||||
def _get_objective_gmv(self, S: np.ndarray) -> np.ndarray:
|
||||
"""global minimum variance optimization objective
|
||||
|
||||
Optimization objective
|
||||
min_w w' S w
|
||||
"""
|
||||
|
||||
def func(x):
|
||||
return x @ S @ x
|
||||
|
||||
return func
|
||||
|
||||
def _get_objective_mvo(self, S: np.ndarray, u: np.ndarray = None) -> np.ndarray:
|
||||
"""mean-variance optimization objective
|
||||
|
||||
Optimization objective
|
||||
min_w - w' u + lamb * w' S w
|
||||
"""
|
||||
|
||||
def func(x):
|
||||
risk = x @ S @ x
|
||||
ret = x @ u
|
||||
return -ret + self.lamb * risk
|
||||
|
||||
return func
|
||||
|
||||
def _get_objective_rp(self, S: np.ndarray) -> np.ndarray:
|
||||
"""risk-parity optimization objective
|
||||
|
||||
Optimization objective
|
||||
min_w sum_i [w_i - (w' S w) / ((S w)_i * N)]**2
|
||||
"""
|
||||
|
||||
def func(x):
|
||||
N = len(x)
|
||||
Sx = S @ x
|
||||
xSx = x @ Sx
|
||||
return np.sum((x - xSx / Sx / N)**2)
|
||||
|
||||
return func
|
||||
|
||||
def _get_constrains(self, w0: Optional[np.ndarray] = None):
|
||||
"""optimization constraints
|
||||
|
||||
Defines the following constraints:
|
||||
- no shorting and leverage: 0 <= w <= 1
|
||||
- full investment: sum(w) == 1
|
||||
- turnover constraint: |w - w0| <= delta
|
||||
"""
|
||||
|
||||
# no shorting and leverage
|
||||
bounds = so.Bounds(0.0, 1.0)
|
||||
|
||||
# full investment constraint
|
||||
cons = [
|
||||
{'type': 'eq', 'fun': lambda x: np.sum(x) - 1} # == 0
|
||||
]
|
||||
|
||||
# turnover constraint
|
||||
if w0 is not None:
|
||||
cons.append(
|
||||
{'type': 'ineq', 'fun': lambda x: self.delta - np.sum(np.abs(x - w0))} # >= 0
|
||||
)
|
||||
|
||||
return bounds, cons
|
||||
|
||||
def _solve(self, n: int, obj: Callable, bounds: so.Bounds, cons: List) -> np.ndarray:
|
||||
"""solve optimization
|
||||
|
||||
Args:
|
||||
n (int): number of parameters
|
||||
obj (callable): optimization objective
|
||||
bounds (Bounds): bounds of parameters
|
||||
cons (list): optimization constraints
|
||||
"""
|
||||
# add l2 regularization
|
||||
wrapped_obj = obj
|
||||
if self.alpha > 0:
|
||||
wrapped_obj = lambda x: obj(x) + self.alpha * np.sum(np.square(x))
|
||||
|
||||
# solve
|
||||
x0 = np.ones(n) / n # init results
|
||||
sol = so.minimize(wrapped_obj, x0, bounds=bounds, constraints=cons, tol=self.tol)
|
||||
if not sol.success:
|
||||
warnings.warn(f'optimization not success ({sol.status})')
|
||||
|
||||
return sol.x
|
||||
Reference in New Issue
Block a user