diff --git a/docs/advanced/task_management.rst b/docs/advanced/task_management.rst index 230a4e9d1..a68c12627 100644 --- a/docs/advanced/task_management.rst +++ b/docs/advanced/task_management.rst @@ -1,4 +1,4 @@ -.. _task_managment: +.. _task_management: ================================= Task Management @@ -10,15 +10,17 @@ Introduction ============= The `Workflow <../component/introduction.html>`_ part introduces how to run research workflow in a loosely-coupled way. But it can only execute one ``task`` when you use ``qrun``. -To automatically generate and execute different tasks, ``Task Management`` provides a whole process including `Task Generating`_, `Task Storing`_, `Task Running`_ and `Task Collecting`_. +To automatically generate and execute different tasks, ``Task Management`` provides a whole process including `Task Generating`_, `Task Storing`_, `Task Training`_ and `Task Collecting`_. With this module, users can run their ``task`` automatically at different periods, in different losses, or even by different models. -An example of the entire process is shown `here `_. +This whole process can be used in `Online Serving <../component/online.html>`_. + +An example of the entire process is shown `here `_. Task Generating =============== A ``task`` consists of `Model`, `Dataset`, `Record` or anything added by users. -The specific task template(/definition/config) can be viewed in +The specific task template can be viewed in `Task Section <../component/workflow.html#task-section>`_. Even though the task template is fixed, users can customize their ``TaskGen`` to generate different ``task`` by task template. @@ -27,15 +29,16 @@ Here is the base class of ``TaskGen``: .. autoclass:: qlib.workflow.task.gen.TaskGen :members: -``Qlib`` provider a class `RollingGen `_ to generate a list of ``task`` of the dataset in different date segments. -This class allows users to verify the effect of data from different periods on the model in one experiment. +``Qlib`` provides a class `RollingGen `_ to generate a list of ``task`` of the dataset in different date segments. +This class allows users to verify the effect of data from different periods on the model in one experiment. More information in `here <../reference/api.html#TaskGen>`_. Task Storing =============== To achieve higher efficiency and the possibility of cluster operation, ``Task Manager`` will store all tasks in `MongoDB `_. +``TaskManager`` can fetch undone tasks automatically and manage the lifecycle of a set of tasks with error handling. Users **MUST** finished the configuration of `MongoDB `_ when using this module. -Users need to provide the URL and database name of ``task`` storing like this. +Users need to provide the MongoDB URL and database name for using ``TaskManager`` in `initialization <../start/initialization.html#Parameters>`_ or make statement like this. .. code-block:: python @@ -45,13 +48,12 @@ Users need to provide the URL and database name of ``task`` storing like this. "task_db_name" : "rolling_db" # database name } -The CRUD methods of ``task`` can be found in TaskManager. -More methods can be seen in the `Github `_. - .. autoclass:: qlib.workflow.task.manage.TaskManager :members: -Task Running +More information of ``Task Manager`` can be found in `here <../reference/api.html#TaskManager>`_. + +Task Training =============== After generating and storing those ``task``, it's time to run the ``task`` which are in the *WAITING* status. ``Qlib`` provides a method called ``run_task`` to run those ``task`` in task pool, however, users can also customize how tasks are executed. @@ -60,14 +62,24 @@ It will run the whole workflow defined by ``task``, which includes *Model*, *Dat .. autofunction:: qlib.workflow.task.manage.run_task +Meanwhile, ``Qlib`` provides a module called ``Trainer``. +``Trainer`` will train a list of tasks and return a list of model recorder. +``Qlib`` offer two kind of Trainer, TrainerR is the simplest way and TrainerRM is based on TaskManager to help manager tasks lifecycle automatically. +If you do not want to use ``Task Manager`` to manage tasks, then use TrainerR to train a list of tasks generated by ``TaskGen`` is enough. +More information is in `here <../reference/api.html#Trainer>`_. + Task Collecting =============== -To see the results of ``task`` after running or to update something, ``Qlib`` provides a ``TaskCollector`` to collect the tasks by filter condition (optional). -Here are some methods in this class. +To collect the results of ``task`` after training, ``Qlib`` provides `Collector <../reference/api.html#Collector>`_, `Group <../reference/api.html#Group>`_ and `Ensemble <../reference/api.html#Ensemble>`_ to collect the results in a readable, expandable and loosely-coupled way. -.. autoclass:: qlib.workflow.task.collect.TaskCollector - :members: +`Collector <../reference/api.html#Collector>`_ can collect object from everywhere and process them such as merging, grouping, averaging and so on. It has 2 step action including ``collect`` (collect anything in a dict) and ``process_collect`` (process collected dict). -``Qlib`` provides a concrete `example `_, including a whole process of `Task Generating`_ (using `RollingGen `_), `Task Storing`_, `Task Running`_ and `Task Collecting`_. -Besides, the `example `_ uses a ``ModelUpdater`` inherited from ``TaskCollector``, which can update the inferences and retrain the model if it is out of date. -Actually, the model updating can be viewed as a subset of ``Online Serving``. \ No newline at end of file +`Group <../reference/api.html#Group>`_ also has 2 steps including ``group`` (can group a set of object based on `group_func` and change them to a dict) and ``reduce`` (can make a dict become an ensemble based on some rule). +For example: {(A,B,C1): object, (A,B,C2): object} ---``group``---> {(A,B): {C1: object, C2: object}} ---``reduce``---> {(A,B): object} + +`Ensemble <../reference/api.html#Ensemble>`_ can merge the objects in an ensemble. +For example: {C1: object, C2: object} ---``Ensemble``---> object + +So the hierarchy is ``Collector``'s second step correspond to ``Group``. And ``Group``'s second step correspond to ``Ensemble``. + +For more information, please see `Collector <../reference/api.html#Collector>`_, `Group <../reference/api.html#Group>`_ and `Ensemble <../reference/api.html#Ensemble>`_, or the `example `_ \ No newline at end of file diff --git a/docs/component/online.rst b/docs/component/online.rst new file mode 100644 index 000000000..e25173153 --- /dev/null +++ b/docs/component/online.rst @@ -0,0 +1,41 @@ +.. _online: + +================================= +Online Serving +================================= +.. currentmodule:: qlib + + +Introduction +============= +In addition to backtesting, one way to test a model is effective is to make predictions in real market conditions or even do real trading based on those predictions. +``Online Serving`` is a set of module for online models using latest data, +which including `Online Manager <#Online Manager>`_, `Online Strategy <#Online Strategy>`_, `Online Tool <#Online Tool>`_, `Updater <#Updater>`_. + +`Here `_ are several examples for reference, which demonstrate different features of ``Online Serving``. +If you have many models or `task` need to be managed, please consider `Task Management <../advanced/task_management.html>`_. +The `examples `_ maybe based on `Task Management <../advanced/task_management.html>`_ such as ``TrainerRM`` or ``Collector``. + +Online Manager +============= + +.. automodule:: qlib.workflow.online.manager + :members: + +Online Strategy +============= + +.. automodule:: qlib.workflow.online.strategy + :members: + +Online Tool +============= + +.. automodule:: qlib.workflow.online.utils + :members: + +Updater +============= + +.. automodule:: qlib.workflow.online.update + :members: \ No newline at end of file diff --git a/docs/index.rst b/docs/index.rst index 274dc8045..803aa97d2 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -42,6 +42,7 @@ Document Structure Intraday Trading: Model&Strategy Testing Qlib Recorder: Experiment Management Analysis: Evaluation & Results Analysis + Online Serving: Online Management & Strategy & Tool .. toctree:: :maxdepth: 3 diff --git a/docs/reference/api.rst b/docs/reference/api.rst index 691dff703..edba6228a 100644 --- a/docs/reference/api.rst +++ b/docs/reference/api.rst @@ -154,36 +154,71 @@ Record Template .. automodule:: qlib.workflow.record_temp :members: - Task Management ==================== -RollingGen +TaskGen -------------------- -.. autoclass:: qlib.workflow.task.gen.RollingGen +.. automodule:: qlib.workflow.task.gen :members: TaskManager -------------------- -.. autoclass:: qlib.workflow.task.manage.TaskManager +.. automodule:: qlib.workflow.task.manage :members: -TaskCollector +Trainer -------------------- -.. autoclass:: qlib.workflow.task.collect.TaskCollector +.. automodule:: qlib.model.trainer :members: -ModelUpdater +Collector -------------------- -.. autoclass:: qlib.workflow.task.update.ModelUpdater +.. automodule:: qlib.workflow.task.collect :members: -TimeAdjuster +Group -------------------- -.. autoclass:: qlib.workflow.task.utils.TimeAdjuster +.. automodule:: qlib.model.ens.group :members: +Ensemble +-------------------- +.. automodule:: qlib.model.ens.ensemble + :members: + +Utils +-------------------- +.. automodule:: qlib.workflow.task.utils + :members: + + +Online Serving +==================== + + +Online Manager +-------------------- +.. automodule:: qlib.workflow.online.manager + :members: + +Online Strategy +-------------------- +.. automodule:: qlib.workflow.online.strategy + :members: + +Online Tool +-------------------- +.. automodule:: qlib.workflow.online.utils + :members: + +RecordUpdater +-------------------- +.. automodule:: qlib.workflow.online.update + :members: + + Utils ==================== diff --git a/examples/online_srv/online_management_simulate.py b/examples/online_srv/online_management_simulate.py index 16e985ccd..7be46d999 100644 --- a/examples/online_srv/online_management_simulate.py +++ b/examples/online_srv/online_management_simulate.py @@ -131,6 +131,8 @@ class OnlineSimulationExample: self.rolling_online_manager.simulate(end_time=self.end_time) print("========== collect results ==========") print(self.rolling_online_manager.get_collector()()) + print("========== signals ==========") + print(self.rolling_online_manager.get_signals()) print("========== online history ==========") print(self.rolling_online_manager.get_online_history(self.exp_name)) diff --git a/examples/online_srv/rolling_online_management.py b/examples/online_srv/rolling_online_management.py index 950c9684d..25b6fc4da 100644 --- a/examples/online_srv/rolling_online_management.py +++ b/examples/online_srv/rolling_online_management.py @@ -86,7 +86,7 @@ class RollingOnlineExample: task_url="mongodb://10.0.0.4:27017/", task_db_name="rolling_db", rolling_step=550, - tasks=[task_xgboost_config, task_lgb_config], + tasks=[task_xgboost_config], # , task_lgb_config], ): mongo_conf = { "task_url": task_url, # your MongoDB url @@ -148,6 +148,8 @@ class RollingOnlineExample: self.rolling_online_manager.routine() print("========== collect results ==========") print(self.collector()) + print("========== signals ==========") + print(self.rolling_online_manager.get_signals()) def main(self): self.first_run() diff --git a/qlib/data/dataset/__init__.py b/qlib/data/dataset/__init__.py index 4457dda5f..4ae73c670 100644 --- a/qlib/data/dataset/__init__.py +++ b/qlib/data/dataset/__init__.py @@ -27,7 +27,7 @@ class Dataset(Serializable): - setup data - The data related attributes' names should start with '_' so that it will not be saved on disk when serializing. - The data could specify the info to caculate the essential data for preparation + The data could specify the info to calculate the essential data for preparation """ self.setup_data(**kwargs) super().__init__() @@ -92,7 +92,7 @@ class DatasetH(Dataset): handler : Union[dict, DataHandler] handler could be: - - insntance of `DataHandler` + - instance of `DataHandler` - config of `DataHandler`. Please refer to `DataHandler` @@ -114,7 +114,6 @@ class DatasetH(Dataset): """ self.handler: DataHandler = init_instance_by_config(handler, accept_types=DataHandler) self.segments = segments.copy() - self.fetch_kwargs = {} super().__init__(**kwargs) def config(self, handler_kwargs: dict = None, **kwargs): @@ -124,7 +123,7 @@ class DatasetH(Dataset): Parameters ---------- handler_kwargs : dict - Config of DataHanlder, which could include the following arguments: + Config of DataHandler, which could include the following arguments: - arguments of DataHandler.conf_data, such as 'instruments', 'start_time' and 'end_time'. @@ -148,11 +147,11 @@ class DatasetH(Dataset): Parameters ---------- handler_kwargs : dict - init arguments of DataHanlder, which could include the following arguments: + init arguments of DataHandler, which could include the following arguments: - init_type : Init Type of Handler - - enable_cache : wheter to enable cache + - enable_cache : whether to enable cache """ super().setup_data(**kwargs) @@ -172,7 +171,7 @@ class DatasetH(Dataset): ---------- slc : slice """ - return self.handler.fetch(slc, **kwargs, **self.fetch_kwargs) + return self.handler.fetch(slc, **kwargs) def prepare( self, @@ -232,7 +231,7 @@ class TSDataSampler: (T)ime-(S)eries DataSampler This is the result of TSDatasetH - It works like `torch.data.utils.Dataset`, it provides a very convient interface for constructing time-series + It works like `torch.data.utils.Dataset`, it provides a very convenient interface for constructing time-series dataset based on tabular data. If user have further requirements for processing data, user could process them based on `TSDataSampler` or create @@ -289,29 +288,12 @@ class TSDataSampler: # the data type will be changed # The index of usable data is between start_idx and end_idx + self.start_idx, self.end_idx = self.data.index.slice_locs(start=pd.Timestamp(start), end=pd.Timestamp(end)) self.idx_df, self.idx_map = self.build_index(self.data) - self.data_index = deepcopy(self.data.index) - - if flt_data is not None: - self.flt_data = np.array(flt_data).reshape(-1) - self.idx_map = self.flt_idx_map(self.flt_data, self.idx_map) - self.data_index = self.data_index[np.where(self.flt_data == True)[0]] - - self.start_idx, self.end_idx = self.data_index.slice_locs(start=pd.Timestamp(start), end=pd.Timestamp(end)) self.idx_arr = np.array(self.idx_df.values, dtype=np.float64) # for better performance - + self.data_idx = deepcopy(self.data.index) del self.data # save memory - @staticmethod - def flt_idx_map(flt_data, idx_map): - idx = 0 - new_idx_map = {} - for i, exist in enumerate(flt_data): - if exist: - new_idx_map[idx] = idx_map[i] - idx += 1 - return new_idx_map - def get_index(self): """ Get the pandas index of the data, it will be useful in following scenarios @@ -461,7 +443,7 @@ class TSDatasetH(DatasetH): (T)ime-(S)eries Dataset (H)andler - Covnert the tabular data to Time-Series data + Convert the tabular data to Time-Series data Requirements analysis @@ -505,19 +487,8 @@ class TSDatasetH(DatasetH): """ split the _prepare_raw_seg is to leave a hook for data preprocessing before creating processing data """ - dtype = kwargs.pop("dtype") + dtype = kwargs.pop("dtype", None) start, end = slc.start, slc.stop - flt_col = kwargs.pop("flt_col", None) - # TSDatasetH will retrieve more data for complete - data = self._prepare_raw_seg(slc, **kwargs) - - flt_kwargs = deepcopy(kwargs) - if flt_col is not None: - flt_kwargs["col_set"] = flt_col - flt_data = self._prepare_raw_seg(slc, **flt_kwargs) - assert len(flt_data.columns) == 1 - else: - flt_data = None - - tsds = TSDataSampler(data=data, start=start, end=end, step_len=self.step_len, dtype=dtype, flt_data=flt_data) + data = self._prepare_raw_seg(slc=slc, **kwargs) + tsds = TSDataSampler(data=data, start=start, end=end, step_len=self.step_len, dtype=dtype) return tsds diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index f1fa39c3b..63b49d78b 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -36,7 +36,7 @@ class DataHandler(Serializable): The data handler try to maintain a handler with 2 level. `datetime` & `instruments`. - Any order of the index level can be suported (The order will be implied in the data). + Any order of the index level can be supported (The order will be implied in the data). The order <`datetime`, `instruments`> will be used when the dataframe index name is missed. Example of the data: @@ -77,7 +77,7 @@ class DataHandler(Serializable): data_loader : Tuple[dict, str, DataLoader] data loader to load the data. init_data : - intialize the original data in the constructor. + initialize the original data in the constructor. fetch_orig : bool Return the original data instead of copy if possible. """ @@ -128,7 +128,7 @@ class DataHandler(Serializable): def setup_data(self, enable_cache: bool = False): """ - Set Up the data in case of running intialization for multiple time + Set Up the data in case of running initialization for multiple time It is responsible for maintaining following variable 1) self._data @@ -453,7 +453,7 @@ class DataHandlerLP(DataHandler): def setup_data(self, init_type: str = IT_FIT_SEQ, **kwargs): """ - Set up the data in case of running intialization for multiple time + Set up the data in case of running initialization for multiple time Parameters ---------- diff --git a/qlib/model/ens/ensemble.py b/qlib/model/ens/ensemble.py index 7ccf98ab2..1fb14a37b 100644 --- a/qlib/model/ens/ensemble.py +++ b/qlib/model/ens/ensemble.py @@ -5,6 +5,7 @@ Ensemble can merge the objects in an Ensemble. For example, if there are many submodels predictions, we may need to merge them in an ensemble predictions. """ +from typing import Union import pandas as pd @@ -24,6 +25,30 @@ class Ensemble: raise NotImplementedError(f"Please implement the `__call__` method.") +class SingleKeyEnsemble(Ensemble): + + """ + Extract the object if there is only one key and value in dict. Make result more readable. + {Only key: Only value} -> Only value + If there are more than 1 key or less than 1 key, then do nothing. + Even you can run this recursively to make dict more readable. + NOTE: Default run recursively. + """ + + def __call__(self, ensemble_dict: Union[dict, object], recursion: bool = True) -> object: + if not isinstance(ensemble_dict, dict): + return ensemble_dict + if recursion: + tmp_dict = {} + for k, v in ensemble_dict.items(): + tmp_dict[k] = self(v, recursion) + ensemble_dict = tmp_dict + keys = list(ensemble_dict.keys()) + if len(keys) == 1: + ensemble_dict = ensemble_dict[keys[0]] + return ensemble_dict + + class RollingEnsemble(Ensemble): """Merge the rolling objects in an Ensemble""" @@ -47,3 +72,24 @@ class RollingEnsemble(Ensemble): artifact = artifact[~artifact.index.duplicated(keep="last")] artifact = artifact.sort_index() return artifact + + +class AverageEnsemble(Ensemble): + def __call__(self, ensemble_dict: dict): + """ + Average a dict of same shape dataframe like `prediction` or `IC` into an ensemble. + + NOTE: The values of dict must be pd.DataFrame, and have the index "datetime" + + Args: + ensemble_dict (dict): a dict like {"A": pd.DataFrame, "B": pd.DataFrame}. + The key of the dict will be ignored. + + Returns: + pd.DataFrame: the complete result of averaging. + """ + values = list(ensemble_dict.values()) + results = pd.concat(values, axis=1) + results = results.mean(axis=1).to_frame("score") + results = results.sort_index() + return results diff --git a/qlib/model/ens/group.py b/qlib/model/ens/group.py index d53a55f4c..d8f174105 100644 --- a/qlib/model/ens/group.py +++ b/qlib/model/ens/group.py @@ -3,6 +3,13 @@ """ Group can group a set of object based on `group_func` and change them to a dict. +After group, we provide a method to reduce them. + +For example: + +group: {(A,B,C1): object, (A,B,C2): object} -> {(A,B): {C1: object, C2: object}} +reduce: {(A,B): {C1: object, C2: object}} -> {(A,B): object} + """ from qlib.model.ens.ensemble import Ensemble, RollingEnsemble diff --git a/qlib/model/trainer.py b/qlib/model/trainer.py index a0d252ab4..7680674a6 100644 --- a/qlib/model/trainer.py +++ b/qlib/model/trainer.py @@ -3,12 +3,12 @@ """ The Trainer will train a list of tasks and return a list of model recorder. -There are two steps in each Trainer including `train`(make model recorder) and `end_train`(modify model recorder). +There are two steps in each Trainer including ``train``(make model recorder) and ``end_train``(modify model recorder). -This is concept called "DelayTrainer", which can be used in online simulating to parallel training. -In "DelayTrainer", the first step is only to save some necessary info to model recorder, and the second step which will be finished in the end can do some concurrent and time-consuming operations such as model fitting. +This is concept called ``DelayTrainer``, which can be used in online simulating to parallel training. +In ``DelayTrainer``, the first step is only to save some necessary info to model recorder, and the second step which will be finished in the end can do some concurrent and time-consuming operations such as model fitting. -`Qlib` offer two kind of Trainer, TrainerR is simplest and TrainerRM is based on TaskManager to help manager tasks lifecycle automatically. +``Qlib`` offer two kind of Trainer, ``TrainerR`` is the simplest way and ``TrainerRM`` is based on TaskManager to help manager tasks lifecycle automatically. """ import socket @@ -36,9 +36,6 @@ def begin_task_train(task_config: dict, experiment_name: str, recorder_name: str Returns: Recorder: the model recorder """ - # FIXME: recorder_id - if recorder_name is None: - recorder_name = str(time.time()) with R.start(experiment_name=experiment_name, recorder_name=recorder_name): R.log_params(**flatten_dict(task_config)) R.save_objects(**{"task": task_config}) # keep the original format and datatype @@ -58,7 +55,7 @@ def end_task_train(rec: Recorder, experiment_name: str) -> Recorder: Returns: Recorder: the model recorder """ - with R.start(experiment_name=experiment_name, recorder_name=rec.info["name"], resume=True): + with R.start(experiment_name=experiment_name, recorder_id=rec.info["id"], resume=True): task_config = R.load_object("task") # model & dataset initiation model: Model = init_instance_by_config(task_config["model"]) @@ -314,7 +311,8 @@ class TrainerRM(Trainer): def reset(self): """ - NOTE: this method will delete all task in this task_pool! + .. note:: + this method will delete all task in this task_pool! """ tm = TaskManager(task_pool=self.task_pool) tm.remove() diff --git a/qlib/workflow/online/manager.py b/qlib/workflow/online/manager.py index 4e9290096..6c62fbce9 100644 --- a/qlib/workflow/online/manager.py +++ b/qlib/workflow/online/manager.py @@ -2,11 +2,14 @@ # Licensed under the MIT License. """ -OnlineManager can manage a set of OnlineStrategy and run them dynamically. +OnlineManager can manage a set of `Online Strategy <#Online Strategy>`_ and run them dynamically. With the change of time, the decisive models will be also changed. In this module, we call those contributing models as `online` models. In every routine(such as everyday or every minutes), the `online` models maybe changed and the prediction of them need to be updated. So this module provide a series methods to control this process. + +This module also provide a method to simulate `Online Strategy <#Online Strategy>`_ in the history. +Which means you can verify your strategy or find a better one. """ from typing import Dict, List, Union @@ -14,12 +17,18 @@ from typing import Dict, List, Union import pandas as pd from qlib import get_module_logger from qlib.data.data import D +from qlib.model.ens.ensemble import AverageEnsemble, SingleKeyEnsemble from qlib.utils.serial import Serializable from qlib.workflow.online.strategy import OnlineStrategy from qlib.workflow.task.collect import HyperCollector class OnlineManager(Serializable): + """ + OnlineManager can manage online models with `Online Strategy <#Online Strategy>`_. + It also provide a history recording which models are onlined at what time. + """ + def __init__( self, strategy: Union[OnlineStrategy, List[OnlineStrategy]], @@ -29,10 +38,11 @@ class OnlineManager(Serializable): ): """ Init OnlineManager. + One OnlineManager must have at least one OnlineStrategy. Args: strategy (Union[OnlineStrategy, List[OnlineStrategy]]): an instance of OnlineStrategy or a list of OnlineStrategy - begin_time (Union[str,pd.Timestamp], optional): the OnlineManager will begin at this time. Defaults to None. + begin_time (Union[str,pd.Timestamp], optional): the OnlineManager will begin at this time. Defaults to None for using latest date. freq (str, optional): data frequency. Defaults to "day". need_log (bool, optional): print log or not. Defaults to True. """ @@ -50,7 +60,7 @@ class OnlineManager(Serializable): def first_train(self): """ - Run every strategy first_train method and record the online history + Run every strategy first_train method and record the online history. """ for strategy in self.strategy: self.logger.info(f"Strategy `{strategy.name_id}` begins first training...") @@ -62,7 +72,7 @@ class OnlineManager(Serializable): Run typical update process for every strategy and record the online history. The typical update process after a routine, such as day by day or month by month. - update online prediction -> prepare signals -> prepare tasks -> prepare new models -> reset online models + The process is: Prepare signals -> Prepare tasks -> Prepare online models. Args: cur_time (Union[str,pd.Timestamp], optional): run routine method in this time. Defaults to None. @@ -84,15 +94,15 @@ class OnlineManager(Serializable): def get_collector(self) -> HyperCollector: """ - Get the instance of HyperCollector to collect results from every strategy. + Get the instance of `Collector <../advanced/task_management.html#Task Collecting>`_ to collect results from every strategy. Returns: - HyperCollector: the collector can collect other collectors. + HyperCollector: the collector to collect other collectors (using SingleKeyEnsemble() to make results more readable). """ collector_dict = {} for strategy in self.strategy: collector_dict[strategy.name_id] = strategy.get_collector() - return HyperCollector(collector_dict) + return HyperCollector(collector_dict, process_list=SingleKeyEnsemble()) def get_online_history(self, strategy_name_id: str) -> list: """ @@ -102,7 +112,7 @@ class OnlineManager(Serializable): strategy_name_id (str): the name_id of strategy Returns: - dict: a list like [(time, [online_models])] + list: a list like [(begin_time, [online_models])] """ history_dict = self.history[strategy_name_id] history = [] @@ -121,10 +131,27 @@ class OnlineManager(Serializable): for strategy in self.strategy: strategy.delay_prepare(self.get_online_history(strategy.name_id), **delay_kwargs) + def get_signals(self) -> pd.DataFrame: + """ + Average all strategy signals as the online signals. + + Assumption: the signals from every strategy is pd.DataFrame. Override this function to change. + + Returns: + pd.DataFrame: signals + """ + signals_dict = {} + for strategy in self.strategy: + signals_dict[strategy.name_id] = strategy.get_signals() + return AverageEnsemble()(signals_dict) + def simulate(self, end_time, frequency="day", task_kwargs={}, model_kwargs={}, delay_kwargs={}) -> HyperCollector: """ - Starting from cur time, this method will simulate every routine in OnlineManager. - NOTE: Considering the parallel training, the models and signals can be perpared after all routine simulating. + Starting from current time, this method will simulate every routine in OnlineManager until end time. + + Considering the parallel training, the models and signals can be perpared after all routine simulating. + + The delay training way can be ``DelayTrainer`` and the delay preparing signals way can be ``delay_prepare``. Returns: HyperCollector: the OnlineManager's collector @@ -140,7 +167,9 @@ class OnlineManager(Serializable): def reset(self): """ - NOTE: This method will reset all strategy! Be careful to use it. + This method will reset all strategy! + + **Be careful to use it.** """ self.cur_time = self.begin_time self.history = {} diff --git a/qlib/workflow/online/strategy.py b/qlib/workflow/online/strategy.py index 3782ee652..0cae11b7f 100644 --- a/qlib/workflow/online/strategy.py +++ b/qlib/workflow/online/strategy.py @@ -2,8 +2,7 @@ # Licensed under the MIT License. """ -OnlineStrategy is a set of strategy of online serving. -It is working with OnlineManager, responsing how the tasks are generated, the models are updated and signals are perpared. +OnlineStrategy is a set of strategy for online serving. """ from copy import deepcopy @@ -12,6 +11,7 @@ from typing import List, Tuple, Union import pandas as pd from qlib.data.data import D from qlib.log import get_module_logger +from qlib.model.ens.ensemble import AverageEnsemble, SingleKeyEnsemble from qlib.model.ens.group import RollingGroup from qlib.model.trainer import Trainer, TrainerR from qlib.workflow import R @@ -23,9 +23,14 @@ from qlib.workflow.task.utils import TimeAdjuster, list_recorders class OnlineStrategy: + """ + OnlineStrategy is working with `Online Manager <#Online Manager>`_, responsing how the tasks are generated, the models are updated and signals are perpared. + """ + def __init__(self, name_id: str, trainer: Trainer = None, need_log=True): """ Init OnlineStrategy. + This module **MUST** use `Trainer <../reference/api.html#Trainer>`_ to finishing model training. Args: name_id (str): a unique name or id @@ -43,6 +48,7 @@ class OnlineStrategy: After perparing the data of last routine (a box in box-plot) which means the end of the routine, we can prepare trading signals for next routine. NOTE: Given a set prediction, all signals before these prediction end time will be prepared well. + Args: delay: bool If this method was called by `delay_prepare` @@ -52,7 +58,7 @@ class OnlineStrategy: def prepare_tasks(self, *args, **kwargs): """ After the end of a routine, check whether we need to prepare and train some new tasks. - return the new tasks waiting for training. + Return the new tasks waiting for training. You can find last online models by OnlineTool.online_models. """ @@ -66,10 +72,6 @@ class OnlineStrategy: Args: tasks (list): a list of tasks. - tag (str): - `ONLINE_TAG` for first train or additional train - `NEXT_ONLINE_TAG` for reset online model when calling `reset_online_tag` - `OFFLINE_TAG` for train but offline those models check_func: the method to judge if a model can be online. The parameter is the model record and return True for online. None for online every models. @@ -95,7 +97,8 @@ class OnlineStrategy: def get_collector(self) -> Collector: """ - Get the instance of collector to collect results of online serving. + Get the instance of `Collector <../advanced/task_management.html#Task Collecting>`_ to collect results of online serving. + For example: 1) collect predictions in Recorder @@ -109,7 +112,8 @@ class OnlineStrategy: def delay_prepare(self, history: list, **kwargs): """ Prepare all models and signals if there are something waiting for prepare. - NOTE: Assumption: the predictions of online models need less than next begin_time, or this method will work in a wrong way. + + Assumption: the predictions of online models need less than next begin_time, or this method will work in a wrong way. Args: history (list): an online models list likes [begin_time:[online models]]. @@ -120,6 +124,12 @@ class OnlineStrategy: self.tool.reset_online_tag(recs_list) self.prepare_signals(delay=True) + def get_signals(self): + """ + Get prepared signals. + """ + raise NotImplementedError(f"Please implement the `get_signals` method.") + def reset(self): """ Delete all things and set them to default status. This method is convenient to explore the strategy for online simulation. @@ -164,17 +174,20 @@ class RollingAverageStrategy(OnlineStrategy): self.rg = rolling_gen self.tool = OnlineToolR(self.exp_name) self.ta = TimeAdjuster() - self.signal_rec = None # the recorder to record signals + with R.start(experiment_name=self.signal_exp_name, recorder_name=self.exp_name, resume=True): + self.signal_rec = R.get_recorder() # the recorder to record signals + self.signal_rec.save_objects(**{"signals": None}) - def get_collector(self, rec_key_func=None, rec_filter_func=None): + def get_collector(self, process_list=[RollingGroup()], rec_key_func=None, rec_filter_func=None, artifacts_key=None): """ - Get the instance of collector to collect results. The returned collector must can distinguish results in different models. + Get the instance of `Collector <../advanced/task_management.html#Task Collecting>`_ to collect results. The returned collector must can distinguish results in different models. Assumption: the models can be distinguished based on model name and rolling test segments. If you do not want this assumption, please implement your own method or use another rec_key_func. Args: rec_key_func (Callable): a function to get the key of a recorder. If None, use recorder id. rec_filter_func (Callable, optional): filter the recorder by return True or False. Defaults to None. + artifacts_key (List[str], optional): the artifacts key you want to get. If None, get all artifacts. """ def rec_key(recorder): @@ -188,18 +201,13 @@ class RollingAverageStrategy(OnlineStrategy): artifacts_collector = RecorderCollector( experiment=self.exp_name, - process_list=RollingGroup(), + process_list=process_list, rec_key_func=rec_key_func, rec_filter_func=rec_filter_func, + artifacts_key=artifacts_key, ) - signals_collector = RecorderCollector( - experiment=self.signal_exp_name, - rec_key_func=lambda rec: rec.info["name"], - rec_filter_func=lambda rec: rec.info["name"] == self.exp_name, - artifacts_path={"signals": "signals"}, - ) - return HyperCollector({"artifacts": artifacts_collector, "signals": signals_collector}) + return artifacts_collector def first_train(self) -> List[Recorder]: """ @@ -252,7 +260,11 @@ class RollingAverageStrategy(OnlineStrategy): Average the predictions of online models and offer a trading signals every routine. The signals will be saved to `signal` file of a recorder named self.exp_name of a experiment using the name of `SIGNAL_EXP` Even if the latest signal already exists, the latest calculation result will be overwritten. - NOTE: Given a prediction of a certain time, all signals before this time will be prepared well. + + .. note:: + + Given a prediction of a certain time, all signals before this time will be prepared well. + Args: over_write (bool, optional): If True, the new signals will overwrite the file. If False, the new signals will append to the end of signals. Defaults to False. Returns: @@ -260,21 +272,17 @@ class RollingAverageStrategy(OnlineStrategy): """ if not delay: self.tool.update_online_pred() - if self.signal_rec is None: - with R.start(experiment_name=self.signal_exp_name, recorder_name=self.exp_name, resume=True): - self.signal_rec = R.get_recorder() - pred = [] - try: - old_signals = self.signal_rec.load_object("signals") - except OSError: - old_signals = None + # Get a collector to average online models predictions + online_collector = self.get_collector( + process_list=[AverageEnsemble()], + rec_filter_func=lambda x: True if self.tool.get_online_tag(x) == self.tool.ONLINE_TAG else False, + artifacts_key="pred", + ) + online_results = online_collector() + signals = online_results["pred"] - for rec in self.tool.online_models(): - pred.append(rec.load_object("pred.pkl")) - - signals: pd.DataFrame = pd.concat(pred, axis=1).mean(axis=1).to_frame("score") - signals = signals.sort_index() + old_signals = self.get_signals() if old_signals is not None and not over_write: old_max = old_signals.index.get_level_values("datetime").max() new_signals = signals.loc[old_max:] @@ -288,18 +296,15 @@ class RollingAverageStrategy(OnlineStrategy): self.signal_rec.save_objects(**{"signals": signals}) return signals - # def get_signals(self): - # """ - # get signals from the recorder(named self.exp_name) of the experiment(named self.SIGNAL_EXP) + def get_signals(self) -> object: + """ + Get signals from the recorder(named self.exp_name) of the experiment(named self.SIGNAL_EXP) - # Returns: - # signals - # """ - # if self.signal_rec is None: - # with R.start(experiment_name=self.signal_exp_name, recorder_name=self.exp_name, resume=True): - # self.signal_rec = R.get_recorder() - # signals = self.signal_rec.load_object("signals") - # return signals + Returns: + object: signals + """ + signals = self.signal_rec.load_object("signals") + return signals def _list_latest(self, rec_list: List[Recorder]): """ diff --git a/qlib/workflow/online/update.py b/qlib/workflow/online/update.py index 69ad55324..ab910ba8d 100644 --- a/qlib/workflow/online/update.py +++ b/qlib/workflow/online/update.py @@ -2,7 +2,7 @@ # Licensed under the MIT License. """ -Update is a module to update artifacts such as predictions, when the stock data updating. +Updater is a module to update artifacts such as predictions, when the stock data is updating. """ from abc import ABCMeta, abstractmethod @@ -89,9 +89,13 @@ class PredUpdater(RecordUpdater): hist_ref : int Sometimes, the dataset will have historical depends. Leave the problem to user to set the length of historical dependency - NOTE: the start_time is not included in the hist_ref - # TODO: automate this step in the future. + + .. note:: + + the start_time is not included in the hist_ref + """ + # TODO: automate this hist_ref in the future. super().__init__(record=record, need_log=need_log) self.to_date = to_date diff --git a/qlib/workflow/online/utils.py b/qlib/workflow/online/utils.py index 4d630a665..296ca3ea6 100644 --- a/qlib/workflow/online/utils.py +++ b/qlib/workflow/online/utils.py @@ -16,6 +16,9 @@ from qlib.workflow.task.utils import list_recorders class OnlineTool: + """ + OnlineTool. + """ ONLINE_KEY = "online_status" # the online status key in recorder ONLINE_TAG = "online" # the 'online' model diff --git a/qlib/workflow/task/collect.py b/qlib/workflow/task/collect.py index d74d08184..28320e2ce 100644 --- a/qlib/workflow/task/collect.py +++ b/qlib/workflow/task/collect.py @@ -5,6 +5,7 @@ Collector can collect object from everywhere and process them such as merging, grouping, averaging and so on. """ +from qlib.model.ens.ensemble import SingleKeyEnsemble from qlib.workflow import R import dill as pickle @@ -81,7 +82,7 @@ class Collector: filepath (str): the path of file Returns: - bool: if successed + bool: if succeeded """ try: with open(filepath, "wb") as f: @@ -122,6 +123,8 @@ class HyperCollector(Collector): Args: collector_dict (dict): the dict like {collector_key, Collector} process_list (list or Callable): the list of processors or the instance of processor to process dict. + NOTE: process_list = [SingleKeyEnsemble()] can ignore key and use value directly if there is only one {k,v} in a dict. + This can make result more readable. If you want to maintain as it should be, just give a empty process list. """ super().__init__(process_list=process_list) self.collector_dict = collector_dict diff --git a/qlib/workflow/task/manage.py b/qlib/workflow/task/manage.py index 3c3144fe8..c71be7d39 100644 --- a/qlib/workflow/task/manage.py +++ b/qlib/workflow/task/manage.py @@ -52,9 +52,13 @@ class TaskManager: Assumption: the data in MongoDB was encoded and the data out of MongoDB was decoded Here are four status which are: + STATUS_WAITING: waiting for train + STATUS_RUNNING: training - STATUS_PART_DONE: finished some step and waiting for next step. + + STATUS_PART_DONE: finished some step and waiting for next step + STATUS_DONE: all work done """ @@ -393,9 +397,13 @@ def run_task( While task pool is not empty (has WAITING tasks), use task_func to fetch and run tasks in task_pool After running this method, here are 4 situations (before_status -> after_status): + STATUS_WAITING -> STATUS_DONE: use task["def"] as `task_func` param + STATUS_WAITING -> STATUS_PART_DONE: use task["def"] as `task_func` param + STATUS_PART_DONE -> STATUS_PART_DONE: use task["res"] as `task_func` param + STATUS_PART_DONE -> STATUS_DONE: use task["res"] as `task_func` param Parameters