mirror of
https://github.com/microsoft/qlib.git
synced 2026-07-01 18:11:18 +08:00
make the logic of online manager cleaner
This commit is contained in:
@@ -546,7 +546,7 @@ class TSDatasetH(DatasetH):
|
||||
dtype = kwargs.pop("dtype", None)
|
||||
start, end = slc.start, slc.stop
|
||||
flt_col = kwargs.pop("flt_col", None)
|
||||
# TSDatasetH will retrieve more data for complete
|
||||
# TSDatasetH will retrieve more data for complete time-series
|
||||
data = self._prepare_raw_seg(slc, **kwargs)
|
||||
|
||||
flt_kwargs = deepcopy(kwargs)
|
||||
|
||||
@@ -21,19 +21,65 @@ Situations Description
|
||||
Online + Trainer When you want to do a REAL routine, the Trainer will help you train the models. It
|
||||
will train models task by task and strategy by strategy.
|
||||
|
||||
Online + DelayTrainer When your models don't have any temporal dependence, the DelayTrainer will train
|
||||
nothing until all tasks have been prepared. It makes user can train all tasks in
|
||||
the end of `routine` or `first_train`.
|
||||
Online + DelayTrainer DelayTrainer will skip concrete training until all tasks have been prepared by
|
||||
different strategies. It makes users can parallelly train all tasks at the end of
|
||||
`routine` or `first_train`. Otherwise, these functions will get stuck when each
|
||||
strategy prepare tasks.
|
||||
|
||||
Simulation + Trainer When your models have some temporal dependence on the previous models, then you
|
||||
need to consider using Trainer. This means it will REAL train your models in
|
||||
every routine and prepare signals for every routine.
|
||||
Simulation + Trainer It will behave in the same way as `Online + Trainer`. The only difference is that it
|
||||
is for simulation/backtesting instead of online trading
|
||||
|
||||
Simulation + DelayTrainer When your models don't have any temporal dependence, you can use DelayTrainer
|
||||
for the ability to multitasking. It means all tasks in all routines
|
||||
can be REAL trained at the end of simulating. The signals will be prepared well at
|
||||
different time segments (based on whether or not any new model is online).
|
||||
========================= ===================================================================================
|
||||
|
||||
Here is some pseudo code the demonstrate the workflow of each situation
|
||||
|
||||
For simplicity
|
||||
- Only one strategy is used in the strategy
|
||||
- `update_online_pred` is only called in the online mode and is ignored
|
||||
|
||||
1) `Online + Trainer`
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
tasks = first_train()
|
||||
models = trainer.train(tasks)
|
||||
trainer.end_train(models)
|
||||
for day in online_trading_days:
|
||||
# OnlineManager.routine
|
||||
models = trainer.train(strategy.prepare_tasks()) # for each strategy
|
||||
strategy.prepare_online_models(models) # for each strategy
|
||||
|
||||
trainer.end_train(models)
|
||||
prepare_signals() # prepare trading signals daily
|
||||
|
||||
|
||||
`Online + DelayTrainer`: the workflow is the same as `Online + Trainer`.
|
||||
|
||||
|
||||
2) `Simulation + DelayTrainer`
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# simulate
|
||||
tasks = first_train()
|
||||
models = trainer.train(tasks)
|
||||
for day in historical_calendars:
|
||||
# OnlineManager.routine
|
||||
models = trainer.train(strategy.prepare_tasks()) # for each strategy
|
||||
strategy.prepare_online_models(models) # for each strategy
|
||||
# delay_prepare()
|
||||
# FIXME: Currently the delay_prepare is not implemented in a proper way.
|
||||
trainer.end_train(<for all previous models>)
|
||||
prepare_signals()
|
||||
|
||||
|
||||
# Can we simplify current workflow?
|
||||
- Can reduce the number of state of tasks?
|
||||
- For each task, we have three phases (i.e. task, partly trained task, final trained task)
|
||||
"""
|
||||
|
||||
import logging
|
||||
@@ -58,7 +104,7 @@ class OnlineManager(Serializable):
|
||||
"""
|
||||
|
||||
STATUS_SIMULATING = "simulating" # when calling `simulate`
|
||||
STATUS_NORMAL = "normal" # the normal status
|
||||
STATUS_ONLINE = "online" # the normal status. It is used when online trading
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -87,12 +133,24 @@ class OnlineManager(Serializable):
|
||||
self.begin_time = pd.Timestamp(begin_time)
|
||||
self.cur_time = self.begin_time
|
||||
# OnlineManager will recorder the history of online models, which is a dict like {pd.Timestamp, {strategy, [online_models]}}.
|
||||
# It records the online servnig models of each strategy for each day.
|
||||
self.history = {}
|
||||
if trainer is None:
|
||||
trainer = TrainerR()
|
||||
self.trainer = trainer
|
||||
self.signals = None
|
||||
self.status = self.STATUS_NORMAL
|
||||
self.status = self.STATUS_ONLINE
|
||||
|
||||
def _postpone_action(self):
|
||||
"""
|
||||
Should the workflow to postpone the following actions to the end (in delay_prepare)
|
||||
- trainer.end_train
|
||||
- prepare_signals
|
||||
|
||||
Postpone these actions is to support simulating/backtest online strategies without time dependencies.
|
||||
All the actions can be done parallelly at the end.
|
||||
"""
|
||||
return self.status == self.STATUS_SIMULATING and self.trainer.is_delay()
|
||||
|
||||
def first_train(self, strategies: List[OnlineStrategy] = None, model_kwargs: dict = {}):
|
||||
"""
|
||||
@@ -113,12 +171,12 @@ class OnlineManager(Serializable):
|
||||
models = self.trainer.train(tasks, experiment_name=strategy.name_id)
|
||||
models_list.append(models)
|
||||
self.logger.info(f"Finished training {len(models)} models.")
|
||||
# FIXME: Traing multiple online models at `first_train` will result in getting too much online models at the
|
||||
# FIXME: Train multiple online models at `first_train` will result in getting too much online models at the
|
||||
# start.
|
||||
online_models = strategy.prepare_online_models(models, **model_kwargs)
|
||||
self.history.setdefault(self.cur_time, {})[strategy] = online_models
|
||||
|
||||
if not self.status == self.STATUS_SIMULATING or not self.trainer.is_delay():
|
||||
if not self._postpone_action():
|
||||
for strategy, models in zip(strategies, models_list):
|
||||
models = self.trainer.end_train(models, experiment_name=strategy.name_id)
|
||||
|
||||
@@ -160,10 +218,10 @@ class OnlineManager(Serializable):
|
||||
|
||||
# The online model may changes in the above processes
|
||||
# So updating the predictions of online models should be the last step
|
||||
if self.status == self.STATUS_NORMAL:
|
||||
if self.status == self.STATUS_ONLINE:
|
||||
strategy.tool.update_online_pred()
|
||||
|
||||
if not self.status == self.STATUS_SIMULATING or not self.trainer.is_delay():
|
||||
if not self._postpone_action():
|
||||
for strategy, models in zip(self.strategies, models_list):
|
||||
models = self.trainer.end_train(models, experiment_name=strategy.name_id)
|
||||
self.prepare_signals(**signal_kwargs)
|
||||
@@ -278,13 +336,13 @@ class OnlineManager(Serializable):
|
||||
signal_kwargs=signal_kwargs,
|
||||
)
|
||||
# delay prepare the models and signals
|
||||
if self.trainer.is_delay():
|
||||
if self._postpone_action():
|
||||
self.delay_prepare(model_kwargs=model_kwargs, signal_kwargs=signal_kwargs)
|
||||
|
||||
# FIXME: get logging level firstly and restore it here
|
||||
set_global_logger_level(logging.DEBUG)
|
||||
self.logger.info(f"Finished preparing signals")
|
||||
self.status = self.STATUS_NORMAL
|
||||
self.status = self.STATUS_ONLINE
|
||||
return self.get_signals()
|
||||
|
||||
def delay_prepare(self, model_kwargs={}, signal_kwargs={}):
|
||||
@@ -295,6 +353,8 @@ class OnlineManager(Serializable):
|
||||
model_kwargs: the params for `end_train`
|
||||
signal_kwargs: the params for `prepare_signals`
|
||||
"""
|
||||
# FIXME:
|
||||
# This method is not implemented in the proper way!!!
|
||||
last_models = {}
|
||||
signals_time = D.calendar()[0]
|
||||
need_prepare = False
|
||||
|
||||
@@ -94,6 +94,11 @@ class TaskGen(metaclass=abc.ABCMeta):
|
||||
def handler_mod(task: dict, rolling_gen):
|
||||
"""
|
||||
Help to modify the handler end time when using RollingGen
|
||||
It try to handle the following case
|
||||
- Hander's data end_time is earlier than dataset's test_data's segments.
|
||||
- To handle this, handler's data's end_time is extended.
|
||||
|
||||
If the handler's end_time is None, then it is not necessary to change it's end time.
|
||||
|
||||
Args:
|
||||
task (dict): a task template
|
||||
|
||||
Reference in New Issue
Block a user