1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-05 03:50:57 +08:00

Fix backtest (#719)

* modify FileStorage to support multiple freqs

* modify backtest's sample documentation

* change the logging level of read data exception from error to debug

* fix the backtest exception when volume is 0 or np.nan

* fix test_storage.py

* add backtest_daily

* modify backtest_daily's docstring

* add __repr__/__str__ to Position

* fix the bug of nested_decision_execution example

Co-authored-by: Young <afe.young@gmail.com>
Co-authored-by: you-n-g <you-n-g@users.noreply.github.com>
This commit is contained in:
Pengrong Zhu
2021-12-07 19:04:23 +08:00
committed by GitHub
parent 84103c7d43
commit c276de4040
19 changed files with 663 additions and 232 deletions

View File

@@ -84,31 +84,125 @@ Usage & Example
====================
``Portfolio Strategy`` can be specified in the ``Intraday Trading(Backtest)``, the example is as follows.
.. code-block:: python
- daily
from qlib.contrib.strategy.strategy import TopkDropoutStrategy
from qlib.contrib.evaluate import backtest
STRATEGY_CONFIG = {
"topk": 50,
"n_drop": 5,
}
BACKTEST_CONFIG = {
"limit_threshold": 0.095,
"account": 100000000,
"benchmark": BENCHMARK,
"deal_price": "close",
"open_cost": 0.0005,
"close_cost": 0.0015,
"min_cost": 5,
}
# use default strategy
strategy = TopkDropoutStrategy(**STRATEGY_CONFIG)
.. code-block:: python
from pprint import pprint
import qlib
import pandas as pd
from qlib.utils.time import Freq
from qlib.utils import flatten_dict
from qlib.contrib.evaluate import backtest_daily
from qlib.contrib.evaluate import risk_analysis
from qlib.contrib.strategy import TopkDropoutStrategy
# init qlib
qlib.init(provider_uri=<qlib data dir>)
CSI300_BENCH = "SH000300"
STRATEGY_CONFIG = {
"topk": 50,
"n_drop": 5,
# pred_score, pd.Series
"signal": pred_score,
}
strategy_obj = TopkDropoutStrategy(**STRATEGY_CONFIG)
report_normal, positions_normal = backtest_daily(
start_time="2017-01-01", end_time="2020-08-01", strategy=strategy_obj
)
analysis = dict()
analysis["excess_return_without_cost"] = risk_analysis(
report_normal["return"] - report_normal["bench"], freq=analysis_freq
)
analysis["excess_return_with_cost"] = risk_analysis(
report_normal["return"] - report_normal["bench"] - report_normal["cost"], freq=analysis_freq
)
analysis_df = pd.concat(analysis) # type: pd.DataFrame
pprint(analysis_df)
- nested decision execution
.. code-block:: python
from pprint import pprint
import qlib
import pandas as pd
from qlib.utils.time import Freq
from qlib.utils import flatten_dict
from qlib.backtest import backtest, executor
from qlib.contrib.evaluate import risk_analysis
from qlib.contrib.strategy import TopkDropoutStrategy
# init qlib
qlib.init(provider_uri=<qlib data dir>)
CSI300_BENCH = "SH000300"
FREQ = "day"
STRATEGY_CONFIG = {
"topk": 50,
"n_drop": 5,
# pred_score, pd.Series
"signal": pred_score,
}
EXECUTOR_CONFIG = {
"time_per_step": "day",
"generate_portfolio_metrics": True,
}
backtest_config = {
"start_time": "2017-01-01",
"end_time": "2020-08-01",
"account": 100000000,
"benchmark": CSI300_BENCH,
"exchange_kwargs": {
"freq": FREQ,
"limit_threshold": 0.095,
"deal_price": "close",
"open_cost": 0.0005,
"close_cost": 0.0015,
"min_cost": 5,
},
}
# strategy object
strategy_obj = TopkDropoutStrategy(**STRATEGY_CONFIG)
# executor object
executor_obj = executor.SimulatorExecutor(**EXECUTOR_CONFIG)
# backtest
portfolio_metric_dict, indicator_dict = backtest(executor=executor_obj, strategy=strategy_obj, **backtest_config)
analysis_freq = "{0}{1}".format(*Freq.parse(FREQ))
# backtest info
report_normal, positions_normal = portfolio_metric_dict.get(analysis_freq)
# analysis
analysis = dict()
analysis["excess_return_without_cost"] = risk_analysis(
report_normal["return"] - report_normal["bench"], freq=analysis_freq
)
analysis["excess_return_with_cost"] = risk_analysis(
report_normal["return"] - report_normal["bench"] - report_normal["cost"], freq=analysis_freq
)
analysis_df = pd.concat(analysis) # type: pd.DataFrame
# log metrics
analysis_dict = flatten_dict(analysis_df["risk"].unstack().T.to_dict())
# print out results
pprint(f"The following are analysis results of benchmark return({analysis_freq}).")
pprint(risk_analysis(report_normal["bench"], freq=analysis_freq))
pprint(f"The following are analysis results of the excess return without cost({analysis_freq}).")
pprint(analysis["excess_return_without_cost"])
pprint(f"The following are analysis results of the excess return with cost({analysis_freq}).")
pprint(analysis["excess_return_with_cost"])
# pred_score is the `prediction score` output by Model
report_normal, positions_normal = backtest(
pred_score, strategy=strategy, **BACKTEST_CONFIG
)
To know more about the `prediction score` `pred_score` output by ``Forecast Model``, please refer to `Forecast Model: Model Training & Prediction <model.html>`_.

View File

@@ -1,9 +1,105 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""
The expect result of `backtest` is following in current version
'The following are analysis results of benchmark return(1day).'
risk
mean 0.000651
std 0.012472
annualized_return 0.154967
information_ratio 0.805422
max_drawdown -0.160445
'The following are analysis results of the excess return without cost(1day).'
risk
mean 0.001258
std 0.007575
annualized_return 0.299303
information_ratio 2.561219
max_drawdown -0.068386
'The following are analysis results of the excess return with cost(1day).'
risk
mean 0.001110
std 0.007575
annualized_return 0.264280
information_ratio 2.261392
max_drawdown -0.071842
[1706497:MainThread](2021-12-07 14:08:30,263) INFO - qlib.workflow - [record_temp.py:441] - Portfolio analysis record 'port_analysis_30minute.
pkl' has been saved as the artifact of the Experiment 2
'The following are analysis results of benchmark return(30minute).'
risk
mean 0.000078
std 0.003646
annualized_return 0.148787
information_ratio 0.935252
max_drawdown -0.142830
('The following are analysis results of the excess return without '
'cost(30minute).')
risk
mean 0.000174
std 0.003343
annualized_return 0.331867
information_ratio 2.275019
max_drawdown -0.074752
'The following are analysis results of the excess return with cost(30minute).'
risk
mean 0.000155
std 0.003343
annualized_return 0.294536
information_ratio 2.018860
max_drawdown -0.075579
[1706497:MainThread](2021-12-07 14:08:30,277) INFO - qlib.workflow - [record_temp.py:441] - Portfolio analysis record 'port_analysis_5minute.p
kl' has been saved as the artifact of the Experiment 2
'The following are analysis results of benchmark return(5minute).'
risk
mean 0.000015
std 0.001460
annualized_return 0.172170
information_ratio 1.103439
max_drawdown -0.144807
'The following are analysis results of the excess return without cost(5minute).'
risk
mean 0.000028
std 0.001412
annualized_return 0.319771
information_ratio 2.119563
max_drawdown -0.077426
'The following are analysis results of the excess return with cost(5minute).'
risk
mean 0.000025
std 0.001412
annualized_return 0.281536
information_ratio 1.866091
max_drawdown -0.078194
[1706497:MainThread](2021-12-07 14:08:30,287) INFO - qlib.workflow - [record_temp.py:466] - Indicator analysis record 'indicator_analysis_1day
.pkl' has been saved as the artifact of the Experiment 2
'The following are analysis results of indicators(1day).'
value
ffr 0.945821
pa 0.000324
pos 0.542882
[1706497:MainThread](2021-12-07 14:08:30,293) INFO - qlib.workflow - [record_temp.py:466] - Indicator analysis record 'indicator_analysis_30mi
nute.pkl' has been saved as the artifact of the Experiment 2
'The following are analysis results of indicators(30minute).'
value
ffr 0.982910
pa 0.000037
pos 0.500806
[1706497:MainThread](2021-12-07 14:08:30,302) INFO - qlib.workflow - [record_temp.py:466] - Indicator analysis record 'indicator_analysis_5min
ute.pkl' has been saved as the artifact of the Experiment 2
'The following are analysis results of indicators(5minute).'
value
ffr 0.991017
pa 0.000000
pos 0.000000
[1706497:MainThread](2021-12-07 14:08:30,627) INFO - qlib.timer - [log.py:113] - Time cost: 0.014s | waiting `async_log` Done
"""
from copy import deepcopy
import qlib
import fire
import pandas as pd
from qlib.config import REG_CN, HIGH_FREQ_CONFIG
from qlib.data import D
from qlib.utils import exists_qlib_data, init_instance_by_config, flatten_dict
@@ -14,6 +110,13 @@ from qlib.backtest import collect_data
class NestedDecisionExecutionWorkflow:
# TODO: add test for nested workflow.
# 1) comparing same backtest
# - Basic test idea: the shared accumulated value are equal in multiple levels
# - Aligning the profit calculation between multiple levels and single levels.
# 2) comparing different backtest
# - Basic test idea:
# - the daily backtest will be similar as multi-level(the data quality makes this gap samller)
market = "csi300"
benchmark = "SH000300"
@@ -167,8 +270,6 @@ class NestedDecisionExecutionWorkflow:
par = PortAnaRecord(
recorder,
self.port_analysis_config,
risk_analysis_freq=["day", "30min", "5min"],
indicator_analysis_freq=["day", "30min", "5min"],
indicator_analysis_method="value_weighted",
)
par.generate()
@@ -199,6 +300,93 @@ class NestedDecisionExecutionWorkflow:
for trade_decision in data_generator:
print(trade_decision)
# the code below are for checking, users don't have to care about it
def check_diff_freq(self):
self._init_qlib()
exp = R.get_exp(experiment_name="backtest")
rec = next(iter(exp.list_recorders().values())) # assuming this will get the latest recorder
for check_key in "account", "total_turnover", "total_cost":
check_key = "total_cost"
acc_dict = {}
for freq in ["30minute", "5minute", "1day"]:
acc_dict[freq] = rec.load_object(f"portfolio_analysis/report_normal_{freq}.pkl")[check_key]
acc_df = pd.DataFrame(acc_dict)
acc_resam = acc_df.resample("1d").last().dropna()
assert (acc_resam["30minute"] == acc_resam["1day"]).all()
def backtest_only_daily(self):
"""
This backtest is used for comparing the nested execution and single layer execution
Due to the low quality daily-level and miniute-level data, they are hardly comparable.
So it is used for detecting serious bugs which make the results different greatly.
.. code-block:: shell
[1724971:MainThread](2021-12-07 16:24:31,156) INFO - qlib.workflow - [record_temp.py:441] - Portfolio analysis record 'port_analysis_1day.pkl'
has been saved as the artifact of the Experiment 2
'The following are analysis results of benchmark return(1day).'
risk
mean 0.000651
std 0.012472
annualized_return 0.154967
information_ratio 0.805422
max_drawdown -0.160445
'The following are analysis results of the excess return without cost(1day).'
risk
mean 0.001375
std 0.006103
annualized_return 0.327204
information_ratio 3.475016
max_drawdown -0.024927
'The following are analysis results of the excess return with cost(1day).'
risk
mean 0.001184
std 0.006091
annualized_return 0.281801
information_ratio 2.998749
max_drawdown -0.029568
[1724971:MainThread](2021-12-07 16:24:31,170) INFO - qlib.workflow - [record_temp.py:466] - Indicator analysis record 'indicator_analysis_1day.
pkl' has been saved as the artifact of the Experiment 2
'The following are analysis results of indicators(1day).'
value
ffr 1.0
pa 0.0
pos 0.0
[1724971:MainThread](2021-12-07 16:24:31,188) INFO - qlib.timer - [log.py:113] - Time cost: 0.007s | waiting `async_log` Done
"""
self._init_qlib()
model = init_instance_by_config(self.task["model"])
dataset = init_instance_by_config(self.task["dataset"])
self._train_model(model, dataset)
strategy_config = {
"class": "TopkDropoutStrategy",
"module_path": "qlib.contrib.strategy.signal_strategy",
"kwargs": {
"signal": (model, dataset),
"topk": 50,
"n_drop": 5,
},
}
pa_conf = deepcopy(self.port_analysis_config)
pa_conf["strategy"] = strategy_config
pa_conf["executor"] = {
"class": "SimulatorExecutor",
"module_path": "qlib.backtest.executor",
"kwargs": {
"time_per_step": "day",
"generate_portfolio_metrics": True,
"verbose": True,
},
}
pa_conf["backtest"]["benchmark"] = self.benchmark
with R.start(experiment_name="backtest"):
recorder = R.get_recorder()
par = PortAnaRecord(recorder, pa_conf)
par.generate()
if __name__ == "__main__":
fire.Fire(NestedDecisionExecutionWorkflow)

View File

@@ -186,8 +186,10 @@ def get_strategy_executor(
trade_exchange = get_exchange(**exchange_kwargs)
common_infra = CommonInfrastructure(trade_account=trade_account, trade_exchange=trade_exchange)
trade_strategy = init_instance_by_config(strategy, accept_types=BaseStrategy, common_infra=common_infra)
trade_executor = init_instance_by_config(executor, accept_types=BaseExecutor, common_infra=common_infra)
trade_strategy = init_instance_by_config(strategy, accept_types=BaseStrategy)
trade_strategy.reset_common_infra(common_infra)
trade_executor = init_instance_by_config(executor, accept_types=BaseExecutor)
trade_executor.reset_common_infra(common_infra)
return trade_strategy, trade_executor

View File

@@ -29,7 +29,10 @@ rtn & earning in the Account
class AccumulatedInfo:
"""accumulated trading info, including accumulated return/cost/turnover"""
"""
accumulated trading info, including accumulated return/cost/turnover
AccumulatedInfo should be shared accross different levels
"""
def __init__(self):
self.reset()
@@ -62,6 +65,11 @@ class AccumulatedInfo:
class Account:
"""
The correctness of the metrics of Account in nested execution depends on the shallow copy of `trade_account` in qlib/backtest/executor.py:NestedExecutor
Different level of executor has different Account object when calculating metrics. But the position object is shared cross all the Account object.
"""
def __init__(
self,
init_cash: float = 1e9,
@@ -95,6 +103,8 @@ class Account:
self.init_vars(init_cash, position_dict, freq, benchmark_config)
def init_vars(self, init_cash, position_dict, freq: str, benchmark_config: dict):
# 1) the following variables are shared by multiple layers
# - you will see a shallow copy instead of deepcopy in the NestedExecutor;
self.init_cash = init_cash
self.current_position: BasePosition = init_instance_by_config(
{
@@ -106,6 +116,9 @@ class Account:
"module_path": "qlib.backtest.position",
}
)
self.accum_info = AccumulatedInfo()
# 2) following variables are not shared between layers
self.portfolio_metrics = None
self.hist_positions = {}
self.reset(freq=freq, benchmark_config=benchmark_config)
@@ -119,7 +132,8 @@ class Account:
def reset_report(self, freq, benchmark_config):
# portfolio related metrics
if self.is_port_metr_enabled():
self.accum_info = AccumulatedInfo()
# NOTE:
# `accum_info` and `current_position` are shared here
self.portfolio_metrics = PortfolioMetrics(freq, benchmark_config)
self.hist_positions = {}

View File

@@ -231,7 +231,7 @@ class Exchange:
self.extra_quote["limit_buy"] = False
self.logger.warning("No limit_buy set for extra_quote. All stock will be able to be bought.")
assert set(self.extra_quote.columns) == set(self.quote_df.columns) - {"$change"}
self.quote_df = pd.concat([self.quote_df, extra_quote], sort=False, axis=0)
self.quote_df = pd.concat([self.quote_df, self.extra_quote], sort=False, axis=0)
LT_TP_EXP = "(exp)" # Tuple[str, str]
LT_FLT = "float" # float
@@ -736,7 +736,11 @@ class Exchange:
# TODO: the adjusted cost ratio can be overestimated as deal_amount will be clipped in the next steps
trade_val = order.deal_amount * trade_price
adj_cost_ratio = self.impact_cost * (trade_val / total_trade_val) ** 2
if not total_trade_val or np.isnan(total_trade_val):
# TODO: assert trade_val == 0, f"trade_val != 0, total_trade_val: {total_trade_val}; order info: {order}"
adj_cost_ratio = self.impact_cost
else:
adj_cost_ratio = self.impact_cost * (trade_val / total_trade_val) ** 2
if order.direction == Order.SELL:
cost_ratio = self.close_cost + adj_cost_ratio

View File

@@ -130,7 +130,7 @@ class BaseExecutor:
if common_infra.has("trade_account"):
# NOTE: there is a trick in the code.
# copy is used instead of deepcopy. So positions are shared
# shallow copy is used instead of deepcopy. So positions are shared
self.trade_account: Account = copy.copy(common_infra.get("trade_account"))
self.trade_account.reset(freq=self.time_per_step, port_metr_enabled=self.generate_portfolio_metrics)

View File

@@ -223,6 +223,12 @@ class BasePosition:
"""
raise NotImplementedError(f"Please implement the `settle_commit` method")
def __str__(self):
return self.__dict__.__str__()
def __repr__(self):
return self.__dict__.__repr__()
class Position(BasePosition):
"""Position

View File

@@ -70,7 +70,7 @@ class TradeCalendarManager:
- If self.trade_step >= self.self.trade_len, it means the trading is finished
- If self.trade_step < self.self.trade_len, it means the number of trading step finished is self.trade_step
"""
return self.trade_step >= self.trade_len - 1
return self.trade_step >= self.trade_len
def step(self):
if self.finished():
@@ -222,7 +222,7 @@ class CommonInfrastructure(BaseInfrastructure):
class LevelInfrastructure(BaseInfrastructure):
"""level instrastructure is created by executor, and then shared to strategies on the same level"""
"""level infrastructure is created by executor, and then shared to strategies on the same level"""
def get_support_infra(self):
"""

View File

@@ -176,8 +176,6 @@ _default_config = {
# if min_data_shift == 0, use default market time [9:30, 11:29, 1:00, 2:59]
# if min_data_shift != 0, use shifted market time [9:30, 11:29, 1:00, 2:59] - shift*minute
"min_data_shift": 0,
# whether to display the ops warning log, default False
"ops_warning_log": False,
}
MODE_CONF = {
@@ -246,8 +244,8 @@ HIGH_FREQ_CONFIG = {
_default_region_config = {
REG_CN: {
"trade_unit": 100,
"limit_threshold": 0.099,
"deal_price": "vwap",
"limit_threshold": 0.095,
"deal_price": "close",
},
REG_US: {
"trade_unit": 1,
@@ -272,6 +270,20 @@ class QlibConfig(Config):
self.provider_uri = provider_uri
self.mount_path = mount_path
@staticmethod
def format_provider_uri(provider_uri: Union[str, dict, Path]) -> dict:
if provider_uri is None:
raise ValueError("provider_uri cannot be None")
if isinstance(provider_uri, (str, dict, Path)):
if not isinstance(provider_uri, dict):
provider_uri = {QlibConfig.DEFAULT_FREQ: provider_uri}
else:
raise TypeError(f"provider_uri does not support {type(provider_uri)}")
for freq, _uri in provider_uri.items():
if QlibConfig.DataPathManager.get_uri_type(_uri) == QlibConfig.LOCAL_URI:
provider_uri[freq] = str(Path(_uri).expanduser().resolve())
return provider_uri
@staticmethod
def get_uri_type(uri: Union[str, Path]):
uri = uri if isinstance(uri, str) else str(uri.expanduser().resolve())
@@ -318,11 +330,7 @@ class QlibConfig(Config):
def resolve_path(self):
# resolve path
_mount_path = self["mount_path"]
_provider_uri = self["provider_uri"]
if _provider_uri is None:
raise ValueError("provider_uri cannot be None")
if not isinstance(_provider_uri, dict):
_provider_uri = {self.DEFAULT_FREQ: _provider_uri}
_provider_uri = self.DataPathManager.format_provider_uri(self["provider_uri"])
if not isinstance(_mount_path, dict):
_mount_path = {_freq: _mount_path for _freq in _provider_uri.keys()}
@@ -331,10 +339,7 @@ class QlibConfig(Config):
assert len(_miss_freq) == 0, f"mount_path is missing freq: {_miss_freq}"
# resolve
for _freq, _uri in _provider_uri.items():
# provider_uri
if self.DataPathManager.get_uri_type(_uri) == QlibConfig.LOCAL_URI:
_provider_uri[_freq] = str(Path(_uri).expanduser().resolve())
for _freq in _provider_uri.keys():
# mount_path
_mount_path[_freq] = (
_mount_path[_freq]
@@ -344,20 +349,6 @@ class QlibConfig(Config):
self["provider_uri"] = _provider_uri
self["mount_path"] = _mount_path
def get_uri_type(self):
path = self["provider_uri"]
if isinstance(path, Path):
path = str(path)
is_win = re.match("^[a-zA-Z]:.*", path) is not None # such as 'C:\\data', 'D:'
is_nfs_or_win = (
re.match("^[^/]+:.+", path) is not None
) # such as 'host:/data/' (User may define short hostname by themselves or use localhost)
if is_nfs_or_win and not is_win:
return QlibConfig.NFS_URI
else:
return QlibConfig.LOCAL_URI
def set(self, default_conf: str = "client", **kwargs):
"""
configure qlib based on the input parameters

View File

@@ -3,15 +3,18 @@
from __future__ import division
from __future__ import print_function
from logging import warn
import numpy as np
import pandas as pd
import warnings
from typing import Union
from ..log import get_module_logger
from ..backtest import get_exchange, backtest as backtest_func
from ..utils import get_date_range
from ..utils.resam import Freq
from ..strategy.base import BaseStrategy
from ..backtest import get_exchange, position, backtest as backtest_func, executor as _executor
from ..data import D
from ..config import C
@@ -117,84 +120,129 @@ def indicator_analysis(df, method="mean"):
# This is the API for compatibility for legacy code
def backtest(pred, account=1e9, shift=1, benchmark="SH000905", verbose=True, **kwargs):
"""This function will help you set a reasonable Exchange and provide default value for strategy
def backtest_daily(
start_time: Union[str, pd.Timestamp],
end_time: Union[str, pd.Timestamp],
strategy: Union[str, dict, BaseStrategy],
executor: Union[str, dict, _executor.BaseExecutor] = None,
account: Union[float, int, position.Position] = 1e8,
benchmark: str = "SH000300",
exchange_kwargs: dict = None,
pos_type: str = "Position",
):
"""initialize the strategy and executor, then executor the backtest of daily frequency
Parameters
----------
start_time : Union[str, pd.Timestamp]
closed start time for backtest
**NOTE**: This will be applied to the outmost executor's calendar.
end_time : Union[str, pd.Timestamp]
closed end time for backtest
**NOTE**: This will be applied to the outmost executor's calendar.
E.g. Executor[day](Executor[1min]), setting `end_time == 20XX0301` will include all the minutes on 20XX0301
strategy : Union[str, dict, BaseStrategy]
for initializing outermost portfolio strategy. Please refer to the docs of init_instance_by_config for more information.
- **backtest workflow related or commmon arguments**
E.g.
pred : pandas.DataFrame
predict should has <datetime, instrument> index and one `score` column.
account : float
init account value.
shift : int
whether to shift prediction by one day.
benchmark : str
benchmark code, default is SH000905 CSI 500.
verbose : bool
whether to print log.
.. code-block:: python
# dict
strategy = {
"class": "TopkDropoutStrategy",
"module_path": "qlib.contrib.strategy.signal_strategy",
"kwargs": {
"signal": (model, dataset),
"topk": 50,
"n_drop": 5,
},
}
# BaseStrategy
pred_score = pd.read_pickle("score.pkl")["score"]
STRATEGY_CONFIG = {
"topk": 50,
"n_drop": 5,
"signal": pred_score,
}
strategy = TopkDropoutStrategy(**STRATEGY_CONFIG)
# str example.
# 1) specify a pickle object
# - path like 'file:///<path to pickle file>/obj.pkl'
# 2) specify a class name
# - "ClassName": getattr(module, "ClassName")() will be used.
# 3) specify module path with class name
# - "a.b.c.ClassName" getattr(<a.b.c.module>, "ClassName")() will be used.
- **strategy related arguments**
strategy : Strategy()
strategy used in backtest.
topk : int (Default value: 50)
top-N stocks to buy.
margin : int or float(Default value: 0.5)
- if isinstance(margin, int):
executor : Union[str, dict, BaseExecutor]
for initializing the outermost executor.
benchmark: str
the benchmark for reporting.
account : Union[float, int, Position]
information for describing how to creating the account
For `float` or `int`:
Using Account with only initial cash
For `Position`:
Using Account with a Position
exchange_kwargs : dict
the kwargs for initializing Exchange
E.g.
sell_limit = margin
.. code-block:: python
- else:
exchange_kwargs = {
"freq": freq,
"limit_threshold": None, # limit_threshold is None, using C.limit_threshold
"deal_price": None, # deal_price is None, using C.deal_price
"open_cost": 0.0005,
"close_cost": 0.0015,
"min_cost": 5,
}
sell_limit = pred_in_a_day.count() * margin
pos_type : str
the type of Position.
buffer margin, in single score_mode, continue holding stock if it is in nlargest(sell_limit).
sell_limit should be no less than topk.
n_drop : int
number of stocks to be replaced in each trading date.
risk_degree: float
0-1, 0.95 for example, use 95% money to trade.
str_type: 'amount', 'weight' or 'dropout'
strategy type: TopkAmountStrategy ,TopkWeightStrategy or TopkDropoutStrategy.
- **exchange related arguments**
exchange: Exchange()
pass the exchange for speeding up.
subscribe_fields: list
subscribe fields.
open_cost : float
open transaction cost. The default value is 0.002(0.2%).
close_cost : float
close transaction cost. The default value is 0.002(0.2%).
min_cost : float
min transaction cost.
trade_unit : int
100 for China A.
deal_price: str
dealing price type: 'close', 'open', 'vwap'.
limit_threshold : float
limit move 0.1 (10%) for example, long and short with same limit.
extract_codes: bool
will we pass the codes extracted from the pred to the exchange.
.. note:: This will be faster with offline qlib.
- **executor related arguments**
executor : BaseExecutor()
executor used in backtest.
verbose : bool
whether to print log.
Returns
-------
report_normal: pd.DataFrame
backtest report
positions_normal: pd.DataFrame
backtest positions
"""
warnings.warn("this function is deprecated, please use backtest function in qlib.backtest", DeprecationWarning)
report_dict = backtest_func(
pred=pred, account=account, shift=shift, benchmark=benchmark, verbose=verbose, return_order=False, **kwargs
freq = "day"
if executor is None:
executor_config = {
"time_per_step": freq,
"generate_portfolio_metrics": True,
}
executor = _executor.SimulatorExecutor(**executor_config)
_exchange_kwargs = {
"freq": freq,
"limit_threshold": None,
"deal_price": None,
"open_cost": 0.0005,
"close_cost": 0.0015,
"min_cost": 5,
}
if exchange_kwargs is not None:
_exchange_kwargs.update(exchange_kwargs)
portfolio_metric_dict, indicator_dict = backtest_func(
start_time=start_time,
end_time=end_time,
strategy=strategy,
executor=executor,
account=account,
benchmark=benchmark,
exchange_kwargs=_exchange_kwargs,
pos_type=pos_type,
)
return report_dict.get("report_df"), report_dict.get("positions")
analysis_freq = "{0}{1}".format(*Freq.parse(freq))
report_normal, positions_normal = portfolio_metric_dict.get(analysis_freq)
return report_normal, positions_normal
def long_short_backtest(
@@ -327,7 +375,12 @@ def t_run():
pred["datetime"] = pd.to_datetime(pred["datetime"])
pred = pred.set_index([pred.columns[0], pred.columns[1]])
pred = pred.iloc[:9000]
report_df, positions = backtest(pred=pred)
strategy_config = {
"topk": 50,
"n_drop": 5,
"signal": pred,
}
report_df, positions = backtest_daily(start_time="2017-01-01", end_time="2020-08-01", strategy=strategy_config)
print(report_df.head())
print(positions.keys())
print(positions[list(positions.keys())[0]])

View File

@@ -171,20 +171,55 @@ def report_graph(report_df: pd.DataFrame, show_notebook: bool = True) -> [list,
.. code-block:: python
from qlib.contrib.evaluate import backtest
import qlib
import pandas as pd
from qlib.utils.time import Freq
from qlib.utils import flatten_dict
from qlib.backtest import backtest, executor
from qlib.contrib.evaluate import risk_analysis
from qlib.contrib.strategy import TopkDropoutStrategy
# backtest parameters
bparas = {}
bparas['limit_threshold'] = 0.095
bparas['account'] = 1000000000
# init qlib
qlib.init(provider_uri=<qlib data dir>)
sparas = {}
sparas['topk'] = 50
sparas['n_drop'] = 230
strategy = TopkDropoutStrategy(**sparas)
CSI300_BENCH = "SH000300"
FREQ = "day"
STRATEGY_CONFIG = {
"topk": 50,
"n_drop": 5,
# pred_score, pd.Series
"signal": pred_score,
}
report_normal_df, _ = backtest(pred_df, strategy, **bparas)
EXECUTOR_CONFIG = {
"time_per_step": "day",
"generate_portfolio_metrics": True,
}
backtest_config = {
"start_time": "2017-01-01",
"end_time": "2020-08-01",
"account": 100000000,
"benchmark": CSI300_BENCH,
"exchange_kwargs": {
"freq": FREQ,
"limit_threshold": 0.095,
"deal_price": "close",
"open_cost": 0.0005,
"close_cost": 0.0015,
"min_cost": 5,
},
}
# strategy object
strategy_obj = TopkDropoutStrategy(**STRATEGY_CONFIG)
# executor object
executor_obj = executor.SimulatorExecutor(**EXECUTOR_CONFIG)
# backtest
portfolio_metric_dict, indicator_dict = backtest(executor=executor_obj, strategy=strategy_obj, **backtest_config)
analysis_freq = "{0}{1}".format(*Freq.parse(FREQ))
# backtest info
report_normal_df, positions_normal = portfolio_metric_dict.get(analysis_freq)
qcr.analysis_position.report_graph(report_normal_df)

View File

@@ -170,32 +170,64 @@ def risk_analysis_graph(
.. code-block:: python
from qlib.contrib.evaluate import risk_analysis, backtest, long_short_backtest
import qlib
import pandas as pd
from qlib.utils.time import Freq
from qlib.utils import flatten_dict
from qlib.backtest import backtest, executor
from qlib.contrib.evaluate import risk_analysis
from qlib.contrib.strategy import TopkDropoutStrategy
from qlib.contrib.report import analysis_position
# backtest parameters
bparas = {}
bparas['limit_threshold'] = 0.095
bparas['account'] = 1000000000
# init qlib
qlib.init(provider_uri=<qlib data dir>)
sparas = {}
sparas['topk'] = 50
sparas['n_drop'] = 230
strategy = TopkDropoutStrategy(**sparas)
CSI300_BENCH = "SH000300"
FREQ = "day"
STRATEGY_CONFIG = {
"topk": 50,
"n_drop": 5,
# pred_score, pd.Series
"signal": pred_score,
}
report_normal_df, positions = backtest(pred_df, strategy, **bparas)
# long_short_map = long_short_backtest(pred_df)
# report_long_short_df = pd.DataFrame(long_short_map)
EXECUTOR_CONFIG = {
"time_per_step": "day",
"generate_portfolio_metrics": True,
}
backtest_config = {
"start_time": "2017-01-01",
"end_time": "2020-08-01",
"account": 100000000,
"benchmark": CSI300_BENCH,
"exchange_kwargs": {
"freq": FREQ,
"limit_threshold": 0.095,
"deal_price": "close",
"open_cost": 0.0005,
"close_cost": 0.0015,
"min_cost": 5,
},
}
# strategy object
strategy_obj = TopkDropoutStrategy(**STRATEGY_CONFIG)
# executor object
executor_obj = executor.SimulatorExecutor(**EXECUTOR_CONFIG)
# backtest
portfolio_metric_dict, indicator_dict = backtest(executor=executor_obj, strategy=strategy_obj, **backtest_config)
analysis_freq = "{0}{1}".format(*Freq.parse(FREQ))
# backtest info
report_normal_df, positions_normal = portfolio_metric_dict.get(analysis_freq)
analysis = dict()
# analysis['pred_long'] = risk_analysis(report_long_short_df['long'])
# analysis['pred_short'] = risk_analysis(report_long_short_df['short'])
# analysis['pred_long_short'] = risk_analysis(report_long_short_df['long_short'])
analysis['excess_return_without_cost'] = risk_analysis(report_normal_df['return'] - report_normal_df['bench'])
analysis['excess_return_with_cost'] = risk_analysis(report_normal_df['return'] - report_normal_df['bench'] - report_normal_df['cost'])
analysis_df = pd.concat(analysis)
analysis["excess_return_without_cost"] = risk_analysis(
report_normal_df["return"] - report_normal_df["bench"], freq=analysis_freq
)
analysis["excess_return_with_cost"] = risk_analysis(
report_normal_df["return"] - report_normal_df["bench"] - report_normal_df["cost"], freq=analysis_freq
)
analysis_df = pd.concat(analysis) # type: pd.DataFrame
analysis_position.risk_analysis_graph(analysis_df, report_normal_df)

View File

@@ -155,7 +155,7 @@ class Expression(abc.ABC):
try:
series = self._load_internal(instrument, start_index, end_index, freq)
except Exception as e:
get_module_logger("data").error(
get_module_logger("data").debug(
f"Loading data error: instrument={instrument}, expression={str(self)}, "
f"start_index={start_index}, end_index={end_index}, freq={freq}. "
f"error info: {str(e)}"

View File

@@ -58,34 +58,13 @@ class ProviderBackendMixin:
backend = copy.deepcopy(backend)
# set default storage kwargs
# NOTE: provider_uri priority
# 1. backend_config: backend_obj["kwargs"]["provider_uri"]
# 2. qlib.init: provider_uri
backend_kwargs = backend.setdefault("kwargs", {})
# default provider_uri map
if "provider_uri" not in backend_kwargs:
# if the user has no uri configured, use: uri = uri_map[freq]
# NOTE: provider_uri priority
# 1. backend_config: backend_obj["kwargs"]["provider_uri"]
# 2. backend_config: backend_obj["kwargs"]["provider_uri_map"]
# 3. qlib.init: provider_uri
provider_uri_map = backend_kwargs.setdefault("provider_uri_map", {})
freq = kwargs.get("freq", "day")
if freq not in provider_uri_map:
# NOTE: uri
# 1. If `freq` in C.dpm.provider_uri.keys(), uri = C.dpm.provider_uri[freq]
# 2. If `freq` not in C.dpm.provider_uri.keys()
# - Get the `min_freq` closest to `freq` from C.dpm.provider_uri.keys(), uri = C.dpm.provider_uri[min_freq]
# NOTE: In Storage, only CalendarStorage is supported
# 1. If `uri` does not exist
# - Get the `min_uri` of the closest `freq` under the same "directory" as the `uri`
# - Read data from `min_uri` and resample to `freq`
try:
_uri = C.dpm.get_data_uri(freq)
except KeyError:
# provider_uri is dict and freq not in list(provider_uri.keys())
# use the nearest freq greater than 0
min_freq = Freq.get_recent_freq(freq, C.dpm.provider_uri.keys())
_uri = C.dpm.get_data_uri(freq) if min_freq is None else C.dpm.get_data_uri(min_freq)
provider_uri_map[freq] = _uri
backend_kwargs["provider_uri"] = provider_uri_map[freq]
provider_uri = backend_kwargs.get("provider_uri", None)
provider_uri = C.dpm.provider_uri if provider_uri is None else C.dpm.format_provider_uri(provider_uri)
backend_kwargs["provider_uri"] = provider_uri
backend.setdefault("kwargs", {}).update(**kwargs)
return init_instance_by_config(backend)
@@ -730,7 +709,7 @@ class LocalExpressionProvider(ExpressionProvider):
try:
series = expression.load(instrument, max(0, start_index - lft_etd), end_index + rght_etd, freq)
except Exception as e:
get_module_logger("data").error(
get_module_logger("data").debug(
f"Loading expression error: "
f"instrument={instrument}, field=({field}), start_time={start_time}, end_time={end_time}, freq={freq}. "
f"error info: {str(e)}"

View File

@@ -324,11 +324,11 @@ class NpPairOperator(PairOperator):
try:
res = getattr(np, self.func)(series_left, series_right)
except ValueError as e:
get_module_logger("ops").error(warning_info)
get_module_logger("ops").debug(warning_info)
raise ValueError(f"{str(e)}. \n\t{warning_info}")
else:
if check_length and len(series_left) != len(series_right) and C.ops_warning_log:
get_module_logger("ops").warning(warning_info)
if check_length and len(series_left) != len(series_right):
get_module_logger("ops").debug(warning_info)
return res

View File

@@ -10,23 +10,44 @@ import pandas as pd
from qlib.utils.time import Freq
from qlib.utils.resam import resam_calendar
from qlib.config import C
from qlib.log import get_module_logger
from qlib.data.storage import CalendarStorage, InstrumentStorage, FeatureStorage, CalVT, InstKT, InstVT
from qlib.data.cache import H
logger = get_module_logger("file_storage")
class FileStorageMixin:
"""FileStorageMixin, applicable to FileXXXStorage
Subclasses need to have provider_uri, freq, storage_name, file_name attributes
"""
@property
def dpm(self):
return C.DataPathManager(self.provider_uri, None)
@property
def support_freq(self) -> List[str]:
_v = "_support_freq"
if hasattr(self, _v):
return getattr(self, _v)
if len(self.provider_uri) == 1 and C.DEFAULT_FREQ in self.provider_uri:
freq = filter(
lambda _freq: not _freq.endswith("_future"),
map(lambda x: x.stem, self.dpm.get_data_uri(C.DEFAULT_FREQ).joinpath("calendars").glob("*.txt")),
)
else:
freq = self.provider_uri.keys()
freq = list(freq)
setattr(self, _v, freq)
return freq
@property
def uri(self) -> Path:
_provider_uri = self.kwargs.get("provider_uri", None)
if _provider_uri is None:
raise ValueError(
f"The `provider_uri` parameter is not found in {self.__class__.__name__}, "
f'please specify `provider_uri` in the "provider\'s backend"'
)
return Path(_provider_uri).expanduser().joinpath(f"{self.storage_name}s", self.file_name)
if self.freq not in self.support_freq:
raise ValueError(f"{self.storage_name}: {self.provider_uri} does not contain data for {self.freq}")
return self.dpm.get_data_uri(self.freq).joinpath(f"{self.storage_name}s", self.file_name)
def check(self):
"""check self.uri
@@ -40,10 +61,19 @@ class FileStorageMixin:
class FileCalendarStorage(FileStorageMixin, CalendarStorage):
def __init__(self, freq: str, future: bool, **kwargs):
def __init__(self, freq: str, future: bool, provider_uri: dict, **kwargs):
super(FileCalendarStorage, self).__init__(freq, future, **kwargs)
self.future = future
self.file_name = f"{freq}_future.txt" if future else f"{freq}.txt".lower()
self.provider_uri = C.DataPathManager.format_provider_uri(provider_uri)
self.resample_freq = None
@property
def file_name(self) -> str:
return f"{self.use_freq}_future.txt" if self.future else f"{self.use_freq}.txt".lower()
@property
def use_freq(self) -> str:
return self.freq if self.resample_freq is None else self.resample_freq
def _read_calendar(self, skip_rows: int = 0, n_rows: int = None) -> List[CalVT]:
if not self.uri.exists():
@@ -59,28 +89,26 @@ class FileCalendarStorage(FileStorageMixin, CalendarStorage):
np.savetxt(fp, values, fmt="%s", encoding="utf-8")
@property
def data(self) -> List[CalVT]:
# NOTE: uri
# 1. If `uri` does not exist
# - Get the `min_uri` of the closest `freq` under the same "directory" as the `uri`
# - Read data from `min_uri` and resample to `freq`
try:
self.check()
_calendar = self._read_calendar()
except ValueError:
freq_list = self._get_storage_freq()
_freq = Freq.get_recent_freq(self.freq, freq_list)
if _freq is None:
raise ValueError(f"can't find a freq from {freq_list} that can resample to {self.freq}!")
self.file_name = f"{_freq}_future.txt" if self.future else f"{_freq}.txt".lower()
# The cache is useful for the following cases
# - multiple frequencies are sampled from the same calendar
cache_key = self.uri
if cache_key not in H["c"]:
H["c"][cache_key] = self._read_calendar()
_calendar = H["c"][cache_key]
_calendar = resam_calendar(np.array(list(map(pd.Timestamp, _calendar))), _freq, self.freq)
def uri(self) -> Path:
freq = self.freq
if freq not in self.support_freq:
# NOTE: uri
# 1. If `uri` does not exist
# - Get the `min_uri` of the closest `freq` under the same "directory" as the `uri`
# - Read data from `min_uri` and resample to `freq`
freq = Freq.get_recent_freq(freq, self.support_freq)
if freq is None:
raise ValueError(f"can't find a freq from {self.support_freq} that can resample to {self.freq}!")
self.resample_freq = freq
return self.dpm.get_data_uri(self.use_freq).joinpath(f"{self.storage_name}s", self.file_name)
@property
def data(self) -> List[CalVT]:
self.check()
_calendar = self._read_calendar()
if self.resample_freq is not None:
_calendar = resam_calendar(np.array(list(map(pd.Timestamp, _calendar))), self.resample_freq, self.freq)
return _calendar
def _get_storage_freq(self) -> List[str]:
@@ -135,8 +163,9 @@ class FileInstrumentStorage(FileStorageMixin, InstrumentStorage):
INSTRUMENT_END_FIELD = "end_datetime"
SYMBOL_FIELD_NAME = "instrument"
def __init__(self, market: str, **kwargs):
super(FileInstrumentStorage, self).__init__(market, **kwargs)
def __init__(self, market: str, freq: str, provider_uri: dict, **kwargs):
super(FileInstrumentStorage, self).__init__(market, freq, **kwargs)
self.provider_uri = C.DataPathManager.format_provider_uri(provider_uri)
self.file_name = f"{market.lower()}.txt"
def _read_instrument(self) -> Dict[InstKT, InstVT]:
@@ -223,8 +252,9 @@ class FileInstrumentStorage(FileStorageMixin, InstrumentStorage):
class FileFeatureStorage(FileStorageMixin, FeatureStorage):
def __init__(self, instrument: str, field: str, freq: str, **kwargs):
def __init__(self, instrument: str, field: str, freq: str, provider_uri: dict, **kwargs):
super(FileFeatureStorage, self).__init__(instrument, field, freq, **kwargs)
self.provider_uri = C.DataPathManager.format_provider_uri(provider_uri)
self.file_name = f"{instrument.lower()}/{field.lower()}.{freq.lower()}.bin"
def clear(self):

View File

@@ -195,8 +195,9 @@ class CalendarStorage(BaseStorage):
class InstrumentStorage(BaseStorage):
def __init__(self, market: str, **kwargs):
def __init__(self, market: str, freq: str, **kwargs):
self.market = market
self.freq = freq
self.kwargs = kwargs
@property

View File

@@ -75,7 +75,7 @@ class TestStorage(TestAutoData):
"""
instrument = InstrumentStorage(market="csi300", provider_uri=self.provider_uri)
instrument = InstrumentStorage(market="csi300", provider_uri=self.provider_uri, freq="day")
for inst, spans in instrument.data.items():
assert isinstance(inst, str) and isinstance(
@@ -88,7 +88,7 @@ class TestStorage(TestAutoData):
print(f"instrument['SH600000']: {instrument['SH600000']}")
instrument = InstrumentStorage(market="csi300", provider_uri="not_found")
instrument = InstrumentStorage(market="csi300", provider_uri="not_found", freq="day")
with self.assertRaises(ValueError):
print(instrument.data)
@@ -163,8 +163,9 @@ class TestStorage(TestAutoData):
feature = FeatureStorage(instrument="SH600004", field="close", freq="day", provider_uri="not_fount")
assert feature[0] == (None, None), "FeatureStorage does not exist, feature[i] should return `(None, None)`"
assert feature[:].empty, "FeatureStorage does not exist, feature[:] should return `pd.Series(dtype=np.float32)`"
assert (
feature.data.empty
), "FeatureStorage does not exist, feature.data should return `pd.Series(dtype=np.float32)`"
with self.assertRaises(ValueError):
print(feature[0])
with self.assertRaises(ValueError):
print(feature[:].empty)
with self.assertRaises(ValueError):
print(feature.data.empty)

View File

@@ -201,6 +201,7 @@ class TestAllFlow(TestAutoData):
0.10,
"backtest failed",
)
self.assertTrue(not analyze_df.isna().any().any(), "backtest failed")
def test_3_expmanager(self):
pass_default, pass_current, uri_path = fake_experiment()