1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-02 18:40:58 +08:00

Update docs and delete estimator

This commit is contained in:
Jactus
2020-11-26 19:40:41 +08:00
parent 0f8f9453bd
commit 2fd982a98f
14 changed files with 245 additions and 1878 deletions

View File

@@ -159,6 +159,9 @@ Data Loader
``Data Loader`` in ``Qlib`` is designed to load raw data from the original data source. It will be loaded and used in the ``Data Handler`` module.
QlibDataLoader
---------------
The ``QlibDataLoader`` class in ``Qlib`` is such an interface that allows users to load raw data from the data source.
Interface
@@ -166,33 +169,8 @@ Interface
Here are some interfaces of the ``QlibDataLoader`` class:
- `load(instruments, start_time=None, end_time=None)`
- This method loads the data as pd.DataFrame
- Parameters:
- `instruments` \: str or dict
it can either be the market name or the config file of instruments generated by InstrumentProvider.
- `start_time` \: str
start of the time range.
- `end_time` \: str
end of the time range.
- Returns:
- The data being loaded with type `pd.DataFrame`
- `load_group_df(instruments, exprs: list, names: list, start_time=None, end_time=None)`
- This method loads the dataframe for specific group.
- Parameters:
- `instruments` \: str or dict
it can either be the market name or the config file of instruments generated by InstrumentProvider.
- `exprs` \: list
the expressions to describe the content of the data.
- `names` \: list
the name of the data.
- `start_time` \: str
start of the time range.
- `end_time` \: str
end of the time range.
- Returns:
- The queried data in type `pd.DataFrame`.
.. autoclass:: qlib.data.dataset.loader.QlibDataLoader
:members: load, load_group_df
API
-----------
@@ -207,74 +185,24 @@ The ``Data Handler`` module in ``Qlib`` is designed to handler those common data
Users can use ``Data Handler`` in an automatic workflow by ``qrun``, refer to `Workflow: Workflow Management <workflow.html>`_ for more details.
Base Class & Interface
----------------------
DataHandlerLP
--------------
In addition to use ``Data Handler`` in an automatic workflow with ``qrun``, ``Data Handler`` can be used as an independent module, by which users can easily preprocess data (standardization, remove NaN, etc.) and build datasets.
In order to achieve so, ``Qlib`` provides a base class `qlib.data.dataset.DataHandlerLP <../reference/api.html#qlib.data.dataset.handler.DataHandlerLP>`_. The core idea of this class is that: we will have some leanable ``Processors`` which can learn the parameters of data processing. When new data comes in, these `trained` ``Processors`` can then infer on the new data and thus processing real-time data in an efficient way. More information about ``Processors`` will be listed in the next subsection.
Interface
----------------------
Here are some important interfaces that ``DataHandlerLP`` provides:
- `__init__(instruments=None, start_time=None, end_time=None, data_loader: Tuple[dict, str, DataLoader] = None, infer_processors=[], learn_processors=[], process_type=PTYPE_A, **kwargs)`
- Initialization of the class.
- Parameters:
- `infer_processors` \: list
- list of <description info> of processors to generate data for inference
- example of <description info>:
.. code-block::
1) classname & kwargs:
{
"class": "MinMaxNorm",
"kwargs": {
"fit_start_time": "20080101",
"fit_end_time": "20121231"
}
}
2) Only classname:
"DropnaFeature"
3) object instance of Processor
- `learn_processors` \: list
similar to infer_processors, but for generating data for learning models
- `process_type`: str
- PTYPE_I = 'independent'
- self._infer will processed by infer_processors
- self._learn will be processed by learn_processors
- PTYPE_A = 'append'
- self._infer will processed by infer_processors
- self._learn will be processed by infer_processors + learn_processors
- (e.g. self._infer processed by learn_processors )
- `fetch(selector: Union[pd.Timestamp, slice, str] = slice(None, None), level: Union[str, int] = "datetime", col_set=DataHandler.CS_ALL, data_key: str = DK_I)`
- This method fetches data from underlying data source
- Parameters:
- `selector` \: Union[pd.Timestamp, slice, str]
describe how to select data by index.
- `level` \: Union[str, int]
which index level to select the data.
- `col_set` \: str
select a set of meaningful columns.(e.g. features, columns).
- `data_key` \: str
The data to fetch: DK_*.
- Returns:
- The retrieved results in the type: `pd.DataFrame`.
- `get_cols(col_set=DataHandler.CS_ALL, data_key: str = DK_I)`
- This method gets the column names.
- Parameters:
- `col_set` \: str
select a set of meaningful columns.(e.g. features, columns).
- `data_key` \: str
the data to fetch: DK_*.
- Returns:
- A list of column names.
.. autoclass:: qlib.data.dataset.handler.DataHandlerLP
:members: __init__, fetch, get_cols
If users want to load features and labels by config, users can inherit ``qlib.data.dataset.handler.ConfigDataHandler``, ``Qlib`` also provides some preprocess method in this subclass.
If users want to use qlib data, `QLibDataHandler` is recommended. Users can inherit their custom class from `QLibDataHandler`, which is also a subclass of `ConfigDataHandler`.
@@ -353,23 +281,8 @@ The motivation of this module is that we want to maximize the flexibility of of
The ``DatasetH`` class is the `dataset` with `Data Handler`. Here is the most important interface of the class:
- `prepare(segments: Union[List[str], Tuple[str], str, slice], col_set=DataHandler.CS_ALL, data_key=DataHandlerLP.DK_I, **kwargs)`
- This method prepares the data for learning and inference.
- Parameters:
- `segments` \: Union[List[str], Tuple[str], str, slice]
Describe the scope of the data to be prepared
Here are some examples:
- 'train'
- ['train', 'valid']
- `col_set` \: str
The col_set will be passed to self._handler when fetching data.
- `data_key` \: str
The data to fetch: DK_*
Default is DK_I, which indicate fetching data for **inference**.
.. autoclass:: qlib.data.dataset.__init__.DatasetH
:members:
API
---------

View File

