1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-05 12:00:58 +08:00

Merge pull request #374 from bxdd/qlib_loaderhandler

Add DataLoader Based on DataHandler & Add Rolling Process Example & Restructure the Config & Setup_data
This commit is contained in:
you-n-g
2021-03-30 13:37:55 +08:00
committed by GitHub
8 changed files with 378 additions and 77 deletions

View File

@@ -27,12 +27,11 @@ from qlib.tests.data import GetData
from highfreq_ops import get_calendar_day, DayLast, FFillNan, BFillNan, Date, Select, IsNull, Cut
class HighfreqWorkflow(object):
class HighfreqWorkflow:
SPEC_CONF = {"custom_ops": [DayLast, FFillNan, BFillNan, Date, Select, IsNull, Cut], "expression_cache": None}
MARKET = "all"
BENCHMARK = "SH000300"
start_time = "2020-09-15 00:00:00"
end_time = "2021-01-18 16:00:00"
@@ -146,35 +145,40 @@ class HighfreqWorkflow(object):
self._prepare_calender_cache()
##=============reinit dataset=============
dataset.init(
dataset.config(
handler_kwargs={
"start_time": "2021-01-19 00:00:00",
"end_time": "2021-01-25 16:00:00",
},
segments={
"test": (
"2021-01-19 00:00:00",
"2021-01-25 16:00:00",
),
},
)
dataset.setup_data(
handler_kwargs={
"init_type": DataHandlerLP.IT_LS,
"start_time": "2021-01-19 00:00:00",
"end_time": "2021-01-25 16:00:00",
},
segment_kwargs={
"test": (
"2021-01-19 00:00:00",
"2021-01-25 16:00:00",
),
},
)
dataset_backtest.init(
dataset_backtest.config(
handler_kwargs={
"start_time": "2021-01-19 00:00:00",
"end_time": "2021-01-25 16:00:00",
},
segment_kwargs={
segments={
"test": (
"2021-01-19 00:00:00",
"2021-01-25 16:00:00",
),
},
)
dataset_backtest.setup_data(handler_kwargs={})
##=============get data=============
xtest = dataset.prepare(["test"])
backtest_test = dataset_backtest.prepare(["test"])
xtest = dataset.prepare("test")
backtest_test = dataset_backtest.prepare("test")
print(xtest, backtest_test)
return

View File

@@ -0,0 +1,17 @@
# Rolling Process Data
This workflow is an example for `Rolling Process Data`.
## Background
When rolling train the models, data also needs to be generated in the different rolling windows. When the rolling window moves, the training data will change, and the processor's learnable state (such as standard deviation, mean, etc.) will also change.
In order to avoid regenerating data, this example uses the `DataHandler-based DataLoader` to load the raw features that are not related to the rolling window, and then used Processors to generate processed-features related to the rolling window.
## Run the Code
Run the example by running the following command:
```bash
python workflow.py rolling_process
```

View File

@@ -0,0 +1,32 @@
from qlib.data.dataset.handler import DataHandlerLP
from qlib.data.dataset.loader import DataLoaderDH
from qlib.contrib.data.handler import check_transform_proc
class RollingDataHandler(DataHandlerLP):
def __init__(
self,
start_time=None,
end_time=None,
infer_processors=[],
learn_processors=[],
fit_start_time=None,
fit_end_time=None,
data_loader_kwargs={},
):
infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time)
learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time)
data_loader = {
"class": "DataLoaderDH",
"kwargs": {**data_loader_kwargs},
}
super().__init__(
instruments=None,
start_time=start_time,
end_time=end_time,
data_loader=data_loader,
infer_processors=infer_processors,
learn_processors=learn_processors,
)

View File

@@ -0,0 +1,141 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import qlib
import fire
import pickle
import pandas as pd
from datetime import datetime
from qlib.config import REG_CN
from qlib.data.dataset.handler import DataHandlerLP
from qlib.contrib.data.handler import Alpha158
from qlib.utils import exists_qlib_data, init_instance_by_config
from qlib.tests.data import GetData
class RollingDataWorkflow:
MARKET = "csi300"
start_time = "2010-01-01"
end_time = "2019-12-31"
rolling_cnt = 5
def _init_qlib(self):
"""initialize qlib"""
# use yahoo_cn_1min data
provider_uri = "~/.qlib/qlib_data/cn_data" # target_dir
if not exists_qlib_data(provider_uri):
print(f"Qlib data is not found in {provider_uri}")
GetData().qlib_data(target_dir=provider_uri, region=REG_CN)
qlib.init(provider_uri=provider_uri, region=REG_CN)
def _dump_pre_handler(self, path):
handler_config = {
"class": "Alpha158",
"module_path": "qlib.contrib.data.handler",
"kwargs": {
"start_time": self.start_time,
"end_time": self.end_time,
"instruments": self.MARKET,
"infer_processors": [],
"learn_processors": [],
},
}
pre_handler = init_instance_by_config(handler_config)
pre_handler.config(dump_all=True)
pre_handler.to_pickle(path)
def _load_pre_handler(self, path):
with open(path, "rb") as file_dataset:
pre_handler = pickle.load(file_dataset)
return pre_handler
def rolling_process(self):
self._init_qlib()
self._dump_pre_handler("pre_handler.pkl")
pre_handler = self._load_pre_handler("pre_handler.pkl")
train_start_time = (2010, 1, 1)
train_end_time = (2012, 12, 31)
valid_start_time = (2013, 1, 1)
valid_end_time = (2013, 12, 31)
test_start_time = (2014, 1, 1)
test_end_time = (2014, 12, 31)
dataset_config = {
"class": "DatasetH",
"module_path": "qlib.data.dataset",
"kwargs": {
"handler": {
"class": "RollingDataHandler",
"module_path": "rolling_handler",
"kwargs": {
"start_time": datetime(*train_start_time),
"end_time": datetime(*test_end_time),
"fit_start_time": datetime(*train_start_time),
"fit_end_time": datetime(*train_end_time),
"infer_processors": [
{"class": "RobustZScoreNorm", "kwargs": {"fields_group": "feature"}},
],
"learn_processors": [
{"class": "DropnaLabel"},
{"class": "CSZScoreNorm", "kwargs": {"fields_group": "label"}},
],
"data_loader_kwargs": {
"handler_config": pre_handler,
},
},
},
"segments": {
"train": (datetime(*train_start_time), datetime(*train_end_time)),
"valid": (datetime(*valid_start_time), datetime(*valid_end_time)),
"test": (datetime(*test_start_time), datetime(*test_end_time)),
},
},
}
dataset = init_instance_by_config(dataset_config)
for rolling_offset in range(self.rolling_cnt):
print(f"===========rolling{rolling_offset} start===========")
if rolling_offset:
dataset.config(
handler_kwargs={
"start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]),
"end_time": datetime(test_end_time[0] + rolling_offset, *test_end_time[1:]),
"processor_kwargs": {
"fit_start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]),
"fit_end_time": datetime(train_end_time[0] + rolling_offset, *train_end_time[1:]),
},
},
segments={
"train": (
datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]),
datetime(train_end_time[0] + rolling_offset, *train_end_time[1:]),
),
"valid": (
datetime(valid_start_time[0] + rolling_offset, *valid_start_time[1:]),
datetime(valid_end_time[0] + rolling_offset, *valid_end_time[1:]),
),
"test": (
datetime(test_start_time[0] + rolling_offset, *test_start_time[1:]),
datetime(test_end_time[0] + rolling_offset, *test_end_time[1:]),
),
},
)
dataset.setup_data(
handler_kwargs={
"init_type": DataHandlerLP.IT_FIT_SEQ,
}
)
dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"])
print(dtrain, dvalid, dtest)
## print or dump data
print(f"===========rolling{rolling_offset} end===========")
if __name__ == "__main__":
fire.Fire(RollingDataWorkflow)

View File

@@ -3,6 +3,7 @@ from typing import Union, List, Tuple, Dict, Text, Optional
from ...utils import init_instance_by_config, np_ffill
from ...log import get_module_logger
from .handler import DataHandler, DataHandlerLP
from copy import deepcopy
from inspect import getfullargspec
import pandas as pd
import numpy as np
@@ -16,22 +17,28 @@ class Dataset(Serializable):
Preparing data for model training and inferencing.
"""
def __init__(self, *args, **kwargs):
def __init__(self, **kwargs):
"""
init is designed to finish following steps:
- init the sub instance and the state of the dataset(info to prepare the data)
- The name of essential state for preparing data should not start with '_' so that it could be serialized on disk when serializing.
- setup data
- The data related attributes' names should start with '_' so that it will not be saved on disk when serializing.
- initialize the state of the dataset(info to prepare the data)
- The name of essential state for preparing data should not start with '_' so that it could be serialized on disk when serializing.
The data could specify the info to caculate the essential data for preparation
"""
self.setup_data(*args, **kwargs)
self.setup_data(**kwargs)
super().__init__()
def setup_data(self, *args, **kwargs):
def config(self, **kwargs):
"""
config is designed to configure and parameters that cannot be learned from the data
"""
super().config(**kwargs)
def setup_data(self, **kwargs):
"""
Setup the data.
@@ -39,7 +46,7 @@ class Dataset(Serializable):
- User have a Dataset object with learned status on disk.
- User load the Dataset object from the disk(Note the init function is skiped).
- User load the Dataset object from the disk.
- User call `setup_data` to load new data.
@@ -47,7 +54,7 @@ class Dataset(Serializable):
"""
pass
def prepare(self, *args, **kwargs) -> object:
def prepare(self, **kwargs) -> object:
"""
The type of dataset depends on the model. (It could be pd.DataFrame, pytorch.DataLoader, etc.)
The parameters should specify the scope for the prepared data
@@ -76,44 +83,7 @@ class DatasetH(Dataset):
- The processing is related to data split.
"""
def init(self, handler_kwargs: dict = None, segment_kwargs: dict = None):
"""
Initialize the DatasetH
Parameters
----------
handler_kwargs : dict
Config of DataHanlder, which could include the following arguments:
- arguments of DataHandler.conf_data, such as 'instruments', 'start_time' and 'end_time'.
- arguments of DataHandler.init, such as 'enable_cache', etc.
segment_kwargs : dict
Config of segments which is same as 'segments' in DatasetH.setup_data
"""
if handler_kwargs:
if not isinstance(handler_kwargs, dict):
raise TypeError(f"param handler_kwargs must be type dict, not {type(handler_kwargs)}")
kwargs_init = {}
kwargs_conf_data = {}
conf_data_arg = {"instruments", "start_time", "end_time"}
for k, v in handler_kwargs.items():
if k in conf_data_arg:
kwargs_conf_data.update({k: v})
else:
kwargs_init.update({k: v})
self.handler.conf_data(**kwargs_conf_data)
self.handler.init(**kwargs_init)
if segment_kwargs:
if not isinstance(segment_kwargs, dict):
raise TypeError(f"param handler_kwargs must be type dict, not {type(segment_kwargs)}")
self.segments = segment_kwargs.copy()
def setup_data(self, handler: Union[Dict, DataHandler], segments: Dict[Text, Tuple]):
def __init__(self, handler: Union[Dict, DataHandler], segments: Dict[Text, Tuple], **kwargs):
"""
Setup the underlying data.
@@ -144,6 +114,49 @@ class DatasetH(Dataset):
"""
self.handler = init_instance_by_config(handler, accept_types=DataHandler)
self.segments = segments.copy()
super().__init__(**kwargs)
def config(self, handler_kwargs: dict = None, **kwargs):
"""
Initialize the DatasetH
Parameters
----------
handler_kwargs : dict
Config of DataHanlder, which could include the following arguments:
- arguments of DataHandler.conf_data, such as 'instruments', 'start_time' and 'end_time'.
kwargs : dict
Config of DatasetH, such as
- segments : dict
Config of segments which is same as 'segments' in self.__init__
"""
if handler_kwargs is not None:
self.handler.config(**handler_kwargs)
if "segments" in kwargs:
self.segments = deepcopy(kwargs.pop("segments"))
super().config(**kwargs)
def setup_data(self, handler_kwargs: dict = None, **kwargs):
"""
Setup the Data
Parameters
----------
handler_kwargs : dict
init arguments of DataHanlder, which could include the following arguments:
- init_type : Init Type of Handler
- enable_cache : wheter to enable cache
"""
super().setup_data(**kwargs)
if handler_kwargs is not None:
self.handler.setup_data(**handler_kwargs)
def __repr__(self):
return "{name}(handler={handler}, segments={segments})".format(
@@ -433,15 +446,19 @@ class TSDatasetH(DatasetH):
- The dimension of a batch of data <batch_idx, feature, timestep>
"""
def __init__(self, step_len=30, *args, **kwargs):
def __init__(self, step_len=30, **kwargs):
self.step_len = step_len
super().__init__(*args, **kwargs)
super().__init__(**kwargs)
def setup_data(self, *args, **kwargs):
super().setup_data(*args, **kwargs)
def config(self, **kwargs):
if "step_len" in kwargs:
self.step_len = kwargs.pop("step_len")
super().config(**kwargs)
def setup_data(self, **kwargs):
super().setup_data(**kwargs)
cal = self.handler.fetch(col_set=self.handler.CS_RAW).index.get_level_values("datetime").unique()
cal = sorted(cal)
# Get the datatime index for building timestamp
self.cal = cal
def _prepare_seg(self, slc: slice, **kwargs) -> TSDataSampler:

View File

@@ -6,6 +6,7 @@ import abc
import bisect
import logging
import warnings
from inspect import getfullargspec
from typing import Union, Tuple, List, Iterator, Optional
import pandas as pd
@@ -16,7 +17,7 @@ from ...data import D
from ...config import C
from ...utils import parse_config, transform_end_date, init_instance_by_config
from ...utils.serial import Serializable
from .utils import get_level_index, fetch_df_by_index
from .utils import fetch_df_by_index
from pathlib import Path
from .loader import DataLoader
@@ -99,10 +100,10 @@ class DataHandler(Serializable):
self.fetch_orig = fetch_orig
if init_data:
with TimeInspector.logt("Init data"):
self.init()
self.setup_data()
super().__init__()
def conf_data(self, **kwargs):
def config(self, **kwargs):
"""
configuration of data.
# what data to be loaded from data source
@@ -115,13 +116,16 @@ class DataHandler(Serializable):
for k, v in kwargs.items():
if k in attr_list:
setattr(self, k, v)
else:
raise KeyError("Such config is not supported.")
def init(self, enable_cache: bool = False):
for attr in attr_list:
if attr in kwargs:
kwargs.pop(attr)
super().config(**kwargs)
def setup_data(self, enable_cache: bool = False):
"""
initialize the data.
In case of running intialization for multiple time, it will do nothing for the second time.
Set Up the data in case of running intialization for multiple time
It is responsible for maintaining following variable
1) self._data
@@ -405,14 +409,28 @@ class DataHandlerLP(DataHandler):
if self.drop_raw:
del self._data
def config(self, processor_kwargs: dict = None, **kwargs):
"""
configuration of data.
# what data to be loaded from data source
This method will be used when loading pickled handler from dataset.
The data will be initialized with different time range.
"""
super().config(**kwargs)
if processor_kwargs is not None:
for processor in self.get_all_processors():
processor.config(**processor_kwargs)
# init type
IT_FIT_SEQ = "fit_seq" # the input of `fit` will be the output of the previous processor
IT_FIT_IND = "fit_ind" # the input of `fit` will be the original df
IT_LS = "load_state" # The state of the object has been load by pickle
def init(self, init_type: str = IT_FIT_SEQ, enable_cache: bool = False):
def setup_data(self, init_type: str = IT_FIT_SEQ, **kwargs):
"""
Initialize the data of Qlib
Set up the data in case of running intialization for multiple time
Parameters
----------
@@ -427,7 +445,7 @@ class DataHandlerLP(DataHandler):
when we call `init` next time
"""
# init raw data
super().init(enable_cache=enable_cache)
super().setup_data(**kwargs)
with TimeInspector.logt("fit & process data"):
if init_type == DataHandlerLP.IT_FIT_IND:

View File

@@ -217,3 +217,64 @@ class StaticDataLoader(DataLoader):
join=self.join,
)
self._data.sort_index(inplace=True)
class DataLoaderDH(DataLoader):
"""DataLoaderDH
DataLoader based on (D)ata (H)andler
It is designed to load multiple data from data handler
- If you just want to load data from single datahandler, you can write them in single data handler
"""
def __init__(self, handler_config: dict, fetch_kwargs: dict = {}, is_group=False):
"""
Parameters
----------
handler_config : dict
handler_config will be used to describe the handlers
.. code-block::
<handler_config> := {
"group_name1": <handler>
"group_name2": <handler>
}
or
<handler_config> := <handler>
<handler> := DataHandler Instance | DataHandler Config
fetch_kwargs : dict
fetch_kwargs will be used to describe the different arguments of fetch method, such as col_set, squeeze, data_key, etc.
is_group: bool
is_group will be used to describe whether the key of handler_config is group
"""
from qlib.data.dataset.handler import DataHandler
if is_group:
self.handlers = {
grp: init_instance_by_config(config, accept_types=DataHandler) for grp, config in handler_config.items()
}
else:
self.handlers = init_instance_by_config(handler_config, accept_types=DataHandler)
self.is_group = is_group
self.fetch_kwargs = {"col_set": DataHandler.CS_RAW}
self.fetch_kwargs.update(fetch_kwargs)
def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame:
if instruments is not None:
LOG.warning(f"instruments[{instruments}] is ignored")
if self.is_group:
df = pd.concat(
{
grp: dh.fetch(selector=slice(start_time, end_time), level="datetime", **self.fetch_kwargs)
for grp, dh in self.handlers.items()
},
axis=1,
)
else:
df = self.handlers.fetch(selector=slice(start_time, end_time), level="datetime", **self.fetch_kwargs)
return df

View File

@@ -72,6 +72,17 @@ class Processor(Serializable):
"""
return True
def config(self, **kwargs):
attr_list = {"fit_start_time", "fit_end_time"}
for k, v in kwargs.items():
if k in attr_list and hasattr(self, k):
setattr(self, k, v)
for attr in attr_list:
if attr in kwargs:
kwargs.pop(attr)
super().config(**kwargs)
class DropnaProcessor(Processor):
def __init__(self, fields_group=None):