1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-06-06 05:51:17 +08:00

support optimization based strategy (#754)

* support optimization based strategy

* fix riskdata not found & update doc

* refactor signal_strategy

* add portfolio example

* Update examples/portfolio/prepare_riskdata.py

Co-authored-by: you-n-g <you-n-g@users.noreply.github.com>

* fix typo

Co-authored-by: you-n-g <you-n-g@users.noreply.github.com>

* fix typo

Co-authored-by: you-n-g <you-n-g@users.noreply.github.com>

* update doc

* fix riskmodel doc

Co-authored-by: you-n-g <you-n-g@users.noreply.github.com>

Co-authored-by: you-n-g <you-n-g@users.noreply.github.com>
This commit is contained in:
Dong Zhou
2021-12-28 18:44:20 +08:00
committed by GitHub
parent 4709909782
commit 1b8f0b4575
14 changed files with 668 additions and 262 deletions

View File

@@ -8,7 +8,7 @@ Portfolio Strategy: Portfolio Management
Introduction
===================
``Portfolio Strategy`` is designed to adopt different portfolio strategies, which means that users can adopt different algorithms to generate investment portfolios based on the prediction scores of the ``Forecast Model``. Users can use the ``Portfolio Strategy`` in an automatic workflow by ``Workflow`` module, please refer to `Workflow: Workflow Management <workflow.html>`_.
``Portfolio Strategy`` is designed to adopt different portfolio strategies, which means that users can adopt different algorithms to generate investment portfolios based on the prediction scores of the ``Forecast Model``. Users can use the ``Portfolio Strategy`` in an automatic workflow by ``Workflow`` module, please refer to `Workflow: Workflow Management <workflow.html>`_.
Because the components in ``Qlib`` are designed in a loosely-coupled way, ``Portfolio Strategy`` can be used as an independent module also.
@@ -28,14 +28,14 @@ Qlib provides a base class ``qlib.contrib.strategy.BaseStrategy``. All strategy
Return the proportion of your total value you will use in investment. Dynamically risk_degree will result in Market timing.
- `generate_order_list`
Return the order list.
Return the order list.
Users can inherit `BaseStrategy` to customize their strategy class.
WeightStrategyBase
--------------------
Qlib also provides a class ``qlib.contrib.strategy.WeightStrategyBase`` that is a subclass of `BaseStrategy`.
Qlib also provides a class ``qlib.contrib.strategy.WeightStrategyBase`` that is a subclass of `BaseStrategy`.
`WeightStrategyBase` only focuses on the target positions, and automatically generates an order list based on positions. It provides the `generate_target_weight_position` interface.
@@ -71,17 +71,27 @@ TopkDropoutStrategy
- `Topk`: The number of stocks held
- `Drop`: The number of stocks sold on each trading day
Currently, the number of held stocks is `Topk`.
On each trading day, the `Drop` number of held stocks with the worst `prediction score` will be sold, and the same number of unheld stocks with the best `prediction score` will be bought.
.. image:: ../_static/img/topk_drop.png
:alt: Topk-Drop
``TopkDrop`` algorithm sells `Drop` stocks every trading day, which guarantees a fixed turnover rate.
- Generate the order list from the target amount
EnhancedIndexingStrategy
------------------------
`EnhancedIndexingStrategy` Enhanced indexing combines the arts of active management and passive management,
with the aim of outperforming a benchmark index (e.g., S&P 500) in terms of portfolio return while controlling
the risk exposure (a.k.a. tracking error).
For more information, please refer to `qlib.contrib.strategy.signal_strategy.EnhancedIndexingStrategy`
and `qlib.contrib.strategy.optimizer.enhanced_indexing.EnhancedIndexingOptimizer`.
Usage & Example
====================

View File

@@ -0,0 +1,46 @@
# Portfolio Optimization Strategy
## Introduction
In `qlib/examples/benchmarks` we have various **alpha** models that predict
the stock returns. We also use a simple rule based `TopkDropoutStrategy` to
evaluate the investing performance of these models. However, such a strategy
is too simple to control the portfolio risk like correlation and volatility.
To this end, an optimization based strategy should be used to for the
trade-off between return and risk. In this doc, we will show how to use
`EnhancedIndexingStrategy` to maximize portfolio return while minimizing
tracking error relative to a benchmark.
## Preparation
We use China stock market data for our example.
1. Prepare CSI300 weight:
```bash
wget http://fintech.msra.cn/stock_data/downloads/csi300_weight.zip
unzip -d ~/.qlib/qlib_data/cn_data csi300_weight.zip
rm -f csi300_weight.zip
```
2. Prepare risk model data:
```bash
python prepare_riskdata.py
```
Here we use a **Statistical Risk Model** implemented in `qlib.model.riskmodel`.
However users are strongly recommended to use other risk models for better quality:
* **Fundamental Risk Model** like MSCI BARRA
* [Deep Risk Model](https://arxiv.org/abs/2107.05201)
## End-to-End Workflow
You can finish workflow with `EnhancedIndexingStrategy` by running
`qrun config_enhanced_indexing.yaml`.
In this config, we mainly changed the strategy section compared to
`qlib/examples/benchmarks/workflow_config_lightgbm_Alpha158.yaml`.

View File

@@ -0,0 +1,71 @@
qlib_init:
provider_uri: "~/.qlib/qlib_data/cn_data"
region: cn
market: &market csi300
benchmark: &benchmark SH000300
data_handler_config: &data_handler_config
start_time: 2008-01-01
end_time: 2020-08-01
fit_start_time: 2008-01-01
fit_end_time: 2014-12-31
instruments: *market
port_analysis_config: &port_analysis_config
strategy:
class: EnhancedIndexingStrategy
module_path: qlib.contrib.strategy
kwargs:
model: <MODEL>
dataset: <DATASET>
riskmodel_root: ./riskdata
backtest:
start_time: 2017-01-01
end_time: 2020-08-01
account: 100000000
benchmark: *benchmark
exchange_kwargs:
limit_threshold: 0.095
deal_price: close
open_cost: 0.0005
close_cost: 0.0015
min_cost: 5
task:
model:
class: LGBModel
module_path: qlib.contrib.model.gbdt
kwargs:
loss: mse
colsample_bytree: 0.8879
learning_rate: 0.2
subsample: 0.8789
lambda_l1: 205.6999
lambda_l2: 580.9768
max_depth: 8
num_leaves: 210
num_threads: 20
dataset:
class: DatasetH
module_path: qlib.data.dataset
kwargs:
handler:
class: Alpha158
module_path: qlib.contrib.data.handler
kwargs: *data_handler_config
segments:
train: [2008-01-01, 2014-12-31]
valid: [2015-01-01, 2016-12-31]
test: [2017-01-01, 2020-08-01]
record:
- class: SignalRecord
module_path: qlib.workflow.record_temp
kwargs:
model: <MODEL>
dataset: <DATASET>
- class: SigAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
ana_long_short: False
ann_scaler: 252
- class: PortAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
config: *port_analysis_config

View File

@@ -0,0 +1,55 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import os
import numpy as np
import pandas as pd
from qlib.data import D
from qlib.model.riskmodel import StructuredCovEstimator
def prepare_data(riskdata_root="./riskdata", T=240, start_time="2016-01-01"):
universe = D.features(D.instruments("csi300"), ["$close"], start_time=start_time).swaplevel().sort_index()
price_all = (
D.features(D.instruments("all"), ["$close"], start_time=start_time).squeeze().unstack(level="instrument")
)
# StructuredCovEstimator is a statistical risk model
riskmodel = StructuredCovEstimator()
for i in range(T - 1, len(price_all)):
date = price_all.index[i]
ref_date = price_all.index[i - T + 1]
print(date)
codes = universe.loc[date].index
price = price_all.loc[ref_date:date, codes]
# calculate return and remove extreme return
ret = price.pct_change()
ret.clip(ret.quantile(0.025), ret.quantile(0.975), axis=1, inplace=True)
# run risk model
F, cov_b, var_u = riskmodel.predict(ret, is_price=False, return_decomposed_components=True)
# save risk data
root = riskdata_root + "/" + date.strftime("%Y%m%d")
os.makedirs(root, exist_ok=True)
pd.DataFrame(F, index=codes).to_pickle(root + "/factor_exp.pkl")
pd.DataFrame(cov_b).to_pickle(root + "/factor_cov.pkl")
# for specific_risk we follow the convention to save volatility
pd.Series(np.sqrt(var_u), index=codes).to_pickle(root + "/specific_risk.pkl")
if __name__ == "__main__":
import qlib
qlib.init(provider_uri="~/.qlib/qlib_data/cn_data")
prepare_data()

View File

@@ -5,6 +5,7 @@
from .signal_strategy import (
TopkDropoutStrategy,
WeightStrategyBase,
EnhancedIndexingStrategy,
)
from .rule_strategy import (

View File

@@ -0,0 +1,203 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import numpy as np
import cvxpy as cp
import pandas as pd
from typing import Union, Optional, Dict, Any, List
from qlib.log import get_module_logger
from .base import BaseOptimizer
logger = get_module_logger("EnhancedIndexingOptimizer")
class EnhancedIndexingOptimizer(BaseOptimizer):
"""
Portfolio Optimizer for Enhanced Indexing
Notations:
w0: current holding weights
wb: benchmark weight
r: expected return
F: factor exposure
cov_b: factor covariance
var_u: residual variance (diagonal)
lamb: risk aversion parameter
delta: total turnover limit
b_dev: benchmark deviation limit
f_dev: factor deviation limit
Also denote:
d = w - wb: benchmark deviation
v = d @ F: factor deviation
The optimization problem for enhanced indexing:
max_w d @ r - lamb * (v @ cov_b @ v + var_u @ d**2)
s.t. w >= 0
sum(w) == 1
sum(|w - w0|) <= delta
d >= -b_dev
d <= b_dev
v >= -f_dev
v <= f_dev
"""
def __init__(
self,
lamb: float = 1,
delta: Optional[float] = 0.2,
b_dev: Optional[float] = 0.01,
f_dev: Optional[Union[List[float], np.ndarray]] = None,
scale_return: bool = True,
epsilon: float = 5e-5,
solver_kwargs: Optional[Dict[str, Any]] = {},
):
"""
Args:
lamb (float): risk aversion parameter (larger `lamb` means more focus on risk)
delta (float): total turnover limit
b_dev (float): benchmark deviation limit
f_dev (list): factor deviation limit
scale_return (bool): whether scale return to match estimated volatility
epsilon (float): minimum weight
solver_kwargs (dict): kwargs for cvxpy solver
"""
assert lamb >= 0, "risk aversion parameter `lamb` should be positive"
self.lamb = lamb
assert delta >= 0, "turnover limit `delta` should be positive"
self.delta = delta
assert b_dev is None or b_dev >= 0, "benchmark deviation limit `b_dev` should be positive"
self.b_dev = b_dev
if isinstance(f_dev, float):
assert f_dev >= 0, "factor deviation limit `f_dev` should be positive"
elif f_dev is not None:
f_dev = np.array(f_dev)
assert all(f_dev >= 0), "factor deviation limit `f_dev` should be positive"
self.f_dev = f_dev
self.scale_return = scale_return
self.epsilon = epsilon
self.solver_kwargs = solver_kwargs
def __call__(
self,
r: np.ndarray,
F: np.ndarray,
cov_b: np.ndarray,
var_u: np.ndarray,
w0: np.ndarray,
wb: np.ndarray,
mfh: Optional[np.ndarray] = None,
mfs: Optional[np.ndarray] = None,
) -> np.ndarray:
"""
Args:
r (np.ndarray): expected returns
F (np.ndarray): factor exposure
cov_b (np.ndarray): factor covariance
var_u (np.ndarray): residual variance
w0 (np.ndarray): current holding weights
wb (np.ndarray): benchmark weights
mfh (np.ndarray): mask force holding
mfs (np.ndarray): mask force selling
Returns:
np.ndarray: optimized portfolio allocation
"""
# scale return to match volatility
if self.scale_return:
r = r / r.std()
r *= np.sqrt(np.mean(np.diag(F @ cov_b @ F.T) + var_u))
# target weight
w = cp.Variable(len(r), nonneg=True)
w.value = wb # for warm start
# precompute exposure
d = w - wb # benchmark exposure
v = d @ F # factor exposure
# objective
ret = d @ r # excess return
risk = cp.quad_form(v, cov_b) + var_u @ (d ** 2) # tracking error
obj = cp.Maximize(ret - self.lamb * risk)
# weight bounds
lb = np.zeros_like(wb)
ub = np.ones_like(wb)
# bench bounds
if self.b_dev is not None:
lb = np.maximum(lb, wb - self.b_dev)
ub = np.minimum(ub, wb + self.b_dev)
# force holding
if mfh is not None:
lb[mfh] = w0[mfh]
ub[mfh] = w0[mfh]
# force selling
# NOTE: this will override mfh
if mfs is not None:
lb[mfs] = 0
ub[mfs] = 0
# constraints
# TODO: currently we assume fullly invest in the stocks,
# in the future we should support holding cash as an asset
cons = [cp.sum(w) == 1, w >= lb, w <= ub]
# factor deviation
if self.f_dev is not None:
cons.extend([v >= -self.f_dev, v <= self.f_dev])
# total turnover constraint
t_cons = []
if self.delta is not None:
if w0 is not None and w0.sum() > 0:
t_cons.extend([cp.norm(w - w0, 1) <= self.delta])
# optimize
# trial 1: use all constraints
success = False
try:
prob = cp.Problem(obj, cons + t_cons)
prob.solve(solver=cp.ECOS, warm_start=True, **self.solver_kwargs)
assert prob.status == "optimal"
success = True
except Exception as e:
logger.warning(f"trial 1 failed {e} (status: {prob.status})")
# trial 2: remove turnover constraint
if not success and len(t_cons):
logger.info("try removing turnover constraint as the last optimization failed")
try:
w.value = wb
prob = cp.Problem(obj, cons)
prob.solve(solver=cp.ECOS, warm_start=True, **self.solver_kwargs)
assert prob.status in ["optimal", "optimal_inaccurate"]
success = True
except Exception as e:
logger.warning(f"trial 2 failed {e} (status: {prob.status})")
# return current weight if not success
if not success:
logger.warning("optimization failed, will return current holding weight")
return w0
if prob.status == "optimal_inaccurate":
logger.warning(f"the optimization is inaccurate")
# remove small weight
w = np.asarray(w.value)
w[w < self.epsilon] = 0
w /= w.sum()
return w

View File

@@ -8,7 +8,7 @@ import pandas as pd
import scipy.optimize as so
from typing import Optional, Union, Callable, List
from qlib.portfolio.optimizer import BaseOptimizer
from .base import BaseOptimizer
class PortfolioOptimizer(BaseOptimizer):
@@ -35,7 +35,7 @@ class PortfolioOptimizer(BaseOptimizer):
lamb: float = 0,
delta: float = 0,
alpha: float = 0.0,
scale_alpha: bool = True,
scale_return: bool = True,
tol: float = 1e-8,
):
"""
@@ -44,7 +44,7 @@ class PortfolioOptimizer(BaseOptimizer):
lamb (float): risk aversion parameter (larger `lamb` means more focus on return)
delta (float): turnover rate limit
alpha (float): l2 norm regularizer
scale_alpha (bool): if to scale alpha to match the volatility of the covariance matrix
scale_return (bool): if to scale alpha to match the volatility of the covariance matrix
tol (float): tolerance for optimization termination
"""
assert method in [self.OPT_GMV, self.OPT_MVO, self.OPT_RP, self.OPT_INV], f"method `{method}` is not supported"
@@ -60,18 +60,18 @@ class PortfolioOptimizer(BaseOptimizer):
self.alpha = alpha
self.tol = tol
self.scale_alpha = scale_alpha
self.scale_return = scale_return
def __call__(
self,
S: Union[np.ndarray, pd.DataFrame],
u: Optional[Union[np.ndarray, pd.Series]] = None,
r: Optional[Union[np.ndarray, pd.Series]] = None,
w0: Optional[Union[np.ndarray, pd.Series]] = None,
) -> Union[np.ndarray, pd.Series]:
"""
Args:
S (np.ndarray or pd.DataFrame): covariance matrix
u (np.ndarray or pd.Series): expected returns (a.k.a., alpha)
r (np.ndarray or pd.Series): expected return
w0 (np.ndarray or pd.Series): initial weights (for turnover control)
Returns:
@@ -83,12 +83,12 @@ class PortfolioOptimizer(BaseOptimizer):
index = S.index
S = S.values
# transform alpha
if u is not None:
assert len(u) == len(S), "`u` has mismatched shape"
if isinstance(u, pd.Series):
assert u.index.equals(index), "`u` has mismatched index"
u = u.values
# transform return
if r is not None:
assert len(r) == len(S), "`r` has mismatched shape"
if isinstance(r, pd.Series):
assert r.index.equals(index), "`r` has mismatched index"
r = r.values
# transform initial weights
if w0 is not None:
@@ -97,13 +97,13 @@ class PortfolioOptimizer(BaseOptimizer):
assert w0.index.equals(index), "`w0` has mismatched index"
w0 = w0.values
# scale alpha to match volatility
if u is not None and self.scale_alpha:
u = u / u.std()
u *= np.mean(np.diag(S)) ** 0.5
# scale return to match volatility
if r is not None and self.scale_return:
r = r / r.std()
r *= np.sqrt(np.mean(np.diag(S)))
# optimize
w = self._optimize(S, u, w0)
w = self._optimize(S, r, w0)
# restore index if needed
if index is not None:
@@ -111,30 +111,30 @@ class PortfolioOptimizer(BaseOptimizer):
return w
def _optimize(self, S: np.ndarray, u: Optional[np.ndarray] = None, w0: Optional[np.ndarray] = None) -> np.ndarray:
def _optimize(self, S: np.ndarray, r: Optional[np.ndarray] = None, w0: Optional[np.ndarray] = None) -> np.ndarray:
# inverse volatility
if self.method == self.OPT_INV:
if u is not None:
warnings.warn("`u` is set but will not be used for `inv` portfolio")
if r is not None:
warnings.warn("`r` is set but will not be used for `inv` portfolio")
if w0 is not None:
warnings.warn("`w0` is set but will not be used for `inv` portfolio")
return self._optimize_inv(S)
# global minimum variance
if self.method == self.OPT_GMV:
if u is not None:
warnings.warn("`u` is set but will not be used for `gmv` portfolio")
if r is not None:
warnings.warn("`r` is set but will not be used for `gmv` portfolio")
return self._optimize_gmv(S, w0)
# mean-variance
if self.method == self.OPT_MVO:
return self._optimize_mvo(S, u, w0)
return self._optimize_mvo(S, r, w0)
# risk parity
if self.method == self.OPT_RP:
if u is not None:
warnings.warn("`u` is set but will not be used for `rp` portfolio")
if r is not None:
warnings.warn("`r` is set but will not be used for `rp` portfolio")
return self._optimize_rp(S, w0)
def _optimize_inv(self, S: np.ndarray) -> np.ndarray:
@@ -155,17 +155,17 @@ class PortfolioOptimizer(BaseOptimizer):
return self._solve(len(S), self._get_objective_gmv(S), *self._get_constrains(w0))
def _optimize_mvo(
self, S: np.ndarray, u: Optional[np.ndarray] = None, w0: Optional[np.ndarray] = None
self, S: np.ndarray, r: Optional[np.ndarray] = None, w0: Optional[np.ndarray] = None
) -> np.ndarray:
"""optimize mean-variance portfolio
This method solves the following optimization problem
min_w - w' u + lamb * w' S w
min_w - w' r + lamb * w' S w
s.t. w >= 0, sum(w) == 1
where `S` is the covariance matrix, `u` is the expected returns,
and `lamb` is the risk aversion parameter.
"""
return self._solve(len(S), self._get_objective_mvo(S, u), *self._get_constrains(w0))
return self._solve(len(S), self._get_objective_mvo(S, r), *self._get_constrains(w0))
def _optimize_rp(self, S: np.ndarray, w0: Optional[np.ndarray] = None) -> np.ndarray:
"""optimize risk parity portfolio
@@ -189,16 +189,16 @@ class PortfolioOptimizer(BaseOptimizer):
return func
def _get_objective_mvo(self, S: np.ndarray, u: np.ndarray = None) -> Callable:
def _get_objective_mvo(self, S: np.ndarray, r: np.ndarray = None) -> Callable:
"""mean-variance optimization objective
Optimization objective
min_w - w' u + lamb * w' S w
min_w - w' r + lamb * w' S w
"""
def func(x):
risk = x @ S @ x
ret = x @ u
ret = x @ r
return -ret + self.lamb * risk
return func

View File

@@ -1,70 +1,49 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import os
import copy
from qlib.backtest.signal import Signal, create_signal_from
from typing import Dict, List, Text, Tuple, Union
from qlib.data.dataset import Dataset
from qlib.model.base import BaseModel
from qlib.backtest.position import Position
import warnings
import cvxpy as cp
import numpy as np
import pandas as pd
from ...utils.resam import resam_ts_data
from ...strategy.base import BaseStrategy
from ...backtest.decision import Order, BaseTradeDecision, OrderDir, TradeDecisionWO
from typing import Dict, List, Text, Tuple, Union
from .order_generator import OrderGenWInteract
from qlib.data import D
from qlib.data.dataset import Dataset
from qlib.model.base import BaseModel
from qlib.strategy.base import BaseStrategy
from qlib.backtest.position import Position
from qlib.backtest.signal import Signal, create_signal_from
from qlib.backtest.decision import Order, BaseTradeDecision, OrderDir, TradeDecisionWO
from qlib.log import get_module_logger
from qlib.utils import get_pre_trading_date, load_dataset
from qlib.utils.resam import resam_ts_data
from qlib.contrib.strategy.order_generator import OrderGenWInteract, OrderGenWOInteract
from qlib.contrib.strategy.optimizer import EnhancedIndexingOptimizer
class TopkDropoutStrategy(BaseStrategy):
# TODO:
# 1. Supporting leverage the get_range_limit result from the decision
# 2. Supporting alter_outer_trade_decision
# 3. Supporting checking the availability of trade decision
class BaseSignalStrategy(BaseStrategy):
def __init__(
self,
*,
topk,
n_drop,
signal: Union[Signal, Tuple[BaseModel, Dataset], List, Dict, Text, pd.Series, pd.DataFrame] = None,
method_sell="bottom",
method_buy="top",
risk_degree=0.95,
hold_thresh=1,
only_tradable=False,
model=None,
dataset=None,
risk_degree: float = 0.95,
trade_exchange=None,
level_infra=None,
common_infra=None,
model=None,
dataset=None,
**kwargs,
):
"""
Parameters
-----------
topk : int
the number of stocks in the portfolio.
n_drop : int
number of stocks to be replaced in each trading date.
signal :
the information to describe a signal. Please refer to the docs of `qlib.backtest.signal.create_signal_from`
the decision of the strategy will base on the given signal
method_sell : str
dropout method_sell, random/bottom.
method_buy : str
dropout method_buy, random/top.
risk_degree : float
position percentage of total value.
hold_thresh : int
minimum holding days
before sell stock , will check current.get_stock_count(order.stock_id) >= self.hold_thresh.
only_tradable : bool
will the strategy only consider the tradable stock when buying and selling.
if only_tradable:
strategy will make buy sell decision without checking the tradable state of the stock.
else:
strategy will make decision with the tradable state of the stock info and avoid buy and sell them.
trade_exchange : Exchange
exchange that provides market info, used to deal order and generate report
- If `trade_exchange` is None, self.trade_exchange will be set with common_infra
@@ -74,16 +53,9 @@ class TopkDropoutStrategy(BaseStrategy):
- In minutely execution, the daily exchange is not usable, only the minutely exchange is recommended.
"""
super(TopkDropoutStrategy, self).__init__(
level_infra=level_infra, common_infra=common_infra, trade_exchange=trade_exchange, **kwargs
)
self.topk = topk
self.n_drop = n_drop
self.method_sell = method_sell
self.method_buy = method_buy
super().__init__(level_infra=level_infra, common_infra=common_infra, trade_exchange=trade_exchange, **kwargs)
self.risk_degree = risk_degree
self.hold_thresh = hold_thresh
self.only_tradable = only_tradable
# This is trying to be compatible with previous version of qlib task config
if model is not None and dataset is not None:
@@ -100,6 +72,52 @@ class TopkDropoutStrategy(BaseStrategy):
# It will use 95% amoutn of your total value by default
return self.risk_degree
class TopkDropoutStrategy(BaseSignalStrategy):
# TODO:
# 1. Supporting leverage the get_range_limit result from the decision
# 2. Supporting alter_outer_trade_decision
# 3. Supporting checking the availability of trade decision
def __init__(
self,
*,
topk,
n_drop,
method_sell="bottom",
method_buy="top",
hold_thresh=1,
only_tradable=False,
**kwargs,
):
"""
Parameters
-----------
topk : int
the number of stocks in the portfolio.
n_drop : int
number of stocks to be replaced in each trading date.
method_sell : str
dropout method_sell, random/bottom.
method_buy : str
dropout method_buy, random/top.
hold_thresh : int
minimum holding days
before sell stock , will check current.get_stock_count(order.stock_id) >= self.hold_thresh.
only_tradable : bool
will the strategy only consider the tradable stock when buying and selling.
if only_tradable:
strategy will make buy sell decision without checking the tradable state of the stock.
else:
strategy will make decision with the tradable state of the stock info and avoid buy and sell them.
"""
super().__init__(**kwargs)
self.topk = topk
self.n_drop = n_drop
self.method_sell = method_sell
self.method_buy = method_buy
self.hold_thresh = hold_thresh
self.only_tradable = only_tradable
def generate_trade_decision(self, execute_result=None):
# get the number of trading step finished, trade_step can be [0, 1, 2, ..., trade_len - 1]
trade_step = self.trade_calendar.get_trade_step()
@@ -253,7 +271,7 @@ class TopkDropoutStrategy(BaseStrategy):
return TradeDecisionWO(sell_order_list + buy_order_list, self)
class WeightStrategyBase(BaseStrategy):
class WeightStrategyBase(BaseSignalStrategy):
# TODO:
# 1. Supporting leverage the get_range_limit result from the decision
# 2. Supporting alter_outer_trade_decision
@@ -261,11 +279,7 @@ class WeightStrategyBase(BaseStrategy):
def __init__(
self,
*,
signal: Union[Signal, Tuple[BaseModel, Dataset], List, Dict, Text, pd.Series, pd.DataFrame],
order_generator_cls_or_obj=OrderGenWInteract,
trade_exchange=None,
level_infra=None,
common_infra=None,
order_generator_cls_or_obj=OrderGenWOInteract,
**kwargs,
):
"""
@@ -280,24 +294,13 @@ class WeightStrategyBase(BaseStrategy):
- In daily execution, both daily exchange and minutely are usable, but the daily exchange is recommended because it run faster.
- In minutely execution, the daily exchange is not usable, only the minutely exchange is recommended.
"""
super(WeightStrategyBase, self).__init__(
level_infra=level_infra, common_infra=common_infra, trade_exchange=trade_exchange, **kwargs
)
super().__init__(**kwargs)
if isinstance(order_generator_cls_or_obj, type):
self.order_generator = order_generator_cls_or_obj()
else:
self.order_generator = order_generator_cls_or_obj
self.signal: Signal = create_signal_from(signal)
def get_risk_degree(self, trade_step=None):
"""get_risk_degree
Return the proportion of your total value you will used in investment.
Dynamically risk_degree will result in Market timing.
"""
# It will use 95% amoutn of your total value by default
return 0.95
def generate_target_weight_position(self, score, current, trade_start_time, trade_end_time):
"""
Generate target position from score for this date and the current position.The cash is not considered in the position
@@ -341,3 +344,154 @@ class WeightStrategyBase(BaseStrategy):
trade_end_time=trade_end_time,
)
return TradeDecisionWO(order_list, self)
class EnhancedIndexingStrategy(WeightStrategyBase):
"""Enhanced Indexing Strategy
Enhanced indexing combines the arts of active management and passive management,
with the aim of outperforming a benchmark index (e.g., S&P 500) in terms of
portfolio return while controlling the risk exposure (a.k.a. tracking error).
Users need to prepare their risk model data like below:
├── /path/to/riskmodel
├──── 20210101
├────── factor_exp.{csv|pkl|h5}
├────── factor_cov.{csv|pkl|h5}
├────── specific_risk.{csv|pkl|h5}
├────── blacklist.{csv|pkl|h5} # optional
The risk model data can be obtained from risk data provider. You can also use
`qlib.model.riskmodel.structured.StructuredCovEstimator` to prepare these data.
Args:
riskmodel_path (str): risk model path
name_mapping (dict): alternative file names
"""
FACTOR_EXP_NAME = "factor_exp.pkl"
FACTOR_COV_NAME = "factor_cov.pkl"
SPECIFIC_RISK_NAME = "specific_risk.pkl"
BLACKLIST_NAME = "blacklist.pkl"
def __init__(
self,
*,
riskmodel_root,
market="csi500",
turn_limit=None,
name_mapping={},
optimizer_kwargs={},
verbose=False,
**kwargs,
):
super().__init__(**kwargs)
self.logger = get_module_logger("EnhancedIndexingStrategy")
self.riskmodel_root = riskmodel_root
self.market = market
self.turn_limit = turn_limit
self.factor_exp_path = name_mapping.get("factor_exp", self.FACTOR_EXP_NAME)
self.factor_cov_path = name_mapping.get("factor_cov", self.FACTOR_COV_NAME)
self.specific_risk_path = name_mapping.get("specific_risk", self.SPECIFIC_RISK_NAME)
self.blacklist_path = name_mapping.get("blacklist", self.BLACKLIST_NAME)
self.optimizer = EnhancedIndexingOptimizer(**optimizer_kwargs)
self.verbose = verbose
self._riskdata_cache = {}
def get_risk_data(self, date):
if date in self._riskdata_cache:
return self._riskdata_cache[date]
root = self.riskmodel_root + "/" + date.strftime("%Y%m%d")
if not os.path.exists(root):
return None
factor_exp = load_dataset(root + "/" + self.factor_exp_path, index_col=[0])
factor_cov = load_dataset(root + "/" + self.factor_cov_path, index_col=[0])
specific_risk = load_dataset(root + "/" + self.specific_risk_path, index_col=[0])
if not factor_exp.index.equals(specific_risk.index):
# NOTE: for stocks missing specific_risk, we always assume it have the highest volatility
specific_risk = specific_risk.reindex(factor_exp.index, fill_value=specific_risk.max())
universe = factor_exp.index.tolist()
blacklist = []
if os.path.exists(root + "/" + self.blacklist_path):
blacklist = load_dataset(root + "/" + self.blacklist_path).index.tolist()
self._riskdata_cache[date] = factor_exp.values, factor_cov.values, specific_risk.values, universe, blacklist
return self._riskdata_cache[date]
def generate_target_weight_position(self, score, current, trade_start_time, trade_end_time):
trade_date = trade_start_time
pre_date = get_pre_trading_date(trade_date, future=True) # previous trade date
# load risk data
outs = self.get_risk_data(pre_date)
if outs is None:
self.logger.warning(f"no risk data for {pre_date:%Y-%m-%d}, skip optimization")
return None
factor_exp, factor_cov, specific_risk, universe, blacklist = outs
# transform score
# NOTE: for stocks missing score, we always assume they have the lowest score
score = score.reindex(universe).fillna(score.min()).values
# get current weight
# NOTE: if a stock is not in universe, its current weight will be zero
cur_weight = current.get_stock_weight_dict(only_stock=False)
cur_weight = np.array([cur_weight.get(stock, 0) for stock in universe])
assert all(cur_weight >= 0), "current weight has negative values"
cur_weight = cur_weight / self.get_risk_degree(trade_date) # sum of weight should be risk_degree
if cur_weight.sum() > 1 and self.verbose:
self.logger.warning(f"previous total holdings excess risk degree (current: {cur_weight.sum()})")
# load bench weight
bench_weight = D.features(
D.instruments("all"), [f"${self.market}_weight"], start_time=pre_date, end_time=pre_date
).squeeze()
bench_weight.index = bench_weight.index.droplevel(level="datetime")
bench_weight = bench_weight.reindex(universe).fillna(0).values
# whether stock tradable
# NOTE: currently we use last day volume to check whether tradable
tradable = D.features(D.instruments("all"), ["$volume"], start_time=pre_date, end_time=pre_date).squeeze()
tradable.index = tradable.index.droplevel(level="datetime")
tradable = tradable.reindex(universe).gt(0).values
mask_force_hold = ~tradable
# mask force sell
mask_force_sell = np.array([stock in blacklist for stock in universe], dtype=bool)
# optimize
weight = self.optimizer(
r=score,
F=factor_exp,
cov_b=factor_cov,
var_u=specific_risk ** 2,
w0=cur_weight,
wb=bench_weight,
mfh=mask_force_hold,
mfs=mask_force_sell,
)
target_weight_position = {stock: weight for stock, weight in zip(universe, weight) if weight > 0}
if self.verbose:
self.logger.info("trade date: {:%Y-%m-%d}".format(trade_date))
self.logger.info("number of holding stocks: {}".format(len(target_weight_position)))
self.logger.info("total holding weight: {:.6f}".format(weight.sum()))
return target_weight_position

View File

@@ -13,19 +13,30 @@ class StructuredCovEstimator(RiskModel):
"""Structured Covariance Estimator
This estimator assumes observations can be predicted by multiple factors
X = FB + U
where `F` can be specified by explicit risk factors or latent factors.
X = B @ F.T + U
where `X` contains observations (row) of multiple variables (column),
`F` contains factor exposures (column) for all variables (row),
`B` is the regression coefficients matrix for all observations (row) on
all factors (columns), and `U` is the residual matrix with shape like `X`.
Therefore the structured covariance can be estimated by
cov(X) = F cov(B) F.T + cov(U)
cov(X.T) = F @ cov(B.T) @ F.T + diag(var(U))
We use latent factor models to estimate the structured covariance.
Specifically, the following latent factor models are supported:
In finance domain, there are mainly three methods to design `F` [1][2]:
- Statistical Risk Model (SRM): latent factor models major components
- Fundamental Risk Model (FRM): human designed factors
- Deep Risk Model (DRM): neural network designed factors (like a blend of SRM & DRM)
In this implementation we use latent factor models to specify `F`.
Specifically, the following two latent factor models are supported:
- `pca`: Principal Component Analysis
- `fa`: Factor Analysis
Reference: [1] Fan, J., Liao, Y., & Liu, H. (2016). An overview of the estimation of large covariance and
precision matrices. Econometrics Journal, 19(1), C1C32. https://doi.org/10.1111/ectj.12061
Reference:
[1] Fan, J., Liao, Y., & Liu, H. (2016). An overview of the estimation of large covariance and
precision matrices. Econometrics Journal, 19(1), C1C32. https://doi.org/10.1111/ectj.12061
[2] Lin, H., Zhou, D., Liu, W., & Bian, J. (2021). Deep Risk Model: A Deep Learning Solution for
Mining Latent Risk Factors to Improve Covariance Matrix Estimation. arXiv preprint arXiv:2107.05201.
"""
FACTOR_MODEL_PCA = "pca"
@@ -70,10 +81,10 @@ class StructuredCovEstimator(RiskModel):
model = self.solver(self.num_factors, random_state=0).fit(X)
F = model.components_.T # num_features x num_factors
B = model.transform(X) # num_samples x num_factors
F = model.components_.T # variables x factors
B = model.transform(X) # observations x factors
U = X - B @ F.T
cov_b = np.cov(B.T) # num_factors x num_factors
cov_b = np.cov(B.T) # factors x factors
var_u = np.var(U, axis=0) # diagonal
if return_decomposed_components:

View File

@@ -1,2 +0,0 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

View File

@@ -1,143 +0,0 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import numpy as np
import cvxpy as cp
import pandas as pd
from typing import Union
from qlib.portfolio.optimizer import BaseOptimizer
class EnhancedIndexingOptimizer(BaseOptimizer):
"""
Portfolio Optimizer with Enhanced Indexing
Note:
This optimizer always assumes full investment and no-shorting.
"""
START_FROM_W0 = "w0"
START_FROM_BENCH = "benchmark"
def __init__(
self,
lamb: float = 10,
delta: float = 0.4,
bench_dev: float = 0.01,
inds_dev: float = None,
scale_alpha: bool = True,
verbose: bool = False,
warm_start: str = None,
max_iters: int = 10000,
):
"""
Args:
lamb (float): risk aversion parameter (larger `lamb` means less focus on return)
delta (float): turnover rate limit
bench_dev (float): benchmark deviation limit
inds_dev (float/None): industry deviation limit, set `inds_dev` to None to ignore industry specific
restriction
scale_alpha (bool): if to scale alpha to match the volatility of the covariance matrix
verbose (bool): if print detailed information about the solver
warm_start (str): whether try to warm start (`w0`/`benchmark`/``)
(https://www.cvxpy.org/tutorial/advanced/index.html#warm-start)
"""
assert lamb >= 0, "risk aversion parameter `lamb` should be positive"
self.lamb = lamb
assert delta >= 0, "turnover limit `delta` should be positive"
self.delta = delta
assert bench_dev >= 0, "benchmark deviation limit `bench_dev` should be positive"
self.bench_dev = bench_dev
assert inds_dev is None or inds_dev >= 0, "industry deviation limit `inds_dev` should be positive or None."
self.inds_dev = inds_dev
assert warm_start in [
None,
self.START_FROM_W0,
self.START_FROM_BENCH,
], "illegal warm start option"
self.start_from_w0 = warm_start == self.START_FROM_W0
self.start_from_bench = warm_start == self.START_FROM_BENCH
self.scale_alpha = scale_alpha
self.verbose = verbose
self.max_iters = max_iters
def __call__(
self,
u: Union[np.ndarray, pd.Series],
F: np.ndarray,
covB: np.ndarray,
varU: np.ndarray,
w0: np.ndarray,
w_bench: np.ndarray,
inds_onehot: np.ndarray = None,
) -> Union[np.ndarray, pd.Series]:
"""
Args:
u (np.ndarray or pd.Series): expected returns (a.k.a., alpha)
F, covB, varU (np.ndarray): see StructuredCovEstimator
w0 (np.ndarray): initial weights (for turnover control)
w_bench (np.ndarray): benchmark weights
inds_onehot (np.ndarray): industry (onehot)
Returns:
np.ndarray or pd.Series: optimized portfolio allocation
"""
assert inds_onehot is not None or self.inds_dev is None, "Industry onehot vector is required."
# transform dataframe into array
if isinstance(u, pd.Series):
u = u.values
# scale alpha to match volatility
if self.scale_alpha:
u = u / u.std()
x_variance = np.mean(np.diag(F @ covB @ F.T) + varU)
u *= x_variance ** 0.5
w = cp.Variable(len(u)) # num_assets
v = w @ F # num_factors
ret = w @ u
risk = cp.quad_form(v, covB) + cp.sum(cp.multiply(varU, w ** 2))
obj = cp.Maximize(ret - self.lamb * risk)
d_bench = w - w_bench
cons = [
w >= 0,
cp.sum(w) == 1,
d_bench >= -self.bench_dev,
d_bench <= self.bench_dev,
]
if self.inds_dev is not None:
d_inds = d_bench @ inds_onehot
cons.append(d_inds >= -self.inds_dev)
cons.append(d_inds <= self.inds_dev)
if w0 is not None:
turnover = cp.sum(cp.abs(w - w0))
cons.append(turnover <= self.delta)
warm_start = False
if self.start_from_w0:
if w0 is None:
print("Warning: try warm start with w0, but w0 is `None`.")
else:
w.value = w0
warm_start = True
elif self.start_from_bench:
w.value = w_bench
warm_start = True
prob = cp.Problem(obj, cons)
prob.solve(solver=cp.SCS, verbose=self.verbose, warm_start=warm_start, max_iters=self.max_iters)
if prob.status != "optimal":
print("Warning: solve failed.", prob.status)
return np.asarray(w.value)

View File

@@ -877,7 +877,7 @@ def register_wrapper(wrapper, cls_or_obj, module_path=None):
wrapper.register(obj)
def load_dataset(path_or_obj):
def load_dataset(path_or_obj, index_col=[0, 1]):
"""load dataset from multiple file formats"""
if isinstance(path_or_obj, pd.DataFrame):
return path_or_obj
@@ -889,7 +889,7 @@ def load_dataset(path_or_obj):
elif extension == ".pkl":
return pd.read_pickle(path_or_obj)
elif extension == ".csv":
return pd.read_csv(path_or_obj, parse_dates=True, index_col=[0, 1])
return pd.read_csv(path_or_obj, parse_dates=True, index_col=index_col)
raise ValueError(f"unsupported file type `{extension}`")