1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-03 02:50:58 +08:00

docs and bug fixed

This commit is contained in:
lzh222333
2021-05-06 04:18:55 +00:00
parent 1c99fb35da
commit 84c56f13bd
17 changed files with 312 additions and 145 deletions

View File

@@ -1,4 +1,4 @@
.. _task_managment:
.. _task_management:
=================================
Task Management
@@ -10,15 +10,17 @@ Introduction
=============
The `Workflow <../component/introduction.html>`_ part introduces how to run research workflow in a loosely-coupled way. But it can only execute one ``task`` when you use ``qrun``.
To automatically generate and execute different tasks, ``Task Management`` provides a whole process including `Task Generating`_, `Task Storing`_, `Task Running`_ and `Task Collecting`_.
To automatically generate and execute different tasks, ``Task Management`` provides a whole process including `Task Generating`_, `Task Storing`_, `Task Training`_ and `Task Collecting`_.
With this module, users can run their ``task`` automatically at different periods, in different losses, or even by different models.
An example of the entire process is shown `here <https://github.com/microsoft/qlib/tree/main/examples/taskmanager/task_manager_rolling.py>`_.
This whole process can be used in `Online Serving <../component/online.html>`_.
An example of the entire process is shown `here <https://github.com/microsoft/qlib/tree/main/examples/model_rolling/task_manager_rolling.py>`_.
Task Generating
===============
A ``task`` consists of `Model`, `Dataset`, `Record` or anything added by users.
The specific task template(/definition/config) can be viewed in
The specific task template can be viewed in
`Task Section <../component/workflow.html#task-section>`_.
Even though the task template is fixed, users can customize their ``TaskGen`` to generate different ``task`` by task template.
@@ -27,15 +29,16 @@ Here is the base class of ``TaskGen``:
.. autoclass:: qlib.workflow.task.gen.TaskGen
:members:
``Qlib`` provider a class `RollingGen <https://github.com/microsoft/qlib/tree/main/qlib/workflow/task/gen.py>`_ to generate a list of ``task`` of the dataset in different date segments.
This class allows users to verify the effect of data from different periods on the model in one experiment.
``Qlib`` provides a class `RollingGen <https://github.com/microsoft/qlib/tree/main/qlib/workflow/task/gen.py>`_ to generate a list of ``task`` of the dataset in different date segments.
This class allows users to verify the effect of data from different periods on the model in one experiment. More information in `here <../reference/api.html#TaskGen>`_.
Task Storing
===============
To achieve higher efficiency and the possibility of cluster operation, ``Task Manager`` will store all tasks in `MongoDB <https://www.mongodb.com/>`_.
``TaskManager`` can fetch undone tasks automatically and manage the lifecycle of a set of tasks with error handling.
Users **MUST** finished the configuration of `MongoDB <https://www.mongodb.com/>`_ when using this module.
Users need to provide the URL and database name of ``task`` storing like this.
Users need to provide the MongoDB URL and database name for using ``TaskManager`` in `initialization <../start/initialization.html#Parameters>`_ or make statement like this.
.. code-block:: python
@@ -45,13 +48,12 @@ Users need to provide the URL and database name of ``task`` storing like this.
"task_db_name" : "rolling_db" # database name
}
The CRUD methods of ``task`` can be found in TaskManager.
More methods can be seen in the `Github <https://github.com/microsoft/qlib/tree/main/qlib/workflow/task/manage.py>`_.
.. autoclass:: qlib.workflow.task.manage.TaskManager
:members:
Task Running
More information of ``Task Manager`` can be found in `here <../reference/api.html#TaskManager>`_.
Task Training
===============
After generating and storing those ``task``, it's time to run the ``task`` which are in the *WAITING* status.
``Qlib`` provides a method called ``run_task`` to run those ``task`` in task pool, however, users can also customize how tasks are executed.
@@ -60,14 +62,24 @@ It will run the whole workflow defined by ``task``, which includes *Model*, *Dat
.. autofunction:: qlib.workflow.task.manage.run_task
Meanwhile, ``Qlib`` provides a module called ``Trainer``.
``Trainer`` will train a list of tasks and return a list of model recorder.
``Qlib`` offer two kind of Trainer, TrainerR is the simplest way and TrainerRM is based on TaskManager to help manager tasks lifecycle automatically.
If you do not want to use ``Task Manager`` to manage tasks, then use TrainerR to train a list of tasks generated by ``TaskGen`` is enough.
More information is in `here <../reference/api.html#Trainer>`_.
Task Collecting
===============
To see the results of ``task`` after running or to update something, ``Qlib`` provides a ``TaskCollector`` to collect the tasks by filter condition (optional).
Here are some methods in this class.
To collect the results of ``task`` after training, ``Qlib`` provides `Collector <../reference/api.html#Collector>`_, `Group <../reference/api.html#Group>`_ and `Ensemble <../reference/api.html#Ensemble>`_ to collect the results in a readable, expandable and loosely-coupled way.
.. autoclass:: qlib.workflow.task.collect.TaskCollector
:members:
`Collector <../reference/api.html#Collector>`_ can collect object from everywhere and process them such as merging, grouping, averaging and so on. It has 2 step action including ``collect`` (collect anything in a dict) and ``process_collect`` (process collected dict).
``Qlib`` provides a concrete `example <https://github.com/microsoft/qlib/tree/main/examples/taskmanager/task_manager_rolling_with_updating.py>`_, including a whole process of `Task Generating`_ (using `RollingGen <https://github.com/microsoft/qlib/tree/main/qlib/workflow/task/gen.py>`_), `Task Storing`_, `Task Running`_ and `Task Collecting`_.
Besides, the `example <https://github.com/microsoft/qlib/tree/main/examples/taskmanager/task_manager_rolling_with_updating.py>`_ uses a ``ModelUpdater`` inherited from ``TaskCollector``, which can update the inferences and retrain the model if it is out of date.
Actually, the model updating can be viewed as a subset of ``Online Serving``.
`Group <../reference/api.html#Group>`_ also has 2 steps including ``group`` (can group a set of object based on `group_func` and change them to a dict) and ``reduce`` (can make a dict become an ensemble based on some rule).
For example: {(A,B,C1): object, (A,B,C2): object} ---``group``---> {(A,B): {C1: object, C2: object}} ---``reduce``---> {(A,B): object}
`Ensemble <../reference/api.html#Ensemble>`_ can merge the objects in an ensemble.
For example: {C1: object, C2: object} ---``Ensemble``---> object
So the hierarchy is ``Collector``'s second step correspond to ``Group``. And ``Group``'s second step correspond to ``Ensemble``.
For more information, please see `Collector <../reference/api.html#Collector>`_, `Group <../reference/api.html#Group>`_ and `Ensemble <../reference/api.html#Ensemble>`_, or the `example <https://github.com/microsoft/qlib/tree/main/examples/model_rolling/task_manager_rolling.py>`_

