From 1ca3c6a61c11cff9adf79b1657af555cf68a365a Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 01:29:59 +0800 Subject: [PATCH 01/23] add DataHandlerDL --- qlib/data/dataset/loader.py | 58 +++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index 921bf01c5..faabe2c02 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -217,3 +217,61 @@ class StaticDataLoader(DataLoader): join=self.join, ) self._data.sort_index(inplace=True) + +class DataHandlerDL(DataLoader): + '''DataHandlerDL + DataHandler-based (D)ata (L)oader + It is designed to load multiple data from data handler + - If you just want to load data from single datahandler, you can write them in single data handler + ''' + + def __init__(self, handler_config:dict, fetch_config:dict = {}, is_group=False): + """ + Parameters + ---------- + handler_config : dict + handler_config will be used to describe the handlers + + .. code-block:: + + := { + "group_name1": + "group_name2": + } + or + := + := DataHandler Instance | DataHandler Config + + fetch_config : dict + fetch_config will be used to describe the different arguments of fetch method, such as squeeze, data_key, etc. + + is_group: bool + is_group will be used to describe whether the key of handler_config is group + + """ + if self.is_group: + self.handlers = { + grp: init_instance_by_config(config, accept_types=DataHandler) + for grp, config in handler_config.items() + } + else: + self.handlers = init_instance_by_config(handler_config, accept_types=DataHandler) + + self.is_group = is_group + self.fetch_config = fetch_config + + def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame: + if instruments is not None: + LOG.warning(f"instruments[{instruments}] is ignored") + + if self.is_group: + df = pd.concat( + { + grp: dh.fetch(slice(start_time, end_time), col_set=DataHandler.CS_RAW, **fetch_config) + for grp, dh in self.handlers.items() + }, + axis=1, + ) + else: + df = self.handler.fetch(slice(start_time, end_time), col_set=DataHandler.CS_RAW, **fetch_config) + return df From b1a28358adb9b9e15abd09fe59f7ff4544e399ed Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 01:30:31 +0800 Subject: [PATCH 02/23] black format --- qlib/data/dataset/loader.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index faabe2c02..884d15635 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -218,14 +218,15 @@ class StaticDataLoader(DataLoader): ) self._data.sort_index(inplace=True) + class DataHandlerDL(DataLoader): - '''DataHandlerDL + """DataHandlerDL DataHandler-based (D)ata (L)oader It is designed to load multiple data from data handler - If you just want to load data from single datahandler, you can write them in single data handler - ''' + """ - def __init__(self, handler_config:dict, fetch_config:dict = {}, is_group=False): + def __init__(self, handler_config: dict, fetch_config: dict = {}, is_group=False): """ Parameters ---------- @@ -251,12 +252,11 @@ class DataHandlerDL(DataLoader): """ if self.is_group: self.handlers = { - grp: init_instance_by_config(config, accept_types=DataHandler) - for grp, config in handler_config.items() + grp: init_instance_by_config(config, accept_types=DataHandler) for grp, config in handler_config.items() } else: self.handlers = init_instance_by_config(handler_config, accept_types=DataHandler) - + self.is_group = is_group self.fetch_config = fetch_config From 1fcfe8e4ba6e655ba59ae95180c491ea3fe85c8e Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 01:37:17 +0800 Subject: [PATCH 03/23] add rolling process data --- examples/rolling_process_data/README.md | 2 ++ examples/rolling_process_data/workflow.py | 0 2 files changed, 2 insertions(+) create mode 100644 examples/rolling_process_data/README.md create mode 100644 examples/rolling_process_data/workflow.py diff --git a/examples/rolling_process_data/README.md b/examples/rolling_process_data/README.md new file mode 100644 index 000000000..3f1c8768d --- /dev/null +++ b/examples/rolling_process_data/README.md @@ -0,0 +1,2 @@ +# Rolling Process Data + diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py new file mode 100644 index 000000000..e69de29bb From f6dc25b22982d5e80b4cd2f9c2fc823ed98d244b Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 16:14:22 +0800 Subject: [PATCH 04/23] update rolling process --- examples/highfreq/workflow.py | 1 - .../rolling_process_data/rolling_handler.py | 34 ++++ examples/rolling_process_data/workflow.py | 145 ++++++++++++++++++ qlib/data/dataset/handler.py | 2 +- qlib/data/dataset/loader.py | 21 +-- 5 files changed, 192 insertions(+), 11 deletions(-) create mode 100644 examples/rolling_process_data/rolling_handler.py diff --git a/examples/highfreq/workflow.py b/examples/highfreq/workflow.py index 01de59c0e..c2ca36db3 100644 --- a/examples/highfreq/workflow.py +++ b/examples/highfreq/workflow.py @@ -32,7 +32,6 @@ class HighfreqWorkflow(object): SPEC_CONF = {"custom_ops": [DayLast, FFillNan, BFillNan, Date, Select, IsNull, Cut], "expression_cache": None} MARKET = "all" - BENCHMARK = "SH000300" start_time = "2020-09-15 00:00:00" end_time = "2021-01-18 16:00:00" diff --git a/examples/rolling_process_data/rolling_handler.py b/examples/rolling_process_data/rolling_handler.py new file mode 100644 index 000000000..50a36f219 --- /dev/null +++ b/examples/rolling_process_data/rolling_handler.py @@ -0,0 +1,34 @@ +from qlib.data.dataset.handler import DataHandlerLP +from qlib.data.dataset.loader import DataLoaderDH +from qlib.contrib.data.handler import check_transform_proc + + +class RollingDataHandler(DataHandlerLP): + def __init__( + self, + start_time=None, + end_time=None, + infer_processors=[], + learn_processors=[], + fit_start_time=None, + fit_end_time=None, + data_loader_kwargs={} + ): + infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time) + learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time) + + data_loader = { + "class": "DataLoaderDH", + "kwargs": { + **data_loader_kwargs + }, + } + + super().__init__( + instruments=None, + start_time=start_time, + end_time=end_time, + data_loader=data_loader, + infer_processors=infer_processors, + learn_processors=learn_processors, + ) diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index e69de29bb..8581f149b 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -0,0 +1,145 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import qlib +import pickle +import datetime +import pandas as pd +from qlib.config import REG_CN +from qlib.data.dataset.handler import DataHandlerLP +from qlib.contrib.data.handler import Alpha158 +from qlib.utils import exists_qlib_data, init_instance_by_config +from qlib.tests.data import GetData + +class RollingDataWorkflow(object): + + MARKET = "csi300" + + start_time = "2010-01-01" + end_time = "2019-12-31" + rolling_cnt = 5 + + def _init_qlib(self): + """initialize qlib""" + # use yahoo_cn_1min data + provider_uri = "~/.qlib/qlib_data/cn_data" # target_dir + if not exists_qlib_data(provider_uri): + print(f"Qlib data is not found in {provider_uri}") + GetData().qlib_data(target_dir=provider_uri, region=REG_CN) + qlib.init(provider_uri=provider_uri, region=REG_CN) + + def _dump_pre_handler(self, path): + handler_config = { + "class": "Alpha158", + "module_path": "qlib.contrib.data.handler", + "kwargs": { + "start_time": start_time, + "end_time": end_time, + "instruments": MARKET, + }, + } + pre_handler = init_instance_by_config(handler_config) + pre_handler.to_pickle(path) + + def _load_pre_handler(self, path): + with open(path, "rb") as file_dataset: + pre_handler = pickle.load(file_dataset) + return pre_handler + + def rolling_process(self): + self._init_qlib() + self._dump_pre_handler("pre_handler.py") + pre_handler = self._load_pre_handler("pre_handler.py") + + init_start_time = datetime.datetime(2010,1,1) + init_end_time = datetime.datetime(2014,12,31) + init_fit_end_time = datetime.datetime(2012,12,31) + + dataset_config = { + "class": "DatasetH", + "module_path": "qlib.data.dataset", + "kwargs": { + "handler": { + "class": "RollingDataHandler", + "module_path": "rolling_handler", + "kwargs": { + "start_time": init_start_time, + "end_time": init_start_time, + "fit_start_time": init_fit_start_time, + "fit_end_time": init_fit_end_time, + "data_loader_kwargs":{ + "handler_config": pre_handler, + } + }, + }, + "segments": { + "train": (init_start_time, init_fit_end_time), + "valid": (init_start_time, "2013-12-31"), + "test": (init_start_time, init_end_time), + }, + }, + } + + dataset = init_instance_by_config(dataset_config) + + for rolling_offset in range(rolling_cnt): + if rolling_offset: + dataset.init( + handler_kwargs={ + "init_type": DataHandlerLP.IT_FIT_IND, + "start_time": "2021-01-19 00:00:00", + "end_time": "2021-01-25 16:00:00", + }, + segment_kwargs={ + "train": ("2010-01-01", "2012-12-31"), + "valid": ("2013-01-01", "2013-12-31"), + "test": ("2014-01-01", "2014-12-31"), + }, + ) + + +if __name__ == "__main__": + + # use default data + provider_uri = "~/.qlib/qlib_data/cn_data" # target_dir + if not exists_qlib_data(provider_uri): + print(f"Qlib data is not found in {provider_uri}") + GetData().qlib_data(target_dir=provider_uri, region=REG_CN) + + qlib.init(provider_uri=provider_uri, region=REG_CN) + + market = "csi300" + benchmark = "SH000300" + + ################################### + # train model + ################################### + data_handler_config = { + "start_time": "2008-01-01", + "end_time": "2020-08-01", + "fit_start_time": "2008-01-01", + "fit_end_time": "2014-12-31", + "instruments": market, + } + + task = { + "dataset": { + "class": "DatasetH", + "module_path": "qlib.data.dataset", + "kwargs": { + "handler": { + "class": "Alpha158", + "module_path": "qlib.contrib.data.handler", + "kwargs": data_handler_config, + }, + "segments": { + "train": ("2008-01-01", "2014-12-31"), + "valid": ("2015-01-01", "2016-12-31"), + "test": ("2017-01-01", "2020-08-01"), + }, + }, + }, + } + + dataset = init_instance_by_config(task["dataset"]) + diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 050043ba6..f4795c566 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -16,7 +16,7 @@ from ...data import D from ...config import C from ...utils import parse_config, transform_end_date, init_instance_by_config from ...utils.serial import Serializable -from .utils import get_level_index, fetch_df_by_index +from .utils import fetch_df_by_index from pathlib import Path from .loader import DataLoader diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index 884d15635..f88aaf05e 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -219,14 +219,14 @@ class StaticDataLoader(DataLoader): self._data.sort_index(inplace=True) -class DataHandlerDL(DataLoader): - """DataHandlerDL - DataHandler-based (D)ata (L)oader +class DataLoaderDH(DataLoader): + """DataLoaderDH + DataLoader based on (D)ata (H)andler It is designed to load multiple data from data handler - If you just want to load data from single datahandler, you can write them in single data handler """ - def __init__(self, handler_config: dict, fetch_config: dict = {}, is_group=False): + def __init__(self, handler_config: dict, fetch_kwargs: dict = {}, is_group=False): """ Parameters ---------- @@ -243,8 +243,8 @@ class DataHandlerDL(DataLoader): := := DataHandler Instance | DataHandler Config - fetch_config : dict - fetch_config will be used to describe the different arguments of fetch method, such as squeeze, data_key, etc. + fetch_kwargs : dict + fetch_kwargs will be used to describe the different arguments of fetch method, such as col_set, squeeze, data_key, etc. is_group: bool is_group will be used to describe whether the key of handler_config is group @@ -258,7 +258,10 @@ class DataHandlerDL(DataLoader): self.handlers = init_instance_by_config(handler_config, accept_types=DataHandler) self.is_group = is_group - self.fetch_config = fetch_config + self.fetch_kwargs = { + "col_set":DataHandler.CS_RAW + } + self.fetch_kwargs = {**self.fetch_kwargs, **fetch_kwargs} def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame: if instruments is not None: @@ -267,11 +270,11 @@ class DataHandlerDL(DataLoader): if self.is_group: df = pd.concat( { - grp: dh.fetch(slice(start_time, end_time), col_set=DataHandler.CS_RAW, **fetch_config) + grp: dh.fetch(selector=slice(start_time, end_time), level="datetime", **self.fetch_kwargs) for grp, dh in self.handlers.items() }, axis=1, ) else: - df = self.handler.fetch(slice(start_time, end_time), col_set=DataHandler.CS_RAW, **fetch_config) + df = self.handler.fetch(selector=slice(start_time, end_time), level="datetime", **self.fetch_kwargs) return df From 4ec300787efc87900db522145f43e20d52402bc1 Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 19:54:52 +0800 Subject: [PATCH 05/23] update rolling workflow --- examples/rolling_process_data/workflow.py | 49 +++++++++++++---------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index 8581f149b..62523aefd 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -3,8 +3,9 @@ import qlib import pickle -import datetime import pandas as pd + +from datetime import datetime from qlib.config import REG_CN from qlib.data.dataset.handler import DataHandlerLP from qlib.contrib.data.handler import Alpha158 @@ -14,7 +15,6 @@ from qlib.tests.data import GetData class RollingDataWorkflow(object): MARKET = "csi300" - start_time = "2010-01-01" end_time = "2019-12-31" rolling_cnt = 5 @@ -33,9 +33,9 @@ class RollingDataWorkflow(object): "class": "Alpha158", "module_path": "qlib.contrib.data.handler", "kwargs": { - "start_time": start_time, - "end_time": end_time, - "instruments": MARKET, + "start_time": self.start_time, + "end_time": self.end_time, + "instruments": self.MARKET, }, } pre_handler = init_instance_by_config(handler_config) @@ -51,10 +51,13 @@ class RollingDataWorkflow(object): self._dump_pre_handler("pre_handler.py") pre_handler = self._load_pre_handler("pre_handler.py") - init_start_time = datetime.datetime(2010,1,1) - init_end_time = datetime.datetime(2014,12,31) - init_fit_end_time = datetime.datetime(2012,12,31) - + train_start_time = (2010,1,1) + train_end_time = (2012,12,31) + valid_start_time = (2013,1,1) + valid_end_time = (2013,12,31) + test_start_time = (2014,1,1) + test_end_time = (2014,12,31) + dataset_config = { "class": "DatasetH", "module_path": "qlib.data.dataset", @@ -63,19 +66,19 @@ class RollingDataWorkflow(object): "class": "RollingDataHandler", "module_path": "rolling_handler", "kwargs": { - "start_time": init_start_time, - "end_time": init_start_time, - "fit_start_time": init_fit_start_time, - "fit_end_time": init_fit_end_time, + "start_time": datetime(*train_start_time), + "end_time": datetime(*test_end_time), + "fit_start_time": datetime(*train_start_time), + "fit_end_time": datetime(*train_end_time), "data_loader_kwargs":{ "handler_config": pre_handler, } }, }, "segments": { - "train": (init_start_time, init_fit_end_time), - "valid": (init_start_time, "2013-12-31"), - "test": (init_start_time, init_end_time), + "train": (datetime(*train_start_time), datetime(*train_end_time)), + "valid": (datetime(*valid_start_time), datetime(*valid_end_time)), + "test": (datetime(*test_start_time), datetime(*test_end_time)), }, }, } @@ -86,17 +89,19 @@ class RollingDataWorkflow(object): if rolling_offset: dataset.init( handler_kwargs={ - "init_type": DataHandlerLP.IT_FIT_IND, - "start_time": "2021-01-19 00:00:00", - "end_time": "2021-01-25 16:00:00", + "init_type": DataHandlerLP.IT_FIT_SEQ, + "start_time": datetime(train_start_time[0] + 1, *train_start_time[1:]), + "end_time": datetime(test_end_time[0] + 1, *test_end_time[1:]), }, segment_kwargs={ - "train": ("2010-01-01", "2012-12-31"), - "valid": ("2013-01-01", "2013-12-31"), - "test": ("2014-01-01", "2014-12-31"), + "train": (datetime(train_start_time[0] + 1, *train_start_time[1:]), datetime(train_end_time[0], *train_end_time[1:])), + "valid": (datetime(valid_start_time[0] + 1, *valid_start_time[1:]), datetime(valid_end_time[0], *valid_end_time[1:])), + "test": (datetime(test_start_time[0] + 1, *test_start_time[1:]), datetime(test_end_time[0], *test_end_time[1:])), }, ) + dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) + if __name__ == "__main__": From efe134e9f4f5445055f9c1cd30576bf5f6b42217 Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 19:56:04 +0800 Subject: [PATCH 06/23] update workflow --- examples/rolling_process_data/rolling_handler.py | 8 +++----- examples/rolling_process_data/workflow.py | 2 +- qlib/data/dataset/loader.py | 4 +--- 3 files changed, 5 insertions(+), 9 deletions(-) diff --git a/examples/rolling_process_data/rolling_handler.py b/examples/rolling_process_data/rolling_handler.py index 50a36f219..13b399afd 100644 --- a/examples/rolling_process_data/rolling_handler.py +++ b/examples/rolling_process_data/rolling_handler.py @@ -12,17 +12,15 @@ class RollingDataHandler(DataHandlerLP): learn_processors=[], fit_start_time=None, fit_end_time=None, - data_loader_kwargs={} + data_loader_kwargs={}, ): infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time) learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time) data_loader = { "class": "DataLoaderDH", - "kwargs": { - **data_loader_kwargs - }, - } + "kwargs": {**data_loader_kwargs}, + } super().__init__( instruments=None, diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index 62523aefd..9b61af47e 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -101,7 +101,7 @@ class RollingDataWorkflow(object): ) dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) - + if __name__ == "__main__": diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index f88aaf05e..539b930ec 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -258,9 +258,7 @@ class DataLoaderDH(DataLoader): self.handlers = init_instance_by_config(handler_config, accept_types=DataHandler) self.is_group = is_group - self.fetch_kwargs = { - "col_set":DataHandler.CS_RAW - } + self.fetch_kwargs = {"col_set": DataHandler.CS_RAW} self.fetch_kwargs = {**self.fetch_kwargs, **fetch_kwargs} def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame: From a04c6bd6c941027d1beab07d65be8712d41e2406 Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 19:56:22 +0800 Subject: [PATCH 07/23] balck format --- examples/rolling_process_data/workflow.py | 43 ++++++++++++++--------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index 9b61af47e..9dd4285da 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -12,11 +12,12 @@ from qlib.contrib.data.handler import Alpha158 from qlib.utils import exists_qlib_data, init_instance_by_config from qlib.tests.data import GetData + class RollingDataWorkflow(object): MARKET = "csi300" start_time = "2010-01-01" - end_time = "2019-12-31" + end_time = "2019-12-31" rolling_cnt = 5 def _init_qlib(self): @@ -27,7 +28,7 @@ class RollingDataWorkflow(object): print(f"Qlib data is not found in {provider_uri}") GetData().qlib_data(target_dir=provider_uri, region=REG_CN) qlib.init(provider_uri=provider_uri, region=REG_CN) - + def _dump_pre_handler(self, path): handler_config = { "class": "Alpha158", @@ -51,13 +52,13 @@ class RollingDataWorkflow(object): self._dump_pre_handler("pre_handler.py") pre_handler = self._load_pre_handler("pre_handler.py") - train_start_time = (2010,1,1) - train_end_time = (2012,12,31) - valid_start_time = (2013,1,1) - valid_end_time = (2013,12,31) - test_start_time = (2014,1,1) - test_end_time = (2014,12,31) - + train_start_time = (2010, 1, 1) + train_end_time = (2012, 12, 31) + valid_start_time = (2013, 1, 1) + valid_end_time = (2013, 12, 31) + test_start_time = (2014, 1, 1) + test_end_time = (2014, 12, 31) + dataset_config = { "class": "DatasetH", "module_path": "qlib.data.dataset", @@ -70,9 +71,9 @@ class RollingDataWorkflow(object): "end_time": datetime(*test_end_time), "fit_start_time": datetime(*train_start_time), "fit_end_time": datetime(*train_end_time), - "data_loader_kwargs":{ + "data_loader_kwargs": { "handler_config": pre_handler, - } + }, }, }, "segments": { @@ -94,14 +95,23 @@ class RollingDataWorkflow(object): "end_time": datetime(test_end_time[0] + 1, *test_end_time[1:]), }, segment_kwargs={ - "train": (datetime(train_start_time[0] + 1, *train_start_time[1:]), datetime(train_end_time[0], *train_end_time[1:])), - "valid": (datetime(valid_start_time[0] + 1, *valid_start_time[1:]), datetime(valid_end_time[0], *valid_end_time[1:])), - "test": (datetime(test_start_time[0] + 1, *test_start_time[1:]), datetime(test_end_time[0], *test_end_time[1:])), + "train": ( + datetime(train_start_time[0] + 1, *train_start_time[1:]), + datetime(train_end_time[0], *train_end_time[1:]), + ), + "valid": ( + datetime(valid_start_time[0] + 1, *valid_start_time[1:]), + datetime(valid_end_time[0], *valid_end_time[1:]), + ), + "test": ( + datetime(test_start_time[0] + 1, *test_start_time[1:]), + datetime(test_end_time[0], *test_end_time[1:]), + ), }, ) - dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) - + dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) + if __name__ == "__main__": @@ -147,4 +157,3 @@ if __name__ == "__main__": } dataset = init_instance_by_config(task["dataset"]) - From 68246b3b6d7037f3134ceb6e59aef869e96f1d8f Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 19:58:55 +0800 Subject: [PATCH 08/23] update workflow --- examples/rolling_process_data/workflow.py | 87 +++++------------------ 1 file changed, 18 insertions(+), 69 deletions(-) diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index 9dd4285da..2f48662bd 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -2,6 +2,7 @@ # Licensed under the MIT License. import qlib +import fire import pickle import pandas as pd @@ -12,12 +13,11 @@ from qlib.contrib.data.handler import Alpha158 from qlib.utils import exists_qlib_data, init_instance_by_config from qlib.tests.data import GetData - class RollingDataWorkflow(object): MARKET = "csi300" start_time = "2010-01-01" - end_time = "2019-12-31" + end_time = "2019-12-31" rolling_cnt = 5 def _init_qlib(self): @@ -28,7 +28,7 @@ class RollingDataWorkflow(object): print(f"Qlib data is not found in {provider_uri}") GetData().qlib_data(target_dir=provider_uri, region=REG_CN) qlib.init(provider_uri=provider_uri, region=REG_CN) - + def _dump_pre_handler(self, path): handler_config = { "class": "Alpha158", @@ -52,13 +52,13 @@ class RollingDataWorkflow(object): self._dump_pre_handler("pre_handler.py") pre_handler = self._load_pre_handler("pre_handler.py") - train_start_time = (2010, 1, 1) - train_end_time = (2012, 12, 31) - valid_start_time = (2013, 1, 1) - valid_end_time = (2013, 12, 31) - test_start_time = (2014, 1, 1) - test_end_time = (2014, 12, 31) - + train_start_time = (2010,1,1) + train_end_time = (2012,12,31) + valid_start_time = (2013,1,1) + valid_end_time = (2013,12,31) + test_start_time = (2014,1,1) + test_end_time = (2014,12,31) + dataset_config = { "class": "DatasetH", "module_path": "qlib.data.dataset", @@ -71,9 +71,9 @@ class RollingDataWorkflow(object): "end_time": datetime(*test_end_time), "fit_start_time": datetime(*train_start_time), "fit_end_time": datetime(*train_end_time), - "data_loader_kwargs": { + "data_loader_kwargs":{ "handler_config": pre_handler, - }, + } }, }, "segments": { @@ -95,65 +95,14 @@ class RollingDataWorkflow(object): "end_time": datetime(test_end_time[0] + 1, *test_end_time[1:]), }, segment_kwargs={ - "train": ( - datetime(train_start_time[0] + 1, *train_start_time[1:]), - datetime(train_end_time[0], *train_end_time[1:]), - ), - "valid": ( - datetime(valid_start_time[0] + 1, *valid_start_time[1:]), - datetime(valid_end_time[0], *valid_end_time[1:]), - ), - "test": ( - datetime(test_start_time[0] + 1, *test_start_time[1:]), - datetime(test_end_time[0], *test_end_time[1:]), - ), + "train": (datetime(train_start_time[0] + 1, *train_start_time[1:]), datetime(train_end_time[0], *train_end_time[1:])), + "valid": (datetime(valid_start_time[0] + 1, *valid_start_time[1:]), datetime(valid_end_time[0], *valid_end_time[1:])), + "test": (datetime(test_start_time[0] + 1, *test_start_time[1:]), datetime(test_end_time[0], *test_end_time[1:])), }, ) - dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) - + dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) + if __name__ == "__main__": - - # use default data - provider_uri = "~/.qlib/qlib_data/cn_data" # target_dir - if not exists_qlib_data(provider_uri): - print(f"Qlib data is not found in {provider_uri}") - GetData().qlib_data(target_dir=provider_uri, region=REG_CN) - - qlib.init(provider_uri=provider_uri, region=REG_CN) - - market = "csi300" - benchmark = "SH000300" - - ################################### - # train model - ################################### - data_handler_config = { - "start_time": "2008-01-01", - "end_time": "2020-08-01", - "fit_start_time": "2008-01-01", - "fit_end_time": "2014-12-31", - "instruments": market, - } - - task = { - "dataset": { - "class": "DatasetH", - "module_path": "qlib.data.dataset", - "kwargs": { - "handler": { - "class": "Alpha158", - "module_path": "qlib.contrib.data.handler", - "kwargs": data_handler_config, - }, - "segments": { - "train": ("2008-01-01", "2014-12-31"), - "valid": ("2015-01-01", "2016-12-31"), - "test": ("2017-01-01", "2020-08-01"), - }, - }, - }, - } - - dataset = init_instance_by_config(task["dataset"]) + fire.Fire(RollingDataWorkflow) From e119c8576c78f7729364358ce1a3515ca682177a Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 19:59:22 +0800 Subject: [PATCH 09/23] black format --- examples/rolling_process_data/workflow.py | 42 ++++++++++++++--------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index 2f48662bd..d5f7fec10 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -13,11 +13,12 @@ from qlib.contrib.data.handler import Alpha158 from qlib.utils import exists_qlib_data, init_instance_by_config from qlib.tests.data import GetData + class RollingDataWorkflow(object): MARKET = "csi300" start_time = "2010-01-01" - end_time = "2019-12-31" + end_time = "2019-12-31" rolling_cnt = 5 def _init_qlib(self): @@ -28,7 +29,7 @@ class RollingDataWorkflow(object): print(f"Qlib data is not found in {provider_uri}") GetData().qlib_data(target_dir=provider_uri, region=REG_CN) qlib.init(provider_uri=provider_uri, region=REG_CN) - + def _dump_pre_handler(self, path): handler_config = { "class": "Alpha158", @@ -52,13 +53,13 @@ class RollingDataWorkflow(object): self._dump_pre_handler("pre_handler.py") pre_handler = self._load_pre_handler("pre_handler.py") - train_start_time = (2010,1,1) - train_end_time = (2012,12,31) - valid_start_time = (2013,1,1) - valid_end_time = (2013,12,31) - test_start_time = (2014,1,1) - test_end_time = (2014,12,31) - + train_start_time = (2010, 1, 1) + train_end_time = (2012, 12, 31) + valid_start_time = (2013, 1, 1) + valid_end_time = (2013, 12, 31) + test_start_time = (2014, 1, 1) + test_end_time = (2014, 12, 31) + dataset_config = { "class": "DatasetH", "module_path": "qlib.data.dataset", @@ -71,9 +72,9 @@ class RollingDataWorkflow(object): "end_time": datetime(*test_end_time), "fit_start_time": datetime(*train_start_time), "fit_end_time": datetime(*train_end_time), - "data_loader_kwargs":{ + "data_loader_kwargs": { "handler_config": pre_handler, - } + }, }, }, "segments": { @@ -95,14 +96,23 @@ class RollingDataWorkflow(object): "end_time": datetime(test_end_time[0] + 1, *test_end_time[1:]), }, segment_kwargs={ - "train": (datetime(train_start_time[0] + 1, *train_start_time[1:]), datetime(train_end_time[0], *train_end_time[1:])), - "valid": (datetime(valid_start_time[0] + 1, *valid_start_time[1:]), datetime(valid_end_time[0], *valid_end_time[1:])), - "test": (datetime(test_start_time[0] + 1, *test_start_time[1:]), datetime(test_end_time[0], *test_end_time[1:])), + "train": ( + datetime(train_start_time[0] + 1, *train_start_time[1:]), + datetime(train_end_time[0], *train_end_time[1:]), + ), + "valid": ( + datetime(valid_start_time[0] + 1, *valid_start_time[1:]), + datetime(valid_end_time[0], *valid_end_time[1:]), + ), + "test": ( + datetime(test_start_time[0] + 1, *test_start_time[1:]), + datetime(test_end_time[0], *test_end_time[1:]), + ), }, ) - dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) - + dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) + if __name__ == "__main__": fire.Fire(RollingDataWorkflow) From 9cc3b18e4e9cd61f7745271a01d628063b1b48a3 Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 20:36:07 +0800 Subject: [PATCH 10/23] fix but --- examples/rolling_process_data/README.md | 1 - examples/rolling_process_data/workflow.py | 19 ++++++++++++++++--- qlib/data/dataset/loader.py | 6 ++++-- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/examples/rolling_process_data/README.md b/examples/rolling_process_data/README.md index 3f1c8768d..6a6af0d3d 100644 --- a/examples/rolling_process_data/README.md +++ b/examples/rolling_process_data/README.md @@ -1,2 +1 @@ # Rolling Process Data - diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index d5f7fec10..29b1c19f8 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -38,9 +38,12 @@ class RollingDataWorkflow(object): "start_time": self.start_time, "end_time": self.end_time, "instruments": self.MARKET, + "infer_processors": [], + "learn_processors": [], }, } pre_handler = init_instance_by_config(handler_config) + pre_handler.config(dump_all=True) pre_handler.to_pickle(path) def _load_pre_handler(self, path): @@ -50,8 +53,8 @@ class RollingDataWorkflow(object): def rolling_process(self): self._init_qlib() - self._dump_pre_handler("pre_handler.py") - pre_handler = self._load_pre_handler("pre_handler.py") + self._dump_pre_handler("pre_handler.pkl") + pre_handler = self._load_pre_handler("pre_handler.pkl") train_start_time = (2010, 1, 1) train_end_time = (2012, 12, 31) @@ -72,6 +75,13 @@ class RollingDataWorkflow(object): "end_time": datetime(*test_end_time), "fit_start_time": datetime(*train_start_time), "fit_end_time": datetime(*train_end_time), + "infer_processors": [ + {"class":"RobustZScoreNorm", "kwargs": {"fields_group": "feature"}}, + ], + "learn_processors": [ + {"class": "DropnaLabel"}, + {"class": "CSZScoreNorm", "kwargs": {"fields_group": "label"}}, + ], "data_loader_kwargs": { "handler_config": pre_handler, }, @@ -87,7 +97,8 @@ class RollingDataWorkflow(object): dataset = init_instance_by_config(dataset_config) - for rolling_offset in range(rolling_cnt): + for rolling_offset in range(self.rolling_cnt): + print(f"===========rolling{rolling_offset} start===========") if rolling_offset: dataset.init( handler_kwargs={ @@ -112,6 +123,8 @@ class RollingDataWorkflow(object): ) dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) + ## print or dump data + print(f"===========rolling{rolling_offset} end===========") if __name__ == "__main__": diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index 539b930ec..1cda5c025 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -250,7 +250,9 @@ class DataLoaderDH(DataLoader): is_group will be used to describe whether the key of handler_config is group """ - if self.is_group: + from qlib.data.dataset.handler import DataHandler + + if is_group: self.handlers = { grp: init_instance_by_config(config, accept_types=DataHandler) for grp, config in handler_config.items() } @@ -274,5 +276,5 @@ class DataLoaderDH(DataLoader): axis=1, ) else: - df = self.handler.fetch(selector=slice(start_time, end_time), level="datetime", **self.fetch_kwargs) + df = self.handlers.fetch(selector=slice(start_time, end_time), level="datetime", **self.fetch_kwargs) return df From d6ff764bb270017b74099205dcfb78ade161a9e7 Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 20:36:45 +0800 Subject: [PATCH 11/23] black format --- examples/rolling_process_data/workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index 29b1c19f8..3b38faa31 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -76,7 +76,7 @@ class RollingDataWorkflow(object): "fit_start_time": datetime(*train_start_time), "fit_end_time": datetime(*train_end_time), "infer_processors": [ - {"class":"RobustZScoreNorm", "kwargs": {"fields_group": "feature"}}, + {"class": "RobustZScoreNorm", "kwargs": {"fields_group": "feature"}}, ], "learn_processors": [ {"class": "DropnaLabel"}, From 194217fb07696530d5b575567c5bb664d479948d Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 21:47:17 +0800 Subject: [PATCH 12/23] fix bug --- examples/rolling_process_data/workflow.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index 3b38faa31..719d93a1b 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -103,21 +103,21 @@ class RollingDataWorkflow(object): dataset.init( handler_kwargs={ "init_type": DataHandlerLP.IT_FIT_SEQ, - "start_time": datetime(train_start_time[0] + 1, *train_start_time[1:]), - "end_time": datetime(test_end_time[0] + 1, *test_end_time[1:]), + "start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), + "end_time": datetime(test_end_time[0] + rolling_offset, *test_end_time[1:]), }, segment_kwargs={ "train": ( - datetime(train_start_time[0] + 1, *train_start_time[1:]), - datetime(train_end_time[0], *train_end_time[1:]), + datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), + datetime(train_end_time[0] + rolling_offset, *train_end_time[1:]), ), "valid": ( - datetime(valid_start_time[0] + 1, *valid_start_time[1:]), - datetime(valid_end_time[0], *valid_end_time[1:]), + datetime(valid_start_time[0] + rolling_offset, *valid_start_time[1:]), + datetime(valid_end_time[0] + rolling_offset, *valid_end_time[1:]), ), "test": ( - datetime(test_start_time[0] + 1, *test_start_time[1:]), - datetime(test_end_time[0], *test_end_time[1:]), + datetime(test_start_time[0] + rolling_offset, *test_start_time[1:]), + datetime(test_end_time[0] + rolling_offset, *test_end_time[1:]), ), }, ) From 5f60d18dfe2fa71d341ee7e8128f0f4c1f79c119 Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 22:08:23 +0800 Subject: [PATCH 13/23] fix config_data bug --- examples/rolling_process_data/workflow.py | 4 ++++ qlib/data/dataset/__init__.py | 2 +- qlib/data/dataset/handler.py | 28 ++++++++++++++++++++--- 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index 719d93a1b..0be88dddc 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -98,6 +98,7 @@ class RollingDataWorkflow(object): dataset = init_instance_by_config(dataset_config) for rolling_offset in range(self.rolling_cnt): + print(f"===========rolling{rolling_offset} start===========") if rolling_offset: dataset.init( @@ -105,6 +106,8 @@ class RollingDataWorkflow(object): "init_type": DataHandlerLP.IT_FIT_SEQ, "start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), "end_time": datetime(test_end_time[0] + rolling_offset, *test_end_time[1:]), + "fit_start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), + "fit_end_time": datetime(train_end_time[0] + rolling_offset, *train_end_time[1:]), }, segment_kwargs={ "train": ( @@ -123,6 +126,7 @@ class RollingDataWorkflow(object): ) dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) + print(dtrain, dvalid, dtest) ## print or dump data print(f"===========rolling{rolling_offset} end===========") diff --git a/qlib/data/dataset/__init__.py b/qlib/data/dataset/__init__.py index 0f5d2baba..518b8eecd 100644 --- a/qlib/data/dataset/__init__.py +++ b/qlib/data/dataset/__init__.py @@ -98,7 +98,7 @@ class DatasetH(Dataset): raise TypeError(f"param handler_kwargs must be type dict, not {type(handler_kwargs)}") kwargs_init = {} kwargs_conf_data = {} - conf_data_arg = {"instruments", "start_time", "end_time"} + conf_data_arg = {"instruments", "start_time", "end_time", "fit_start_time", "fit_end_time"} for k, v in handler_kwargs.items(): if k in conf_data_arg: kwargs_conf_data.update({k: v}) diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index f4795c566..40db5e4f3 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -115,8 +115,7 @@ class DataHandler(Serializable): for k, v in kwargs.items(): if k in attr_list: setattr(self, k, v) - else: - raise KeyError("Such config is not supported.") + def init(self, enable_cache: bool = False): """ @@ -405,11 +404,34 @@ class DataHandlerLP(DataHandler): if self.drop_raw: del self._data + + def conf_data(self, **kwargs): + """ + configuration of data. + # what data to be loaded from data source + + This method will be used when loading pickled handler from dataset. + The data will be initialized with different time range. + + """ + attr_list = {"fit_start_time", "fit_end_time"} + for k, v in kwargs.items(): + if k in attr_list: + for infer_processor in self.infer_processors: + if getattr(infer_processor, k, None): + setattr(infer_processor, k, v) + + for learn_processor in self.learn_processors: + if getattr(learn_processor, k, None): + setattr(learn_processor, k, v) + + super().conf_data(**kwargs) + # init type IT_FIT_SEQ = "fit_seq" # the input of `fit` will be the output of the previous processor IT_FIT_IND = "fit_ind" # the input of `fit` will be the original df IT_LS = "load_state" # The state of the object has been load by pickle - + def init(self, init_type: str = IT_FIT_SEQ, enable_cache: bool = False): """ Initialize the data of Qlib From 4ee0240c2483383a28099d97e5688bce8ea030b1 Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 25 Mar 2021 22:08:39 +0800 Subject: [PATCH 14/23] black format --- qlib/data/dataset/handler.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 40db5e4f3..9aa05b9b9 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -116,7 +116,6 @@ class DataHandler(Serializable): if k in attr_list: setattr(self, k, v) - def init(self, enable_cache: bool = False): """ initialize the data. @@ -404,7 +403,6 @@ class DataHandlerLP(DataHandler): if self.drop_raw: del self._data - def conf_data(self, **kwargs): """ configuration of data. @@ -431,7 +429,7 @@ class DataHandlerLP(DataHandler): IT_FIT_SEQ = "fit_seq" # the input of `fit` will be the output of the previous processor IT_FIT_IND = "fit_ind" # the input of `fit` will be the original df IT_LS = "load_state" # The state of the object has been load by pickle - + def init(self, init_type: str = IT_FIT_SEQ, enable_cache: bool = False): """ Initialize the data of Qlib From 31bc85bf867ba2161c638b819b41e3cb7e863ce1 Mon Sep 17 00:00:00 2001 From: bxdd Date: Mon, 29 Mar 2021 19:49:30 +0800 Subject: [PATCH 15/23] restructure data layer config & setup --- examples/highfreq/highfreq_processor.py | 7 ++ examples/highfreq/workflow.py | 33 ++++--- qlib/data/dataset/__init__.py | 116 ++++++++++++++---------- qlib/data/dataset/handler.py | 46 +++++----- qlib/data/dataset/loader.py | 1 - qlib/data/dataset/processor.py | 22 +++++ 6 files changed, 138 insertions(+), 87 deletions(-) diff --git a/examples/highfreq/highfreq_processor.py b/examples/highfreq/highfreq_processor.py index f0ab0dec2..4ec8f3dd2 100644 --- a/examples/highfreq/highfreq_processor.py +++ b/examples/highfreq/highfreq_processor.py @@ -70,3 +70,10 @@ class HighFreqNorm(Processor): columns=["FEATURE_%d" % i for i in range(12 * 240)], ).sort_index() return df_new_features + + def config(fit_start_time=None, fit_end_time=None, **kwargs): + if fit_start_time: + self.fit_start_time = fit_start_time + if fit_end_time: + self.fit_end_time = fit_end_time + super().config(**kwargs) diff --git a/examples/highfreq/workflow.py b/examples/highfreq/workflow.py index c2ca36db3..0b48b971f 100644 --- a/examples/highfreq/workflow.py +++ b/examples/highfreq/workflow.py @@ -31,7 +31,7 @@ class HighfreqWorkflow(object): SPEC_CONF = {"custom_ops": [DayLast, FFillNan, BFillNan, Date, Select, IsNull, Cut], "expression_cache": None} - MARKET = "all" + MARKET = "csi300" start_time = "2020-09-15 00:00:00" end_time = "2021-01-18 16:00:00" @@ -145,35 +145,40 @@ class HighfreqWorkflow(object): self._prepare_calender_cache() ##=============reinit dataset============= - dataset.init( + dataset.config( + handler_kwargs={ + "start_time": "2021-01-19 00:00:00", + "end_time": "2021-01-25 16:00:00", + }, + segments={ + "test": ( + "2021-01-19 00:00:00", + "2021-01-25 16:00:00", + ), + }, + ) + dataset.setup_data( handler_kwargs={ "init_type": DataHandlerLP.IT_LS, - "start_time": "2021-01-19 00:00:00", - "end_time": "2021-01-25 16:00:00", - }, - segment_kwargs={ - "test": ( - "2021-01-19 00:00:00", - "2021-01-25 16:00:00", - ), }, ) - dataset_backtest.init( + dataset_backtest.config( handler_kwargs={ "start_time": "2021-01-19 00:00:00", "end_time": "2021-01-25 16:00:00", }, - segment_kwargs={ + segments={ "test": ( "2021-01-19 00:00:00", "2021-01-25 16:00:00", ), }, ) + dataset_backtest.setup_data(handler_kwargs={}) ##=============get data============= - xtest = dataset.prepare(["test"]) - backtest_test = dataset_backtest.prepare(["test"]) + xtest, = dataset.prepare(["test"]) + backtest_test, = dataset_backtest.prepare(["test"]) print(xtest, backtest_test) return diff --git a/qlib/data/dataset/__init__.py b/qlib/data/dataset/__init__.py index 518b8eecd..aa90cee2f 100644 --- a/qlib/data/dataset/__init__.py +++ b/qlib/data/dataset/__init__.py @@ -20,17 +20,25 @@ class Dataset(Serializable): """ init is designed to finish following steps: + - init instance + + - config the state of the dataset(info to prepare the data) + - The name of essential state for preparing data should not start with '_' so that it could be serialized on disk when serializing. + - setup data - The data related attributes' names should start with '_' so that it will not be saved on disk when serializing. - - initialize the state of the dataset(info to prepare the data) - - The name of essential state for preparing data should not start with '_' so that it could be serialized on disk when serializing. - The data could specify the info to caculate the essential data for preparation """ self.setup_data(*args, **kwargs) super().__init__() + def config(self, *arg, **kwargs): + """ + config is designed to configure and parameters that cannot be learned from the data + """ + super().config(*arg, **kwargs) + def setup_data(self, *args, **kwargs): """ Setup the data. @@ -39,7 +47,7 @@ class Dataset(Serializable): - User have a Dataset object with learned status on disk. - - User load the Dataset object from the disk(Note the init function is skiped). + - User load the Dataset object from the disk. - User call `setup_data` to load new data. @@ -76,44 +84,7 @@ class DatasetH(Dataset): - The processing is related to data split. """ - def init(self, handler_kwargs: dict = None, segment_kwargs: dict = None): - """ - Initialize the DatasetH - - Parameters - ---------- - handler_kwargs : dict - Config of DataHanlder, which could include the following arguments: - - - arguments of DataHandler.conf_data, such as 'instruments', 'start_time' and 'end_time'. - - - arguments of DataHandler.init, such as 'enable_cache', etc. - - segment_kwargs : dict - Config of segments which is same as 'segments' in DatasetH.setup_data - - """ - if handler_kwargs: - if not isinstance(handler_kwargs, dict): - raise TypeError(f"param handler_kwargs must be type dict, not {type(handler_kwargs)}") - kwargs_init = {} - kwargs_conf_data = {} - conf_data_arg = {"instruments", "start_time", "end_time", "fit_start_time", "fit_end_time"} - for k, v in handler_kwargs.items(): - if k in conf_data_arg: - kwargs_conf_data.update({k: v}) - else: - kwargs_init.update({k: v}) - - self.handler.conf_data(**kwargs_conf_data) - self.handler.init(**kwargs_init) - - if segment_kwargs: - if not isinstance(segment_kwargs, dict): - raise TypeError(f"param handler_kwargs must be type dict, not {type(segment_kwargs)}") - self.segments = segment_kwargs.copy() - - def setup_data(self, handler: Union[Dict, DataHandler], segments: Dict[Text, Tuple]): + def __init__(self, handler: Union[Dict, DataHandler], segments: Dict[Text, Tuple], **kwargs): """ Setup the underlying data. @@ -144,6 +115,52 @@ class DatasetH(Dataset): """ self.handler = init_instance_by_config(handler, accept_types=DataHandler) self.segments = segments.copy() + super().__init__(**kwargs) + + def config(self, handler_kwargs:dict = None, segments:dict = None, **kwargs): + """ + Initialize the DatasetH + + Parameters + ---------- + handler_kwargs : dict + Config of DataHanlder, which could include the following arguments: + + - arguments of DataHandler.conf_data, such as 'instruments', 'start_time' and 'end_time'. + + kwargs : dict + Config of DatasetH, such as + + - segments : dict + Config of segments which is same as 'segments' in self.__init__ + + """ + super().config(**kwargs) + if handler_kwargs is not None: + self.handler.config(**handler_kwargs) + if segments is not None: + self.segments = segments.copy() + + + + def setup_data(self, handler_kwargs: dict = None, **kwargs): + """ + Setup the Data + + Parameters + ---------- + handler_kwargs : dict + init arguments of DataHanlder, which could include the following arguments: + + - init_type : Init Type of Handler + + - enable_cache : wheter to enable cache + + """ + super().setup_data(**kwargs) + if handler_kwargs is not None: + self.handler.setup_data(**handler_kwargs) + def __repr__(self): return "{name}(handler={handler}, segments={segments})".format( @@ -433,16 +450,21 @@ class TSDatasetH(DatasetH): - The dimension of a batch of data """ - def __init__(self, step_len=30, *args, **kwargs): + def __init__(self, step_len=30, **kwargs): self.step_len = step_len - super().__init__(*args, **kwargs) + super().__init__(**kwargs) - def setup_data(self, *args, **kwargs): - super().setup_data(*args, **kwargs) + def config(self, step_len=None, **kwargs): + super().config(**kwargs) + if step_len: + self.step_len = step_len + + def setup_data(self, **kwargs): + super().setup_data(**kwargs) cal = self.handler.fetch(col_set=self.handler.CS_RAW).index.get_level_values("datetime").unique() cal = sorted(cal) - # Get the datatime index for building timestamp self.cal = cal + def _prepare_seg(self, slc: slice, **kwargs) -> TSDataSampler: # Dataset decide how to slice data(Get more data for timeseries). diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 9aa05b9b9..712cd6232 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -6,6 +6,7 @@ import abc import bisect import logging import warnings +from inspect import getfullargspec from typing import Union, Tuple, List, Iterator, Optional import pandas as pd @@ -99,10 +100,10 @@ class DataHandler(Serializable): self.fetch_orig = fetch_orig if init_data: with TimeInspector.logt("Init data"): - self.init() + self.setup_data() super().__init__() - def conf_data(self, **kwargs): + def config(self, instruments=None, start_time=None, end_time=None, **kwargs): """ configuration of data. # what data to be loaded from data source @@ -111,14 +112,17 @@ class DataHandler(Serializable): The data will be initialized with different time range. """ - attr_list = {"instruments", "start_time", "end_time"} - for k, v in kwargs.items(): - if k in attr_list: - setattr(self, k, v) - - def init(self, enable_cache: bool = False): + super().config(**kwargs) + if instruments: + self.instruments = instruments + if start_time: + self.start_time = start_time + if end_time: + self.end_time = end_time + + def setup_data(self, enable_cache: bool = False): """ - initialize the data. + Set Up the data. In case of running intialization for multiple time, it will do nothing for the second time. It is responsible for maintaining following variable @@ -403,7 +407,7 @@ class DataHandlerLP(DataHandler): if self.drop_raw: del self._data - def conf_data(self, **kwargs): + def config(self, processors_kwargs:dict = None, **kwargs): """ configuration of data. # what data to be loaded from data source @@ -412,27 +416,19 @@ class DataHandlerLP(DataHandler): The data will be initialized with different time range. """ - attr_list = {"fit_start_time", "fit_end_time"} - for k, v in kwargs.items(): - if k in attr_list: - for infer_processor in self.infer_processors: - if getattr(infer_processor, k, None): - setattr(infer_processor, k, v) - - for learn_processor in self.learn_processors: - if getattr(learn_processor, k, None): - setattr(learn_processor, k, v) - - super().conf_data(**kwargs) + super().config(**kwargs) + if processors_kwargs is not None: + for processor in self.get_all_processors(): + processor.config(**processor_kwargs) # init type IT_FIT_SEQ = "fit_seq" # the input of `fit` will be the output of the previous processor IT_FIT_IND = "fit_ind" # the input of `fit` will be the original df IT_LS = "load_state" # The state of the object has been load by pickle - def init(self, init_type: str = IT_FIT_SEQ, enable_cache: bool = False): + def setup_data(self, init_type: str = IT_FIT_SEQ, **kwargs): """ - Initialize the data of Qlib + Set up the data of Qlib Parameters ---------- @@ -447,7 +443,7 @@ class DataHandlerLP(DataHandler): when we call `init` next time """ # init raw data - super().init(enable_cache=enable_cache) + super().setup_data(**kwargs) with TimeInspector.logt("fit & process data"): if init_type == DataHandlerLP.IT_FIT_IND: diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index 1cda5c025..a58bca5e8 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -53,7 +53,6 @@ class DataLoader(abc.ABC): """ pass - class DLWParser(DataLoader): """ (D)ata(L)oader (W)ith (P)arser for features and names diff --git a/qlib/data/dataset/processor.py b/qlib/data/dataset/processor.py index 5a06f66be..e14e85831 100755 --- a/qlib/data/dataset/processor.py +++ b/qlib/data/dataset/processor.py @@ -72,6 +72,9 @@ class Processor(Serializable): """ return True + def config(**kwargs): + super().config(kwargs.get("dump_all", None), kwargs.get("exclude", None)) + class DropnaProcessor(Processor): def __init__(self, fields_group=None): @@ -192,6 +195,12 @@ class MinMaxNorm(Processor): df.loc(axis=1)[self.cols] = normalize(df[self.cols].values) return df + def config(fit_start_time=None, fit_end_time=None, **kwargs): + if fit_start_time: + self.fit_start_time = fit_start_time + if fit_end_time: + self.fit_end_time = fit_end_time + super().config(**kwargs) class ZScoreNorm(Processor): """ZScore Normalization""" @@ -220,6 +229,13 @@ class ZScoreNorm(Processor): df.loc(axis=1)[self.cols] = normalize(df[self.cols].values) return df + + def config(fit_start_time=None, fit_end_time=None, **kwargs): + if fit_start_time: + self.fit_start_time = fit_start_time + if fit_end_time: + self.fit_end_time = fit_end_time + super().config(**kwargs) class RobustZScoreNorm(Processor): @@ -257,6 +273,12 @@ class RobustZScoreNorm(Processor): df.clip(-3, 3, inplace=True) return df + def config(fit_start_time=None, fit_end_time=None, **kwargs): + if fit_start_time: + self.fit_start_time = fit_start_time + if fit_end_time: + self.fit_end_time = fit_end_time + super().config(**kwargs) class CSZScoreNorm(Processor): """Cross Sectional ZScore Normalization""" From fb7f84f31e4e3b6a6e76cf496d97b6a62fe2fe04 Mon Sep 17 00:00:00 2001 From: bxdd Date: Mon, 29 Mar 2021 20:15:42 +0800 Subject: [PATCH 16/23] fix ubg --- examples/highfreq/highfreq_processor.py | 2 +- examples/highfreq/workflow.py | 2 +- examples/rolling_process_data/workflow.py | 14 +++++++++----- qlib/data/dataset/handler.py | 4 ++-- qlib/data/dataset/processor.py | 8 ++++---- 5 files changed, 17 insertions(+), 13 deletions(-) diff --git a/examples/highfreq/highfreq_processor.py b/examples/highfreq/highfreq_processor.py index 4ec8f3dd2..6ed68ff38 100644 --- a/examples/highfreq/highfreq_processor.py +++ b/examples/highfreq/highfreq_processor.py @@ -71,7 +71,7 @@ class HighFreqNorm(Processor): ).sort_index() return df_new_features - def config(fit_start_time=None, fit_end_time=None, **kwargs): + def config(self, fit_start_time=None, fit_end_time=None, **kwargs): if fit_start_time: self.fit_start_time = fit_start_time if fit_end_time: diff --git a/examples/highfreq/workflow.py b/examples/highfreq/workflow.py index 0b48b971f..97762f182 100644 --- a/examples/highfreq/workflow.py +++ b/examples/highfreq/workflow.py @@ -31,7 +31,7 @@ class HighfreqWorkflow(object): SPEC_CONF = {"custom_ops": [DayLast, FFillNan, BFillNan, Date, Select, IsNull, Cut], "expression_cache": None} - MARKET = "csi300" + MARKET = "all" start_time = "2020-09-15 00:00:00" end_time = "2021-01-18 16:00:00" diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index 0be88dddc..ffdd8329a 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -101,15 +101,16 @@ class RollingDataWorkflow(object): print(f"===========rolling{rolling_offset} start===========") if rolling_offset: - dataset.init( + dataset.config( handler_kwargs={ - "init_type": DataHandlerLP.IT_FIT_SEQ, "start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), "end_time": datetime(test_end_time[0] + rolling_offset, *test_end_time[1:]), - "fit_start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), - "fit_end_time": datetime(train_end_time[0] + rolling_offset, *train_end_time[1:]), + "processor_kwargs":{ + "fit_start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), + "fit_end_time": datetime(train_end_time[0] + rolling_offset, *train_end_time[1:]), + }, }, - segment_kwargs={ + segments={ "train": ( datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), datetime(train_end_time[0] + rolling_offset, *train_end_time[1:]), @@ -124,6 +125,9 @@ class RollingDataWorkflow(object): ), }, ) + dataset.setup_data( + handler_kwargs={"init_type": DataHandlerLP.IT_FIT_SEQ,} + ) dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) print(dtrain, dvalid, dtest) diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 712cd6232..4adef23a0 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -407,7 +407,7 @@ class DataHandlerLP(DataHandler): if self.drop_raw: del self._data - def config(self, processors_kwargs:dict = None, **kwargs): + def config(self, processor_kwargs:dict = None, **kwargs): """ configuration of data. # what data to be loaded from data source @@ -417,7 +417,7 @@ class DataHandlerLP(DataHandler): """ super().config(**kwargs) - if processors_kwargs is not None: + if processor_kwargs is not None: for processor in self.get_all_processors(): processor.config(**processor_kwargs) diff --git a/qlib/data/dataset/processor.py b/qlib/data/dataset/processor.py index e14e85831..5be178c5c 100755 --- a/qlib/data/dataset/processor.py +++ b/qlib/data/dataset/processor.py @@ -72,7 +72,7 @@ class Processor(Serializable): """ return True - def config(**kwargs): + def config(self, **kwargs): super().config(kwargs.get("dump_all", None), kwargs.get("exclude", None)) @@ -195,7 +195,7 @@ class MinMaxNorm(Processor): df.loc(axis=1)[self.cols] = normalize(df[self.cols].values) return df - def config(fit_start_time=None, fit_end_time=None, **kwargs): + def config(self, fit_start_time=None, fit_end_time=None, **kwargs): if fit_start_time: self.fit_start_time = fit_start_time if fit_end_time: @@ -230,7 +230,7 @@ class ZScoreNorm(Processor): df.loc(axis=1)[self.cols] = normalize(df[self.cols].values) return df - def config(fit_start_time=None, fit_end_time=None, **kwargs): + def config(self, fit_start_time=None, fit_end_time=None, **kwargs): if fit_start_time: self.fit_start_time = fit_start_time if fit_end_time: @@ -273,7 +273,7 @@ class RobustZScoreNorm(Processor): df.clip(-3, 3, inplace=True) return df - def config(fit_start_time=None, fit_end_time=None, **kwargs): + def config(self, fit_start_time=None, fit_end_time=None, **kwargs): if fit_start_time: self.fit_start_time = fit_start_time if fit_end_time: From 8743576f7238003530ae55e78fa50554d8d6ba33 Mon Sep 17 00:00:00 2001 From: bxdd Date: Mon, 29 Mar 2021 20:16:00 +0800 Subject: [PATCH 17/23] black format --- examples/highfreq/highfreq_processor.py | 2 +- examples/highfreq/workflow.py | 4 ++-- examples/rolling_process_data/workflow.py | 6 ++++-- qlib/data/dataset/__init__.py | 14 +++++--------- qlib/data/dataset/handler.py | 4 ++-- qlib/data/dataset/loader.py | 1 + qlib/data/dataset/processor.py | 4 +++- 7 files changed, 18 insertions(+), 17 deletions(-) diff --git a/examples/highfreq/highfreq_processor.py b/examples/highfreq/highfreq_processor.py index 6ed68ff38..d843c6ac0 100644 --- a/examples/highfreq/highfreq_processor.py +++ b/examples/highfreq/highfreq_processor.py @@ -70,7 +70,7 @@ class HighFreqNorm(Processor): columns=["FEATURE_%d" % i for i in range(12 * 240)], ).sort_index() return df_new_features - + def config(self, fit_start_time=None, fit_end_time=None, **kwargs): if fit_start_time: self.fit_start_time = fit_start_time diff --git a/examples/highfreq/workflow.py b/examples/highfreq/workflow.py index 97762f182..94c9b689f 100644 --- a/examples/highfreq/workflow.py +++ b/examples/highfreq/workflow.py @@ -177,8 +177,8 @@ class HighfreqWorkflow(object): dataset_backtest.setup_data(handler_kwargs={}) ##=============get data============= - xtest, = dataset.prepare(["test"]) - backtest_test, = dataset_backtest.prepare(["test"]) + (xtest,) = dataset.prepare(["test"]) + (backtest_test,) = dataset_backtest.prepare(["test"]) print(xtest, backtest_test) return diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index ffdd8329a..02f43889d 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -105,7 +105,7 @@ class RollingDataWorkflow(object): handler_kwargs={ "start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), "end_time": datetime(test_end_time[0] + rolling_offset, *test_end_time[1:]), - "processor_kwargs":{ + "processor_kwargs": { "fit_start_time": datetime(train_start_time[0] + rolling_offset, *train_start_time[1:]), "fit_end_time": datetime(train_end_time[0] + rolling_offset, *train_end_time[1:]), }, @@ -126,7 +126,9 @@ class RollingDataWorkflow(object): }, ) dataset.setup_data( - handler_kwargs={"init_type": DataHandlerLP.IT_FIT_SEQ,} + handler_kwargs={ + "init_type": DataHandlerLP.IT_FIT_SEQ, + } ) dtrain, dvalid, dtest = dataset.prepare(["train", "valid", "test"]) diff --git a/qlib/data/dataset/__init__.py b/qlib/data/dataset/__init__.py index aa90cee2f..d8a9e0209 100644 --- a/qlib/data/dataset/__init__.py +++ b/qlib/data/dataset/__init__.py @@ -35,7 +35,7 @@ class Dataset(Serializable): def config(self, *arg, **kwargs): """ - config is designed to configure and parameters that cannot be learned from the data + config is designed to configure and parameters that cannot be learned from the data """ super().config(*arg, **kwargs) @@ -117,7 +117,7 @@ class DatasetH(Dataset): self.segments = segments.copy() super().__init__(**kwargs) - def config(self, handler_kwargs:dict = None, segments:dict = None, **kwargs): + def config(self, handler_kwargs: dict = None, segments: dict = None, **kwargs): """ Initialize the DatasetH @@ -130,7 +130,7 @@ class DatasetH(Dataset): kwargs : dict Config of DatasetH, such as - + - segments : dict Config of segments which is same as 'segments' in self.__init__ @@ -141,8 +141,6 @@ class DatasetH(Dataset): if segments is not None: self.segments = segments.copy() - - def setup_data(self, handler_kwargs: dict = None, **kwargs): """ Setup the Data @@ -151,16 +149,15 @@ class DatasetH(Dataset): ---------- handler_kwargs : dict init arguments of DataHanlder, which could include the following arguments: - + - init_type : Init Type of Handler - + - enable_cache : wheter to enable cache """ super().setup_data(**kwargs) if handler_kwargs is not None: self.handler.setup_data(**handler_kwargs) - def __repr__(self): return "{name}(handler={handler}, segments={segments})".format( @@ -464,7 +461,6 @@ class TSDatasetH(DatasetH): cal = self.handler.fetch(col_set=self.handler.CS_RAW).index.get_level_values("datetime").unique() cal = sorted(cal) self.cal = cal - def _prepare_seg(self, slc: slice, **kwargs) -> TSDataSampler: # Dataset decide how to slice data(Get more data for timeseries). diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 4adef23a0..2190deeb1 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -119,7 +119,7 @@ class DataHandler(Serializable): self.start_time = start_time if end_time: self.end_time = end_time - + def setup_data(self, enable_cache: bool = False): """ Set Up the data. @@ -407,7 +407,7 @@ class DataHandlerLP(DataHandler): if self.drop_raw: del self._data - def config(self, processor_kwargs:dict = None, **kwargs): + def config(self, processor_kwargs: dict = None, **kwargs): """ configuration of data. # what data to be loaded from data source diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index a58bca5e8..1cda5c025 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -53,6 +53,7 @@ class DataLoader(abc.ABC): """ pass + class DLWParser(DataLoader): """ (D)ata(L)oader (W)ith (P)arser for features and names diff --git a/qlib/data/dataset/processor.py b/qlib/data/dataset/processor.py index 5be178c5c..d25d36c88 100755 --- a/qlib/data/dataset/processor.py +++ b/qlib/data/dataset/processor.py @@ -202,6 +202,7 @@ class MinMaxNorm(Processor): self.fit_end_time = fit_end_time super().config(**kwargs) + class ZScoreNorm(Processor): """ZScore Normalization""" @@ -229,7 +230,7 @@ class ZScoreNorm(Processor): df.loc(axis=1)[self.cols] = normalize(df[self.cols].values) return df - + def config(self, fit_start_time=None, fit_end_time=None, **kwargs): if fit_start_time: self.fit_start_time = fit_start_time @@ -280,6 +281,7 @@ class RobustZScoreNorm(Processor): self.fit_end_time = fit_end_time super().config(**kwargs) + class CSZScoreNorm(Processor): """Cross Sectional ZScore Normalization""" From d18c3674974dfa3593424418e53d167247dadf74 Mon Sep 17 00:00:00 2001 From: bxdd Date: Mon, 29 Mar 2021 20:34:36 +0800 Subject: [PATCH 18/23] update README --- examples/rolling_process_data/README.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/examples/rolling_process_data/README.md b/examples/rolling_process_data/README.md index 6a6af0d3d..b04f5ed7f 100644 --- a/examples/rolling_process_data/README.md +++ b/examples/rolling_process_data/README.md @@ -1 +1,17 @@ # Rolling Process Data + +This workflow is an example for `Rolling Process Data`. + +## Background + +When rolling train the models, data also needs to be generated in the different rolling windows. When the rolling window moves, the training data will also change, and the processor's learnable state (such as standard deviation, mean, etc.) will also be changed. + +In order to avoid regenerating data, this example uses the `DataHandler-based DataLoader` to load the raw features that are not related to the rolling window, and then used Processors to generate processed-features related to the sliding window. + + +### Run the Code + +Run the example by running the following command: +```bash + python workflow.py rolling_process +``` \ No newline at end of file From 1074284666113389cbcb6c5707f59e5c69f07f99 Mon Sep 17 00:00:00 2001 From: bxdd Date: Mon, 29 Mar 2021 20:38:09 +0800 Subject: [PATCH 19/23] fix docstring --- qlib/data/dataset/__init__.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/qlib/data/dataset/__init__.py b/qlib/data/dataset/__init__.py index d8a9e0209..668ea833b 100644 --- a/qlib/data/dataset/__init__.py +++ b/qlib/data/dataset/__init__.py @@ -20,9 +20,7 @@ class Dataset(Serializable): """ init is designed to finish following steps: - - init instance - - - config the state of the dataset(info to prepare the data) + - init the sub instance and the state of the dataset(info to prepare the data) - The name of essential state for preparing data should not start with '_' so that it could be serialized on disk when serializing. - setup data From 136830bc2bf8281838d96c22fb0cdd45e93ae16b Mon Sep 17 00:00:00 2001 From: bxdd Date: Tue, 30 Mar 2021 00:38:15 +0800 Subject: [PATCH 20/23] update comments --- examples/highfreq/highfreq_processor.py | 7 ----- examples/highfreq/workflow.py | 6 ++--- examples/rolling_process_data/workflow.py | 2 +- qlib/data/dataset/__init__.py | 27 ++++++++++---------- qlib/data/dataset/handler.py | 17 ++++++++----- qlib/data/dataset/loader.py | 2 +- qlib/data/dataset/processor.py | 31 +++++++---------------- 7 files changed, 38 insertions(+), 54 deletions(-) diff --git a/examples/highfreq/highfreq_processor.py b/examples/highfreq/highfreq_processor.py index d843c6ac0..f0ab0dec2 100644 --- a/examples/highfreq/highfreq_processor.py +++ b/examples/highfreq/highfreq_processor.py @@ -70,10 +70,3 @@ class HighFreqNorm(Processor): columns=["FEATURE_%d" % i for i in range(12 * 240)], ).sort_index() return df_new_features - - def config(self, fit_start_time=None, fit_end_time=None, **kwargs): - if fit_start_time: - self.fit_start_time = fit_start_time - if fit_end_time: - self.fit_end_time = fit_end_time - super().config(**kwargs) diff --git a/examples/highfreq/workflow.py b/examples/highfreq/workflow.py index 94c9b689f..5660ab2e9 100644 --- a/examples/highfreq/workflow.py +++ b/examples/highfreq/workflow.py @@ -27,7 +27,7 @@ from qlib.tests.data import GetData from highfreq_ops import get_calendar_day, DayLast, FFillNan, BFillNan, Date, Select, IsNull, Cut -class HighfreqWorkflow(object): +class HighfreqWorkflow: SPEC_CONF = {"custom_ops": [DayLast, FFillNan, BFillNan, Date, Select, IsNull, Cut], "expression_cache": None} @@ -177,8 +177,8 @@ class HighfreqWorkflow(object): dataset_backtest.setup_data(handler_kwargs={}) ##=============get data============= - (xtest,) = dataset.prepare(["test"]) - (backtest_test,) = dataset_backtest.prepare(["test"]) + xtest = dataset.prepare("test") + backtest_test = dataset_backtest.prepare("test") print(xtest, backtest_test) return diff --git a/examples/rolling_process_data/workflow.py b/examples/rolling_process_data/workflow.py index 02f43889d..5757aaa87 100644 --- a/examples/rolling_process_data/workflow.py +++ b/examples/rolling_process_data/workflow.py @@ -14,7 +14,7 @@ from qlib.utils import exists_qlib_data, init_instance_by_config from qlib.tests.data import GetData -class RollingDataWorkflow(object): +class RollingDataWorkflow: MARKET = "csi300" start_time = "2010-01-01" diff --git a/qlib/data/dataset/__init__.py b/qlib/data/dataset/__init__.py index 668ea833b..b3eaac7a3 100644 --- a/qlib/data/dataset/__init__.py +++ b/qlib/data/dataset/__init__.py @@ -3,6 +3,7 @@ from typing import Union, List, Tuple, Dict, Text, Optional from ...utils import init_instance_by_config, np_ffill from ...log import get_module_logger from .handler import DataHandler, DataHandlerLP +from copy import deepcopy from inspect import getfullargspec import pandas as pd import numpy as np @@ -16,7 +17,7 @@ class Dataset(Serializable): Preparing data for model training and inferencing. """ - def __init__(self, *args, **kwargs): + def __init__(self, **kwargs): """ init is designed to finish following steps: @@ -28,16 +29,16 @@ class Dataset(Serializable): The data could specify the info to caculate the essential data for preparation """ - self.setup_data(*args, **kwargs) + self.setup_data(**kwargs) super().__init__() - def config(self, *arg, **kwargs): + def config(self, **kwargs): """ config is designed to configure and parameters that cannot be learned from the data """ - super().config(*arg, **kwargs) + super().config(**kwargs) - def setup_data(self, *args, **kwargs): + def setup_data(self, **kwargs): """ Setup the data. @@ -53,7 +54,7 @@ class Dataset(Serializable): """ pass - def prepare(self, *args, **kwargs) -> object: + def prepare(self, **kwargs) -> object: """ The type of dataset depends on the model. (It could be pd.DataFrame, pytorch.DataLoader, etc.) The parameters should specify the scope for the prepared data @@ -115,7 +116,7 @@ class DatasetH(Dataset): self.segments = segments.copy() super().__init__(**kwargs) - def config(self, handler_kwargs: dict = None, segments: dict = None, **kwargs): + def config(self, handler_kwargs: dict = None, **kwargs): """ Initialize the DatasetH @@ -133,11 +134,11 @@ class DatasetH(Dataset): Config of segments which is same as 'segments' in self.__init__ """ - super().config(**kwargs) if handler_kwargs is not None: self.handler.config(**handler_kwargs) - if segments is not None: - self.segments = segments.copy() + if "segments" in kwargs: + self.segments = deepcopy(kwargs.pop("segments")) + super().config(**kwargs) def setup_data(self, handler_kwargs: dict = None, **kwargs): """ @@ -449,10 +450,10 @@ class TSDatasetH(DatasetH): self.step_len = step_len super().__init__(**kwargs) - def config(self, step_len=None, **kwargs): + def config(self, **kwargs): + if "step_len" in kwargs: + self.step_len = kwargs.pop("step_len") super().config(**kwargs) - if step_len: - self.step_len = step_len def setup_data(self, **kwargs): super().setup_data(**kwargs) diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 2190deeb1..7fb7090d2 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -103,7 +103,7 @@ class DataHandler(Serializable): self.setup_data() super().__init__() - def config(self, instruments=None, start_time=None, end_time=None, **kwargs): + def config(self, **kwargs): """ configuration of data. # what data to be loaded from data source @@ -112,13 +112,16 @@ class DataHandler(Serializable): The data will be initialized with different time range. """ + attr_list = {"instruments", "start_time", "end_time"} + for k, v in kwargs.items(): + if k in attr_list: + setattr(self, k, v) + + for attr in attr_list: + if attr in kwargs: + kwargs.pop(attr) + super().config(**kwargs) - if instruments: - self.instruments = instruments - if start_time: - self.start_time = start_time - if end_time: - self.end_time = end_time def setup_data(self, enable_cache: bool = False): """ diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index 1cda5c025..58aca1d4f 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -261,7 +261,7 @@ class DataLoaderDH(DataLoader): self.is_group = is_group self.fetch_kwargs = {"col_set": DataHandler.CS_RAW} - self.fetch_kwargs = {**self.fetch_kwargs, **fetch_kwargs} + self.fetch_kwargs.update(fetch_kwargs) def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame: if instruments is not None: diff --git a/qlib/data/dataset/processor.py b/qlib/data/dataset/processor.py index d25d36c88..8f69a5dff 100755 --- a/qlib/data/dataset/processor.py +++ b/qlib/data/dataset/processor.py @@ -73,7 +73,15 @@ class Processor(Serializable): return True def config(self, **kwargs): - super().config(kwargs.get("dump_all", None), kwargs.get("exclude", None)) + attr_list = {"fit_start_time", "fit_end_time"} + for k, v in kwargs.items(): + if k in attr_list and getattr(self, k, None) is not None: + setattr(self, k, v) + + for attr in attr_list: + if attr in kwargs: + kwargs.pop(attr) + super().config(**kwargs) class DropnaProcessor(Processor): @@ -195,13 +203,6 @@ class MinMaxNorm(Processor): df.loc(axis=1)[self.cols] = normalize(df[self.cols].values) return df - def config(self, fit_start_time=None, fit_end_time=None, **kwargs): - if fit_start_time: - self.fit_start_time = fit_start_time - if fit_end_time: - self.fit_end_time = fit_end_time - super().config(**kwargs) - class ZScoreNorm(Processor): """ZScore Normalization""" @@ -231,13 +232,6 @@ class ZScoreNorm(Processor): df.loc(axis=1)[self.cols] = normalize(df[self.cols].values) return df - def config(self, fit_start_time=None, fit_end_time=None, **kwargs): - if fit_start_time: - self.fit_start_time = fit_start_time - if fit_end_time: - self.fit_end_time = fit_end_time - super().config(**kwargs) - class RobustZScoreNorm(Processor): """Robust ZScore Normalization @@ -274,13 +268,6 @@ class RobustZScoreNorm(Processor): df.clip(-3, 3, inplace=True) return df - def config(self, fit_start_time=None, fit_end_time=None, **kwargs): - if fit_start_time: - self.fit_start_time = fit_start_time - if fit_end_time: - self.fit_end_time = fit_end_time - super().config(**kwargs) - class CSZScoreNorm(Processor): """Cross Sectional ZScore Normalization""" From f8da79b802d617234f6ae20bea2ae2bc771c39a9 Mon Sep 17 00:00:00 2001 From: bxdd Date: Tue, 30 Mar 2021 00:54:00 +0800 Subject: [PATCH 21/23] fix readme --- examples/rolling_process_data/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/rolling_process_data/README.md b/examples/rolling_process_data/README.md index b04f5ed7f..c84eaac20 100644 --- a/examples/rolling_process_data/README.md +++ b/examples/rolling_process_data/README.md @@ -9,7 +9,7 @@ When rolling train the models, data also needs to be generated in the different In order to avoid regenerating data, this example uses the `DataHandler-based DataLoader` to load the raw features that are not related to the rolling window, and then used Processors to generate processed-features related to the sliding window. -### Run the Code +## Run the Code Run the example by running the following command: ```bash From 023603479c5e451671d2c68fcec65574ec847fe7 Mon Sep 17 00:00:00 2001 From: bxdd Date: Tue, 30 Mar 2021 01:00:12 +0800 Subject: [PATCH 22/23] fix readme --- examples/rolling_process_data/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/rolling_process_data/README.md b/examples/rolling_process_data/README.md index c84eaac20..315fe2eed 100644 --- a/examples/rolling_process_data/README.md +++ b/examples/rolling_process_data/README.md @@ -4,9 +4,9 @@ This workflow is an example for `Rolling Process Data`. ## Background -When rolling train the models, data also needs to be generated in the different rolling windows. When the rolling window moves, the training data will also change, and the processor's learnable state (such as standard deviation, mean, etc.) will also be changed. +When rolling train the models, data also needs to be generated in the different rolling windows. When the rolling window moves, the training data will change, and the processor's learnable state (such as standard deviation, mean, etc.) will also change. -In order to avoid regenerating data, this example uses the `DataHandler-based DataLoader` to load the raw features that are not related to the rolling window, and then used Processors to generate processed-features related to the sliding window. +In order to avoid regenerating data, this example uses the `DataHandler-based DataLoader` to load the raw features that are not related to the rolling window, and then used Processors to generate processed-features related to the rolling window. ## Run the Code From 7a2203f116bd79338481ffe439ad389b247c0e03 Mon Sep 17 00:00:00 2001 From: bxdd Date: Tue, 30 Mar 2021 11:03:07 +0800 Subject: [PATCH 23/23] update comments --- qlib/data/dataset/handler.py | 5 ++--- qlib/data/dataset/processor.py | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 7fb7090d2..201d2459d 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -125,8 +125,7 @@ class DataHandler(Serializable): def setup_data(self, enable_cache: bool = False): """ - Set Up the data. - In case of running intialization for multiple time, it will do nothing for the second time. + Set Up the data in case of running intialization for multiple time It is responsible for maintaining following variable 1) self._data @@ -431,7 +430,7 @@ class DataHandlerLP(DataHandler): def setup_data(self, init_type: str = IT_FIT_SEQ, **kwargs): """ - Set up the data of Qlib + Set up the data in case of running intialization for multiple time Parameters ---------- diff --git a/qlib/data/dataset/processor.py b/qlib/data/dataset/processor.py index 8f69a5dff..e035f5624 100755 --- a/qlib/data/dataset/processor.py +++ b/qlib/data/dataset/processor.py @@ -75,7 +75,7 @@ class Processor(Serializable): def config(self, **kwargs): attr_list = {"fit_start_time", "fit_end_time"} for k, v in kwargs.items(): - if k in attr_list and getattr(self, k, None) is not None: + if k in attr_list and hasattr(self, k): setattr(self, k, v) for attr in attr_list: