1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-04 11:30:57 +08:00

fix account bug & update indicator_analysis & fix some comments

This commit is contained in:
bxdd
2021-06-22 02:42:09 +08:00
parent 9e45528165
commit 4ac6e6e246
9 changed files with 524 additions and 302 deletions

View File

@@ -17,7 +17,6 @@ class NestedDecisonExecutionWorkflow:
market = "csi300"
benchmark = "SH000300"
data_handler_config = {
"start_time": "2008-01-01",
"end_time": "2021-05-28",
@@ -67,28 +66,19 @@ class NestedDecisonExecutionWorkflow:
"kwargs": {
"time_per_step": "week",
"inner_executor": {
"class": "NestedExecutor",
"class": "SimulatorExecutor",
"module_path": "qlib.backtest.executor",
"kwargs": {
"time_per_step": "day",
"inner_executor": {
"class": "SimulatorExecutor",
"module_path": "qlib.backtest.executor",
"kwargs": {
"time_per_step": "15min",
"generate_report": True,
"verbose": True,
},
"generate_report": True,
"verbose": True,
"indicator_config": {
"show_indicator": True,
},
"inner_strategy": {
"class": "TWAPStrategy",
"module_path": "qlib.contrib.strategy.rule_strategy",
},
"show_indicator": True,
},
},
"inner_strategy": {
"class": "VAStrategy",
"class": "SBBStrategyEMA",
"module_path": "qlib.contrib.strategy.rule_strategy",
"kwargs": {
"freq": "day",
@@ -96,7 +86,10 @@ class NestedDecisonExecutionWorkflow:
},
},
"track_data": True,
"show_indicator": True,
"generate_report": True,
"indicator_config": {
"show_indicator": True,
},
},
},
"backtest": {
@@ -105,7 +98,7 @@ class NestedDecisonExecutionWorkflow:
"account": 100000000,
"benchmark": benchmark,
"exchange_kwargs": {
"freq": "1min",
"freq": "day",
"limit_threshold": 0.095,
"deal_price": "close",
"open_cost": 0.0005,
@@ -124,7 +117,7 @@ class NestedDecisonExecutionWorkflow:
GetData().qlib_data(
target_dir=provider_uri_1min, interval="1min", region=REG_CN, version="v2", exists_skip=True
)
provider_uri_day = "/data/csdesign/qlib"
provider_uri_map = {"1min": provider_uri_1min, "day": provider_uri_day}
client_config = {
"calendar_provider": {
@@ -179,12 +172,25 @@ class NestedDecisonExecutionWorkflow:
},
}
self.port_analysis_config["strategy"] = strategy_config
self.port_analysis_config["backtest"]["benchmark"] = D.list_instruments(
instruments=D.instruments(market=self.market), as_list=True
)
with R.start(experiment_name="backtest"):
recorder = R.get_recorder()
par = PortAnaRecord(recorder, self.port_analysis_config, "15minute")
par = PortAnaRecord(
recorder,
self.port_analysis_config,
risk_analysis_freq=["week", "day"],
indicator_analysis_freq=["week", "day"],
indicator_analysis_method="value_weighted",
)
par.generate()
# report_normal_df = recorder.load_object("portfolio_analysis/report_normal_1day.pkl")
# from qlib.contrib.report import analysis_position
# analysis_position.report_graph(report_normal_df)
def collect_data(self):
self._init_qlib()
model = init_instance_by_config(self.task["model"])
@@ -192,6 +198,7 @@ class NestedDecisonExecutionWorkflow:
self._train_model(model, dataset)
executor_config = self.port_analysis_config["executor"]
backtest_config = self.port_analysis_config["backtest"]
backtest_config["benchmark"] = D.list_instruments(instruments=D.instruments(market=self.market), as_list=True)
strategy_config = {
"class": "TopkDropoutStrategy",
"module_path": "qlib.contrib.strategy.model_strategy",

View File

@@ -116,9 +116,9 @@ def backtest(start_time, end_time, strategy, executor, benchmark="SH000300", acc
trade_strategy, trade_executor = get_strategy_executor(
start_time, end_time, strategy, executor, benchmark, account, exchange_kwargs
)
report_dict = backtest_loop(start_time, end_time, trade_strategy, trade_executor)
report_dict, indicator_dict = backtest_loop(start_time, end_time, trade_strategy, trade_executor)
return report_dict
return report_dict, indicator_dict
def collect_data(start_time, end_time, strategy, executor, benchmark="SH000300", account=1e9, exchange_kwargs={}):
@@ -126,6 +126,4 @@ def collect_data(start_time, end_time, strategy, executor, benchmark="SH000300",
trade_strategy, trade_executor = get_strategy_executor(
start_time, end_time, strategy, executor, benchmark, account, exchange_kwargs
)
report_dict = yield from collect_data_loop(start_time, end_time, trade_strategy, trade_executor)
return report_dict
yield from collect_data_loop(start_time, end_time, trade_strategy, trade_executor)

View File

@@ -9,7 +9,7 @@ import pandas as pd
from .position import Position
from .report import Report, Indicator
from .order import Order
from .exchange import Exchange
"""
rtn & earning in the Account
@@ -25,10 +25,42 @@ rtn & earning in the Account
while earning is the difference of two position value, so it considers cost, it is the true return rate
in the specific accomplishment for rtn, it does not consider cost, in other words, rtn - cost = earning
Now rtn has been removed in the hierarchical backtest implemention.
"""
class AccumulatedInfo:
"""accumulated trading info, including accumulated return\cost\turnover"""
def __init__(self):
self.reset()
def reset(self):
self.rtn = 0 # accumulated return, do not consider cost
self.cost = 0 # accumulated cost
self.to = 0 # accumulated turnover
def add_return_value(self, value):
self.rtn += value
def add_cost(self, value):
self.cost += value
def add_turnover(self, value):
self.to += value
@property
def get_return(self):
return self.rtn
@property
def get_cost(self):
return self.cost
@property
def get_turnover(self):
return self.to
class Account:
def __init__(self, init_cash, freq: str = "day", benchmark_config: dict = {}):
self.init_vars(init_cash, freq, benchmark_config)
@@ -38,17 +70,13 @@ class Account:
# init cash
self.init_cash = init_cash
self.current = Position(cash=init_cash)
self.accum_info = AccumulatedInfo()
self.reset(freq=freq, benchmark_config=benchmark_config, init_report=True)
def reset_report(self, freq, benchmark_config):
self.report = Report(freq, benchmark_config)
self.indicator = Indicator()
self.positions = {}
self.rtn = 0
self.ct = 0
self.to = 0
self.val = 0
self.earning = 0
def reset(self, freq=None, benchmark_config=None, init_report=False):
"""reset freq and report of account
@@ -78,21 +106,22 @@ class Account:
def _update_state_from_order(self, order, trade_val, cost, trade_price):
# update turnover
self.to += trade_val
self.accum_info.add_turnover(trade_val)
# update cost
self.ct += cost
# update return
# update self.rtn from order
self.accum_info.add_cost(cost)
# update return from order
trade_amount = trade_val / trade_price
if order.direction == Order.SELL: # 0 for sell
# when sell stock, get profit from price change
profit = trade_val - self.current.get_stock_price(order.stock_id) * trade_amount
self.rtn += profit # note here do not consider cost
self.accum_info.add_return_value(profit) # note here do not consider cost
elif order.direction == Order.BUY: # 1 for buy
# when buy stock, we get return for the rtn computing method
# profit in buy order is to make self.rtn is consistent with self.earning at the end of date
# profit in buy order is to make rtn is consistent with earning at the end of bar
profit = self.current.get_stock_price(order.stock_id) * trade_amount - trade_val
self.rtn += profit
self.accum_info.add_return_value(profit) # note here do not consider cost
def update_order(self, order, trade_val, cost, trade_price):
# if stock is sold out, no stock price information in Position, then we should update account first, then update current position
@@ -111,23 +140,12 @@ class Account:
self._update_state_from_order(order, trade_val, cost, trade_price)
def update_bar_count(self):
"""at the end of the trading bar, update holding bar, count of stock"""
# update holding day count
self.current.add_count_all(bar=self.freq)
def update_bar_report(self, trade_start_time, trade_end_time, trade_exchange):
"""
trade_start_time: pd.TimeStamp
trade_end_time: pd.TimeStamp
quote: pd.DataFrame (code, date), collumns
when the end of trade date
- update rtn
- update price for each asset
- update value for this account
- update earning (2nd view of return )
- update holding day, count of stock
- update position hitory
- update report
:return: None
"""
def update_current(self, trade_start_time, trade_end_time, trade_exchange):
"""update current to make rtn consistent with earning at the end of bar"""
# update price for stock in the position and the profit from changed_price
stock_list = self.current.get_stock_list()
for code in stock_list:
@@ -136,22 +154,28 @@ class Account:
continue
bar_close = trade_exchange.get_close(code, trade_start_time, trade_end_time)
self.current.update_stock_price(stock_id=code, price=bar_close)
# update holding day count
# update value
self.val = self.current.calculate_value()
# update earning
def update_report(self, trade_start_time, trade_end_time):
"""update position history, report"""
# calculate earning
# account_value - last_account_value
# for the first trade date, account_value - init_cash
# self.report.is_empty() to judge is_first_trade_date
# get last_account_value, now_account_value, now_stock_value
# get last_account_value, last_total_cost, last_total_turnover
if self.report.is_empty():
last_account_value = self.init_cash
last_total_cost = 0
last_total_turnover = 0
else:
last_account_value = self.report.get_latest_account_value()
last_total_cost = self.report.get_latest_total_cost()
last_total_turnover = self.report.get_latest_total_turnover()
# get now_account_value, now_stock_value, now_earning, now_cost, now_turnover
now_account_value = self.current.calculate_value()
now_stock_value = self.current.calculate_stock_value()
self.earning = now_account_value - last_account_value
now_earning = now_account_value - last_account_value
now_cost = self.accum_info.get_cost - last_total_cost
now_turnover = self.accum_info.get_turnover - last_total_turnover
# update report for today
# judge whether the the trading is begin.
# and don't add init account state into report, due to we don't have excess return in those days.
@@ -160,11 +184,13 @@ class Account:
trade_end_time=trade_end_time,
account_value=now_account_value,
cash=self.current.position["cash"],
return_rate=(self.earning + self.ct) / last_account_value,
return_rate=(now_earning + now_cost) / last_account_value,
# here use earning to calculate return, position's view, earning consider cost, true return
# in order to make same definition with original backtest in evaluate.py
turnover_rate=self.to / last_account_value,
cost_rate=self.ct / last_account_value,
total_turnover=self.accum_info.get_turnover,
turnover_rate=now_turnover / last_account_value,
total_cost=self.accum_info.get_cost,
cost_rate=now_cost / last_account_value,
stock_value=now_stock_value,
)
# set now_account_value to position
@@ -174,8 +200,60 @@ class Account:
# note use deepcopy
self.positions[trade_start_time] = copy.deepcopy(self.current)
# finish today's updation
# reset the bar variables
self.rtn = 0
self.ct = 0
self.to = 0
def update_bar_end(
self,
trade_start_time: pd.Timestamp,
trade_end_time: pd.Timestamp,
trade_exchange: Exchange,
atomic: bool,
generate_report: bool = False,
trade_info: list = None,
inner_order_indicators: Indicator = None,
indicator_config: dict = {},
):
"""update account at each trading bar step
Parameters
----------
trade_start_time : pd.Timestamp
closed start time of step
trade_end_time : pd.Timestamp
closed end time of step
trade_exchange : Exchange
trading exchange, used to update current
atomic : bool
whether the trading executor is atomic, which means there is no higher-frequency trading executor inside it
- if atomic is True, calculate the indicators with trade_info
- else, aggregate indicators with inner indicators
generate_report : bool, optional
whether to generate report, by default False
trade_info : List[(Order, float, float, float)], optional
trading information, by default None
- necessary if atomic is True
- list of tuple(order, trade_val, trade_cost, trade_price)
inner_order_indicators : Indicator, optional
indicators of inner executor, by default None
- necessary if atomic is False
- used to aggregate outer indicators
indicator_config : dict, optional
config of calculating indicators, by default {}
"""
if atomic is True and trade_info is None:
raise ValueError("trade_info is necessary in atomic executor")
elif atomic is False and inner_order_indicators is None:
raise ValueError("inner_order_indicators is necessary in unatomic executor")
self.update_bar_count()
self.update_current(trade_start_time, trade_end_time, trade_exchange)
if generate_report:
self.update_report(trade_start_time, trade_end_time)
self.indicator.clear()
if atomic:
self.indicator.update_order_indicators(trade_start_time, trade_end_time, trade_info, trade_exchange)
else:
self.indicator.agg_order_indicators(inner_order_indicators, indicator_config)
self.indicator.cal_trade_indicators(trade_start_time, self.freq, indicator_config)
self.indicator.record(trade_start_time)

View File

@@ -1,10 +1,25 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from ..utils.resam import parse_freq
def backtest_loop(start_time, end_time, trade_strategy, trade_executor):
"""backtest funciton for the interaction of the outermost strategy and executor in the nested decison execution
Returns
-------
report: Report
it records the trading report information
"""
return_value = {}
for _decison in collect_data_loop(start_time, end_time, trade_strategy, trade_executor, return_value):
pass
return return_value.get("report"), return_value.get("indicator")
def collect_data_loop(start_time, end_time, trade_strategy, trade_executor, return_value: dict = None):
"""Generator for collecting the trade decision data for rl training
Parameters
----------
start_time : pd.Timestamp|str
@@ -15,26 +30,8 @@ def backtest_loop(start_time, end_time, trade_strategy, trade_executor):
the outermost portfolio strategy
trade_executor : BaseExecutor
the outermost executor
Returns
-------
report: Report
it records the trading report information
"""
trade_executor.reset(start_time=start_time, end_time=end_time)
level_infra = trade_executor.get_level_infra()
trade_strategy.reset(level_infra=level_infra)
_execute_result = None
while not trade_executor.finished():
_trade_decision = trade_strategy.generate_trade_decision(_execute_result)
_execute_result = trade_executor.execute(_trade_decision)
return trade_executor.get_report()
def collect_data_loop(start_time, end_time, trade_strategy, trade_executor):
"""Generator for collecting the trade decision data for rl training
return_value : dict
used for backtest_loop
Yields
-------
@@ -49,3 +46,19 @@ def collect_data_loop(start_time, end_time, trade_strategy, trade_executor):
while not trade_executor.finished():
_trade_decision = trade_strategy.generate_trade_decision(_execute_result)
_execute_result = yield from trade_executor.collect_data(_trade_decision)
if return_value is not None:
all_executors = trade_executor.get_all_executors()
all_reports = {
"{}{}".format(*parse_freq(_executor.time_per_step)): _executor.get_report()
for _executor in all_executors
if _executor.generate_report
}
all_indicators = {
"{}{}".format(
*parse_freq(_executor.time_per_step)
): _executor.get_trade_indicator().generate_trade_indicators_dataframe()
for _executor in all_executors
}
return_value.update({"report": all_reports, "indicator": all_indicators})

View File

@@ -48,14 +48,17 @@ class Exchange:
:param trade_unit: trade unit, 100 for China A market
:param min_cost: min cost, default 5
:param extra_quote: pandas, dataframe consists of
columns: like ['$vwap', '$close', '$factor', 'limit'].
columns: like ['$vwap', '$close', '$volume', '$factor', 'limit_sell', 'limit_buy'].
The limit indicates that the etf is tradable on a specific day.
Necessary fields:
$close is for calculating the total value at end of each day.
Optional fields:
$volume is only necessary when we limit the trade amount or caculate PA(vwap) indicator
$vwap is only necessary when we use the $vwap price as the deal price
$factor is for rounding to the trading unit
limit will be set to False by default(False indicates we can buy this
limit_sell will be set to False by default(False indicates we can sell this
target on this day).
limit_buy will be set to False by default(False indicates we can buy this
target on this day).
index: MultipleIndex(instrument, pd.Datetime)
"""

View File

@@ -20,7 +20,7 @@ class BaseExecutor:
time_per_step: str,
start_time: Union[str, pd.Timestamp] = None,
end_time: Union[str, pd.Timestamp] = None,
show_indicator: bool = False,
indicator_config: dict = {},
generate_report: bool = False,
verbose: bool = False,
track_data: bool = False,
@@ -33,7 +33,40 @@ class BaseExecutor:
time_per_step : str
trade time per trading step, used for genreate the trade calendar
show_indicator: bool, optional
whether to show indicators, such as FFR/PA/POS, .etc
whether to show indicators, :
- 'pa', the price advantage
- 'pos', the positive rate
- 'ffr', the fulfill rate
indicator_config: dict, optional
config for calculating trade indicator, including the following fields:
- 'show_indicator': whether to show indicators, optional, default by False. The indicators includes
- 'pa', the price advantage
- 'pos', the positive rate
- 'ffr', the fulfill rate
- 'pa_config': config for calculating price advantage(pa), optional
- 'base_price': the based price than which the trading price is advanced, Optional, default by 'twap'
- If 'base_price' is 'twap', the based price is the time weighted average price
- If 'base_price' is 'vwap', the based price is the volume weighted average price
- 'weight_method': weighted method when calculating total trading pa by different orders' pa in each step, optional, default by 'mean'
- If 'weight_method' is 'mean', calculating mean value of different orders' pa
- If 'weight_method' is 'amount_weighted', calculating amount weighted average value of different orders' pa
- If 'weight_method' is 'value_weighted', calculating value weighted average value of different orders' pa
- 'ffr_config': config for calculating fulfill rate(ffr), optional
- 'weight_method': weighted method when calculating total trading ffr by different orders' ffr in each step, optional, default by 'mean'
- If 'weight_method' is 'mean', calculating mean value of different orders' ffr
- If 'weight_method' is 'amount_weighted', calculating amount weighted average value of different orders' ffr
- If 'weight_method' is 'value_weighted', calculating value weighted average value of different orders' ffr
Example:
{
'show_indicator': True,
'pa_config': {
'base_value': 'twap',
'weight_method': 'value_weighted',
},
'ffr_config':{
'weight_method': 'value_weighted',
}
}
generate_report : bool, optional
whether to generate report, by default False
verbose : bool, optional
@@ -51,7 +84,7 @@ class BaseExecutor:
"""
self.time_per_step = time_per_step
self.show_indicator = show_indicator
self.indicator_config = indicator_config
self.generate_report = generate_report
self.verbose = verbose
self.track_data = track_data
@@ -132,18 +165,20 @@ class BaseExecutor:
yield trade_decision
return self.execute(trade_decision)
def get_trade_account(self):
raise NotImplementedError("get_trade_account is not implemented!")
def get_report(self):
raise NotImplementedError("get_report is not implemented!")
if self.generate_report:
_report = self.trade_account.report.generate_report_dataframe()
_positions = self.trade_account.get_positions()
return _report, _positions
else:
raise ValueError("generate_report should be True if you want to generate report")
def get_all_executors(self):
"""Return all executors"""
return [self]
def get_trade_indicator(self):
return self.trade_account.indicator.trade_indicator
return self.trade_account.indicator
class NestedExecutor(BaseExecutor):
@@ -159,7 +194,7 @@ class NestedExecutor(BaseExecutor):
inner_strategy: Union[BaseStrategy, dict],
start_time: Union[str, pd.Timestamp] = None,
end_time: Union[str, pd.Timestamp] = None,
show_indicator: bool = False,
indicator_config: dict = {},
generate_report: bool = False,
verbose: bool = False,
track_data: bool = False,
@@ -190,7 +225,7 @@ class NestedExecutor(BaseExecutor):
time_per_step=time_per_step,
start_time=start_time,
end_time=end_time,
show_indicator=show_indicator,
indicator_config=indicator_config,
generate_report=generate_report,
verbose=verbose,
track_data=track_data,
@@ -198,7 +233,7 @@ class NestedExecutor(BaseExecutor):
**kwargs,
)
if generate_report and trade_exchange is not None:
if trade_exchange is not None:
self.trade_exchange = trade_exchange
def reset_common_infra(self, common_infra):
@@ -209,7 +244,7 @@ class NestedExecutor(BaseExecutor):
"""
super(NestedExecutor, self).reset_common_infra(common_infra)
if self.generate_report and common_infra.has("trade_exchange"):
if common_infra.has("trade_exchange"):
self.trade_exchange = common_infra.get("trade_exchange")
self.inner_executor.reset_common_infra(common_infra)
@@ -222,66 +257,43 @@ class NestedExecutor(BaseExecutor):
sub_level_infra = self.inner_executor.get_level_infra()
self.inner_strategy.reset(level_infra=sub_level_infra, outer_trade_decision=trade_decision)
def _update_trade_account(self, inner_indicators):
trade_step = self.trade_calendar.get_trade_step()
trade_start_time, trade_end_time = self.trade_calendar.get_step_time(trade_step)
self.trade_account.update_bar_count()
if self.generate_report:
self.trade_account.update_bar_report(
trade_start_time=trade_start_time,
trade_end_time=trade_end_time,
trade_exchange=self.trade_exchange,
)
self.trade_account.indicator.clear()
self.trade_account.indicator.agg_report_info(inner_indicators=inner_indicators)
self.trade_account.indicator.agg_FFR()
self.trade_account.indicator.agg_PA(inner_indicators=inner_indicators)
if self.show_indicator:
FFR_value = self.trade_account.indicator.get_statistics_FFR(method="value_weighted")
PA_value = self.trade_account.indicator.get_statistics_PA(method="value_weighted")
POS_values = self.trade_account.indicator.get_statistics_POS()
print(
"[Indicator({}) {:%Y-%m-%d}]: FFR: {}, PA: {}, POS: {}".format(
self.time_per_step, trade_start_time, FFR_value, PA_value, POS_values
)
)
def execute(self, trade_decision):
for _data in self.collect_data(trade_decision):
return_value = {}
for _decison in self.collect_data(trade_decision, return_value):
pass
return self._execute_result
return return_value.get("execute_result")
def collect_data(self, trade_decision):
def collect_data(self, trade_decision, return_value=None):
if self.track_data:
yield trade_decision
self._init_sub_trading(trade_decision)
execute_result = []
inner_indicators = []
inner_order_indicators = []
_inner_execute_result = None
while not self.inner_executor.finished():
_inner_trade_decision = self.inner_strategy.generate_trade_decision(_inner_execute_result)
_inner_execute_result = yield from self.inner_executor.collect_data(trade_decision=_inner_trade_decision)
execute_result.extend(_inner_execute_result)
inner_indicators.append(self.inner_executor.get_trade_indicator())
inner_order_indicators.append(self.inner_executor.get_trade_indicator().get_order_indicator)
if hasattr(self, "trade_account"):
self._update_trade_account(inner_indicators=inner_indicators)
trade_step = self.trade_calendar.get_trade_step()
trade_start_time, trade_end_time = self.trade_calendar.get_step_time(trade_step)
self.trade_account.update_bar_end(
trade_start_time,
trade_end_time,
self.trade_exchange,
atomic=False,
generate_report=self.generate_report,
inner_order_indicators=inner_order_indicators,
indicator_config=self.indicator_config,
)
self.trade_calendar.step()
self._execute_result = execute_result
if return_value is not None:
return_value.update({"execute_result": execute_result})
return execute_result
def get_report(self):
sub_env_report_dict = self.inner_executor.get_report()
if self.generate_report:
_report = self.trade_account.report.generate_report_dataframe()
_positions = self.trade_account.get_positions()
_count, _freq = parse_freq(self.time_per_step)
sub_env_report_dict.update({f"{_count}{_freq}": (_report, _positions)})
return sub_env_report_dict
def get_all_executors(self):
"""Return all executors, including self and inner_executor.get_all_executors()"""
return [self, *self.inner_executor.get_all_executors()]
@@ -295,7 +307,7 @@ class SimulatorExecutor(BaseExecutor):
time_per_step: str,
start_time: Union[str, pd.Timestamp] = None,
end_time: Union[str, pd.Timestamp] = None,
show_indicator: bool = False,
indicator_config: dict = {},
generate_report: bool = False,
verbose: bool = False,
track_data: bool = False,
@@ -314,7 +326,7 @@ class SimulatorExecutor(BaseExecutor):
time_per_step=time_per_step,
start_time=start_time,
end_time=end_time,
show_indicator=show_indicator,
indicator_config=indicator_config,
generate_report=generate_report,
verbose=verbose,
track_data=track_data,
@@ -377,41 +389,14 @@ class SimulatorExecutor(BaseExecutor):
# do nothing
pass
self.trade_account.update_bar_count()
if self.generate_report:
self.trade_account.update_bar_report(
trade_start_time=trade_start_time,
trade_end_time=trade_end_time,
trade_exchange=self.trade_exchange,
)
self.trade_account.indicator.clear()
self.trade_account.indicator.update_trade_info(trade_info=execute_result)
self.trade_account.indicator.update_FFR()
self.trade_account.indicator.update_PA(
freq=self.time_per_step, trade_start_time=trade_start_time, trade_end_time=trade_end_time
self.trade_account.update_bar_end(
trade_start_time,
trade_end_time,
self.trade_exchange,
atomic=True,
generate_report=self.generate_report,
trade_info=execute_result,
indicator_config=self.indicator_config,
)
self.trade_account.indicator.record(trade_start_time=trade_start_time)
if self.show_indicator:
FFR_value = self.trade_account.indicator.get_statistics_FFR(method="value_weighted")
PA_value = self.trade_account.indicator.get_statistics_PA(method="value_weighted")
POS_values = self.trade_account.indicator.get_statistics_POS()
print(
"[Indicator({}) {:%Y-%m-%d %H:%M:%S}]: FFR: {}, PA: {}, POS: {}".format(
self.time_per_step, trade_start_time, FFR_value, PA_value, POS_values
)
)
self.trade_calendar.step()
return execute_result
def get_report(self):
if self.generate_report:
_report = self.trade_account.report.generate_report_dataframe()
_positions = self.trade_account.get_positions()
_count, _freq = parse_freq(self.time_per_step)
return {f"{_count}{_freq}": (_report, _positions)}
else:
return {}

View File

@@ -52,11 +52,13 @@ class Report:
self.init_bench(freq=freq, benchmark_config=benchmark_config)
def init_vars(self):
self.accounts = OrderedDict() # account postion value for each trade date
self.returns = OrderedDict() # daily return rate for each trade date
self.turnovers = OrderedDict() # turnover for each trade date
self.costs = OrderedDict() # trade cost for each trade date
self.values = OrderedDict() # value for each trade date
self.accounts = OrderedDict() # account postion value for each trade time
self.returns = OrderedDict() # daily return rate for each trade time
self.total_turnovers = OrderedDict() # total turnover for each trade time
self.turnovers = OrderedDict() # turnover for each trade time
self.total_costs = OrderedDict() # total trade cost for each trade time
self.costs = OrderedDict() # trade cost rate for each trade time
self.values = OrderedDict() # value for each trade time
self.cashes = OrderedDict()
self.benches = OrderedDict()
self.latest_report_time = None # pd.TimeStamp
@@ -87,10 +89,10 @@ class Report:
def _sample_benchmark(self, bench, trade_start_time, trade_end_time):
def cal_change(x):
return (x + 1).prod() - 1
return (x + 1).prod()
_ret = resam_ts_data(bench, trade_start_time, trade_end_time, method=cal_change)
return 0.0 if _ret is None else _ret
return 0.0 if _ret is None else _ret - 1
def is_empty(self):
return len(self.accounts) == 0
@@ -101,6 +103,12 @@ class Report:
def get_latest_account_value(self):
return self.accounts[self.latest_report_time]
def get_latest_total_cost(self):
return self.total_costs[self.latest_report_time]
def get_latest_total_turnover(self):
return self.total_turnovers[self.latest_report_time]
def update_report_record(
self,
trade_start_time=None,
@@ -108,7 +116,9 @@ class Report:
account_value=None,
cash=None,
return_rate=None,
total_turnover=None,
turnover_rate=None,
total_cost=None,
cost_rate=None,
stock_value=None,
bench_value=None,
@@ -119,12 +129,14 @@ class Report:
account_value,
cash,
return_rate,
total_turnover,
turnover_rate,
total_cost,
cost_rate,
stock_value,
]:
raise ValueError(
"None in [trade_start_time, account_value, cash, return_rate, turnover_rate, cost_rate, stock_value]"
"None in [trade_start_time, account_value, cash, return_rate, total_turnover, turnover_rate, total_cost, cost_rate, stock_value]"
)
if trade_end_time is None and bench_value is None:
@@ -135,20 +147,24 @@ class Report:
# update report data
self.accounts[trade_start_time] = account_value
self.returns[trade_start_time] = return_rate
self.total_turnovers[trade_start_time] = total_turnover
self.turnovers[trade_start_time] = turnover_rate
self.total_costs[trade_start_time] = total_cost
self.costs[trade_start_time] = cost_rate
self.values[trade_start_time] = stock_value
self.cashes[trade_start_time] = cash
self.benches[trade_start_time] = bench_value
# update latest_report_date
self.latest_report_time = trade_start_time
# finish daily report update
# finish report update in each step
def generate_report_dataframe(self):
report = pd.DataFrame()
report["account"] = pd.Series(self.accounts)
report["return"] = pd.Series(self.returns)
report["total_turnover"] = pd.Series(self.total_turnovers)
report["turnover"] = pd.Series(self.turnovers)
report["total_cost"] = pd.Series(self.total_costs)
report["cost"] = pd.Series(self.costs)
report["value"] = pd.Series(self.values)
report["cash"] = pd.Series(self.cashes)
@@ -163,7 +179,7 @@ class Report:
def load_report(self, path):
"""load report from a file
should have format like
columns = ['account', 'return', 'turnover', 'cost', 'value', 'cash', 'bench']
columns = ['account', 'return', 'total_turnover', 'turnover', 'cost', 'total_cost', 'value', 'cash', 'bench']
:param
path: str/ pathlib.Path()
"""
@@ -179,7 +195,9 @@ class Report:
account_value=r.loc[trade_start_time]["account"],
cash=r.loc[trade_start_time]["cash"],
return_rate=r.loc[trade_start_time]["return"],
total_turnover=r.loc[trade_start_time]["total_turnover"],
turnover_rate=r.loc[trade_start_time]["turnover"],
total_cost=r.loc[trade_start_time]["total_cost"],
cost_rate=r.loc[trade_start_time]["cost"],
stock_value=r.loc[trade_start_time]["value"],
bench_value=r.loc[trade_start_time]["bench"],
@@ -188,147 +206,184 @@ class Report:
class Indicator:
def __init__(self):
self.indicator_his = dict()
self.trade_indicator = dict()
def __getitem__(self, key):
return self.trade_indicator[key]
def __setitem__(self, key, value):
self.trade_indicator[key] = value
def __contains__(self, key):
return key in self.trade_indicator
self.order_indicator_his = OrderedDict()
self.order_indicator = OrderedDict()
self.trade_indicator_his = OrderedDict()
self.trade_indicator = OrderedDict()
def clear(self):
self.trade_indicator = dict()
self.order_indicator = OrderedDict()
self.trade_indicator = OrderedDict()
def record(self, trade_start_time):
self.indicator_his[trade_start_time] = pd.DataFrame(self.trade_indicator)
self.order_indicator_his[trade_start_time] = self.order_indicator
self.trade_indicator_his[trade_start_time] = self.trade_indicator
def update_trade_info(self, trade_info: list):
def _update_order_trade_info(self, trade_info: list):
amount = dict()
deal_amount = dict()
trade_price = dict()
trade_value = dict()
trade_cost = dict()
for order, _trade_val, _trade_cost, _trade_price in trade_info:
amount[order.stock_id] = order.amount * (order.direction * 2 - 1)
deal_amount[order.stock_id] = order.deal_amount * (order.direction * 2 - 1)
trade_price[order.stock_id] = _trade_price
trade_value[order.stock_id] = _trade_val * (order.direction * 2 - 1)
trade_cost[order.stock_id] = _trade_cost
self["amount"] = pd.Series(amount)
self["deal_amount"] = pd.Series(deal_amount)
self["trade_price"] = pd.Series(trade_price)
self["trade_cost"] = pd.Series(trade_cost)
self.order_indicator["amount"] = pd.Series(amount)
self.order_indicator["deal_amount"] = pd.Series(deal_amount)
self.order_indicator["trade_price"] = pd.Series(trade_price)
self.order_indicator["trade_value"] = pd.Series(trade_value)
self.order_indicator["trade_cost"] = pd.Series(trade_cost)
def update_FFR(self):
self["fulfill_rate"] = self["deal_amount"] / self["amount"]
def _update_order_fulfill_rate(self):
self.order_indicator["ffr"] = self.order_indicator["deal_amount"] / self.order_indicator["amount"]
def update_PA(self, freq, trade_start_time, trade_end_time, base_price="twap"):
base_price = base_price.lower()
def _update_order_price_advantage(self, trade_exchange, trade_start_time, trade_end_time):
self.order_indicator["base_price"] = self.order_indicator["trade_price"]
instruments = list(self.order_indicator["base_price"].index)
self.order_indicator["volume"] = pd.Series(
[
trade_exchange.get_volume(stock_id=inst, start_time=trade_start_time, end_time=trade_end_time)
for inst in instruments
],
index=instruments,
)
self.order_indicator["pa"] = (
self.order_indicator["trade_price"] - self.order_indicator["base_price"]
) / self.order_indicator["base_price"]
instruments = list(self["amount"].index)
if base_price == "twap":
# too slow
# price_info, _ = get_higher_freq_feature(instruments, fields=["$close"], start_time=trade_start_time, end_time=trade_end_time, freq=freq)
# price_info = price_info.astype(float)
# self["base_price"] = price_info["$close"].groupby(level="instrument").mean()
self["base_price"] = self["trade_price"]
elif base_price == "vwap":
# too slow
price_info, _ = get_higher_freq_feature(
instruments,
fields=["$close", "$volume"],
start_time=trade_start_time,
end_time=trade_end_time,
freq=freq,
)
price_info = price_info.astype(float)
self["base_price"] = price_info.groupby(level="instrument").apply(
lambda x: (x["$close"] * x["$volume"]).sum() / x["$volume"].sum()
)
self["volume"] = price_info["$volume"].groupby(level="instrument").sum()
else:
raise ValueError(f"base_price {base_price} is not supported!")
self["pa"] = (self["trade_price"] - self["base_price"]) / self["base_price"]
def agg_report_info(self, inner_indicators):
def _agg_order_trade_info(self, inner_order_indicators):
amount = pd.Series()
deal_amount = pd.Series()
trade_price = pd.Series()
trade_value = pd.Series()
trade_cost = pd.Series()
for inner_indicator in inner_indicators:
amount = amount.add(inner_indicator["amount"], fill_value=0)
deal_amount = deal_amount.add(inner_indicator["deal_amount"], fill_value=0)
trade_price = trade_price.add(inner_indicator["trade_price"] * inner_indicator["deal_amount"], fill_value=0)
trade_cost = trade_cost.add(inner_indicator["trade_cost"], fill_value=0)
for _order_indicator in inner_order_indicators:
amount = amount.add(_order_indicator["amount"], fill_value=0)
deal_amount = deal_amount.add(_order_indicator["deal_amount"], fill_value=0)
trade_price = trade_price.add(
_order_indicator["trade_price"] * _order_indicator["deal_amount"], fill_value=0
)
trade_value = trade_value.add(_order_indicator["trade_value"], fill_value=0)
trade_cost = trade_cost.add(_order_indicator["trade_cost"], fill_value=0)
self["amount"] = amount
self["deal_amount"] = deal_amount
trade_price /= self["deal_amount"]
self["trade_price"] = trade_price
self["trade_cost"] = trade_cost
self.order_indicator["amount"] = amount
self.order_indicator["deal_amount"] = deal_amount
trade_price /= self.order_indicator["deal_amount"]
self.order_indicator["trade_price"] = trade_price
self.order_indicator["trade_value"] = trade_value
self.order_indicator["trade_cost"] = trade_cost
def agg_FFR(self):
self["fulfill_rate"] = self["deal_amount"] / self["amount"]
def _agg_order_fulfill_rate(self):
self.order_indicator["ffr"] = self.order_indicator["deal_amount"] / self.order_indicator["amount"]
def agg_PA(self, inner_indicators, base_price="twap"):
def _agg_order_price_advantage(self, inner_order_indicators, base_price="twap"):
base_price = base_price.lower()
volume = pd.Series()
for _order_indicator in inner_order_indicators:
volume = volume.add(_order_indicator["volume"], fill_value=0)
self.order_indicator["volume"] = volume
if base_price == "twap":
base_price = pd.Series()
price_count = pd.Series()
for inner_indicator in inner_indicators:
base_price = base_price.add(inner_indicator["base_price"], fill_value=0)
price_count = price_count.add(pd.Series(1, index=inner_indicator["base_price"].index), fill_value=0)
for _order_indicator in inner_order_indicators:
base_price = base_price.add(_order_indicator["base_price"], fill_value=0)
price_count = price_count.add(pd.Series(1, index=_order_indicator["base_price"].index), fill_value=0)
base_price /= price_count
self["base_price"] = base_price
self.order_indicator["base_price"] = base_price
elif base_price == "vwap":
base_price = pd.Series()
volume = pd.Series()
for inner_indicator in inner_indicators:
base_price = base_price.add(inner_indicator["base_price"] * inner_indicator["volume"], fill_value=0)
volume = volume.add(inner_indicator["volume"], fill_value=0)
base_price /= volume
self["base_price"] = base_price
self["volume"] = volume
for _order_indicator in inner_order_indicators:
base_price = base_price.add(_order_indicator["base_price"] * _order_indicator["volume"], fill_value=0)
base_price /= self.order_indicator["volume"]
self.order_indicator["base_price"] = base_price
else:
raise ValueError(f"base_price {base_price} is not supported!")
self["pa"] = (self["trade_price"] - self["base_price"]) / self["base_price"]
self.order_indicator["pa"] = self.order_indicator["trade_price"] / self.order_indicator["base_price"] - 1
def get_statistics_FFR(self, method="mean"):
def _cal_trade_fulfill_rate(self, method="mean"):
if method == "mean":
return self["fulfill_rate"].mean()
return self.order_indicator["ffr"].mean()
elif method == "amount_weighted":
weights = self["deal_amount"].abs()
return (self["fulfill_rate"] * weights).sum() / weights.sum()
weights = self.order_indicator["deal_amount"].abs()
return (self.order_indicator["ffr"] * weights).sum() / weights.sum()
elif method == "value_weighted":
weights = (self["deal_amount"] * self["trade_price"]).abs()
return (self["fulfill_rate"] * weights).sum() / weights.sum()
weights = self.order_indicator["trade_value"].abs()
return (self.order_indicator["ffr"] * weights).sum() / weights.sum()
else:
raise ValueError(f"method {method} is not supported!")
def get_statistics_PA(self, method="mean"):
pa_order = self["pa"] * (self["amount"] < 0).astype(int)
def _cal_trade_price_advantage(self, method="mean"):
pa_order = self.order_indicator["pa"] * (self.order_indicator["amount"] < 0).astype(int)
if method == "mean":
return pa_order.mean()
elif method == "amount_weighted":
weights = self["deal_amount"].abs()
weights = self.order_indicator["deal_amount"].abs()
return (pa_order * weights).sum() / weights.sum()
elif method == "value_weighted":
weights = (self["deal_amount"] * self["trade_price"]).abs()
weights = self.order_indicator["trade_value"].abs()
return (pa_order * weights).sum() / weights.sum()
else:
raise ValueError(f"method {method} is not supported!")
def get_statistics_POS(self):
pa_order = self["pa"] * (self["amount"] < 0).astype(int)
return (pa_order > 1e-8).astype(int).sum() / len(pa_order)
def _cal_trade_positive_rate(self):
pa_order = self.order_indicator["pa"] * (self.order_indicator["amount"] < 0).astype(int)
return (pa_order > 0).astype(int).sum() / len(pa_order)
def _cal_trade_amount(self):
return self.order_indicator["deal_amount"].abs().sum()
def _cal_trade_value(self):
return self.order_indicator["trade_value"].abs().sum()
def update_order_indicators(self, trade_start_time, trade_end_time, trade_info, trade_exchange):
self._update_order_trade_info(trade_info=trade_info)
self._update_order_fulfill_rate()
self._update_order_price_advantage(trade_exchange, trade_start_time, trade_end_time)
def agg_order_indicators(self, inner_order_indicators, indicator_config={}):
self._agg_order_trade_info(inner_order_indicators)
self._agg_order_fulfill_rate()
pa_config = indicator_config.get("pa_config", {})
self._agg_order_price_advantage(inner_order_indicators, base_price=pa_config.get("base_price", "twap"))
def cal_trade_indicators(self, trade_start_time, freq, indicator_config={}):
show_indicator = indicator_config.get("show_indicator", False)
ffr_config = indicator_config.get("ffr_config", {})
pa_config = indicator_config.get("pa_config", {})
fulfill_rate = self._cal_trade_fulfill_rate(method=ffr_config.get("weight_method", "mean"))
price_advantage = self._cal_trade_price_advantage(method=pa_config.get("weight_method", "mean"))
positive_rate = self._cal_trade_positive_rate()
trade_amount = self._cal_trade_amount()
trade_value = self._cal_trade_value()
self.trade_indicator["ffr"] = fulfill_rate
self.trade_indicator["pa"] = price_advantage
self.trade_indicator["pos"] = positive_rate
self.trade_indicator["amount"] = trade_amount
self.trade_indicator["value"] = trade_value
if show_indicator:
print(
"[Indicator({}) {:%Y-%m-%d %H:%M:%S}]: FFR: {}, PA: {}, POS: {}".format(
freq, trade_start_time, fulfill_rate, price_advantage, positive_rate
)
)
@property
def get_order_indicator(self):
return self.order_indicator
@property
def get_trade_indicator(self):
return self.trade_indicator
def generate_trade_indicators_dataframe(self):
return pd.DataFrame.from_dict(self.trade_indicator_his, orient="index")

View File

@@ -11,7 +11,7 @@ import warnings
from ..log import get_module_logger
from ..backtest import get_exchange, backtest as backtest_func
from ..utils import get_date_range
from ..utils.resam import parse_freq
from ..utils.resam import parse_freq, NORM_FREQ_MONTH, NORM_FREQ_WEEK, NORM_FREQ_DAY, NORM_FREQ_MINUTE
from ..data import D
from ..config import C
@@ -37,12 +37,12 @@ def risk_analysis(r, N: int = None, freq: str = "day"):
def cal_risk_analysis_scaler(freq):
_count, _freq = parse_freq(freq)
_freq_scaler = {
"minute": 240 * 252,
"day": 252,
"week": 50,
"month": 12,
NORM_FREQ_MINUTE: 240 * 252,
NORM_FREQ_DAY: 252,
NORM_FREQ_WEEK: 50,
NORM_FREQ_MONTH: 12,
}
return _count * _freq_scaler[_freq]
return _freq_scaler[_freq] / _count
if N is None and freq is None:
raise ValueError("at least one of `N` and `freq` should exist")
@@ -63,7 +63,51 @@ def risk_analysis(r, N: int = None, freq: str = "day"):
"information_ratio": information_ratio,
"max_drawdown": max_drawdown,
}
res = pd.Series(data, index=data.keys()).to_frame("risk")
res = pd.Series(data).to_frame("risk")
return res
def indicator_analysis(df, method="mean"):
"""analyze statistical time-series indicators of trading
Parameters
----------
df : pandas.DataFrame
columns: like ['pa', 'pos', 'ffr', 'amount', 'value'].
Necessary fields:
- 'pa' is the price advantage in trade indicators
- 'pos' is the positive rate in trade indicators
- 'ffr' is the fulfill rate in trade indicators
Optional fields:
- 'amount' is the total deal amount, only necessary when method is 'amount_weighted'
- 'value' is the total trade value, only necessary when method is 'value_weighted'
index: Index(datetime)
method : str, optional
statistics method, by default "mean"
- if method is 'mean', count the mean statistical value of each trade indicator
- if method is 'amount_weighted', count the amount weighted mean statistical value of each trade indicator
- if method is 'value_weighted', count the value weighted mean statistical value of each trade indicator
Returns
-------
pd.DataFrame
statistical value of each trade indicator
"""
indicators_df = df[["pa", "pos", "ffr"]]
if method == "mean":
res = indicators_df.mean()
elif method == "amount_weighted":
weights = df["amount"].abs()
res = indicators_df.mul(weights, axis=0).sum() / weights.sum()
elif method == "value_weighted":
weights = df["value"].abs()
res = indicators_df.mul(weights, axis=0).sum() / weights.sum()
else:
raise ValueError(f"indicator_analysis method {method} is not supported!")
res = res.to_frame("value")
return res

View File

@@ -7,7 +7,7 @@ import warnings
import pandas as pd
from pathlib import Path
from pprint import pprint
from ..contrib.evaluate import risk_analysis
from ..contrib.evaluate import indicator_analysis, risk_analysis, indicator_analysis
from ..data.dataset import DatasetH
from ..data.dataset.handler import DataHandlerLP
@@ -294,7 +294,9 @@ class PortAnaRecord(RecordTemp):
artifact_path = "portfolio_analysis"
def __init__(self, recorder, config, risk_analysis_freq, **kwargs):
def __init__(
self, recorder, config, risk_analysis_freq, indicator_analysis_freq, indicator_analysis_method=None, **kwargs
):
"""
config["strategy"] : dict
define the strategy class as well as the kwargs.
@@ -304,6 +306,10 @@ class PortAnaRecord(RecordTemp):
define the backtest kwargs.
risk_analysis_freq : str|List[str]
risk analysis freq of report
indicator_analysis_freq : str|List[str]
indicator analysis freq of report
indicator_analysis_method : str, optional, default by None
the candidated values include 'mean', 'amount_weighted', 'value_weighted'
"""
super().__init__(recorder=recorder, **kwargs)
@@ -312,10 +318,17 @@ class PortAnaRecord(RecordTemp):
self.backtest_config = config["backtest"]
if isinstance(risk_analysis_freq, str):
risk_analysis_freq = [risk_analysis_freq]
if isinstance(indicator_analysis_freq, str):
indicator_analysis_freq = [indicator_analysis_freq]
self.risk_analysis_freq = [
"{0}{1}".format(*parse_freq(_analysis_freq)) for _analysis_freq in risk_analysis_freq
]
self.report_freq = self._get_report_freq(self.executor_config)
self.indicator_analysis_freq = [
"{0}{1}".format(*parse_freq(_analysis_freq)) for _analysis_freq in indicator_analysis_freq
]
self.indicator_analysis_method = indicator_analysis_method
self.all_freq = self._get_report_freq(self.executor_config)
def _get_report_freq(self, executor_config):
ret_freq = []
@@ -328,21 +341,26 @@ class PortAnaRecord(RecordTemp):
def generate(self, **kwargs):
# custom strategy and get backtest
report_dict = normal_backtest(
report_dict, indicator_dict = normal_backtest(
executor=self.executor_config, strategy=self.strategy_config, **self.backtest_config
)
for report_freq, (report_normal, positions_normal) in report_dict.items():
for _freq, (report_normal, positions_normal) in report_dict.items():
self.recorder.save_objects(
**{f"report_normal_{report_freq}.pkl": report_normal}, artifact_path=PortAnaRecord.get_path()
**{f"report_normal_{_freq}.pkl": report_normal}, artifact_path=PortAnaRecord.get_path()
)
self.recorder.save_objects(
**{f"positions_normal_{report_freq}.pkl": positions_normal}, artifact_path=PortAnaRecord.get_path()
**{f"positions_normal_{_freq}.pkl": positions_normal}, artifact_path=PortAnaRecord.get_path()
)
for _freq, indicators_normal in indicator_dict.items():
self.recorder.save_objects(
**{f"indicators_normal_{_freq}.pkl": indicators_normal}, artifact_path=PortAnaRecord.get_path()
)
for _analysis_freq in self.risk_analysis_freq:
if _analysis_freq not in report_dict:
warnings.warn(
f"the freq {_analysis_freq} report is not found, please set the corresponding env with `generate_report==True`"
f"the freq {_analysis_freq} report is not found, please set the corresponding env with `generate_report=True`"
)
else:
report_normal, _ = report_dict.get(_analysis_freq)
@@ -353,25 +371,46 @@ class PortAnaRecord(RecordTemp):
analysis["excess_return_with_cost"] = risk_analysis(
report_normal["return"] - report_normal["bench"] - report_normal["cost"], freq=_analysis_freq
)
analysis_df = pd.concat(analysis) # type: pd.DataFrame
# log metrics
self.recorder.log_metrics(**flatten_dict(analysis_df["risk"].unstack().T.to_dict()))
analysis_dict = flatten_dict(analysis_df["risk"].unstack().T.to_dict())
self.recorder.log_metrics(**{f"{_analysis_freq}.{k}": v for k, v in analysis_dict.items()})
# save results
self.recorder.save_objects(
**{f"port_analysis_{report_freq}.pkl": analysis_df}, artifact_path=PortAnaRecord.get_path()
**{f"port_analysis_{_analysis_freq}.pkl": analysis_df}, artifact_path=PortAnaRecord.get_path()
)
logger.info(
f"Portfolio analysis record 'port_analysis_{report_freq}.pkl' has been saved as the artifact of the Experiment {self.recorder.experiment_id}"
f"Portfolio analysis record 'port_analysis_{_analysis_freq}.pkl' has been saved as the artifact of the Experiment {self.recorder.experiment_id}"
)
# print out results
pprint("The following are analysis results of the excess return without cost.")
pprint(f"The following are analysis results of benchmark return({_analysis_freq}).")
pprint(risk_analysis(report_normal["bench"], freq=_analysis_freq))
pprint(f"The following are analysis results of the excess return without cost({_analysis_freq}).")
pprint(analysis["excess_return_without_cost"])
pprint("The following are analysis results of the excess return with cost.")
pprint(f"The following are analysis results of the excess return with cost({_analysis_freq}).")
pprint(analysis["excess_return_with_cost"])
for _analysis_freq in self.indicator_analysis_freq:
indicators_normal = indicator_dict.get(_analysis_freq)
if self.indicator_analysis_method is None:
analysis_df = indicator_analysis(indicators_normal)
else:
analysis_df = indicator_analysis(indicators_normal, method=self.indicator_analysis_method)
# log metrics
analysis_dict = analysis_df["value"].to_dict()
self.recorder.log_metrics(**{f"{_analysis_freq}.{k}": v for k, v in analysis_dict.items()})
# save results
self.recorder.save_objects(
**{f"indicator_analysis_{_analysis_freq}.pkl": analysis_df}, artifact_path=PortAnaRecord.get_path()
)
pprint(f"The following are analysis results of indicators({_analysis_freq}).")
pprint(analysis_df)
def list(self):
list_path = []
for _freq in self.report_freq:
for _freq in self.all_freq:
list_path.extend(
[
PortAnaRecord.get_path(f"report_normal_{_freq}.pkl"),
@@ -380,7 +419,7 @@ class PortAnaRecord(RecordTemp):
)
for _analysis_freq in self.risk_analysis_freq:
if _analysis_freq in self.report_freq:
if _analysis_freq in self.all_freq:
list_path.append(PortAnaRecord.get_path(f"port_analysis_{_analysis_freq}.pkl"))
else:
warnings.warn(f"{_analysis_freq} is not found")