From 4ac6e6e246c8f1a542c21ab288f6c8eb77c5180e Mon Sep 17 00:00:00 2001 From: bxdd Date: Tue, 22 Jun 2021 02:42:09 +0800 Subject: [PATCH] fix account bug & update indicator_analysis & fix some comments --- .../nested_decision_execution/workflow.py | 47 ++-- qlib/backtest/__init__.py | 8 +- qlib/backtest/account.py | 164 +++++++++--- qlib/backtest/backtest.py | 53 ++-- qlib/backtest/exchange.py | 7 +- qlib/backtest/executor.py | 167 ++++++------ qlib/backtest/report.py | 253 +++++++++++------- qlib/contrib/evaluate.py | 58 +++- qlib/workflow/record_temp.py | 69 +++-- 9 files changed, 524 insertions(+), 302 deletions(-) diff --git a/examples/nested_decision_execution/workflow.py b/examples/nested_decision_execution/workflow.py index 910011887..689602013 100644 --- a/examples/nested_decision_execution/workflow.py +++ b/examples/nested_decision_execution/workflow.py @@ -17,7 +17,6 @@ class NestedDecisonExecutionWorkflow: market = "csi300" benchmark = "SH000300" - data_handler_config = { "start_time": "2008-01-01", "end_time": "2021-05-28", @@ -67,28 +66,19 @@ class NestedDecisonExecutionWorkflow: "kwargs": { "time_per_step": "week", "inner_executor": { - "class": "NestedExecutor", + "class": "SimulatorExecutor", "module_path": "qlib.backtest.executor", "kwargs": { "time_per_step": "day", - "inner_executor": { - "class": "SimulatorExecutor", - "module_path": "qlib.backtest.executor", - "kwargs": { - "time_per_step": "15min", - "generate_report": True, - "verbose": True, - }, + "generate_report": True, + "verbose": True, + "indicator_config": { + "show_indicator": True, }, - "inner_strategy": { - "class": "TWAPStrategy", - "module_path": "qlib.contrib.strategy.rule_strategy", - }, - "show_indicator": True, }, }, "inner_strategy": { - "class": "VAStrategy", + "class": "SBBStrategyEMA", "module_path": "qlib.contrib.strategy.rule_strategy", "kwargs": { "freq": "day", @@ -96,7 +86,10 @@ class NestedDecisonExecutionWorkflow: }, }, "track_data": True, - "show_indicator": True, + "generate_report": True, + "indicator_config": { + "show_indicator": True, + }, }, }, "backtest": { @@ -105,7 +98,7 @@ class NestedDecisonExecutionWorkflow: "account": 100000000, "benchmark": benchmark, "exchange_kwargs": { - "freq": "1min", + "freq": "day", "limit_threshold": 0.095, "deal_price": "close", "open_cost": 0.0005, @@ -124,7 +117,7 @@ class NestedDecisonExecutionWorkflow: GetData().qlib_data( target_dir=provider_uri_1min, interval="1min", region=REG_CN, version="v2", exists_skip=True ) - + provider_uri_day = "/data/csdesign/qlib" provider_uri_map = {"1min": provider_uri_1min, "day": provider_uri_day} client_config = { "calendar_provider": { @@ -179,12 +172,25 @@ class NestedDecisonExecutionWorkflow: }, } self.port_analysis_config["strategy"] = strategy_config + self.port_analysis_config["backtest"]["benchmark"] = D.list_instruments( + instruments=D.instruments(market=self.market), as_list=True + ) with R.start(experiment_name="backtest"): recorder = R.get_recorder() - par = PortAnaRecord(recorder, self.port_analysis_config, "15minute") + par = PortAnaRecord( + recorder, + self.port_analysis_config, + risk_analysis_freq=["week", "day"], + indicator_analysis_freq=["week", "day"], + indicator_analysis_method="value_weighted", + ) par.generate() + # report_normal_df = recorder.load_object("portfolio_analysis/report_normal_1day.pkl") + # from qlib.contrib.report import analysis_position + # analysis_position.report_graph(report_normal_df) + def collect_data(self): self._init_qlib() model = init_instance_by_config(self.task["model"]) @@ -192,6 +198,7 @@ class NestedDecisonExecutionWorkflow: self._train_model(model, dataset) executor_config = self.port_analysis_config["executor"] backtest_config = self.port_analysis_config["backtest"] + backtest_config["benchmark"] = D.list_instruments(instruments=D.instruments(market=self.market), as_list=True) strategy_config = { "class": "TopkDropoutStrategy", "module_path": "qlib.contrib.strategy.model_strategy", diff --git a/qlib/backtest/__init__.py b/qlib/backtest/__init__.py index a3706008a..f8f30f183 100644 --- a/qlib/backtest/__init__.py +++ b/qlib/backtest/__init__.py @@ -116,9 +116,9 @@ def backtest(start_time, end_time, strategy, executor, benchmark="SH000300", acc trade_strategy, trade_executor = get_strategy_executor( start_time, end_time, strategy, executor, benchmark, account, exchange_kwargs ) - report_dict = backtest_loop(start_time, end_time, trade_strategy, trade_executor) + report_dict, indicator_dict = backtest_loop(start_time, end_time, trade_strategy, trade_executor) - return report_dict + return report_dict, indicator_dict def collect_data(start_time, end_time, strategy, executor, benchmark="SH000300", account=1e9, exchange_kwargs={}): @@ -126,6 +126,4 @@ def collect_data(start_time, end_time, strategy, executor, benchmark="SH000300", trade_strategy, trade_executor = get_strategy_executor( start_time, end_time, strategy, executor, benchmark, account, exchange_kwargs ) - report_dict = yield from collect_data_loop(start_time, end_time, trade_strategy, trade_executor) - - return report_dict + yield from collect_data_loop(start_time, end_time, trade_strategy, trade_executor) diff --git a/qlib/backtest/account.py b/qlib/backtest/account.py index 71214036a..85ca57fa5 100644 --- a/qlib/backtest/account.py +++ b/qlib/backtest/account.py @@ -9,7 +9,7 @@ import pandas as pd from .position import Position from .report import Report, Indicator from .order import Order - +from .exchange import Exchange """ rtn & earning in the Account @@ -25,10 +25,42 @@ rtn & earning in the Account while earning is the difference of two position value, so it considers cost, it is the true return rate in the specific accomplishment for rtn, it does not consider cost, in other words, rtn - cost = earning -Now rtn has been removed in the hierarchical backtest implemention. """ +class AccumulatedInfo: + """accumulated trading info, including accumulated return\cost\turnover""" + + def __init__(self): + self.reset() + + def reset(self): + self.rtn = 0 # accumulated return, do not consider cost + self.cost = 0 # accumulated cost + self.to = 0 # accumulated turnover + + def add_return_value(self, value): + self.rtn += value + + def add_cost(self, value): + self.cost += value + + def add_turnover(self, value): + self.to += value + + @property + def get_return(self): + return self.rtn + + @property + def get_cost(self): + return self.cost + + @property + def get_turnover(self): + return self.to + + class Account: def __init__(self, init_cash, freq: str = "day", benchmark_config: dict = {}): self.init_vars(init_cash, freq, benchmark_config) @@ -38,17 +70,13 @@ class Account: # init cash self.init_cash = init_cash self.current = Position(cash=init_cash) + self.accum_info = AccumulatedInfo() self.reset(freq=freq, benchmark_config=benchmark_config, init_report=True) def reset_report(self, freq, benchmark_config): self.report = Report(freq, benchmark_config) self.indicator = Indicator() self.positions = {} - self.rtn = 0 - self.ct = 0 - self.to = 0 - self.val = 0 - self.earning = 0 def reset(self, freq=None, benchmark_config=None, init_report=False): """reset freq and report of account @@ -78,21 +106,22 @@ class Account: def _update_state_from_order(self, order, trade_val, cost, trade_price): # update turnover - self.to += trade_val + self.accum_info.add_turnover(trade_val) # update cost - self.ct += cost - # update return - # update self.rtn from order + self.accum_info.add_cost(cost) + + # update return from order trade_amount = trade_val / trade_price if order.direction == Order.SELL: # 0 for sell # when sell stock, get profit from price change profit = trade_val - self.current.get_stock_price(order.stock_id) * trade_amount - self.rtn += profit # note here do not consider cost + self.accum_info.add_return_value(profit) # note here do not consider cost + elif order.direction == Order.BUY: # 1 for buy # when buy stock, we get return for the rtn computing method - # profit in buy order is to make self.rtn is consistent with self.earning at the end of date + # profit in buy order is to make rtn is consistent with earning at the end of bar profit = self.current.get_stock_price(order.stock_id) * trade_amount - trade_val - self.rtn += profit + self.accum_info.add_return_value(profit) # note here do not consider cost def update_order(self, order, trade_val, cost, trade_price): # if stock is sold out, no stock price information in Position, then we should update account first, then update current position @@ -111,23 +140,12 @@ class Account: self._update_state_from_order(order, trade_val, cost, trade_price) def update_bar_count(self): + """at the end of the trading bar, update holding bar, count of stock""" + # update holding day count self.current.add_count_all(bar=self.freq) - def update_bar_report(self, trade_start_time, trade_end_time, trade_exchange): - """ - trade_start_time: pd.TimeStamp - trade_end_time: pd.TimeStamp - quote: pd.DataFrame (code, date), collumns - when the end of trade date - - update rtn - - update price for each asset - - update value for this account - - update earning (2nd view of return ) - - update holding day, count of stock - - update position hitory - - update report - :return: None - """ + def update_current(self, trade_start_time, trade_end_time, trade_exchange): + """update current to make rtn consistent with earning at the end of bar""" # update price for stock in the position and the profit from changed_price stock_list = self.current.get_stock_list() for code in stock_list: @@ -136,22 +154,28 @@ class Account: continue bar_close = trade_exchange.get_close(code, trade_start_time, trade_end_time) self.current.update_stock_price(stock_id=code, price=bar_close) - # update holding day count - # update value - self.val = self.current.calculate_value() - # update earning + def update_report(self, trade_start_time, trade_end_time): + """update position history, report""" + # calculate earning # account_value - last_account_value # for the first trade date, account_value - init_cash # self.report.is_empty() to judge is_first_trade_date - # get last_account_value, now_account_value, now_stock_value + # get last_account_value, last_total_cost, last_total_turnover if self.report.is_empty(): last_account_value = self.init_cash + last_total_cost = 0 + last_total_turnover = 0 else: last_account_value = self.report.get_latest_account_value() + last_total_cost = self.report.get_latest_total_cost() + last_total_turnover = self.report.get_latest_total_turnover() + # get now_account_value, now_stock_value, now_earning, now_cost, now_turnover now_account_value = self.current.calculate_value() now_stock_value = self.current.calculate_stock_value() - self.earning = now_account_value - last_account_value + now_earning = now_account_value - last_account_value + now_cost = self.accum_info.get_cost - last_total_cost + now_turnover = self.accum_info.get_turnover - last_total_turnover # update report for today # judge whether the the trading is begin. # and don't add init account state into report, due to we don't have excess return in those days. @@ -160,11 +184,13 @@ class Account: trade_end_time=trade_end_time, account_value=now_account_value, cash=self.current.position["cash"], - return_rate=(self.earning + self.ct) / last_account_value, + return_rate=(now_earning + now_cost) / last_account_value, # here use earning to calculate return, position's view, earning consider cost, true return # in order to make same definition with original backtest in evaluate.py - turnover_rate=self.to / last_account_value, - cost_rate=self.ct / last_account_value, + total_turnover=self.accum_info.get_turnover, + turnover_rate=now_turnover / last_account_value, + total_cost=self.accum_info.get_cost, + cost_rate=now_cost / last_account_value, stock_value=now_stock_value, ) # set now_account_value to position @@ -174,8 +200,60 @@ class Account: # note use deepcopy self.positions[trade_start_time] = copy.deepcopy(self.current) - # finish today's updation - # reset the bar variables - self.rtn = 0 - self.ct = 0 - self.to = 0 + def update_bar_end( + self, + trade_start_time: pd.Timestamp, + trade_end_time: pd.Timestamp, + trade_exchange: Exchange, + atomic: bool, + generate_report: bool = False, + trade_info: list = None, + inner_order_indicators: Indicator = None, + indicator_config: dict = {}, + ): + """update account at each trading bar step + + Parameters + ---------- + trade_start_time : pd.Timestamp + closed start time of step + trade_end_time : pd.Timestamp + closed end time of step + trade_exchange : Exchange + trading exchange, used to update current + atomic : bool + whether the trading executor is atomic, which means there is no higher-frequency trading executor inside it + - if atomic is True, calculate the indicators with trade_info + - else, aggregate indicators with inner indicators + generate_report : bool, optional + whether to generate report, by default False + trade_info : List[(Order, float, float, float)], optional + trading information, by default None + - necessary if atomic is True + - list of tuple(order, trade_val, trade_cost, trade_price) + inner_order_indicators : Indicator, optional + indicators of inner executor, by default None + - necessary if atomic is False + - used to aggregate outer indicators + indicator_config : dict, optional + config of calculating indicators, by default {} + """ + if atomic is True and trade_info is None: + raise ValueError("trade_info is necessary in atomic executor") + elif atomic is False and inner_order_indicators is None: + raise ValueError("inner_order_indicators is necessary in unatomic executor") + + self.update_bar_count() + self.update_current(trade_start_time, trade_end_time, trade_exchange) + if generate_report: + self.update_report(trade_start_time, trade_end_time) + + self.indicator.clear() + + if atomic: + self.indicator.update_order_indicators(trade_start_time, trade_end_time, trade_info, trade_exchange) + else: + self.indicator.agg_order_indicators(inner_order_indicators, indicator_config) + + self.indicator.cal_trade_indicators(trade_start_time, self.freq, indicator_config) + self.indicator.record(trade_start_time) diff --git a/qlib/backtest/backtest.py b/qlib/backtest/backtest.py index e9d864c92..3892fde41 100644 --- a/qlib/backtest/backtest.py +++ b/qlib/backtest/backtest.py @@ -1,10 +1,25 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +from ..utils.resam import parse_freq def backtest_loop(start_time, end_time, trade_strategy, trade_executor): """backtest funciton for the interaction of the outermost strategy and executor in the nested decison execution + Returns + ------- + report: Report + it records the trading report information + """ + return_value = {} + for _decison in collect_data_loop(start_time, end_time, trade_strategy, trade_executor, return_value): + pass + return return_value.get("report"), return_value.get("indicator") + + +def collect_data_loop(start_time, end_time, trade_strategy, trade_executor, return_value: dict = None): + """Generator for collecting the trade decision data for rl training + Parameters ---------- start_time : pd.Timestamp|str @@ -15,26 +30,8 @@ def backtest_loop(start_time, end_time, trade_strategy, trade_executor): the outermost portfolio strategy trade_executor : BaseExecutor the outermost executor - - Returns - ------- - report: Report - it records the trading report information - """ - trade_executor.reset(start_time=start_time, end_time=end_time) - level_infra = trade_executor.get_level_infra() - trade_strategy.reset(level_infra=level_infra) - - _execute_result = None - while not trade_executor.finished(): - _trade_decision = trade_strategy.generate_trade_decision(_execute_result) - _execute_result = trade_executor.execute(_trade_decision) - - return trade_executor.get_report() - - -def collect_data_loop(start_time, end_time, trade_strategy, trade_executor): - """Generator for collecting the trade decision data for rl training + return_value : dict + used for backtest_loop Yields ------- @@ -49,3 +46,19 @@ def collect_data_loop(start_time, end_time, trade_strategy, trade_executor): while not trade_executor.finished(): _trade_decision = trade_strategy.generate_trade_decision(_execute_result) _execute_result = yield from trade_executor.collect_data(_trade_decision) + + if return_value is not None: + all_executors = trade_executor.get_all_executors() + + all_reports = { + "{}{}".format(*parse_freq(_executor.time_per_step)): _executor.get_report() + for _executor in all_executors + if _executor.generate_report + } + all_indicators = { + "{}{}".format( + *parse_freq(_executor.time_per_step) + ): _executor.get_trade_indicator().generate_trade_indicators_dataframe() + for _executor in all_executors + } + return_value.update({"report": all_reports, "indicator": all_indicators}) diff --git a/qlib/backtest/exchange.py b/qlib/backtest/exchange.py index 6accb5e05..b80663245 100644 --- a/qlib/backtest/exchange.py +++ b/qlib/backtest/exchange.py @@ -48,14 +48,17 @@ class Exchange: :param trade_unit: trade unit, 100 for China A market :param min_cost: min cost, default 5 :param extra_quote: pandas, dataframe consists of - columns: like ['$vwap', '$close', '$factor', 'limit']. + columns: like ['$vwap', '$close', '$volume', '$factor', 'limit_sell', 'limit_buy']. The limit indicates that the etf is tradable on a specific day. Necessary fields: $close is for calculating the total value at end of each day. Optional fields: + $volume is only necessary when we limit the trade amount or caculate PA(vwap) indicator $vwap is only necessary when we use the $vwap price as the deal price $factor is for rounding to the trading unit - limit will be set to False by default(False indicates we can buy this + limit_sell will be set to False by default(False indicates we can sell this + target on this day). + limit_buy will be set to False by default(False indicates we can buy this target on this day). index: MultipleIndex(instrument, pd.Datetime) """ diff --git a/qlib/backtest/executor.py b/qlib/backtest/executor.py index d68ff3ab1..c216a461c 100644 --- a/qlib/backtest/executor.py +++ b/qlib/backtest/executor.py @@ -20,7 +20,7 @@ class BaseExecutor: time_per_step: str, start_time: Union[str, pd.Timestamp] = None, end_time: Union[str, pd.Timestamp] = None, - show_indicator: bool = False, + indicator_config: dict = {}, generate_report: bool = False, verbose: bool = False, track_data: bool = False, @@ -33,7 +33,40 @@ class BaseExecutor: time_per_step : str trade time per trading step, used for genreate the trade calendar show_indicator: bool, optional - whether to show indicators, such as FFR/PA/POS, .etc + whether to show indicators, : + - 'pa', the price advantage + - 'pos', the positive rate + - 'ffr', the fulfill rate + indicator_config: dict, optional + config for calculating trade indicator, including the following fields: + - 'show_indicator': whether to show indicators, optional, default by False. The indicators includes + - 'pa', the price advantage + - 'pos', the positive rate + - 'ffr', the fulfill rate + - 'pa_config': config for calculating price advantage(pa), optional + - 'base_price': the based price than which the trading price is advanced, Optional, default by 'twap' + - If 'base_price' is 'twap', the based price is the time weighted average price + - If 'base_price' is 'vwap', the based price is the volume weighted average price + - 'weight_method': weighted method when calculating total trading pa by different orders' pa in each step, optional, default by 'mean' + - If 'weight_method' is 'mean', calculating mean value of different orders' pa + - If 'weight_method' is 'amount_weighted', calculating amount weighted average value of different orders' pa + - If 'weight_method' is 'value_weighted', calculating value weighted average value of different orders' pa + - 'ffr_config': config for calculating fulfill rate(ffr), optional + - 'weight_method': weighted method when calculating total trading ffr by different orders' ffr in each step, optional, default by 'mean' + - If 'weight_method' is 'mean', calculating mean value of different orders' ffr + - If 'weight_method' is 'amount_weighted', calculating amount weighted average value of different orders' ffr + - If 'weight_method' is 'value_weighted', calculating value weighted average value of different orders' ffr + Example: + { + 'show_indicator': True, + 'pa_config': { + 'base_value': 'twap', + 'weight_method': 'value_weighted', + }, + 'ffr_config':{ + 'weight_method': 'value_weighted', + } + } generate_report : bool, optional whether to generate report, by default False verbose : bool, optional @@ -51,7 +84,7 @@ class BaseExecutor: """ self.time_per_step = time_per_step - self.show_indicator = show_indicator + self.indicator_config = indicator_config self.generate_report = generate_report self.verbose = verbose self.track_data = track_data @@ -132,18 +165,20 @@ class BaseExecutor: yield trade_decision return self.execute(trade_decision) - def get_trade_account(self): - raise NotImplementedError("get_trade_account is not implemented!") - def get_report(self): - raise NotImplementedError("get_report is not implemented!") + if self.generate_report: + _report = self.trade_account.report.generate_report_dataframe() + _positions = self.trade_account.get_positions() + return _report, _positions + else: + raise ValueError("generate_report should be True if you want to generate report") def get_all_executors(self): """Return all executors""" return [self] def get_trade_indicator(self): - return self.trade_account.indicator.trade_indicator + return self.trade_account.indicator class NestedExecutor(BaseExecutor): @@ -159,7 +194,7 @@ class NestedExecutor(BaseExecutor): inner_strategy: Union[BaseStrategy, dict], start_time: Union[str, pd.Timestamp] = None, end_time: Union[str, pd.Timestamp] = None, - show_indicator: bool = False, + indicator_config: dict = {}, generate_report: bool = False, verbose: bool = False, track_data: bool = False, @@ -190,7 +225,7 @@ class NestedExecutor(BaseExecutor): time_per_step=time_per_step, start_time=start_time, end_time=end_time, - show_indicator=show_indicator, + indicator_config=indicator_config, generate_report=generate_report, verbose=verbose, track_data=track_data, @@ -198,7 +233,7 @@ class NestedExecutor(BaseExecutor): **kwargs, ) - if generate_report and trade_exchange is not None: + if trade_exchange is not None: self.trade_exchange = trade_exchange def reset_common_infra(self, common_infra): @@ -209,7 +244,7 @@ class NestedExecutor(BaseExecutor): """ super(NestedExecutor, self).reset_common_infra(common_infra) - if self.generate_report and common_infra.has("trade_exchange"): + if common_infra.has("trade_exchange"): self.trade_exchange = common_infra.get("trade_exchange") self.inner_executor.reset_common_infra(common_infra) @@ -222,66 +257,43 @@ class NestedExecutor(BaseExecutor): sub_level_infra = self.inner_executor.get_level_infra() self.inner_strategy.reset(level_infra=sub_level_infra, outer_trade_decision=trade_decision) - def _update_trade_account(self, inner_indicators): - trade_step = self.trade_calendar.get_trade_step() - trade_start_time, trade_end_time = self.trade_calendar.get_step_time(trade_step) - self.trade_account.update_bar_count() - if self.generate_report: - self.trade_account.update_bar_report( - trade_start_time=trade_start_time, - trade_end_time=trade_end_time, - trade_exchange=self.trade_exchange, - ) - - self.trade_account.indicator.clear() - self.trade_account.indicator.agg_report_info(inner_indicators=inner_indicators) - self.trade_account.indicator.agg_FFR() - self.trade_account.indicator.agg_PA(inner_indicators=inner_indicators) - - if self.show_indicator: - FFR_value = self.trade_account.indicator.get_statistics_FFR(method="value_weighted") - PA_value = self.trade_account.indicator.get_statistics_PA(method="value_weighted") - POS_values = self.trade_account.indicator.get_statistics_POS() - print( - "[Indicator({}) {:%Y-%m-%d}]: FFR: {}, PA: {}, POS: {}".format( - self.time_per_step, trade_start_time, FFR_value, PA_value, POS_values - ) - ) - def execute(self, trade_decision): - for _data in self.collect_data(trade_decision): + return_value = {} + for _decison in self.collect_data(trade_decision, return_value): pass - return self._execute_result + return return_value.get("execute_result") - def collect_data(self, trade_decision): + def collect_data(self, trade_decision, return_value=None): if self.track_data: yield trade_decision self._init_sub_trading(trade_decision) execute_result = [] - inner_indicators = [] + inner_order_indicators = [] _inner_execute_result = None while not self.inner_executor.finished(): _inner_trade_decision = self.inner_strategy.generate_trade_decision(_inner_execute_result) _inner_execute_result = yield from self.inner_executor.collect_data(trade_decision=_inner_trade_decision) execute_result.extend(_inner_execute_result) - inner_indicators.append(self.inner_executor.get_trade_indicator()) + inner_order_indicators.append(self.inner_executor.get_trade_indicator().get_order_indicator) if hasattr(self, "trade_account"): - self._update_trade_account(inner_indicators=inner_indicators) + trade_step = self.trade_calendar.get_trade_step() + trade_start_time, trade_end_time = self.trade_calendar.get_step_time(trade_step) + self.trade_account.update_bar_end( + trade_start_time, + trade_end_time, + self.trade_exchange, + atomic=False, + generate_report=self.generate_report, + inner_order_indicators=inner_order_indicators, + indicator_config=self.indicator_config, + ) self.trade_calendar.step() - self._execute_result = execute_result + if return_value is not None: + return_value.update({"execute_result": execute_result}) return execute_result - def get_report(self): - sub_env_report_dict = self.inner_executor.get_report() - if self.generate_report: - _report = self.trade_account.report.generate_report_dataframe() - _positions = self.trade_account.get_positions() - _count, _freq = parse_freq(self.time_per_step) - sub_env_report_dict.update({f"{_count}{_freq}": (_report, _positions)}) - return sub_env_report_dict - def get_all_executors(self): """Return all executors, including self and inner_executor.get_all_executors()""" return [self, *self.inner_executor.get_all_executors()] @@ -295,7 +307,7 @@ class SimulatorExecutor(BaseExecutor): time_per_step: str, start_time: Union[str, pd.Timestamp] = None, end_time: Union[str, pd.Timestamp] = None, - show_indicator: bool = False, + indicator_config: dict = {}, generate_report: bool = False, verbose: bool = False, track_data: bool = False, @@ -314,7 +326,7 @@ class SimulatorExecutor(BaseExecutor): time_per_step=time_per_step, start_time=start_time, end_time=end_time, - show_indicator=show_indicator, + indicator_config=indicator_config, generate_report=generate_report, verbose=verbose, track_data=track_data, @@ -377,41 +389,14 @@ class SimulatorExecutor(BaseExecutor): # do nothing pass - self.trade_account.update_bar_count() - - if self.generate_report: - self.trade_account.update_bar_report( - trade_start_time=trade_start_time, - trade_end_time=trade_end_time, - trade_exchange=self.trade_exchange, - ) - - self.trade_account.indicator.clear() - self.trade_account.indicator.update_trade_info(trade_info=execute_result) - self.trade_account.indicator.update_FFR() - self.trade_account.indicator.update_PA( - freq=self.time_per_step, trade_start_time=trade_start_time, trade_end_time=trade_end_time + self.trade_account.update_bar_end( + trade_start_time, + trade_end_time, + self.trade_exchange, + atomic=True, + generate_report=self.generate_report, + trade_info=execute_result, + indicator_config=self.indicator_config, ) - self.trade_account.indicator.record(trade_start_time=trade_start_time) - - if self.show_indicator: - FFR_value = self.trade_account.indicator.get_statistics_FFR(method="value_weighted") - PA_value = self.trade_account.indicator.get_statistics_PA(method="value_weighted") - POS_values = self.trade_account.indicator.get_statistics_POS() - print( - "[Indicator({}) {:%Y-%m-%d %H:%M:%S}]: FFR: {}, PA: {}, POS: {}".format( - self.time_per_step, trade_start_time, FFR_value, PA_value, POS_values - ) - ) - self.trade_calendar.step() return execute_result - - def get_report(self): - if self.generate_report: - _report = self.trade_account.report.generate_report_dataframe() - _positions = self.trade_account.get_positions() - _count, _freq = parse_freq(self.time_per_step) - return {f"{_count}{_freq}": (_report, _positions)} - else: - return {} diff --git a/qlib/backtest/report.py b/qlib/backtest/report.py index d12595db5..5052a1e88 100644 --- a/qlib/backtest/report.py +++ b/qlib/backtest/report.py @@ -52,11 +52,13 @@ class Report: self.init_bench(freq=freq, benchmark_config=benchmark_config) def init_vars(self): - self.accounts = OrderedDict() # account postion value for each trade date - self.returns = OrderedDict() # daily return rate for each trade date - self.turnovers = OrderedDict() # turnover for each trade date - self.costs = OrderedDict() # trade cost for each trade date - self.values = OrderedDict() # value for each trade date + self.accounts = OrderedDict() # account postion value for each trade time + self.returns = OrderedDict() # daily return rate for each trade time + self.total_turnovers = OrderedDict() # total turnover for each trade time + self.turnovers = OrderedDict() # turnover for each trade time + self.total_costs = OrderedDict() # total trade cost for each trade time + self.costs = OrderedDict() # trade cost rate for each trade time + self.values = OrderedDict() # value for each trade time self.cashes = OrderedDict() self.benches = OrderedDict() self.latest_report_time = None # pd.TimeStamp @@ -87,10 +89,10 @@ class Report: def _sample_benchmark(self, bench, trade_start_time, trade_end_time): def cal_change(x): - return (x + 1).prod() - 1 + return (x + 1).prod() _ret = resam_ts_data(bench, trade_start_time, trade_end_time, method=cal_change) - return 0.0 if _ret is None else _ret + return 0.0 if _ret is None else _ret - 1 def is_empty(self): return len(self.accounts) == 0 @@ -101,6 +103,12 @@ class Report: def get_latest_account_value(self): return self.accounts[self.latest_report_time] + def get_latest_total_cost(self): + return self.total_costs[self.latest_report_time] + + def get_latest_total_turnover(self): + return self.total_turnovers[self.latest_report_time] + def update_report_record( self, trade_start_time=None, @@ -108,7 +116,9 @@ class Report: account_value=None, cash=None, return_rate=None, + total_turnover=None, turnover_rate=None, + total_cost=None, cost_rate=None, stock_value=None, bench_value=None, @@ -119,12 +129,14 @@ class Report: account_value, cash, return_rate, + total_turnover, turnover_rate, + total_cost, cost_rate, stock_value, ]: raise ValueError( - "None in [trade_start_time, account_value, cash, return_rate, turnover_rate, cost_rate, stock_value]" + "None in [trade_start_time, account_value, cash, return_rate, total_turnover, turnover_rate, total_cost, cost_rate, stock_value]" ) if trade_end_time is None and bench_value is None: @@ -135,20 +147,24 @@ class Report: # update report data self.accounts[trade_start_time] = account_value self.returns[trade_start_time] = return_rate + self.total_turnovers[trade_start_time] = total_turnover self.turnovers[trade_start_time] = turnover_rate + self.total_costs[trade_start_time] = total_cost self.costs[trade_start_time] = cost_rate self.values[trade_start_time] = stock_value self.cashes[trade_start_time] = cash self.benches[trade_start_time] = bench_value # update latest_report_date self.latest_report_time = trade_start_time - # finish daily report update + # finish report update in each step def generate_report_dataframe(self): report = pd.DataFrame() report["account"] = pd.Series(self.accounts) report["return"] = pd.Series(self.returns) + report["total_turnover"] = pd.Series(self.total_turnovers) report["turnover"] = pd.Series(self.turnovers) + report["total_cost"] = pd.Series(self.total_costs) report["cost"] = pd.Series(self.costs) report["value"] = pd.Series(self.values) report["cash"] = pd.Series(self.cashes) @@ -163,7 +179,7 @@ class Report: def load_report(self, path): """load report from a file should have format like - columns = ['account', 'return', 'turnover', 'cost', 'value', 'cash', 'bench'] + columns = ['account', 'return', 'total_turnover', 'turnover', 'cost', 'total_cost', 'value', 'cash', 'bench'] :param path: str/ pathlib.Path() """ @@ -179,7 +195,9 @@ class Report: account_value=r.loc[trade_start_time]["account"], cash=r.loc[trade_start_time]["cash"], return_rate=r.loc[trade_start_time]["return"], + total_turnover=r.loc[trade_start_time]["total_turnover"], turnover_rate=r.loc[trade_start_time]["turnover"], + total_cost=r.loc[trade_start_time]["total_cost"], cost_rate=r.loc[trade_start_time]["cost"], stock_value=r.loc[trade_start_time]["value"], bench_value=r.loc[trade_start_time]["bench"], @@ -188,147 +206,184 @@ class Report: class Indicator: def __init__(self): - self.indicator_his = dict() - self.trade_indicator = dict() - - def __getitem__(self, key): - return self.trade_indicator[key] - - def __setitem__(self, key, value): - self.trade_indicator[key] = value - - def __contains__(self, key): - return key in self.trade_indicator + self.order_indicator_his = OrderedDict() + self.order_indicator = OrderedDict() + self.trade_indicator_his = OrderedDict() + self.trade_indicator = OrderedDict() def clear(self): - self.trade_indicator = dict() + self.order_indicator = OrderedDict() + self.trade_indicator = OrderedDict() def record(self, trade_start_time): - self.indicator_his[trade_start_time] = pd.DataFrame(self.trade_indicator) + self.order_indicator_his[trade_start_time] = self.order_indicator + self.trade_indicator_his[trade_start_time] = self.trade_indicator - def update_trade_info(self, trade_info: list): + def _update_order_trade_info(self, trade_info: list): amount = dict() deal_amount = dict() trade_price = dict() + trade_value = dict() trade_cost = dict() for order, _trade_val, _trade_cost, _trade_price in trade_info: amount[order.stock_id] = order.amount * (order.direction * 2 - 1) deal_amount[order.stock_id] = order.deal_amount * (order.direction * 2 - 1) trade_price[order.stock_id] = _trade_price + trade_value[order.stock_id] = _trade_val * (order.direction * 2 - 1) trade_cost[order.stock_id] = _trade_cost - self["amount"] = pd.Series(amount) - self["deal_amount"] = pd.Series(deal_amount) - self["trade_price"] = pd.Series(trade_price) - self["trade_cost"] = pd.Series(trade_cost) + self.order_indicator["amount"] = pd.Series(amount) + self.order_indicator["deal_amount"] = pd.Series(deal_amount) + self.order_indicator["trade_price"] = pd.Series(trade_price) + self.order_indicator["trade_value"] = pd.Series(trade_value) + self.order_indicator["trade_cost"] = pd.Series(trade_cost) - def update_FFR(self): - self["fulfill_rate"] = self["deal_amount"] / self["amount"] + def _update_order_fulfill_rate(self): + self.order_indicator["ffr"] = self.order_indicator["deal_amount"] / self.order_indicator["amount"] - def update_PA(self, freq, trade_start_time, trade_end_time, base_price="twap"): - base_price = base_price.lower() + def _update_order_price_advantage(self, trade_exchange, trade_start_time, trade_end_time): + self.order_indicator["base_price"] = self.order_indicator["trade_price"] + instruments = list(self.order_indicator["base_price"].index) + self.order_indicator["volume"] = pd.Series( + [ + trade_exchange.get_volume(stock_id=inst, start_time=trade_start_time, end_time=trade_end_time) + for inst in instruments + ], + index=instruments, + ) + self.order_indicator["pa"] = ( + self.order_indicator["trade_price"] - self.order_indicator["base_price"] + ) / self.order_indicator["base_price"] - instruments = list(self["amount"].index) - if base_price == "twap": - # too slow - # price_info, _ = get_higher_freq_feature(instruments, fields=["$close"], start_time=trade_start_time, end_time=trade_end_time, freq=freq) - # price_info = price_info.astype(float) - - # self["base_price"] = price_info["$close"].groupby(level="instrument").mean() - self["base_price"] = self["trade_price"] - - elif base_price == "vwap": - # too slow - price_info, _ = get_higher_freq_feature( - instruments, - fields=["$close", "$volume"], - start_time=trade_start_time, - end_time=trade_end_time, - freq=freq, - ) - price_info = price_info.astype(float) - self["base_price"] = price_info.groupby(level="instrument").apply( - lambda x: (x["$close"] * x["$volume"]).sum() / x["$volume"].sum() - ) - self["volume"] = price_info["$volume"].groupby(level="instrument").sum() - else: - raise ValueError(f"base_price {base_price} is not supported!") - - self["pa"] = (self["trade_price"] - self["base_price"]) / self["base_price"] - - def agg_report_info(self, inner_indicators): + def _agg_order_trade_info(self, inner_order_indicators): amount = pd.Series() deal_amount = pd.Series() trade_price = pd.Series() + trade_value = pd.Series() trade_cost = pd.Series() - for inner_indicator in inner_indicators: - amount = amount.add(inner_indicator["amount"], fill_value=0) - deal_amount = deal_amount.add(inner_indicator["deal_amount"], fill_value=0) - trade_price = trade_price.add(inner_indicator["trade_price"] * inner_indicator["deal_amount"], fill_value=0) - trade_cost = trade_cost.add(inner_indicator["trade_cost"], fill_value=0) + for _order_indicator in inner_order_indicators: + amount = amount.add(_order_indicator["amount"], fill_value=0) + deal_amount = deal_amount.add(_order_indicator["deal_amount"], fill_value=0) + trade_price = trade_price.add( + _order_indicator["trade_price"] * _order_indicator["deal_amount"], fill_value=0 + ) + trade_value = trade_value.add(_order_indicator["trade_value"], fill_value=0) + trade_cost = trade_cost.add(_order_indicator["trade_cost"], fill_value=0) - self["amount"] = amount - self["deal_amount"] = deal_amount - trade_price /= self["deal_amount"] - self["trade_price"] = trade_price - self["trade_cost"] = trade_cost + self.order_indicator["amount"] = amount + self.order_indicator["deal_amount"] = deal_amount + trade_price /= self.order_indicator["deal_amount"] + self.order_indicator["trade_price"] = trade_price + self.order_indicator["trade_value"] = trade_value + self.order_indicator["trade_cost"] = trade_cost - def agg_FFR(self): - self["fulfill_rate"] = self["deal_amount"] / self["amount"] + def _agg_order_fulfill_rate(self): + self.order_indicator["ffr"] = self.order_indicator["deal_amount"] / self.order_indicator["amount"] - def agg_PA(self, inner_indicators, base_price="twap"): + def _agg_order_price_advantage(self, inner_order_indicators, base_price="twap"): base_price = base_price.lower() + volume = pd.Series() + for _order_indicator in inner_order_indicators: + volume = volume.add(_order_indicator["volume"], fill_value=0) + self.order_indicator["volume"] = volume if base_price == "twap": base_price = pd.Series() price_count = pd.Series() - for inner_indicator in inner_indicators: - base_price = base_price.add(inner_indicator["base_price"], fill_value=0) - price_count = price_count.add(pd.Series(1, index=inner_indicator["base_price"].index), fill_value=0) + for _order_indicator in inner_order_indicators: + base_price = base_price.add(_order_indicator["base_price"], fill_value=0) + price_count = price_count.add(pd.Series(1, index=_order_indicator["base_price"].index), fill_value=0) base_price /= price_count - self["base_price"] = base_price + self.order_indicator["base_price"] = base_price elif base_price == "vwap": base_price = pd.Series() - volume = pd.Series() - for inner_indicator in inner_indicators: - base_price = base_price.add(inner_indicator["base_price"] * inner_indicator["volume"], fill_value=0) - volume = volume.add(inner_indicator["volume"], fill_value=0) - base_price /= volume - self["base_price"] = base_price - self["volume"] = volume + for _order_indicator in inner_order_indicators: + base_price = base_price.add(_order_indicator["base_price"] * _order_indicator["volume"], fill_value=0) + base_price /= self.order_indicator["volume"] + self.order_indicator["base_price"] = base_price + else: raise ValueError(f"base_price {base_price} is not supported!") - self["pa"] = (self["trade_price"] - self["base_price"]) / self["base_price"] + self.order_indicator["pa"] = self.order_indicator["trade_price"] / self.order_indicator["base_price"] - 1 - def get_statistics_FFR(self, method="mean"): + def _cal_trade_fulfill_rate(self, method="mean"): if method == "mean": - return self["fulfill_rate"].mean() + return self.order_indicator["ffr"].mean() elif method == "amount_weighted": - weights = self["deal_amount"].abs() - return (self["fulfill_rate"] * weights).sum() / weights.sum() + weights = self.order_indicator["deal_amount"].abs() + return (self.order_indicator["ffr"] * weights).sum() / weights.sum() elif method == "value_weighted": - weights = (self["deal_amount"] * self["trade_price"]).abs() - return (self["fulfill_rate"] * weights).sum() / weights.sum() + weights = self.order_indicator["trade_value"].abs() + return (self.order_indicator["ffr"] * weights).sum() / weights.sum() else: raise ValueError(f"method {method} is not supported!") - def get_statistics_PA(self, method="mean"): - pa_order = self["pa"] * (self["amount"] < 0).astype(int) + def _cal_trade_price_advantage(self, method="mean"): + pa_order = self.order_indicator["pa"] * (self.order_indicator["amount"] < 0).astype(int) if method == "mean": return pa_order.mean() elif method == "amount_weighted": - weights = self["deal_amount"].abs() + weights = self.order_indicator["deal_amount"].abs() return (pa_order * weights).sum() / weights.sum() elif method == "value_weighted": - weights = (self["deal_amount"] * self["trade_price"]).abs() + weights = self.order_indicator["trade_value"].abs() return (pa_order * weights).sum() / weights.sum() else: raise ValueError(f"method {method} is not supported!") - def get_statistics_POS(self): - pa_order = self["pa"] * (self["amount"] < 0).astype(int) - return (pa_order > 1e-8).astype(int).sum() / len(pa_order) + def _cal_trade_positive_rate(self): + pa_order = self.order_indicator["pa"] * (self.order_indicator["amount"] < 0).astype(int) + return (pa_order > 0).astype(int).sum() / len(pa_order) + + def _cal_trade_amount(self): + return self.order_indicator["deal_amount"].abs().sum() + + def _cal_trade_value(self): + return self.order_indicator["trade_value"].abs().sum() + + def update_order_indicators(self, trade_start_time, trade_end_time, trade_info, trade_exchange): + self._update_order_trade_info(trade_info=trade_info) + self._update_order_fulfill_rate() + self._update_order_price_advantage(trade_exchange, trade_start_time, trade_end_time) + + def agg_order_indicators(self, inner_order_indicators, indicator_config={}): + self._agg_order_trade_info(inner_order_indicators) + self._agg_order_fulfill_rate() + pa_config = indicator_config.get("pa_config", {}) + self._agg_order_price_advantage(inner_order_indicators, base_price=pa_config.get("base_price", "twap")) + + def cal_trade_indicators(self, trade_start_time, freq, indicator_config={}): + show_indicator = indicator_config.get("show_indicator", False) + ffr_config = indicator_config.get("ffr_config", {}) + pa_config = indicator_config.get("pa_config", {}) + fulfill_rate = self._cal_trade_fulfill_rate(method=ffr_config.get("weight_method", "mean")) + price_advantage = self._cal_trade_price_advantage(method=pa_config.get("weight_method", "mean")) + positive_rate = self._cal_trade_positive_rate() + trade_amount = self._cal_trade_amount() + trade_value = self._cal_trade_value() + self.trade_indicator["ffr"] = fulfill_rate + self.trade_indicator["pa"] = price_advantage + self.trade_indicator["pos"] = positive_rate + self.trade_indicator["amount"] = trade_amount + self.trade_indicator["value"] = trade_value + if show_indicator: + print( + "[Indicator({}) {:%Y-%m-%d %H:%M:%S}]: FFR: {}, PA: {}, POS: {}".format( + freq, trade_start_time, fulfill_rate, price_advantage, positive_rate + ) + ) + + @property + def get_order_indicator(self): + return self.order_indicator + + @property + def get_trade_indicator(self): + return self.trade_indicator + + def generate_trade_indicators_dataframe(self): + return pd.DataFrame.from_dict(self.trade_indicator_his, orient="index") diff --git a/qlib/contrib/evaluate.py b/qlib/contrib/evaluate.py index 0ef8f95a5..a048ead30 100644 --- a/qlib/contrib/evaluate.py +++ b/qlib/contrib/evaluate.py @@ -11,7 +11,7 @@ import warnings from ..log import get_module_logger from ..backtest import get_exchange, backtest as backtest_func from ..utils import get_date_range -from ..utils.resam import parse_freq +from ..utils.resam import parse_freq, NORM_FREQ_MONTH, NORM_FREQ_WEEK, NORM_FREQ_DAY, NORM_FREQ_MINUTE from ..data import D from ..config import C @@ -37,12 +37,12 @@ def risk_analysis(r, N: int = None, freq: str = "day"): def cal_risk_analysis_scaler(freq): _count, _freq = parse_freq(freq) _freq_scaler = { - "minute": 240 * 252, - "day": 252, - "week": 50, - "month": 12, + NORM_FREQ_MINUTE: 240 * 252, + NORM_FREQ_DAY: 252, + NORM_FREQ_WEEK: 50, + NORM_FREQ_MONTH: 12, } - return _count * _freq_scaler[_freq] + return _freq_scaler[_freq] / _count if N is None and freq is None: raise ValueError("at least one of `N` and `freq` should exist") @@ -63,7 +63,51 @@ def risk_analysis(r, N: int = None, freq: str = "day"): "information_ratio": information_ratio, "max_drawdown": max_drawdown, } - res = pd.Series(data, index=data.keys()).to_frame("risk") + res = pd.Series(data).to_frame("risk") + return res + + +def indicator_analysis(df, method="mean"): + """analyze statistical time-series indicators of trading + + Parameters + ---------- + df : pandas.DataFrame + columns: like ['pa', 'pos', 'ffr', 'amount', 'value']. + Necessary fields: + - 'pa' is the price advantage in trade indicators + - 'pos' is the positive rate in trade indicators + - 'ffr' is the fulfill rate in trade indicators + Optional fields: + - 'amount' is the total deal amount, only necessary when method is 'amount_weighted' + - 'value' is the total trade value, only necessary when method is 'value_weighted' + + index: Index(datetime) + method : str, optional + statistics method, by default "mean" + - if method is 'mean', count the mean statistical value of each trade indicator + - if method is 'amount_weighted', count the amount weighted mean statistical value of each trade indicator + - if method is 'value_weighted', count the value weighted mean statistical value of each trade indicator + + Returns + ------- + pd.DataFrame + statistical value of each trade indicator + """ + indicators_df = df[["pa", "pos", "ffr"]] + + if method == "mean": + res = indicators_df.mean() + elif method == "amount_weighted": + weights = df["amount"].abs() + res = indicators_df.mul(weights, axis=0).sum() / weights.sum() + elif method == "value_weighted": + weights = df["value"].abs() + res = indicators_df.mul(weights, axis=0).sum() / weights.sum() + else: + raise ValueError(f"indicator_analysis method {method} is not supported!") + + res = res.to_frame("value") return res diff --git a/qlib/workflow/record_temp.py b/qlib/workflow/record_temp.py index 9516d363a..4ecd5ccdf 100644 --- a/qlib/workflow/record_temp.py +++ b/qlib/workflow/record_temp.py @@ -7,7 +7,7 @@ import warnings import pandas as pd from pathlib import Path from pprint import pprint -from ..contrib.evaluate import risk_analysis +from ..contrib.evaluate import indicator_analysis, risk_analysis, indicator_analysis from ..data.dataset import DatasetH from ..data.dataset.handler import DataHandlerLP @@ -294,7 +294,9 @@ class PortAnaRecord(RecordTemp): artifact_path = "portfolio_analysis" - def __init__(self, recorder, config, risk_analysis_freq, **kwargs): + def __init__( + self, recorder, config, risk_analysis_freq, indicator_analysis_freq, indicator_analysis_method=None, **kwargs + ): """ config["strategy"] : dict define the strategy class as well as the kwargs. @@ -304,6 +306,10 @@ class PortAnaRecord(RecordTemp): define the backtest kwargs. risk_analysis_freq : str|List[str] risk analysis freq of report + indicator_analysis_freq : str|List[str] + indicator analysis freq of report + indicator_analysis_method : str, optional, default by None + the candidated values include 'mean', 'amount_weighted', 'value_weighted' """ super().__init__(recorder=recorder, **kwargs) @@ -312,10 +318,17 @@ class PortAnaRecord(RecordTemp): self.backtest_config = config["backtest"] if isinstance(risk_analysis_freq, str): risk_analysis_freq = [risk_analysis_freq] + if isinstance(indicator_analysis_freq, str): + indicator_analysis_freq = [indicator_analysis_freq] + self.risk_analysis_freq = [ "{0}{1}".format(*parse_freq(_analysis_freq)) for _analysis_freq in risk_analysis_freq ] - self.report_freq = self._get_report_freq(self.executor_config) + self.indicator_analysis_freq = [ + "{0}{1}".format(*parse_freq(_analysis_freq)) for _analysis_freq in indicator_analysis_freq + ] + self.indicator_analysis_method = indicator_analysis_method + self.all_freq = self._get_report_freq(self.executor_config) def _get_report_freq(self, executor_config): ret_freq = [] @@ -328,21 +341,26 @@ class PortAnaRecord(RecordTemp): def generate(self, **kwargs): # custom strategy and get backtest - report_dict = normal_backtest( + report_dict, indicator_dict = normal_backtest( executor=self.executor_config, strategy=self.strategy_config, **self.backtest_config ) - for report_freq, (report_normal, positions_normal) in report_dict.items(): + for _freq, (report_normal, positions_normal) in report_dict.items(): self.recorder.save_objects( - **{f"report_normal_{report_freq}.pkl": report_normal}, artifact_path=PortAnaRecord.get_path() + **{f"report_normal_{_freq}.pkl": report_normal}, artifact_path=PortAnaRecord.get_path() ) self.recorder.save_objects( - **{f"positions_normal_{report_freq}.pkl": positions_normal}, artifact_path=PortAnaRecord.get_path() + **{f"positions_normal_{_freq}.pkl": positions_normal}, artifact_path=PortAnaRecord.get_path() + ) + + for _freq, indicators_normal in indicator_dict.items(): + self.recorder.save_objects( + **{f"indicators_normal_{_freq}.pkl": indicators_normal}, artifact_path=PortAnaRecord.get_path() ) for _analysis_freq in self.risk_analysis_freq: if _analysis_freq not in report_dict: warnings.warn( - f"the freq {_analysis_freq} report is not found, please set the corresponding env with `generate_report==True`" + f"the freq {_analysis_freq} report is not found, please set the corresponding env with `generate_report=True`" ) else: report_normal, _ = report_dict.get(_analysis_freq) @@ -353,25 +371,46 @@ class PortAnaRecord(RecordTemp): analysis["excess_return_with_cost"] = risk_analysis( report_normal["return"] - report_normal["bench"] - report_normal["cost"], freq=_analysis_freq ) + analysis_df = pd.concat(analysis) # type: pd.DataFrame # log metrics - self.recorder.log_metrics(**flatten_dict(analysis_df["risk"].unstack().T.to_dict())) + analysis_dict = flatten_dict(analysis_df["risk"].unstack().T.to_dict()) + self.recorder.log_metrics(**{f"{_analysis_freq}.{k}": v for k, v in analysis_dict.items()}) # save results self.recorder.save_objects( - **{f"port_analysis_{report_freq}.pkl": analysis_df}, artifact_path=PortAnaRecord.get_path() + **{f"port_analysis_{_analysis_freq}.pkl": analysis_df}, artifact_path=PortAnaRecord.get_path() ) logger.info( - f"Portfolio analysis record 'port_analysis_{report_freq}.pkl' has been saved as the artifact of the Experiment {self.recorder.experiment_id}" + f"Portfolio analysis record 'port_analysis_{_analysis_freq}.pkl' has been saved as the artifact of the Experiment {self.recorder.experiment_id}" ) # print out results - pprint("The following are analysis results of the excess return without cost.") + pprint(f"The following are analysis results of benchmark return({_analysis_freq}).") + pprint(risk_analysis(report_normal["bench"], freq=_analysis_freq)) + pprint(f"The following are analysis results of the excess return without cost({_analysis_freq}).") pprint(analysis["excess_return_without_cost"]) - pprint("The following are analysis results of the excess return with cost.") + pprint(f"The following are analysis results of the excess return with cost({_analysis_freq}).") pprint(analysis["excess_return_with_cost"]) + for _analysis_freq in self.indicator_analysis_freq: + indicators_normal = indicator_dict.get(_analysis_freq) + if self.indicator_analysis_method is None: + analysis_df = indicator_analysis(indicators_normal) + else: + analysis_df = indicator_analysis(indicators_normal, method=self.indicator_analysis_method) + + # log metrics + analysis_dict = analysis_df["value"].to_dict() + self.recorder.log_metrics(**{f"{_analysis_freq}.{k}": v for k, v in analysis_dict.items()}) + # save results + self.recorder.save_objects( + **{f"indicator_analysis_{_analysis_freq}.pkl": analysis_df}, artifact_path=PortAnaRecord.get_path() + ) + pprint(f"The following are analysis results of indicators({_analysis_freq}).") + pprint(analysis_df) + def list(self): list_path = [] - for _freq in self.report_freq: + for _freq in self.all_freq: list_path.extend( [ PortAnaRecord.get_path(f"report_normal_{_freq}.pkl"), @@ -380,7 +419,7 @@ class PortAnaRecord(RecordTemp): ) for _analysis_freq in self.risk_analysis_freq: - if _analysis_freq in self.report_freq: + if _analysis_freq in self.all_freq: list_path.append(PortAnaRecord.get_path(f"port_analysis_{_analysis_freq}.pkl")) else: warnings.warn(f"{_analysis_freq} is not found")