@@ -50,312 +50,17 @@ Qlib Recorder
Here are the available interfaces of ``QlibRecorder``:
- `__init__(exp_manager)`
- Initialization.
- It takes in an input: `exp_manager`, which is an `ExperimentManager` instance. The instance will be created during ``qlib.init``.
- `start(experiment_name=None, recorder_name=None)`
- High level API to start an experiment. This method can only be called within a Python's '`with`' statement.
- Parameters:
- `experiment_name` : str
name of the experiment one wants to start.
- `recorder_name` : str
name of the recorder under the experiment one wants to start.
- Use case:
.. code-block:: Python
with R.start('test', 'recorder_1'):
model.fit(dataset)
R.log...
... # further operations
- `start_exp(experiment_name=None, recorder_name=None, uri=None)`
- Lower level method for starting an experiment. When use this method, one should end the experiment manually and the status of the recorder may not be handled properly.
- Parameters:
- `experiment_name` : str
the name of the experiment to be started
- `recorder_name` : str
name of the recorder under the experiment one wants to start.
- `uri` : str
the tracking uri of the experiment, where all the artifacts/metrics etc. will be stored.
The default uri are set in the qlib.config.
- Returns:
- an experiment instance being started.
- Use case:
.. code-block:: Python
R.start_exp(experiment_name='test', recorder_name='recorder_1')
... # further operations
R.end_exp('FINISHED') or R.end_exp(Recorder.STATUS_S)
- `end_exp(recorder_status=Recorder.STATUS_FI)`
- Method for ending an experiment manually. It will end the current active experiment, as well as its active recorder with the specified `status` type.
- Parameters:
- `status` : str
The status of a recorder, which can be '`SCHEDULED`', '`RUNNING`', '`FINISHED`', '`FAILED`'.
- Use case:
.. code-block:: Python
R.start_exp(experiment_name='test')
... # further operations
R.end_exp('FINISHED') or R.end_exp(Recorder.STATUS_S)
- `search_records(experiment_ids, **kwargs)`
- Get a pandas DataFrame of all the records that have been stored with the given search criteria. This method is highly correlated with MLFlow's ``search_runs`` method (`link <https://www.mlflow.org/docs/latest/python_api/mlflow.html#mlflow.search_runs>`_).
- Parameters:
- `experiment_ids` : list
list of experiment IDs.
- `filter_string` : str
filter query string, defaults to searching all runs.
- `run_view_type` : int
one of enum values ACTIVE_ONLY (1), DELETED_ONLY (2), or ALL (3).
- `max_results` : int
the maximum number of runs to put in the dataframe.
- `order_by` : list
list of columns to order by (e.g., “metrics.rmse”).
- Returns:
- A pandas.DataFrame of records, where each metric, parameter, and tag are expanded into their own columns named metrics.*, params.*, and tags.* respectively. For records that don't have a particular metric, parameter, or tag, their value will be (NumPy) Nan, None, or None respectively.
- Use case:
.. code-block:: Python
R.log_metrics(m=2.50, step=0)
records = R.search_runs([experiment_id], order_by=["metrics.m DESC"])
- `list_experiments()`
- Method for listing all the existing experiments (except for those being deleted.)
- Returns:
- A dictionary (name -> experiment) of experiments information that being stored.
- Use case:
.. code-block:: Python
exps = R.list_experiments()
- `list_recorders(experiment_id=None, experiment_name=None)`
- Method for listing all the recorders of experiment with given id or name. If user doesn't provide the id or name of the experiment, this method will try to retrieve the default experiment and list all the recorders of the default experiment. If the default experiment doesn't exist, the method will first create the default experiment, and then create a new recorder under it.
- Parameters:
- `experiment_id` : str
id of the experiment.
- `experiment_name` : str
name of the experiment.
- Returns:
- A dictionary (id -> recorder) of recorder information that being stored.
- Use case:
.. code-block:: Python
recorders = R.list_recorders(experiment_name='test')
- `get_exp(experiment_id=None, experiment_name=None, create: bool = True)`
- Method for retrieving an experiment with given id or name. Once the '`create`' argument is set to True, if no valid experiment is found, this method will create one for the user. Otherwise, it will only retrieve a specific experiment or raise an Error.
- If '`create`' is True:
- If ``R``'s running:
- no id or name specified, return the active experiment.
- if id or name is specified, return the specified experiment. If no such exp found, create a new experiment with given id or name, and the experiment is set to be running.
- If ``R``'s not running:
- no id or name specified, create a default experiment, and the experiment is set to be running.
- if id or name is specified, return the specified experiment. If no such exp found, create a new experiment with given name or the default experiment, and the experiment is set to be running.
- Else If '`create`' is False:
- If ``R``'s running:
- no id or name specified, return the active experiment.
- if id or name is specified, return the specified experiment. If no such exp found, raise Error.
- If ``R``'s not running:
- no id or name specified. If the default experiment exists, return it, otherwise, raise Error.
- if id or name is specified, return the specified experiment. If no such exp found, raise Error.
- Parameters:
- `experiment_id` : str
id of the experiment.
- `experiment_name` : str
name of the experiment.
- `create` : boolean
an argument determines whether the method will automatically create a new experiment according to user's specification if the experiment hasn't been created before.
- Returns:
- An experiment instance with given id or name.
- Use case:
.. code-block:: Python
# Case 1
with R.start('test'):
exp = R.get_exp()
recorders = exp.list_recorders()
# Case 2
with R.start('test'):
exp = R.get_exp('test1')
# Case 3
exp = R.get_exp() -> a default experiment.
# Case 4
exp = R.get_exp(experiment_name='test')
# Case 5
exp = R.get_exp(create=False) -> the default experiment if exists.
- `delete_exp(experiment_id=None, experiment_name=None)`
- Method for deleting the experiment with given id or name. At least one of id or name must be given, otherwise, error will occur.
- Parameters:
- `experiment_id` : str
id of the experiment.
- `experiment_name` : str
name of the experiment.
- Use case:
.. code-block:: Python
R.delete_exp(experiment_name='test')
- `get_uri()`
- Method for retrieving the uri of current experiment manager.
- Returns:
- The uri of current experiment manager.
- Use case:
.. code-block:: Python
uri = R.get_uri()
- `get_recorder(recorder_id=None, recorder_name=None, experiment_name=None)`
- Method for retrieving a recorder. The recorder can be used for further process such as ``save_objects``, ``load_object``, ``log_params``, ``log_metrics``, etc.
- If ``R``'s running:
- no id or name specified, return the active recorder.
- if id or name is specified, return the specified recorder.
- If ``R``'s not running:
- no id or name specified, raise Error.
- if id or name is specified, and the corresponding experiment_name must be given, return the specified recorder. Otherwise, raise Error.
- Parameters:
- `recorder_id` : str
id of the recorder.
- `recorder_name` : str
name of the recorder.
- `experiment_name` : str
name of the experiment.
- Returns:
- A recorder instance.
- Use case:
.. code-block:: Python
# Case 1
with R.start('test'):
recorder = R.get_recorder()
# Case 2
with R.start('test'):
recorder = R.get_recorder(recorder_id='2e7a4efd66574fa49039e00ffaefa99d')
# Case 3
recorder = R.get_recorder() -> Error
# Case 4
recorder = R.get_recorder(recorder_id='2e7a4efd66574fa49039e00ffaefa99d') -> Error
# Case 5
recorder = R.get_recorder(recorder_id='2e7a4efd66574fa49039e00ffaefa99d', experiment_name='test')
- `delete_recorder(recorder_id=None, recorder_name=None)`
- Method for deleting the recorders with given id or name. At least one of id or name must be given, otherwise, error will occur.
- Parameters:
- `recorder_id` : str
id of the experiment.
- `recorder_name` : str
name of the experiment.
- Use case:
.. code-block:: Python
R.delete_recorder(recorder_id='2e7a4efd66574fa49039e00ffaefa99d')
- `save_objects(local_path=None, artifact_path=None, **kwargs)`
- Method for saving objects as artifacts in the experiment to the uri. It supports either saving from a local file/directory, or directly saving objects. User can use valid python's keywords arguments to specify the object to be saved as well as its name (name: value).
- If R's running: it will save the objects through the running recorder.
- If R's not running: the system will create a default experiment, and a new recorder and save objects under it.
.. note::
If one wants to save objects with a specific recorder. It is recommended to first get the specific recorder through `get_recorder` API and use the recorder the save objects. The supported arguments are the same as this method.
- Parameters:
- `local_path` : str
if provided, them save the file or directory to the artifact URI.
- `artifact_path` : str
the relative path for the artifact to be stored in the URI.
- Use case:
.. code-block:: Python
# Case 1
with R.start('test'):
pred = model.predict(dataset)
R.save_objects(**{"pred.pkl": pred}, artifact_path='prediction')
# Case 2
with R.start('test'):
R.save_objects(local_path='results/pred.pkl')
- `log_params(**kwargs)`
- Method for logging parameters during an experiment. In addition to using ``R``, one can also log to a specific recorder after getting it with `get_recorder` API.
- If R's running: it will log parameters through the running recorder.
- If R's not running: the system will create a default experiment as well as a new recorder, and log parameters under it.
- Parameters:
- `keyword argument`:
name1=value1, name2=value2, ...
- Use case:
.. code-block:: Python
# Case 1
with R.start('test'):
R.log_params(learning_rate=0.01)
# Case 2
R.log_params(learning_rate=0.01)
- `log_metrics(step=None, **kwargs)`
- Method for logging metrics during an experiment. In addition to using ``R``, one can also log to a specific recorder after getting it with `get_recorder` API.
- If R's running: it will log metrics through the running recorder.
- If R's not running: the system will create a default experiment as well as a new recorder, and log metrics under it.
- Parameters:
- `step`: int
a single integer step at which to log the specified Metrics. If unspecified, each metric is logged at step zero.
- `keyword argument`:
name1=value1, name2=value2, ...
- `set_tags(**kwargs)`
- Method for setting tags for a recorder. In addition to using ``R``, one can also set the tag to a specific recorder after getting it with `get_recorder` API.
- If R's running: it will set tags through the running recorder.
- If R's not running: the system will create a default experiment as well as a new recorder, and set the tags under it.
- Parameters:
- `keyword argument`:
name1=value1, name2=value2, ...
- Use case:
.. code-block:: Python
# Case 1
with R.start('test'):
R.set_tags(release_version="2.2.0")
# Case 2
R.set_tags(release_version="2.2.0")
.. autoclass:: qlib.workflow.__init__.QlibRecorder
:members:
Experiment Manager
===================
The ``ExpManager`` module in ``Qlib`` is responsible for managing different experiments. Most of the APIs of ``ExpManager`` are similar to ``QlibRecorder``, and the most important API will be the ``get_exp`` method. User can directly refer to the documents above for some detailed information about how to use the ``get_exp`` method.
.. autoclass:: qlib.workflow.expm.ExpManager
:members: get_exp, list_experiments
For other interfaces such as `create_exp`, `delete_exp`, please refer to `Experiment Manager API <../reference/api.html#experiment-manager>`_.
Experiment
@@ -363,6 +68,9 @@ Experiment
The ``Experiment`` class is solely responsible for a single experiment, and it will handle any operations that are related to an experiment. Basic methods such as `start`, `end` an experiment are included. Besides, methods related to `recorders` are also available: such methods include `get_recorder` and `list_recorders`.
.. autoclass:: qlib.workflow.exp.Experiment
:members: get_recorder, list_recorders
For other interfaces such as `search_records`, `delete_recorder`, please refer to `Experiment API <../reference/api.html#experiment>`_.
Recorder
@@ -372,28 +80,8 @@ The ``Recorder`` class is responsible for a single recorder. It will handle some
Here are some important APIs that are not included in the ``QlibRecorder``:
- `list_artifacts(artifact_path: str = None)`
- List all the artifacts of a recorder.
- Parameters:
- `artifact_path` : str
the relative path for the artifact to be stored in the URI.
- Returns:
- A list of artifacts information (name, path, etc.) that being stored.
- `list_metrics()`
- List all the metrics of a recorder.
- Returns:
- A dictionary of metrics that being stored.
- `list_params()`
- List all the params of a recorder.
- Returns:
- A dictionary of params that being stored.
- `list_tags()`
- List all the tags of a recorder.
- Returns:
- A dictionary of tags that being stored.
.. autoclass:: qlib.workflow.recorder.Recorder
:members: list_artifacts, list_metrics, list_params, list_tags
For other interfaces such as `save_objects`, `load_object`, please refer to `Recorder API <../reference/api.html#recorder>`_.

View File