41
docs/component/online.rst Normal file
View File

@@ -0,0 +1,41 @@
.. _online:
=================================
Online Serving
=================================
.. currentmodule:: qlib
Introduction
=============
In addition to backtesting, one way to test a model is effective is to make predictions in real market conditions or even do real trading based on those predictions.
``Online Serving`` is a set of module for online models using latest data,
which including `Online Manager <#Online Manager>`_, `Online Strategy <#Online Strategy>`_, `Online Tool <#Online Tool>`_, `Updater <#Updater>`_.
`Here <https://github.com/microsoft/qlib/tree/main/examples/online_srv>`_ are several examples for reference, which demonstrate different features of ``Online Serving``.
If you have many models or `task` need to be managed, please consider `Task Management <../advanced/task_management.html>`_.
The `examples <https://github.com/microsoft/qlib/tree/main/examples/online_srv>`_ maybe based on `Task Management <../advanced/task_management.html>`_ such as ``TrainerRM`` or ``Collector``.
Online Manager
=============
.. automodule:: qlib.workflow.online.manager
:members:
Online Strategy
=============
.. automodule:: qlib.workflow.online.strategy
:members:
Online Tool
=============
.. automodule:: qlib.workflow.online.utils
:members:
Updater
=============
.. automodule:: qlib.workflow.online.update
:members:

View File

@@ -42,6 +42,7 @@ Document Structure
Intraday Trading: Model&Strategy Testing <component/backtest.rst>
Qlib Recorder: Experiment Management <component/recorder.rst>
Analysis: Evaluation & Results Analysis <component/report.rst>
Online Serving: Online Management & Strategy & Tool <component/online.rst>
.. toctree::
:maxdepth: 3

View File

@@ -154,36 +154,71 @@ Record Template
.. automodule:: qlib.workflow.record_temp
:members:
Task Management
====================
RollingGen
TaskGen
--------------------
.. autoclass:: qlib.workflow.task.gen.RollingGen
.. automodule:: qlib.workflow.task.gen
:members:
TaskManager
--------------------
.. autoclass:: qlib.workflow.task.manage.TaskManager
.. automodule:: qlib.workflow.task.manage
:members:
TaskCollector
Trainer
--------------------
.. autoclass:: qlib.workflow.task.collect.TaskCollector
.. automodule:: qlib.model.trainer
:members:
ModelUpdater
Collector
--------------------
.. autoclass:: qlib.workflow.task.update.ModelUpdater
.. automodule:: qlib.workflow.task.collect
:members:
TimeAdjuster
Group
--------------------
.. autoclass:: qlib.workflow.task.utils.TimeAdjuster
.. automodule:: qlib.model.ens.group
:members:
Ensemble
--------------------
.. automodule:: qlib.model.ens.ensemble
:members:
Utils
--------------------
.. automodule:: qlib.workflow.task.utils
:members:
Online Serving
====================
Online Manager
--------------------
.. automodule:: qlib.workflow.online.manager
:members:
Online Strategy
--------------------
.. automodule:: qlib.workflow.online.strategy
:members:
Online Tool
--------------------
.. automodule:: qlib.workflow.online.utils
:members:
RecordUpdater
--------------------
.. automodule:: qlib.workflow.online.update
:members:
Utils
====================