@@ -124,7 +124,7 @@ html_theme_options = {
"logo_only": True,
"collapse_navigation": False,
"display_version": False,
"navigation_depth": 3,
"navigation_depth": 4,
}
# Add any paths that contain custom static files (such as style sheets) here,

View File

@@ -23,16 +23,13 @@ Filter
.. automodule:: qlib.data.filter
:members:
Feature
--------------------
Class
~~~~~~~~~~~~~~~~~~~~
--------------------
.. automodule:: qlib.data.base
:members:
Operator
~~~~~~~~~~~~~~~~~~~~
--------------------
.. automodule:: qlib.data.ops
:members:
@@ -56,29 +53,32 @@ Cache
.. autoclass:: qlib.data.cache.DiskDatasetCache
:members:
Dataset
---------------
Contrib
====================
Dataset Class
~~~~~~~~~~~~~~~~~~~~
.. automodule:: qlib.data.dataset.__init__
:members:
Data Loader
---------------
~~~~~~~~~~~~~~~~~~~~
.. automodule:: qlib.data.dataset.loader
:members:
Data Handler
---------------
~~~~~~~~~~~~~~~~~~~~
.. automodule:: qlib.data.dataset.handler
:members:
Processor
---------------
~~~~~~~~~~~~~~~~~~~~
.. automodule:: qlib.data.dataset.processor
:members:
Dataset
---------------
.. automodule:: qlib.data.dataset.__init__
:members:
Contrib
====================
Model
--------------------

View File

@@ -1,176 +0,0 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import yaml
import copy
import os
import json
import tempfile
from pathlib import Path
from ...config import REG_CN
class EstimatorConfigManager(object):
def __init__(self, config_path):
if not config_path:
raise ValueError("Config path is invalid.")
self.config_path = config_path
with open(config_path) as fp:
config = yaml.load(fp, Loader=yaml.FullLoader)
self.config = copy.deepcopy(config)
self.ex_config = ExperimentConfig(config.get("experiment", dict()), self)
self.data_config = DataConfig(config.get("data", dict()), self)
self.model_config = ModelConfig(config.get("model", dict()), self)
self.trainer_config = TrainerConfig(config.get("trainer", dict()), self)
self.strategy_config = StrategyConfig(config.get("strategy", dict()), self)
self.backtest_config = BacktestConfig(config.get("backtest", dict()), self)
self.qlib_data_config = QlibDataConfig(config.get("qlib_data", dict()), self)
# If the start_date and end_date are not given in data_config, they will be referred from the trainer_config.
handler_start_date = self.data_config.handler_parameters.get("start_date", None)
handler_end_date = self.data_config.handler_parameters.get("end_date", None)
if handler_start_date is None:
self.data_config.handler_parameters["start_date"] = self.trainer_config.parameters["train_start_date"]
if handler_end_date is None:
self.data_config.handler_parameters["end_date"] = self.trainer_config.parameters["test_end_date"]
class ExperimentConfig(object):
TRAIN_MODE = "train"
TEST_MODE = "test"
OBSERVER_FILE_STORAGE = "file_storage"
OBSERVER_MONGO = "mongo"
def __init__(self, config, CONFIG_MANAGER):
"""__init__
:param config: The config dict for experiment
:param CONFIG_MANAGER: The estimator config manager
"""
self.name = config.get("name", "test_experiment")
# The dir of the result of all the experiments
self.global_dir = config.get("dir", os.path.dirname(CONFIG_MANAGER.config_path))
# The dir of the result of current experiment
self.ex_dir = os.path.join(self.global_dir, self.name)
if not os.path.exists(self.ex_dir):
os.makedirs(self.ex_dir)
self.tmp_run_dir = tempfile.mkdtemp(dir=self.ex_dir)
self.mode = config.get("mode", ExperimentConfig.TRAIN_MODE)
self.sacred_dir = os.path.join(self.ex_dir, "sacred")
self.observer_type = config.get("observer_type", ExperimentConfig.OBSERVER_FILE_STORAGE)
self.mongo_url = config.get("mongo_url", None)
self.db_name = config.get("db_name", None)
self.finetune = config.get("finetune", False)
# The path of the experiment id of the experiment
self.exp_info_path = config.get("exp_info_path", os.path.join(self.ex_dir, "exp_info.json"))
exp_info_dir = Path(self.exp_info_path).parent
exp_info_dir.mkdir(parents=True, exist_ok=True)
# Test mode config
loader_args = config.get("loader", dict())
if self.mode == ExperimentConfig.TEST_MODE or self.finetune:
loader_exp_info_path = loader_args.get("exp_info_path", None)
self.loader_model_index = loader_args.get("model_index", None)
if (loader_exp_info_path is not None) and (os.path.exists(loader_exp_info_path)):
with open(loader_exp_info_path) as fp:
loader_dict = json.load(fp)
for k, v in loader_dict.items():
setattr(self, "loader_{}".format(k), v)
# Check loader experiment id
assert hasattr(self, "loader_id"), "If mode is test or finetune is True, loader must contain id."
else:
self.loader_id = loader_args.get("id", None)
if self.loader_id is None:
raise ValueError("If mode is test or finetune is True, loader must contain id.")
self.loader_observer_type = loader_args.get("observer_type", self.observer_type)
self.loader_name = loader_args.get("name", self.name)
self.loader_dir = loader_args.get("dir", self.global_dir)
self.loader_mongo_url = loader_args.get("mongo_url", self.mongo_url)
self.loader_db_name = loader_args.get("db_name", self.db_name)
class DataConfig(object):
def __init__(self, config, CONFIG_MANAGER):
"""__init__
:param config: The config dict for data
:param CONFIG_MANAGER: The estimator config manager
"""
self.handler_module_path = config.get("module_path", "qlib.contrib.data.handler")
self.handler_class = config.get("class", "ALPHA360")
self.handler_parameters = config.get("args", dict())
self.handler_filter = config.get("filter", dict())
# Update provider uri.
class ModelConfig(object):
def __init__(self, config, CONFIG_MANAGER):
"""__init__
:param config: The config dict for model
:param CONFIG_MANAGER: The estimator config manager
"""
self.model_class = config.get("class", "Model")
self.model_module_path = config.get("module_path", "qlib.model")
self.save_dir = os.path.join(CONFIG_MANAGER.ex_config.tmp_run_dir, "model")
self.save_path = config.get("save_path", os.path.join(self.save_dir, "model.bin"))
self.parameters = config.get("args", dict())
# Make dir if need.
if not os.path.exists(self.save_dir):
os.makedirs(self.save_dir)
class TrainerConfig(object):
def __init__(self, config, CONFIG_MANAGER):
"""__init__
:param config: The config dict for trainer
:param CONFIG_MANAGER: The estimator config manager
"""
self.trainer_class = config.get("class", "StaticTrainer")
self.trainer_module_path = config.get("module_path", "qlib.contrib.estimator.trainer")
self.parameters = config.get("args", dict())
class StrategyConfig(object):
def __init__(self, config, CONFIG_MANAGER):
"""__init__
:param config: The config dict for strategy
:param CONFIG_MANAGER: The estimator config manager
"""
self.strategy_class = config.get("class", "TopkDropoutStrategy")
self.strategy_module_path = config.get("module_path", "qlib.contrib.strategy.strategy")
self.parameters = config.get("args", dict())
class BacktestConfig(object):
def __init__(self, config, CONFIG_MANAGE):
"""__init__
:param config: The config dict for strategy
:param CONFIG_MANAGE: The estimator config manager
"""
self.normal_backtest_parameters = config.get("normal_backtest_args", dict())
self.long_short_backtest_parameters = config.get("long_short_backtest_args", dict())
class QlibDataConfig(object):
def __init__(self, config, CONFIG_MANAGE):
"""__init__
:param config: The config dict for qlib_client
:param CONFIG_MANAGE: The estimator config manager
"""
self.provider_uri = config.pop("provider_uri", "~/.qlib/qlib_data/cn_data")
self.auto_mount = config.pop("auto_mount", False)
self.mount_path = config.pop("mount_path", "~/.qlib/qlib_data/cn_data")
self.region = config.pop("region", REG_CN)
self.args = config

View File