View File

@@ -131,6 +131,8 @@ class OnlineSimulationExample:
self.rolling_online_manager.simulate(end_time=self.end_time)
print("========== collect results ==========")
print(self.rolling_online_manager.get_collector()())
print("========== signals ==========")
print(self.rolling_online_manager.get_signals())
print("========== online history ==========")
print(self.rolling_online_manager.get_online_history(self.exp_name))

View File

@@ -86,7 +86,7 @@ class RollingOnlineExample:
task_url="mongodb://10.0.0.4:27017/",
task_db_name="rolling_db",
rolling_step=550,
tasks=[task_xgboost_config, task_lgb_config],
tasks=[task_xgboost_config], # , task_lgb_config],
):
mongo_conf = {
"task_url": task_url, # your MongoDB url
@@ -148,6 +148,8 @@ class RollingOnlineExample:
self.rolling_online_manager.routine()
print("========== collect results ==========")
print(self.collector())
print("========== signals ==========")
print(self.rolling_online_manager.get_signals())
def main(self):
self.first_run()

View File

@@ -27,7 +27,7 @@ class Dataset(Serializable):
- setup data
- The data related attributes' names should start with '_' so that it will not be saved on disk when serializing.
The data could specify the info to caculate the essential data for preparation
The data could specify the info to calculate the essential data for preparation
"""
self.setup_data(**kwargs)
super().__init__()
@@ -92,7 +92,7 @@ class DatasetH(Dataset):
handler : Union[dict, DataHandler]
handler could be:
- insntance of `DataHandler`
- instance of `DataHandler`
- config of `DataHandler`. Please refer to `DataHandler`
@@ -114,7 +114,6 @@ class DatasetH(Dataset):
"""
self.handler: DataHandler = init_instance_by_config(handler, accept_types=DataHandler)
self.segments = segments.copy()
self.fetch_kwargs = {}
super().__init__(**kwargs)
def config(self, handler_kwargs: dict = None, **kwargs):
@@ -124,7 +123,7 @@ class DatasetH(Dataset):
Parameters
----------
handler_kwargs : dict
Config of DataHanlder, which could include the following arguments:
Config of DataHandler, which could include the following arguments:
- arguments of DataHandler.conf_data, such as 'instruments', 'start_time' and 'end_time'.
@@ -148,11 +147,11 @@ class DatasetH(Dataset):
Parameters
----------
handler_kwargs : dict
init arguments of DataHanlder, which could include the following arguments:
init arguments of DataHandler, which could include the following arguments:
- init_type : Init Type of Handler
- enable_cache : wheter to enable cache
- enable_cache : whether to enable cache
"""
super().setup_data(**kwargs)
@@ -172,7 +171,7 @@ class DatasetH(Dataset):
----------
slc : slice
"""
return self.handler.fetch(slc, **kwargs, **self.fetch_kwargs)
return self.handler.fetch(slc, **kwargs)
def prepare(
self,
@@ -232,7 +231,7 @@ class TSDataSampler:
(T)ime-(S)eries DataSampler
This is the result of TSDatasetH
It works like `torch.data.utils.Dataset`, it provides a very convient interface for constructing time-series
It works like `torch.data.utils.Dataset`, it provides a very convenient interface for constructing time-series
dataset based on tabular data.
If user have further requirements for processing data, user could process them based on `TSDataSampler` or create
@@ -289,29 +288,12 @@ class TSDataSampler:
# the data type will be changed
# The index of usable data is between start_idx and end_idx
self.start_idx, self.end_idx = self.data.index.slice_locs(start=pd.Timestamp(start), end=pd.Timestamp(end))
self.idx_df, self.idx_map = self.build_index(self.data)
self.data_index = deepcopy(self.data.index)
if flt_data is not None:
self.flt_data = np.array(flt_data).reshape(-1)
self.idx_map = self.flt_idx_map(self.flt_data, self.idx_map)
self.data_index = self.data_index[np.where(self.flt_data == True)[0]]
self.start_idx, self.end_idx = self.data_index.slice_locs(start=pd.Timestamp(start), end=pd.Timestamp(end))
self.idx_arr = np.array(self.idx_df.values, dtype=np.float64) # for better performance
self.data_idx = deepcopy(self.data.index)
del self.data # save memory
@staticmethod
def flt_idx_map(flt_data, idx_map):
idx = 0
new_idx_map = {}
for i, exist in enumerate(flt_data):
if exist:
new_idx_map[idx] = idx_map[i]
idx += 1
return new_idx_map
def get_index(self):
"""
Get the pandas index of the data, it will be useful in following scenarios
@@ -461,7 +443,7 @@ class TSDatasetH(DatasetH):
(T)ime-(S)eries Dataset (H)andler
Covnert the tabular data to Time-Series data
Convert the tabular data to Time-Series data
Requirements analysis
@@ -505,19 +487,8 @@ class TSDatasetH(DatasetH):
"""
split the _prepare_raw_seg is to leave a hook for data preprocessing before creating processing data
"""
dtype = kwargs.pop("dtype")
dtype = kwargs.pop("dtype", None)
start, end = slc.start, slc.stop
flt_col = kwargs.pop("flt_col", None)
# TSDatasetH will retrieve more data for complete
data = self._prepare_raw_seg(slc, **kwargs)
flt_kwargs = deepcopy(kwargs)
if flt_col is not None:
flt_kwargs["col_set"] = flt_col
flt_data = self._prepare_raw_seg(slc, **flt_kwargs)
assert len(flt_data.columns) == 1
else:
flt_data = None
tsds = TSDataSampler(data=data, start=start, end=end, step_len=self.step_len, dtype=dtype, flt_data=flt_data)
data = self._prepare_raw_seg(slc=slc, **kwargs)
tsds = TSDataSampler(data=data, start=start, end=end, step_len=self.step_len, dtype=dtype)
return tsds

View File

@@ -36,7 +36,7 @@ class DataHandler(Serializable):
The data handler try to maintain a handler with 2 level.
`datetime` & `instruments`.
Any order of the index level can be suported (The order will be implied in the data).
Any order of the index level can be supported (The order will be implied in the data).
The order <`datetime`, `instruments`> will be used when the dataframe index name is missed.
Example of the data:
@@ -77,7 +77,7 @@ class DataHandler(Serializable):
data_loader : Tuple[dict, str, DataLoader]
data loader to load the data.
init_data :
intialize the original data in the constructor.
initialize the original data in the constructor.
fetch_orig : bool
Return the original data instead of copy if possible.
"""
@@ -128,7 +128,7 @@ class DataHandler(Serializable):
def setup_data(self, enable_cache: bool = False):
"""
Set Up the data in case of running intialization for multiple time
Set Up the data in case of running initialization for multiple time
It is responsible for maintaining following variable
1) self._data
@@ -453,7 +453,7 @@ class DataHandlerLP(DataHandler):
def setup_data(self, init_type: str = IT_FIT_SEQ, **kwargs):
"""
Set up the data in case of running intialization for multiple time
Set up the data in case of running initialization for multiple time
Parameters
----------

View File

@@ -5,6 +5,7 @@
Ensemble can merge the objects in an Ensemble. For example, if there are many submodels predictions, we may need to merge them in an ensemble predictions.
"""
from typing import Union
import pandas as pd
@@ -24,6 +25,30 @@ class Ensemble:
raise NotImplementedError(f"Please implement the `__call__` method.")
class SingleKeyEnsemble(Ensemble):
"""
Extract the object if there is only one key and value in dict. Make result more readable.
{Only key: Only value} -> Only value
If there are more than 1 key or less than 1 key, then do nothing.
Even you can run this recursively to make dict more readable.
NOTE: Default run recursively.
"""
def __call__(self, ensemble_dict: Union[dict, object], recursion: bool = True) -> object:
if not isinstance(ensemble_dict, dict):
return ensemble_dict
if recursion:
tmp_dict = {}
for k, v in ensemble_dict.items():
tmp_dict[k] = self(v, recursion)
ensemble_dict = tmp_dict
keys = list(ensemble_dict.keys())
if len(keys) == 1:
ensemble_dict = ensemble_dict[keys[0]]
return ensemble_dict
class RollingEnsemble(Ensemble):
"""Merge the rolling objects in an Ensemble"""
@@ -47,3 +72,24 @@ class RollingEnsemble(Ensemble):
artifact = artifact[~artifact.index.duplicated(keep="last")]
artifact = artifact.sort_index()
return artifact
class AverageEnsemble(Ensemble):
def __call__(self, ensemble_dict: dict):
"""
Average a dict of same shape dataframe like `prediction` or `IC` into an ensemble.
NOTE: The values of dict must be pd.DataFrame, and have the index "datetime"
Args:
ensemble_dict (dict): a dict like {"A": pd.DataFrame, "B": pd.DataFrame}.
The key of the dict will be ignored.
Returns:
pd.DataFrame: the complete result of averaging.
"""
values = list(ensemble_dict.values())
results = pd.concat(values, axis=1)
results = results.mean(axis=1).to_frame("score")
results = results.sort_index()
return results

View File

@@ -3,6 +3,13 @@
"""
Group can group a set of object based on `group_func` and change them to a dict.
After group, we provide a method to reduce them.
For example:
group: {(A,B,C1): object, (A,B,C2): object} -> {(A,B): {C1: object, C2: object}}
reduce: {(A,B): {C1: object, C2: object}} -> {(A,B): object}
"""
from qlib.model.ens.ensemble import Ensemble, RollingEnsemble

View File

@@ -3,12 +3,12 @@
"""
The Trainer will train a list of tasks and return a list of model recorder.
There are two steps in each Trainer including `train`(make model recorder) and `end_train`(modify model recorder).
There are two steps in each Trainer including ``train``(make model recorder) and ``end_train``(modify model recorder).
This is concept called "DelayTrainer", which can be used in online simulating to parallel training.
In "DelayTrainer", the first step is only to save some necessary info to model recorder, and the second step which will be finished in the end can do some concurrent and time-consuming operations such as model fitting.
This is concept called ``DelayTrainer``, which can be used in online simulating to parallel training.
In ``DelayTrainer``, the first step is only to save some necessary info to model recorder, and the second step which will be finished in the end can do some concurrent and time-consuming operations such as model fitting.
`Qlib` offer two kind of Trainer, TrainerR is simplest and TrainerRM is based on TaskManager to help manager tasks lifecycle automatically.
``Qlib`` offer two kind of Trainer, ``TrainerR`` is the simplest way and ``TrainerRM`` is based on TaskManager to help manager tasks lifecycle automatically.
"""
import socket
@@ -36,9 +36,6 @@ def begin_task_train(task_config: dict, experiment_name: str, recorder_name: str
Returns:
Recorder: the model recorder
"""
# FIXME: recorder_id
if recorder_name is None:
recorder_name = str(time.time())
with R.start(experiment_name=experiment_name, recorder_name=recorder_name):
R.log_params(**flatten_dict(task_config))
R.save_objects(**{"task": task_config}) # keep the original format and datatype
@@ -58,7 +55,7 @@ def end_task_train(rec: Recorder, experiment_name: str) -> Recorder:
Returns:
Recorder: the model recorder
"""
with R.start(experiment_name=experiment_name, recorder_name=rec.info["name"], resume=True):
with R.start(experiment_name=experiment_name, recorder_id=rec.info["id"], resume=True):
task_config = R.load_object("task")
# model & dataset initiation
model: Model = init_instance_by_config(task_config["model"])
@@ -314,7 +311,8 @@ class TrainerRM(Trainer):
def reset(self):
"""
NOTE: this method will delete all task in this task_pool!
.. note::
this method will delete all task in this task_pool!
"""
tm = TaskManager(task_pool=self.task_pool)
tm.remove()

View File

@@ -2,11 +2,14 @@
# Licensed under the MIT License.
"""
OnlineManager can manage a set of OnlineStrategy and run them dynamically.
OnlineManager can manage a set of `Online Strategy <#Online Strategy>`_ and run them dynamically.
With the change of time, the decisive models will be also changed. In this module, we call those contributing models as `online` models.
In every routine(such as everyday or every minutes), the `online` models maybe changed and the prediction of them need to be updated.
So this module provide a series methods to control this process.
This module also provide a method to simulate `Online Strategy <#Online Strategy>`_ in the history.
Which means you can verify your strategy or find a better one.
"""
from typing import Dict, List, Union
@@ -14,12 +17,18 @@ from typing import Dict, List, Union
import pandas as pd
from qlib import get_module_logger
from qlib.data.data import D
from qlib.model.ens.ensemble import AverageEnsemble, SingleKeyEnsemble
from qlib.utils.serial import Serializable
from qlib.workflow.online.strategy import OnlineStrategy
from qlib.workflow.task.collect import HyperCollector
class OnlineManager(Serializable):
"""
OnlineManager can manage online models with `Online Strategy <#Online Strategy>`_.
It also provide a history recording which models are onlined at what time.
"""
def __init__(
self,
strategy: Union[OnlineStrategy, List[OnlineStrategy]],
@@ -29,10 +38,11 @@ class OnlineManager(Serializable):
):
"""
Init OnlineManager.
One OnlineManager must have at least one OnlineStrategy.
Args:
strategy (Union[OnlineStrategy, List[OnlineStrategy]]): an instance of OnlineStrategy or a list of OnlineStrategy
begin_time (Union[str,pd.Timestamp], optional): the OnlineManager will begin at this time. Defaults to None.
begin_time (Union[str,pd.Timestamp], optional): the OnlineManager will begin at this time. Defaults to None for using latest date.
freq (str, optional): data frequency. Defaults to "day".
need_log (bool, optional): print log or not. Defaults to True.
"""
@@ -50,7 +60,7 @@ class OnlineManager(Serializable):
def first_train(self):
"""
Run every strategy first_train method and record the online history
Run every strategy first_train method and record the online history.
"""
for strategy in self.strategy:
self.logger.info(f"Strategy `{strategy.name_id}` begins first training...")
@@ -62,7 +72,7 @@ class OnlineManager(Serializable):
Run typical update process for every strategy and record the online history.
The typical update process after a routine, such as day by day or month by month.
update online prediction -> prepare signals -> prepare tasks -> prepare new models -> reset online models
The process is: Prepare signals -> Prepare tasks -> Prepare online models.
Args:
cur_time (Union[str,pd.Timestamp], optional): run routine method in this time. Defaults to None.
@@ -84,15 +94,15 @@ class OnlineManager(Serializable):
def get_collector(self) -> HyperCollector:
"""
Get the instance of HyperCollector to collect results from every strategy.
Get the instance of `Collector <../advanced/task_management.html#Task Collecting>`_ to collect results from every strategy.
Returns:
HyperCollector: the collector can collect other collectors.
HyperCollector: the collector to collect other collectors (using SingleKeyEnsemble() to make results more readable).
"""
collector_dict = {}
for strategy in self.strategy:
collector_dict[strategy.name_id] = strategy.get_collector()
return HyperCollector(collector_dict)
return HyperCollector(collector_dict, process_list=SingleKeyEnsemble())
def get_online_history(self, strategy_name_id: str) -> list:
"""
@@ -102,7 +112,7 @@ class OnlineManager(Serializable):
strategy_name_id (str): the name_id of strategy
Returns:
dict: a list like [(time, [online_models])]
list: a list like [(begin_time, [online_models])]
"""
history_dict = self.history[strategy_name_id]
history = []
@@ -121,10 +131,27 @@ class OnlineManager(Serializable):
for strategy in self.strategy:
strategy.delay_prepare(self.get_online_history(strategy.name_id), **delay_kwargs)
def get_signals(self) -> pd.DataFrame:
"""
Average all strategy signals as the online signals.
Assumption: the signals from every strategy is pd.DataFrame. Override this function to change.
Returns:
pd.DataFrame: signals
"""
signals_dict = {}
for strategy in self.strategy:
signals_dict[strategy.name_id] = strategy.get_signals()
return AverageEnsemble()(signals_dict)
def simulate(self, end_time, frequency="day", task_kwargs={}, model_kwargs={}, delay_kwargs={}) -> HyperCollector:
"""
Starting from cur time, this method will simulate every routine in OnlineManager.
NOTE: Considering the parallel training, the models and signals can be perpared after all routine simulating.
Starting from current time, this method will simulate every routine in OnlineManager until end time.
Considering the parallel training, the models and signals can be perpared after all routine simulating.
The delay training way can be ``DelayTrainer`` and the delay preparing signals way can be ``delay_prepare``.
Returns:
HyperCollector: the OnlineManager's collector
@@ -140,7 +167,9 @@ class OnlineManager(Serializable):
def reset(self):
"""
NOTE: This method will reset all strategy! Be careful to use it.
This method will reset all strategy!
**Be careful to use it.**
"""
self.cur_time = self.begin_time
self.history = {}

View File

@@ -2,8 +2,7 @@
# Licensed under the MIT License.
"""
OnlineStrategy is a set of strategy of online serving.
It is working with OnlineManager, responsing how the tasks are generated, the models are updated and signals are perpared.
OnlineStrategy is a set of strategy for online serving.
"""
from copy import deepcopy
@@ -12,6 +11,7 @@ from typing import List, Tuple, Union
import pandas as pd
from qlib.data.data import D
from qlib.log import get_module_logger
from qlib.model.ens.ensemble import AverageEnsemble, SingleKeyEnsemble
from qlib.model.ens.group import RollingGroup
from qlib.model.trainer import Trainer, TrainerR
from qlib.workflow import R
@@ -23,9 +23,14 @@ from qlib.workflow.task.utils import TimeAdjuster, list_recorders
class OnlineStrategy:
"""
OnlineStrategy is working with `Online Manager <#Online Manager>`_, responsing how the tasks are generated, the models are updated and signals are perpared.
"""
def __init__(self, name_id: str, trainer: Trainer = None, need_log=True):
"""
Init OnlineStrategy.
This module **MUST** use `Trainer <../reference/api.html#Trainer>`_ to finishing model training.
Args:
name_id (str): a unique name or id
@@ -43,6 +48,7 @@ class OnlineStrategy:
After perparing the data of last routine (a box in box-plot) which means the end of the routine, we can prepare trading signals for next routine.
NOTE: Given a set prediction, all signals before these prediction end time will be prepared well.
Args:
delay: bool
If this method was called by `delay_prepare`
@@ -52,7 +58,7 @@ class OnlineStrategy:
def prepare_tasks(self, *args, **kwargs):
"""
After the end of a routine, check whether we need to prepare and train some new tasks.
return the new tasks waiting for training.
Return the new tasks waiting for training.
You can find last online models by OnlineTool.online_models.
"""
@@ -66,10 +72,6 @@ class OnlineStrategy:
Args:
tasks (list): a list of tasks.
tag (str):
`ONLINE_TAG` for first train or additional train
`NEXT_ONLINE_TAG` for reset online model when calling `reset_online_tag`
`OFFLINE_TAG` for train but offline those models
check_func: the method to judge if a model can be online.
The parameter is the model record and return True for online.
None for online every models.
@@ -95,7 +97,8 @@ class OnlineStrategy:
def get_collector(self) -> Collector:
"""
Get the instance of collector to collect results of online serving.
Get the instance of `Collector <../advanced/task_management.html#Task Collecting>`_ to collect results of online serving.
For example:
1) collect predictions in Recorder
@@ -109,7 +112,8 @@ class OnlineStrategy:
def delay_prepare(self, history: list, **kwargs):
"""
Prepare all models and signals if there are something waiting for prepare.
NOTE: Assumption: the predictions of online models need less than next begin_time, or this method will work in a wrong way.
Assumption: the predictions of online models need less than next begin_time, or this method will work in a wrong way.
Args:
history (list): an online models list likes [begin_time:[online models]].
@@ -120,6 +124,12 @@ class OnlineStrategy:
self.tool.reset_online_tag(recs_list)
self.prepare_signals(delay=True)
def get_signals(self):
"""
Get prepared signals.
"""
raise NotImplementedError(f"Please implement the `get_signals` method.")
def reset(self):
"""
Delete all things and set them to default status. This method is convenient to explore the strategy for online simulation.
@@ -164,17 +174,20 @@ class RollingAverageStrategy(OnlineStrategy):
self.rg = rolling_gen
self.tool = OnlineToolR(self.exp_name)
self.ta = TimeAdjuster()
self.signal_rec = None # the recorder to record signals
with R.start(experiment_name=self.signal_exp_name, recorder_name=self.exp_name, resume=True):
self.signal_rec = R.get_recorder() # the recorder to record signals
self.signal_rec.save_objects(**{"signals": None})
def get_collector(self, rec_key_func=None, rec_filter_func=None):
def get_collector(self, process_list=[RollingGroup()], rec_key_func=None, rec_filter_func=None, artifacts_key=None):
"""
Get the instance of collector to collect results. The returned collector must can distinguish results in different models.
Get the instance of `Collector <../advanced/task_management.html#Task Collecting>`_ to collect results. The returned collector must can distinguish results in different models.
Assumption: the models can be distinguished based on model name and rolling test segments.
If you do not want this assumption, please implement your own method or use another rec_key_func.
Args:
rec_key_func (Callable): a function to get the key of a recorder. If None, use recorder id.
rec_filter_func (Callable, optional): filter the recorder by return True or False. Defaults to None.
artifacts_key (List[str], optional): the artifacts key you want to get. If None, get all artifacts.
"""
def rec_key(recorder):
@@ -188,18 +201,13 @@ class RollingAverageStrategy(OnlineStrategy):
artifacts_collector = RecorderCollector(
experiment=self.exp_name,
process_list=RollingGroup(),
process_list=process_list,
rec_key_func=rec_key_func,
rec_filter_func=rec_filter_func,
artifacts_key=artifacts_key,
)
signals_collector = RecorderCollector(
experiment=self.signal_exp_name,
rec_key_func=lambda rec: rec.info["name"],
rec_filter_func=lambda rec: rec.info["name"] == self.exp_name,
artifacts_path={"signals": "signals"},
)
return HyperCollector({"artifacts": artifacts_collector, "signals": signals_collector})
return artifacts_collector
def first_train(self) -> List[Recorder]:
"""
@@ -252,7 +260,11 @@ class RollingAverageStrategy(OnlineStrategy):
Average the predictions of online models and offer a trading signals every routine.
The signals will be saved to `signal` file of a recorder named self.exp_name of a experiment using the name of `SIGNAL_EXP`
Even if the latest signal already exists, the latest calculation result will be overwritten.
NOTE: Given a prediction of a certain time, all signals before this time will be prepared well.
.. note::
Given a prediction of a certain time, all signals before this time will be prepared well.
Args:
over_write (bool, optional): If True, the new signals will overwrite the file. If False, the new signals will append to the end of signals. Defaults to False.
Returns:
@@ -260,21 +272,17 @@ class RollingAverageStrategy(OnlineStrategy):
"""
if not delay:
self.tool.update_online_pred()
if self.signal_rec is None:
with R.start(experiment_name=self.signal_exp_name, recorder_name=self.exp_name, resume=True):
self.signal_rec = R.get_recorder()
pred = []
try:
old_signals = self.signal_rec.load_object("signals")
except OSError:
old_signals = None
# Get a collector to average online models predictions
online_collector = self.get_collector(
process_list=[AverageEnsemble()],
rec_filter_func=lambda x: True if self.tool.get_online_tag(x) == self.tool.ONLINE_TAG else False,
artifacts_key="pred",
)
online_results = online_collector()
signals = online_results["pred"]
for rec in self.tool.online_models():
pred.append(rec.load_object("pred.pkl"))
signals: pd.DataFrame = pd.concat(pred, axis=1).mean(axis=1).to_frame("score")
signals = signals.sort_index()
old_signals = self.get_signals()
if old_signals is not None and not over_write:
old_max = old_signals.index.get_level_values("datetime").max()
new_signals = signals.loc[old_max:]
@@ -288,18 +296,15 @@ class RollingAverageStrategy(OnlineStrategy):
self.signal_rec.save_objects(**{"signals": signals})
return signals
# def get_signals(self):
# """
# get signals from the recorder(named self.exp_name) of the experiment(named self.SIGNAL_EXP)
def get_signals(self) -> object:
"""
Get signals from the recorder(named self.exp_name) of the experiment(named self.SIGNAL_EXP)
# Returns:
# signals
# """
# if self.signal_rec is None:
# with R.start(experiment_name=self.signal_exp_name, recorder_name=self.exp_name, resume=True):
# self.signal_rec = R.get_recorder()
# signals = self.signal_rec.load_object("signals")
# return signals
Returns:
object: signals
"""
signals = self.signal_rec.load_object("signals")
return signals
def _list_latest(self, rec_list: List[Recorder]):
"""

View File

@@ -2,7 +2,7 @@
# Licensed under the MIT License.
"""
Update is a module to update artifacts such as predictions, when the stock data updating.
Updater is a module to update artifacts such as predictions, when the stock data is updating.
"""
from abc import ABCMeta, abstractmethod
@@ -89,9 +89,13 @@ class PredUpdater(RecordUpdater):
hist_ref : int
Sometimes, the dataset will have historical depends.
Leave the problem to user to set the length of historical dependency
NOTE: the start_time is not included in the hist_ref
# TODO: automate this step in the future.
.. note::
the start_time is not included in the hist_ref
"""
# TODO: automate this hist_ref in the future.
super().__init__(record=record, need_log=need_log)
self.to_date = to_date

View File

@@ -16,6 +16,9 @@ from qlib.workflow.task.utils import list_recorders
class OnlineTool:
"""
OnlineTool.
"""
ONLINE_KEY = "online_status" # the online status key in recorder
ONLINE_TAG = "online" # the 'online' model

View File

@@ -5,6 +5,7 @@
Collector can collect object from everywhere and process them such as merging, grouping, averaging and so on.
"""
from qlib.model.ens.ensemble import SingleKeyEnsemble
from qlib.workflow import R
import dill as pickle
@@ -81,7 +82,7 @@ class Collector:
filepath (str): the path of file
Returns:
bool: if successed
bool: if succeeded
"""
try:
with open(filepath, "wb") as f:
@@ -122,6 +123,8 @@ class HyperCollector(Collector):
Args:
collector_dict (dict): the dict like {collector_key, Collector}
process_list (list or Callable): the list of processors or the instance of processor to process dict.
NOTE: process_list = [SingleKeyEnsemble()] can ignore key and use value directly if there is only one {k,v} in a dict.
This can make result more readable. If you want to maintain as it should be, just give a empty process list.
"""
super().__init__(process_list=process_list)
self.collector_dict = collector_dict

View File

@@ -52,9 +52,13 @@ class TaskManager:
Assumption: the data in MongoDB was encoded and the data out of MongoDB was decoded
Here are four status which are:
STATUS_WAITING: waiting for train
STATUS_RUNNING: training
STATUS_PART_DONE: finished some step and waiting for next step.
STATUS_PART_DONE: finished some step and waiting for next step
STATUS_DONE: all work done
"""
@@ -393,9 +397,13 @@ def run_task(
While task pool is not empty (has WAITING tasks), use task_func to fetch and run tasks in task_pool
After running this method, here are 4 situations (before_status -> after_status):
STATUS_WAITING -> STATUS_DONE: use task["def"] as `task_func` param
STATUS_WAITING -> STATUS_PART_DONE: use task["def"] as `task_func` param
STATUS_PART_DONE -> STATUS_PART_DONE: use task["res"] as `task_func` param
STATUS_PART_DONE -> STATUS_DONE: use task["res"] as `task_func` param
Parameters