@@ -1,328 +0,0 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# coding=utf-8
import pandas as pd
import os
import copy
import json
import yaml
import pickle
import qlib
from ..evaluate import risk_analysis
from ..evaluate import backtest as normal_backtest
from ..evaluate import long_short_backtest
from .config import ExperimentConfig
from .fetcher import create_fetcher_with_config
from ...log import get_module_logger, TimeInspector
from ...utils import get_module_by_module_path, compare_dict_value
class Estimator(object):
def __init__(self, config_manager, sacred_ex):
# Set logger.
self.logger = get_module_logger("Estimator")
# 1. Set config manager.
self.config_manager = config_manager
# 2. Set configs.
self.ex_config = config_manager.ex_config
self.data_config = config_manager.data_config
self.model_config = config_manager.model_config
self.trainer_config = config_manager.trainer_config
self.strategy_config = config_manager.strategy_config
self.backtest_config = config_manager.backtest_config
# If experiment.mode is test or experiment.finetune is True, load the experimental results in the loader
if self.ex_config.mode == self.ex_config.TEST_MODE or self.ex_config.finetune:
self.compare_config_with_config_manger(self.config_manager)
# 3. Set sacred_experiment.
self.ex = sacred_ex
# 4. Init data handler.
self.data_handler = None
self._init_data_handler()
# 5. Init trainer.
self.trainer = None
self._init_trainer()
# 6. Init strategy.
self.strategy = None
self._init_strategy()
def _init_data_handler(self):
handler_module = get_module_by_module_path(self.data_config.handler_module_path)
# Set market
market = self.data_config.handler_filter.get("market", None)
if market is None:
if "market" in self.data_config.handler_parameters:
self.logger.warning(
"Warning: The market in data.args section is deprecated. "
"It only works when market is not set in data.filter section. "
"It will be overridden by market in the data.filter section."
)
market = self.data_config.handler_parameters["market"]
else:
market = "csi500"
self.data_config.handler_parameters["market"] = market
data_filter_list = []
handler_filters = self.data_config.handler_filter.get("filter_pipeline", list())
for h_filter in handler_filters:
filter_module_path = h_filter.get("module_path", "qlib.data.filter")
filter_class_name = h_filter.get("class", "")
filter_parameters = h_filter.get("args", {})
filter_module = get_module_by_module_path(filter_module_path)
filter_class = getattr(filter_module, filter_class_name)
data_filter = filter_class(**filter_parameters)
data_filter_list.append(data_filter)
self.data_config.handler_parameters["data_filter_list"] = data_filter_list
handler_class = getattr(handler_module, self.data_config.handler_class)
self.data_handler = handler_class(**self.data_config.handler_parameters)
def _init_trainer(self):
model_module = get_module_by_module_path(self.model_config.model_module_path)
trainer_module = get_module_by_module_path(self.trainer_config.trainer_module_path)
model_class = getattr(model_module, self.model_config.model_class)
trainer_class = getattr(trainer_module, self.trainer_config.trainer_class)
self.trainer = trainer_class(
model_class,
self.model_config.save_path,
self.model_config.parameters,
self.data_handler,
self.ex,
**self.trainer_config.parameters
)
def _init_strategy(self):
module = get_module_by_module_path(self.strategy_config.strategy_module_path)
strategy_class = getattr(module, self.strategy_config.strategy_class)
self.strategy = strategy_class(**self.strategy_config.parameters)
def run(self):
if self.ex_config.mode == ExperimentConfig.TRAIN_MODE:
self.trainer.train()
elif self.ex_config.mode == ExperimentConfig.TEST_MODE:
self.trainer.load()
else:
raise ValueError("unexpected mode: %s" % self.ex_config.mode)
analysis = self.backtest()
print(analysis)
self.logger.info(
"experiment id: {}, experiment name: {}".format(self.ex.experiment.current_run._id, self.ex_config.name)
)
# Remove temp dir
# shutil.rmtree(self.ex_config.tmp_run_dir)
def backtest(self):
TimeInspector.set_time_mark()
# 1. Get pred and prediction score of model(s).
pred = self.trainer.get_test_score()
try:
performance = self.trainer.get_test_performance()
except NotImplementedError:
performance = None
# 2. Normal Backtest.
report_normal, positions_normal = self._normal_backtest(pred)
# 3. Long-Short Backtest.
# Deprecated
# long_short_reports = self._long_short_backtest(pred)
# 4. Analyze
analysis_df = self._analyze(report_normal)
# 5. Save.
self._save_backtest_result(
pred,
analysis_df,
positions_normal,
report_normal,
# long_short_reports,
performance,
)
return analysis_df
def _normal_backtest(self, pred):
TimeInspector.set_time_mark()
if "account" not in self.backtest_config.normal_backtest_parameters:
if "account" in self.strategy_config.parameters:
self.logger.warning(
"Warning: The account in strategy section is deprecated. "
"It only works when account is not set in backtest section. "
"It will be overridden by account in the backtest section."
)
self.backtest_config.normal_backtest_parameters["account"] = self.strategy_config.parameters["account"]
report_normal, positions_normal = normal_backtest(
pred, strategy=self.strategy, **self.backtest_config.normal_backtest_parameters
)
TimeInspector.log_cost_time("Finished normal backtest.")
return report_normal, positions_normal
def _long_short_backtest(self, pred):
TimeInspector.set_time_mark()
long_short_reports = long_short_backtest(pred, **self.backtest_config.long_short_backtest_parameters)
TimeInspector.log_cost_time("Finished long-short backtest.")
return long_short_reports
@staticmethod
def _analyze(report_normal):
TimeInspector.set_time_mark()
analysis = dict()
# analysis["pred_long"] = risk_analysis(long_short_reports["long"])
# analysis["pred_short"] = risk_analysis(long_short_reports["short"])
# analysis["pred_long_short"] = risk_analysis(long_short_reports["long_short"])
analysis["excess_return_without_cost"] = risk_analysis(report_normal["return"] - report_normal["bench"])
analysis["excess_return_with_cost"] = risk_analysis(
report_normal["return"] - report_normal["bench"] - report_normal["cost"]
)
analysis_df = pd.concat(analysis) # type: pd.DataFrame
TimeInspector.log_cost_time(
"Finished generating analysis," " average turnover is: {0:.4f}.".format(report_normal["turnover"].mean())
)
return analysis_df
def _save_backtest_result(self, pred, analysis, positions, report_normal, performance):
# 1. Result dir.
result_dir = os.path.join(self.config_manager.ex_config.tmp_run_dir, "result")
if not os.path.exists(result_dir):
os.makedirs(result_dir)
self.ex.add_info(
"task_config",
json.loads(json.dumps(self.config_manager.config, default=str)),
)
# 2. Pred.
TimeInspector.set_time_mark()
pred_pkl_path = os.path.join(result_dir, "pred.pkl")
pred.to_pickle(pred_pkl_path)
self.ex.add_artifact(pred_pkl_path)
TimeInspector.log_cost_time("Finished saving pred.pkl to: {}".format(pred_pkl_path))
# 3. Ana.
TimeInspector.set_time_mark()
analysis_pkl_path = os.path.join(result_dir, "analysis.pkl")
analysis.to_pickle(analysis_pkl_path)
self.ex.add_artifact(analysis_pkl_path)
TimeInspector.log_cost_time("Finished saving analysis.pkl to: {}".format(analysis_pkl_path))
# 4. Pos.
TimeInspector.set_time_mark()
positions_pkl_path = os.path.join(result_dir, "positions.pkl")
with open(positions_pkl_path, "wb") as fp:
pickle.dump(positions, fp)
self.ex.add_artifact(positions_pkl_path)
TimeInspector.log_cost_time("Finished saving positions.pkl to: {}".format(positions_pkl_path))
# 5. Report normal.
TimeInspector.set_time_mark()
report_normal_pkl_path = os.path.join(result_dir, "report_normal.pkl")
report_normal.to_pickle(report_normal_pkl_path)
self.ex.add_artifact(report_normal_pkl_path)
TimeInspector.log_cost_time("Finished saving report_normal.pkl to: {}".format(report_normal_pkl_path))
# 6. Report long short.
# Deprecated
# for k, name in zip(
# ["long", "short", "long_short"],
# ["report_long.pkl", "report_short.pkl", "report_long_short.pkl"],
# ):
# TimeInspector.set_time_mark()
# pkl_path = os.path.join(result_dir, name)
# long_short_reports[k].to_pickle(pkl_path)
# self.ex.add_artifact(pkl_path)
# TimeInspector.log_cost_time("Finished saving {} to: {}".format(name, pkl_path))
# 7. Origin test label.
TimeInspector.set_time_mark()
label_pkl_path = os.path.join(result_dir, "label.pkl")
self.data_handler.get_origin_test_label_with_date(
self.trainer_config.parameters["test_start_date"],
self.trainer_config.parameters["test_end_date"],
).to_pickle(label_pkl_path)
self.ex.add_artifact(label_pkl_path)
TimeInspector.log_cost_time("Finished saving label.pkl to: {}".format(label_pkl_path))
# 8. Experiment info, save the model(s) performance here.
TimeInspector.set_time_mark()
cur_ex_id = self.ex.experiment.current_run._id
exp_info = {
"id": cur_ex_id,
"name": self.ex_config.name,
"performance": performance,
"observer_type": self.ex_config.observer_type,
}
if self.ex_config.observer_type == ExperimentConfig.OBSERVER_MONGO:
exp_info.update(
{
"mongo_url": self.ex_config.mongo_url,
"db_name": self.ex_config.db_name,
}
)
else:
exp_info.update({"dir": self.ex_config.global_dir})
with open(self.ex_config.exp_info_path, "w") as fp:
json.dump(exp_info, fp, indent=4, sort_keys=True)
self.ex.add_artifact(self.ex_config.exp_info_path)
TimeInspector.log_cost_time("Finished saving ex_info to: {}".format(self.ex_config.exp_info_path))
@staticmethod
def compare_config_with_config_manger(config_manager):
"""Compare loader model args and current config with ConfigManage
:param config_manager: ConfigManager
:return:
"""
fetcher = create_fetcher_with_config(config_manager, load_form_loader=True)
loader_mode_config = fetcher.get_experiment(
exp_name=config_manager.ex_config.loader_name,
exp_id=config_manager.ex_config.loader_id,
fields=["task_config"],
)["task_config"]
with open(config_manager.config_path) as fp:
current_config = yaml.load(fp.read())
current_config = json.loads(json.dumps(current_config, default=str))
logger = get_module_logger("Estimator")
loader_mode_config = copy.deepcopy(loader_mode_config)
current_config = copy.deepcopy(current_config)
# Require test_mode_config.test_start_date <= current_config.test_start_date
loader_trainer_args = loader_mode_config.get("trainer", {}).get("args", {})
cur_trainer_args = current_config.get("trainer", {}).get("args", {})
loader_start_date = loader_trainer_args.pop("test_start_date")
cur_test_start_date = cur_trainer_args.pop("test_start_date")
assert (
loader_start_date <= cur_test_start_date
), "Require: loader_mode_config.test_start_date <= current_config.test_start_date"
# TODO: For the user's own extended `Trainer`, the support is not very good
if "RollingTrainer" == current_config.get("trainer", {}).get("class", None):
loader_period = loader_trainer_args.pop("rolling_period")
cur_period = cur_trainer_args.pop("rolling_period")
assert (
loader_period == cur_period
), "Require: loader_mode_config.rolling_period == current_config.rolling_period"
compare_section = ["trainer", "model", "data"]
for section in compare_section:
changes = compare_dict_value(loader_mode_config.get(section, {}), current_config.get(section, {}))
if changes:
logger.warning("Warning: Loader mode config and current config, `{}` are different:\n".format(section))

View File

@@ -1,290 +0,0 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# coding=utf-8
import copy
import json
import yaml
import pickle
import gridfs
import pymongo
from pathlib import Path
from abc import abstractmethod
from .config import EstimatorConfigManager, ExperimentConfig
class Fetcher(object):
"""Sacred Experiments Fetcher"""
@abstractmethod
def _get_experiment(self, exp_name, exp_id):
"""Get experiment basic info with experiment and experiment id
:param exp_name: experiment name
:param exp_id: experiment id
:return: dict
Must contain keys: _id, experiment, info, stop_time.
Here is an example below for FileFetcher.
exp = {
'_id': exp_id, # experiment id
'path': path, # experiment result path
'experiment': {'name': exp_name}, # experiment
'info': info, # experiment config info
'stop_time': run.get('stop_time', None) # The time the experiment ended
}
"""
pass
@abstractmethod
def _list_experiments(self, exp_name=None):
"""Get experiment basic info list with experiment name
:param exp_name: experiment name
:return: list
"""
pass
@abstractmethod
def _iter_artifacts(self, experiment):
"""Get information about the data in the experiment results
:param experiment: `self._get_experiment` method result
:return: iterable
Each element contains two elements.
first element : data name
second element : data uri
"""
pass
@abstractmethod
def _load_data(self, uri):
"""Load data with uri
:param uri: data uri
:return: bytes
"""
pass
@staticmethod
def model_dict_to_buffer_list(model_dict):
"""
:param model_dict:
:return:
"""
model_list = []
is_static_model = False
if len(model_dict) == 1 and list(model_dict.keys())[0] == "model.bin":
is_static_model = True
model_list.append(list(model_dict.values())[0])
else:
sep = "model.bin_"
model_ids = list(map(lambda x: int(x.split(sep)[1]), model_dict.keys()))
min_id, max_id = min(model_ids), max(model_ids)
for i in range(min_id, max_id + 1):
model_key = sep + str(i)
model = model_dict.get(model_key, None)
if model is None:
print(
"WARNING: In Fetcher, {} is missing when the get model is in the get_experiment function.".format(
model_key
)
)
break
else:
model_list.append(model)
if is_static_model:
return model_list[0]
return model_list
def get_experiments(self, exp_name=None):
"""Get experiments with name.
:param exp_name: str
If `exp_name` is set to None, then all experiments will return.
:return: dict
Experiments info dict(Including experiment id and task_config to run the
experiment). Here is an example below.
{
'a_experiment': [
{
'id': '1',
'task_config': {...}
},
...
]
...
}
"""
res = dict()
for ex in self._list_experiments(exp_name):
name = ex["experiment"]["name"]
tmp = {
"id": ex["_id"],
"task_config": ex["info"].get("task_config", {}),
"ex_run_stop_time": ex.get("stop_time", None),
}
res.setdefault(name, []).append(tmp)
return res
def get_experiment(self, exp_name, exp_id, fields=None):
"""
:param exp_name:
:param exp_id:
:param fields: list
Experiment result fields, if fields is None, will get all fields.
Currently supported fields:
['model', 'analysis', 'positions', 'report_normal', 'pred', 'task_config', 'label']
:return: dict
"""
fields = copy.copy(fields)
ex = self._get_experiment(exp_name, exp_id)
results = dict()
model_dict = dict()
for name, uri in self._iter_artifacts(ex):
# When saving, use `sacred.experiment.add_artifact(filename)` , so `name` is os.path.basename(filename)
prefix = name.split(".")[0]
if fields and prefix not in fields:
continue
data = self._load_data(uri)
if prefix == "model":
model_dict[name] = data
else:
results[prefix] = pickle.loads(data)
# Sort model
if model_dict:
results["model"] = self.model_dict_to_buffer_list(model_dict)
# Info
results["task_config"] = ex["info"].get("task_config", {})
return results
def estimator_config_to_dict(self, exp_name, exp_id):
"""Save configuration to file
:param exp_name:
:param exp_id:
:return: config dict
"""
return self.get_experiment(exp_name, exp_id, fields=["task_config"])["task_config"]
class FileFetcher(Fetcher):
"""File Fetcher"""
def __init__(self, experiments_dir):
self.experiments_dir = Path(experiments_dir)
def _get_experiment(self, exp_name, exp_id):
path = self.experiments_dir / exp_name / "sacred" / str(exp_id)
info_path = path / "info.json"
run_path = path / "run.json"
if info_path.exists():
with info_path.open("r") as f:
info = json.load(f)
else:
info = {}
if run_path.exists():
with run_path.open("r") as f:
run = json.load(f)
else:
run = {}
exp = {
"_id": exp_id,
"path": path,
"experiment": {"name": exp_name},
"info": info,
"stop_time": run.get("stop_time", None),
}
return exp
def _list_experiments(self, exp_name=None):
runs = []
for path in self.experiments_dir.glob("{}/sacred/[!_]*".format(exp_name or "*")):
exp_name, exp_id = path.parents[1].name, path.name
runs.append(self._get_experiment(exp_name, exp_id))
return runs
def _iter_artifacts(self, experiment):
if experiment is None:
return []
for fname in experiment["path"].iterdir():
if fname.suffix == ".pkl" or ".bin" in fname.suffix:
name, uri = fname.name, str(fname)
yield name, uri
def _load_data(self, uri):
with open(uri, "rb") as f:
data = f.read()
return data
class MongoFetcher(Fetcher):
"""MongoDB Fetcher"""
def __init__(self, mongo_url, db_name):
self.mongo_url = mongo_url
self.db_name = db_name
self.client = None
self.db = None
self.runs = None
self.fs = None
self._setup_mongo_client()
def _setup_mongo_client(self):
self.client = pymongo.MongoClient(self.mongo_url)
self.db = self.client[self.db_name]
self.runs = self.db.runs
self.fs = gridfs.GridFS(self.db)
def _get_experiment(self, exp_name, exp_id):
return self.runs.find_one({"_id": exp_id})
def _list_experiments(self, exp_name=None):
if exp_name is None:
return self.runs.find()
return self.runs.find({"experiment.name": exp_name})
def _iter_artifacts(self, experiment):
if experiment is None:
return []
for artifact in experiment.get("artifacts", []):
name, uri = artifact["name"], artifact["file_id"]
yield name, uri
def _load_data(self, uri):
data = self.fs.get(uri).read()
return data
def create_fetcher_with_config(config_manager: EstimatorConfigManager, load_form_loader: bool = False):
"""Create fetcher with loader config
:param config_manager:
:param load_form_loader
:return:
"""
flag = ""
if load_form_loader:
flag = "loader_"
if config_manager.ex_config.observer_type == ExperimentConfig.OBSERVER_FILE_STORAGE:
return FileFetcher(eval("config_manager.ex_config.{}_dir".format("loader" if load_form_loader else "global")))
elif config_manager.ex_config.observer_type == ExperimentConfig.OBSERVER_MONGO:
return MongoFetcher(
mongo_url=eval("config_manager.ex_config.{}mongo_url".format(flag)),
db_name=eval("config_manager.ex_config.{}db_name".format(flag)),
)
else:
return NotImplementedError("Unkown Backend")

View File

@@ -1,115 +0,0 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import argparse
import importlib
from ... import init
from .config import EstimatorConfigManager
from ...log import get_module_logger
from sacred import Experiment
from sacred.observers import FileStorageObserver
from sacred.observers import MongoObserver
args_parser = argparse.ArgumentParser(prog="estimator")
args_parser.add_argument(
"-c",
"--config_path",
required=True,
type=str,
help="json config path indicates where to load config.",
)
args = args_parser.parse_args()
class SacredExperiment(object):
def __init__(
self,
experiment_name,
experiment_dir,
observer_type="file_storage",
mongo_url=None,
db_name=None,
):
"""__init__
:param experiment_name: The name of the experiments.
:param experiment_dir: The directory to store all the results of the experiments(This is for file_storage).
:param observer_type: The observer to record the results: the `file_storage` or `mongo`
:param mongo_url: The mongo url(for mongo observer)
:param db_name: The mongo url(for mongo observer)
"""
self.experiment_name = experiment_name
self.experiment = Experiment(self.experiment_name)
self.experiment_dir = experiment_dir
self.experiment.logger = get_module_logger("Sacred")
self.observer_type = observer_type
self.mongo_db_url = mongo_url
self.mongo_db_name = db_name
self._setup_experiment()
def _setup_experiment(self):
if self.observer_type == "file_storage":
file_storage_observer = FileStorageObserver.create(basedir=self.experiment_dir)
self.experiment.observers.append(file_storage_observer)
elif self.observer_type == "mongo":
mongo_observer = MongoObserver.create(url=self.mongo_db_url, db_name=self.mongo_db_name)
self.experiment.observers.append(mongo_observer)
else:
raise NotImplementedError("Unsupported observer type: {}".format(self.observer_type))
def add_artifact(self, filename):
self.experiment.add_artifact(filename)
def add_info(self, key, value):
self.experiment.info[key] = value
def main_wrapper(self, func):
return self.experiment.main(func)
def config_wrapper(self, func):
return self.experiment.config(func)
CONFIG_MANAGER = EstimatorConfigManager(args.config_path)
ex = SacredExperiment(
CONFIG_MANAGER.ex_config.name,
CONFIG_MANAGER.ex_config.sacred_dir,
observer_type=CONFIG_MANAGER.ex_config.observer_type,
mongo_url=CONFIG_MANAGER.ex_config.mongo_url,
db_name=CONFIG_MANAGER.ex_config.db_name,
)
# qlib init
init(
provider_uri=CONFIG_MANAGER.qlib_data_config.provider_uri,
mount_path=CONFIG_MANAGER.qlib_data_config.mount_path,
auto_mount=CONFIG_MANAGER.qlib_data_config.auto_mount,
region=CONFIG_MANAGER.qlib_data_config.region,
**CONFIG_MANAGER.qlib_data_config.args
)
@ex.main_wrapper
def _main():
# 1. Get estimator class.
estimator_class = getattr(
importlib.import_module(".estimator", package="qlib.contrib.estimator"),
"Estimator",
)
# 2. Init estimator.
estimator = estimator_class(CONFIG_MANAGER, ex)
estimator.run()
def run():
ex.experiment.run()
if __name__ == "__main__":
run()

View File

@@ -1,317 +0,0 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# coding=utf-8
from abc import abstractmethod
import pandas as pd
import numpy as np
from scipy.stats import pearsonr
from ...log import get_module_logger, TimeInspector
from ...data.dataset.handler import DataHandlerLP
from .launcher import CONFIG_MANAGER
from .fetcher import create_fetcher_with_config
from ...utils import drop_nan_by_y_index, transform_end_date
class BaseTrainer(object):
def __init__(self, model_class, model_save_path, model_args, data_handler: DataHandlerLP, sacred_ex, **kwargs):
# 1. Model.
self.model_class = model_class
self.model_save_path = model_save_path
self.model_args = model_args
# 2. Data handler.
self.data_handler = data_handler
# 3. Sacred ex.
self.ex = sacred_ex
# 4. Logger.
self.logger = get_module_logger("Trainer")
# 5. Data time
self.train_start_date = kwargs.get("train_start_date", None)
self.train_end_date = kwargs.get("train_end_date", None)
self.validate_start_date = kwargs.get("validate_start_date", None)
self.validate_end_date = kwargs.get("validate_end_date", None)
self.test_start_date = kwargs.get("test_start_date", None)
self.test_end_date = transform_end_date(kwargs.get("test_end_date", None))
@abstractmethod
def train(self):
"""
Implement this method indicating how to train a model.
"""
pass
@abstractmethod
def load(self):
"""
Implement this method indicating how to restore a model and the data.
"""
pass
@abstractmethod
def get_test_pred(self):
"""
Implement this method indicating how to get prediction result(s) from a model.
"""
pass
def get_test_performance(self):
"""
Implement this method indicating how to get the performance of the model.
"""
raise NotImplementedError(f"Please implement `get_test_performance`")
def get_test_score(self):
"""
Override this method to transfer the predict result(s) into the score of the stock.
Note: If this is a multi-label training, you need to transfer predict labels into one score.
Or you can just use the result of `get_test_pred()` (you can also process the result) if this is one label training.
We use the first column of the result of `get_test_pred()` as default method (regard it as one label training).
"""
pred = self.get_test_pred()
pred_score = pd.DataFrame(index=pred.index)
pred_score["score"] = pred.iloc(axis=1)[0]
return pred_score
class StaticTrainer(BaseTrainer):
def __init__(self, model_class, model_save_path, model_args, data_handler, sacred_ex, **kwargs):
super(StaticTrainer, self).__init__(model_class, model_save_path, model_args, data_handler, sacred_ex, **kwargs)
self.model = None
split_data = self.data_handler.get_split_data(
self.train_start_date,
self.train_end_date,
self.validate_start_date,
self.validate_end_date,
self.test_start_date,
self.test_end_date,
)
(
self.x_train,
self.y_train,
self.x_validate,
self.y_validate,
self.x_test,
self.y_test,
) = split_data
def train(self):
TimeInspector.set_time_mark()
model = self.model_class(**self.model_args)
if CONFIG_MANAGER.ex_config.finetune:
fetcher = create_fetcher_with_config(CONFIG_MANAGER, load_form_loader=True)
loader_model = fetcher.get_experiment(
exp_name=CONFIG_MANAGER.ex_config.loader_name,
exp_id=CONFIG_MANAGER.ex_config.loader_id,
fields=["model"],
)["model"]
if isinstance(loader_model, list):
model_index = (
-1
if CONFIG_MANAGER.ex_config.loader_model_index is None
else CONFIG_MANAGER.ex_config.loader_model_index
)
loader_model = loader_model[model_index]
model.load(loader_model)
model.finetune(self.x_train, self.y_train, self.x_validate, self.y_validate)
else:
model.fit(self.x_train, self.y_train, self.x_validate, self.y_validate)
model.save(self.model_save_path)
self.ex.add_artifact(self.model_save_path)
self.model = model
TimeInspector.log_cost_time("Finished training model.")
def load(self):
model = self.model_class(**self.model_args)
# Load model
fetcher = create_fetcher_with_config(CONFIG_MANAGER, load_form_loader=True)
loader_model = fetcher.get_experiment(
exp_name=CONFIG_MANAGER.ex_config.loader_name,
exp_id=CONFIG_MANAGER.ex_config.loader_id,
fields=["model"],
)["model"]
if isinstance(loader_model, list):
model_index = (
-1
if CONFIG_MANAGER.ex_config.loader_model_index is None
else CONFIG_MANAGER.ex_config.loader_model_index
)
loader_model = loader_model[model_index]
model.load(loader_model)
# Save model, after load, if you don't save the model, the result of this experiment will be no model
model.save(self.model_save_path)
self.ex.add_artifact(self.model_save_path)
self.model = model
def get_test_pred(self):
pred = self.model.predict(self.x_test)
pred = pd.DataFrame(pred, index=self.x_test.index, columns=self.y_test.columns)
return pred
def get_test_performance(self):
try:
model_score = self.model.score(self.x_test, self.y_test)
except NotImplementedError:
model_score = None
# Remove rows from x, y and w, which contain Nan in any columns in y_test.
x_test, y_test, __ = drop_nan_by_y_index(self.x_test, self.y_test)
pred_test = self.model.predict(x_test)
model_pearsonr = pearsonr(np.ravel(pred_test), np.ravel(y_test.values))[0]
performance = {"model_score": model_score, "model_pearsonr": model_pearsonr}
return performance
class RollingTrainer(BaseTrainer):
def __init__(self, model_class, model_save_path, model_args, data_handler, sacred_ex, **kwargs):
super(RollingTrainer, self).__init__(
model_class, model_save_path, model_args, data_handler, sacred_ex, **kwargs
)
self.rolling_period = kwargs.get("rolling_period", 60)
self.models = []
self.rolling_data = []
self.all_x_test = []
self.all_y_test = []
for data in self.data_handler.get_rolling_data(
self.train_start_date,
self.train_end_date,
self.validate_start_date,
self.validate_end_date,
self.test_start_date,
self.test_end_date,
self.rolling_period,
):
self.rolling_data.append(data)
__, __, __, __, x_test, y_test = data
self.all_x_test.append(x_test)
self.all_y_test.append(y_test)
def train(self):
# 1. Get total data parts.
# total_data_parts = self.data_handler.total_data_parts
# self.logger.warning('Total numbers of model are: {}, start training models...'.format(total_data_parts))
if CONFIG_MANAGER.ex_config.finetune:
fetcher = create_fetcher_with_config(CONFIG_MANAGER, load_form_loader=True)
loader_model = fetcher.get_experiment(
exp_name=CONFIG_MANAGER.ex_config.loader_name,
exp_id=CONFIG_MANAGER.ex_config.loader_id,
fields=["model"],
)["model"]
loader_model_index = CONFIG_MANAGER.ex_config.loader_model_index
previous_model_path = ""
# 2. Rolling train.
for (
index,
(x_train, y_train, x_validate, y_validate, x_test, y_test),
) in enumerate(self.rolling_data):
TimeInspector.set_time_mark()
model = self.model_class(**self.model_args)
if CONFIG_MANAGER.ex_config.finetune:
# Finetune model
if loader_model_index is None and isinstance(loader_model, list):
try:
model.load(loader_model[index])
except IndexError:
# Load model by previous_model_path
with open(previous_model_path, "rb") as fp:
model.load(fp)
model.finetune(x_train, y_train, x_validate, y_validate)
else:
if index == 0:
loader_model = (
loader_model[loader_model_index] if isinstance(loader_model, list) else loader_model
)
model.load(loader_model)
else:
with open(previous_model_path, "rb") as fp:
model.load(fp)
model.finetune(x_train, y_train, x_validate, y_validate)
else:
model.fit(x_train, y_train, x_validate, y_validate)
model_save_path = "{}_{}".format(self.model_save_path, index)
model.save(model_save_path)
previous_model_path = model_save_path
self.ex.add_artifact(model_save_path)
self.models.append(model)
TimeInspector.log_cost_time("Finished training model: {}.".format(index + 1))
def load(self):
"""
Load the data and the model
"""
fetcher = create_fetcher_with_config(CONFIG_MANAGER, load_form_loader=True)
loader_model = fetcher.get_experiment(
exp_name=CONFIG_MANAGER.ex_config.loader_name,
exp_id=CONFIG_MANAGER.ex_config.loader_id,
fields=["model"],
)["model"]
for index in range(len(self.all_x_test)):
model = self.model_class(**self.model_args)
model.load(loader_model[index])
# Save model
model_save_path = "{}_{}".format(self.model_save_path, index)
model.save(model_save_path)
self.ex.add_artifact(model_save_path)
self.models.append(model)
def get_test_pred(self):
"""
Predict the score on test data with the models.
Please ensure the models and data are loaded before call this score.
:return: the predicted scores for the pred
"""
pred_df_list = []
y_test_columns = self.all_y_test[0].columns
# Start iteration.
for model, x_test in zip(self.models, self.all_x_test):
pred = model.predict(x_test)
pred_df = pd.DataFrame(pred, index=x_test.index, columns=y_test_columns)
pred_df_list.append(pred_df)
return pd.concat(pred_df_list)
def get_test_performance(self):
"""
Get the performances of the models
:return: the performances of models
"""
pred_test_list = []
y_test_list = []
scorer = self.models[0]._scorer
for model, x_test, y_test in zip(self.models, self.all_x_test, self.all_y_test):
# Remove rows from x, y and w, which contain Nan in any columns in y_test.
x_test, y_test, __ = drop_nan_by_y_index(x_test, y_test)
pred_test_list.append(model.predict(x_test))
y_test_list.append(np.squeeze(y_test.values))
pred_test_array = np.concatenate(pred_test_list, axis=0)
y_test_array = np.concatenate(y_test_list, axis=0)
model_score = scorer(y_test_array, pred_test_array)
model_pearsonr = pearsonr(np.ravel(y_test_array), np.ravel(pred_test_array))[0]
performance = {"model_score": model_score, "model_pearsonr": model_pearsonr}
return performance

View File

@@ -95,6 +95,7 @@ class DatasetH(Dataset):
- insntance of `DataHandler`
- config of `DataHandler`. Please refer to `DataHandler`
segments : list
Describe the options to segment the data.
Here are some examples:

View File

@@ -265,30 +265,40 @@ class DataHandlerLP(DataHandler):
Parameters
----------
infer_processors : list
list of <description info> of processors to generate data for inference
example of <description info>:
1) classname & kwargs:
{
"class": "MinMaxNorm",
"kwargs": {
"fit_start_time": "20080101",
"fit_end_time": "20121231"
- list of <description info> of processors to generate data for inference
- example of <description info>:
.. code-block::
1) classname & kwargs:
{
"class": "MinMaxNorm",
"kwargs": {
"fit_start_time": "20080101",
"fit_end_time": "20121231"
}
}
}
2) Only classname:
"DropnaFeature"
3) object instance of Processor
2) Only classname:
"DropnaFeature"
3) object instance of Processor
learn_processors : list
similar to infer_processors, but for generating data for learning models
process_type: str
PTYPE_I = 'independent'
- self._infer will processed by infer_processors
- self._learn will be processed by learn_processors
PTYPE_A = 'append'
- self._infer will processed by infer_processors
- self._learn will be processed by infer_processors + learn_processors
- (e.g. self._infer processed by learn_processors )
"""

View File

@@ -23,6 +23,18 @@ class DataLoader(abc.ABC):
"""
load the data as pd.DataFrame.
Example of the data (The multi-index of the columns is optional.):
.. code-block:: python
feature label
$close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0
datetime instrument
2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032
SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042
SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289
Parameters
----------
instruments : str or dict
@@ -36,17 +48,6 @@ class DataLoader(abc.ABC):
-------
pd.DataFrame:
data load from the under layer source
Example of the data (The multi-index of the columns is optional.):
.. code-block::
feature label
$close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0
datetime instrument
2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032
SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042
SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289
"""
pass
@@ -65,7 +66,7 @@ class DLWParser(DataLoader):
config : Tuple[list, tuple, dict]
Config will be used to describe the fields and column names
.. code-block:: YAML
.. code-block::
<config> := {
"group_name1": <fields_info1>

View File

@@ -10,22 +10,6 @@ from ..utils import Wrapper
class QlibRecorder:
"""
A global system that helps to manage the experiments.
The components of the system:
1) ExperimentManager: a class managing experiments.
2) Experiment: a class of experiment, and each instance of it is responsible for a single experiment.
3) Recorder: a class of recorder, and each instance of it is responsible for a single run.
The general structure of the system:
ExperimentManager
- Experiment 1
- Recorder 1
- Recorder 2
- ...
- Experiment 2
- ...
- ...
"""
def __init__(self, exp_manager):
@@ -34,16 +18,14 @@ class QlibRecorder:
@contextmanager
def start(self, experiment_name=None, recorder_name=None):
"""
Method to start an experiment. This method can only be called within a Python's `with` statement.
Method to start an experiment. This method can only be called within a Python's `with` statement. Here is the example code:
Use case:
---------
```
with R.start('test', 'recorder_1'):
model.fit(dataset)
R.log...
... # further operations
```
.. code-block:: Python
with R.start('test', 'recorder_1'):
model.fit(dataset)
R.log...
... # further operations
Parameters
----------
@@ -63,15 +45,14 @@ class QlibRecorder:
def start_exp(self, experiment_name=None, recorder_name=None, uri=None):
"""
Lower level method for starting an experiment. When use this method, one should end the experiment manually
and the status of the recorder may not be handled properly.
and the status of the recorder may not be handled properly. Here is the example code:
.. code-block:: Python
R.start_exp(experiment_name='test', recorder_name='recorder_1')
... # further operations
R.end_exp('FINISHED') or R.end_exp(Recorder.STATUS_S)
Use case:
---------
```
R.start_exp(experiment_name='test', recorder_name='recorder_1')
... # further operations
R.end_exp('FINISHED') or R.end_exp(Recorder.STATUS_S)
```
Parameters
----------
@@ -92,15 +73,13 @@ class QlibRecorder:
def end_exp(self, recorder_status=Recorder.STATUS_FI):
"""
Method for ending an experiment manually. It will end the current active experiment, as well as its
active recorder with the specified `status` type.
active recorder with the specified `status` type. Here is the example code of the method:
Use case:
---------
```
R.start_exp(experiment_name='test')
... # further operations
R.end_exp('FINISHED') or R.end_exp(Recorder.STATUS_S)
```
.. code-block:: Python
R.start_exp(experiment_name='test')
... # further operations
R.end_exp('FINISHED') or R.end_exp(Recorder.STATUS_S)
Parameters
----------
@@ -111,14 +90,12 @@ class QlibRecorder:
def search_records(self, experiment_ids, **kwargs):
"""
Get a pandas DataFrame of records that fit the search criteria.
Get a pandas DataFrame of records that fit the search criteria. Here is the example code of the method:
Use case:
---------
```
R.log_metrics(m=2.50, step=0)
records = R.search_runs([experiment_id], order_by=["metrics.m DESC"])
```
.. code-block:: Python
R.log_metrics(m=2.50, step=0)
records = R.search_runs([experiment_id], order_by=["metrics.m DESC"])
Parameters
----------
@@ -146,11 +123,9 @@ class QlibRecorder:
"""
Method for listing all the existing experiments (except for those being deleted.)
Use case:
---------
```
exps = R.list_experiments()
```
.. code-block:: Python
exps = R.list_experiments()
Returns
-------
@@ -166,11 +141,11 @@ class QlibRecorder:
list all the recorders of the default experiment. If the default experiment doesn't exist, the method will first
create the default experiment, and then create a new recorder under it.
Use case:
---------
```
recorders = R.list_recorders(experiment_name='test')
```
Here is the example code:
.. code-block:: Python
recorders = R.list_recorders(experiment_name='test')
Parameters
----------
@@ -191,46 +166,55 @@ class QlibRecorder:
True, if no valid experiment is found, this method will create one for you. Otherwise, it will
only retrieve a specific experiment or raise an Error.
If `create` is True:
If R's running:
1) no id or name specified, return the active experiment.
2) if id or name is specified, return the specified experiment. If no such exp found,
create a new experiment with given id or name, and the experiment is set to be running.
If R's not running:
1) no id or name specified, create a default experiment, and the experiment is set to be running.
2) if id or name is specified, return the specified experiment. If no such exp found,
create a new experiment with given name or the default experiment, and the experiment is set to be running.
Else If `create` is False:
If R's running:
1) no id or name specified, return the active experiment.
2) if id or name is specified, return the specified experiment. If no such exp found,
raise Error.
If R's not running:
1) no id or name specified. If the default experiment exists, return it, otherwise, raise Error.
2) if id or name is specified, return the specified experiment. If no such exp found,
raise Error.
- If '`create`' is True:
Use case:
---------
```
# Case 1
with R.start('test'):
exp = R.get_exp()
recorders = exp.list_recorders()
- If ``R``'s running:
# Case 2
with R.start('test'):
exp = R.get_exp('test1')
- no id or name specified, return the active experiment.
# Case 3
exp = R.get_exp() -> a default experiment.
- if id or name is specified, return the specified experiment. If no such exp found, create a new experiment with given id or name, and the experiment is set to be running.
# Case 4
exp = R.get_exp(experiment_name='test')
- If ``R``'s not running:
# Case 5
exp = R.get_exp(create=False) -> the default experiment if exists.
```
- no id or name specified, create a default experiment, and the experiment is set to be running.
- if id or name is specified, return the specified experiment. If no such exp found, create a new experiment with given name or the default experiment, and the experiment is set to be running.
- Else If '`create`' is False:
- If ``R``'s running:
- no id or name specified, return the active experiment.
- if id or name is specified, return the specified experiment. If no such exp found, raise Error.
- If ``R``'s not running:
- no id or name specified. If the default experiment exists, return it, otherwise, raise Error.
- if id or name is specified, return the specified experiment. If no such exp found, raise Error.
Here are some use cases:
.. code-block:: Python
# Case 1
with R.start('test'):
exp = R.get_exp()
recorders = exp.list_recorders()
# Case 2
with R.start('test'):
exp = R.get_exp('test1')
# Case 3
exp = R.get_exp() -> a default experiment.
# Case 4
exp = R.get_exp(experiment_name='test')
# Case 5
exp = R.get_exp(create=False) -> the default experiment if exists.
Parameters
----------
@@ -253,11 +237,11 @@ class QlibRecorder:
Method for deleting the experiment with given id or name. At least one of id or name must be given,
otherwise, error will occur.
Use case:
---------
```
R.delete_exp(experiment_name='test')
```
Here is the example code:
.. code-block:: Python
R.delete_exp(experiment_name='test')
Parameters
----------
@@ -272,11 +256,11 @@ class QlibRecorder:
"""
Method for retrieving the uri of current experiment manager.
Use case:
---------
```
uri = R.get_uri()
```
Here is the example code:
.. code-block:: Python
uri = R.get_uri()
Returns
-------
@@ -288,35 +272,41 @@ class QlibRecorder:
"""
Method for retrieving a recorder.
If R's running: 1) no id or name specified, return the active recorder. 2) if id or name is
specified, return the specified recorder.
If R's not running: 1) no id or name specified, raise Error. 2) if id or name is specified,
and the corresponding experiment_name must be given, return the specified recorder. Otherwise,
raise Error.
- If ``R``'s running:
- no id or name specified, return the active recorder.
- if id or name is specified, return the specified recorder.
- If ``R``'s not running:
- no id or name specified, raise Error.
- if id or name is specified, and the corresponding experiment_name must be given, return the specified recorder. Otherwise, raise Error.
The recorder can be used for further process such as `save_object`, `load_object`, `log_params`,
`log_metrics`, etc.
Use case:
---------
```
# Case 1
with R.start('test'):
recorder = R.get_recorder()
Here are some use cases:
# Case 2
with R.start('test'):
recorder = R.get_recorder(recorder_id='2e7a4efd66574fa49039e00ffaefa99d')
.. code-block:: Python
# Case 3
recorder = R.get_recorder() -> Error
# Case 1
with R.start('test'):
recorder = R.get_recorder()
# Case 4
recorder = R.get_recorder(recorder_id='2e7a4efd66574fa49039e00ffaefa99d') -> Error
# Case 2
with R.start('test'):
recorder = R.get_recorder(recorder_id='2e7a4efd66574fa49039e00ffaefa99d')
# Case 5
recorder = R.get_recorder(recorder_id='2e7a4efd66574fa49039e00ffaefa99d', experiment_name='test')
```
# Case 3
recorder = R.get_recorder() -> Error
# Case 4
recorder = R.get_recorder(recorder_id='2e7a4efd66574fa49039e00ffaefa99d') -> Error
# Case 5
recorder = R.get_recorder(recorder_id='2e7a4efd66574fa49039e00ffaefa99d', experiment_name='test')
Parameters
----------
@@ -340,11 +330,11 @@ class QlibRecorder:
Method for deleting the recorders with given id or name. At least one of id or name must be given,
otherwise, error will occur.
Use case:
---------
```
R.delete_recorder(recorder_id='2e7a4efd66574fa49039e00ffaefa99d')
```
Here is the example code:
.. code-block:: Python
R.delete_recorder(recorder_id='2e7a4efd66574fa49039e00ffaefa99d')
Parameters
----------
@@ -361,26 +351,25 @@ class QlibRecorder:
from a local file/directory, or directly saving objects. User can use valid python's keywords arguments
to specify the object to be saved as well as its name (name: value).
If R's running: it will save the objects through the running recorder.
If R's not running: the system will create a default experiment, and a new recorder and
save objects under it.
- If R's running: it will save the objects through the running recorder.
- If R's not running: the system will create a default experiment, and a new recorder and save objects under it.
If one wants to save objects with a specific recorder. It is recommended to first
get the specific recorder through `get_recorder` API and use the recorder the save objects.
The supported arguments are the same as this method.
.. note::
Use case:
---------
```
# Case 1
with R.start('test'):
pred = model.predict(dataset)
R.save_objects(**{"pred.pkl": pred}, artifact_path='prediction')
If one wants to save objects with a specific recorder. It is recommended to first get the specific recorder through `get_recorder` API and use the recorder the save objects. The supported arguments are the same as this method.
# Case 2
with R.start('test'):
R.save_objects(local_path='results/pred.pkl')
```
Here are some use cases:
.. code-block:: Python
# Case 1
with R.start('test'):
pred = model.predict(dataset)
R.save_objects(**{"pred.pkl": pred}, artifact_path='prediction')
# Case 2
with R.start('test'):
R.save_objects(local_path='results/pred.pkl')
Parameters
----------
@@ -393,25 +382,22 @@ class QlibRecorder:
def log_params(self, **kwargs):
"""
Method for logging parameters during an experiment.
Method for logging parameters during an experiment. In addition to using ``R``, one can also log to a specific recorder after getting it with `get_recorder` API.
If R's running: it will log parameters through the running recorder.
If R's not running: the system will create a default experiment as well as a new recorder, and
log parameters under it.
- If R's running: it will log parameters through the running recorder.
- If R's not running: the system will create a default experiment as well as a new recorder, and log parameters under it.
One can also log to a specific recorder after getting it with `get_recorder` API.
Here are some use cases:
Use case:
---------
```
# Case 1
with R.start('test'):
.. code-block:: Python
# Case 1
with R.start('test'):
R.log_params(learning_rate=0.01)
# Case 2
R.log_params(learning_rate=0.01)
# Case 2
R.log_params(learning_rate=0.01)
```
Parameters
----------
keyword argument:
@@ -421,25 +407,22 @@ class QlibRecorder:
def log_metrics(self, step=None, **kwargs):
"""
Method for logging metrics during an experiment.
Method for logging metrics during an experiment. In addition to using ``R``, one can also log to a specific recorder after getting it with `get_recorder` API.
If R's running: it will log metrics through the running recorder.
If R's not running: the system will create a default experiment as well as a new recorder, and
log metrics under it.
- If R's running: it will log metrics through the running recorder.
- If R's not running: the system will create a default experiment as well as a new recorder, and log metrics under it.
One can also log to a specific recorder after getting it with `get_recorder` API.
Here are some use cases:
Use case:
---------
```
# Case 1
with R.start('test'):
.. code-block:: Python
# Case 1
with R.start('test'):
R.log_metrics(train_loss=0.33, step=1)
# Case 2
R.log_metrics(train_loss=0.33, step=1)
# Case 2
R.log_metrics(train_loss=0.33, step=1)
```
Parameters
----------
keyword argument:
@@ -449,25 +432,22 @@ class QlibRecorder:
def set_tags(self, **kwargs):
"""
Method for setting tags for a recorder.
Method for setting tags for a recorder. In addition to using ``R``, one can also set the tag to a specific recorder after getting it with `get_recorder` API.
If R's running: it will set tags through the running recorder.
If R's not running: the system will create a default experiment as well as a new recorder, and
set the tags under it.
- If R's running: it will set tags through the running recorder.
- If R's not running: the system will create a default experiment as well as a new recorder, and set the tags under it.
One can also set the tag to a specific recorder after getting it with `get_recorder` API.
Here are some use cases:
Use case:
---------
```
# Case 1
with R.start('test'):
.. code-block:: Python
# Case 1
with R.start('test'):
R.set_tags(release_version="2.2.0")
# Case 2
R.set_tags(release_version="2.2.0")
# Case 2
R.set_tags(release_version="2.2.0")
```
Parameters
----------
keyword argument: