From 32c3070b73feea6432609393761df75582c6f9b6 Mon Sep 17 00:00:00 2001 From: you-n-g Date: Fri, 7 Apr 2023 15:00:21 +0800 Subject: [PATCH] Refine DDG-DA (#1472) * Run ddg-da successfully * Support include valid; More parameters * Support L2 reg & visualization * Blackformat * Enable fill_method * Support specify handler & optim dataset * Fix Pylint --- .../benchmarks_dynamic/DDG-DA/vis_data.py | 107 ++++++++++++++++++ .../benchmarks_dynamic/DDG-DA/workflow.py | 73 +++++++++--- .../baseline/rolling_benchmark.py | 53 ++++++++- qlib/contrib/meta/data_selection/dataset.py | 90 +++++++++++++-- qlib/contrib/meta/data_selection/model.py | 8 +- qlib/contrib/meta/data_selection/net.py | 11 +- qlib/contrib/meta/data_selection/utils.py | 16 ++- qlib/contrib/model/linear.py | 13 ++- qlib/data/dataset/handler.py | 23 ++++ qlib/data/dataset/utils.py | 8 +- qlib/utils/__init__.py | 1 + qlib/utils/data.py | 51 ++++++++- tests/data_mid_layer_tests/README.md | 5 + .../test_dataset.py | 0 tests/data_mid_layer_tests/test_handler.py | 37 ++++++ .../test_handler_storage.py | 0 .../test_processor.py | 0 17 files changed, 457 insertions(+), 39 deletions(-) create mode 100644 examples/benchmarks_dynamic/DDG-DA/vis_data.py create mode 100644 tests/data_mid_layer_tests/README.md rename tests/{ => data_mid_layer_tests}/test_dataset.py (100%) create mode 100644 tests/data_mid_layer_tests/test_handler.py rename tests/{ => data_mid_layer_tests}/test_handler_storage.py (100%) rename tests/{ => data_mid_layer_tests}/test_processor.py (100%) diff --git a/examples/benchmarks_dynamic/DDG-DA/vis_data.py b/examples/benchmarks_dynamic/DDG-DA/vis_data.py new file mode 100644 index 000000000..45a49ac62 --- /dev/null +++ b/examples/benchmarks_dynamic/DDG-DA/vis_data.py @@ -0,0 +1,107 @@ +import pickle +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt +import seaborn as sns + +sns.set(color_codes=True) +plt.rcParams["font.sans-serif"] = "SimHei" +plt.rcParams["axes.unicode_minus"] = False +from tqdm.auto import tqdm + +# tqdm.pandas() # for progress_apply +# %matplotlib inline +# %load_ext autoreload + + +# # Meta Input + +# + +with open("./internal_data_s20.pkl", "rb") as f: + data = pickle.load(f) + +data.data_ic_df.columns.names = ["start_date", "end_date"] + +data_sim = data.data_ic_df.droplevel(axis=1, level="end_date") + +data_sim.index.name = "test datetime" +# - + +plt.figure(figsize=(40, 20)) +sns.heatmap(data_sim) + +plt.figure(figsize=(40, 20)) +sns.heatmap(data_sim.rolling(20).mean()) + +# # Meta Model + +from qlib import auto_init + +auto_init() +from qlib.workflow import R + +exp = R.get_exp(experiment_name="DDG-DA") +meta_rec = exp.list_recorders(rtype="list", max_results=1)[0] +meta_m = meta_rec.load_object("model") + +pd.DataFrame(meta_m.tn.twm.linear.weight.detach().numpy()).T[0].plot() + +pd.DataFrame(meta_m.tn.twm.linear.weight.detach().numpy()).T[0].rolling(5).mean().plot() + +# # Meta Output + +# + +with open("./tasks_s20.pkl", "rb") as f: + tasks = pickle.load(f) + +task_df = {} +for t in tasks: + test_seg = t["dataset"]["kwargs"]["segments"]["test"] + if None not in test_seg: + # The last rolling is skipped. + task_df[test_seg] = t["reweighter"].time_weight +task_df = pd.concat(task_df) + +task_df.index.names = ["OS_start", "OS_end", "IS_start", "IS_end"] +task_df = task_df.droplevel(["OS_end", "IS_end"]) +task_df = task_df.unstack("OS_start") +# - + +plt.figure(figsize=(40, 20)) +sns.heatmap(task_df.T) + +plt.figure(figsize=(40, 20)) +sns.heatmap(task_df.rolling(10).mean().T) + +# # Sub Models +# +# NOTE: +# - this section assumes that the model is Linear model!! +# - Other models does not support this analysis + +exp = R.get_exp(experiment_name="rolling_ds") + + +def show_linear_weight(exp): + coef_df = {} + for r in exp.list_recorders("list"): + t = r.load_object("task") + if None in t["dataset"]["kwargs"]["segments"]["test"]: + continue + m = r.load_object("params.pkl") + coef_df[t["dataset"]["kwargs"]["segments"]["test"]] = pd.Series(m.coef_) + + coef_df = pd.concat(coef_df) + + coef_df.index.names = ["test_start", "test_end", "coef_idx"] + + coef_df = coef_df.droplevel("test_end").unstack("coef_idx").T + + plt.figure(figsize=(40, 20)) + sns.heatmap(coef_df) + plt.show() + + +show_linear_weight(R.get_exp(experiment_name="rolling_ds")) + +show_linear_weight(R.get_exp(experiment_name="rolling_models")) diff --git a/examples/benchmarks_dynamic/DDG-DA/workflow.py b/examples/benchmarks_dynamic/DDG-DA/workflow.py index 48ea9bdb3..b69107549 100644 --- a/examples/benchmarks_dynamic/DDG-DA/workflow.py +++ b/examples/benchmarks_dynamic/DDG-DA/workflow.py @@ -10,8 +10,10 @@ import pandas as pd import fire import sys import pickle +from typing import Optional from qlib import auto_init from qlib.model.trainer import TrainerR +from qlib.typehint import Literal from qlib.utils import init_instance_by_config from qlib.workflow import R from qlib.tests.data import GetData @@ -30,7 +32,33 @@ class DDGDA: - `rm -r mlruns` """ - def __init__(self, sim_task_model="linear", forecast_model="linear"): + def __init__( + self, + sim_task_model: Literal["linear", "gbdt"] = "linear", + forecast_model: Literal["linear", "gbdt"] = "linear", + h_path: Optional[str] = None, + test_end: Optional[str] = None, + train_start: Optional[str] = None, + meta_1st_train_end: Optional[str] = None, + task_ext_conf: Optional[dict] = None, + alpha: float = 0.0, + proxy_hd: str = "handler_proxy.pkl", + ): + """ + + Parameters + ---------- + + train_start: Optional[str] + the start datetime for data. It is used in training start time (for both tasks & meta learing) + test_end: Optional[str] + the end datetime for data. It is used in test end time + meta_1st_train_end: Optional[str] + the datetime of training end of the first meta_task + alpha: float + Setting the L2 regularization for ridge + The `alpha` is only passed to MetaModelDS (it is not passed to sim_task_model currently..) + """ self.step = 20 # NOTE: # the horizon must match the meaning in the base task template @@ -38,10 +66,19 @@ class DDGDA: self.meta_exp_name = "DDG-DA" self.sim_task_model = sim_task_model # The model to capture the distribution of data. self.forecast_model = forecast_model # downstream forecasting models' type + self.rb_kwargs = { + "h_path": h_path, + "test_end": test_end, + "train_start": train_start, + "task_ext_conf": task_ext_conf, + } + self.alpha = alpha + self.meta_1st_train_end = meta_1st_train_end + self.proxy_hd = proxy_hd def get_feature_importance(self): # this must be lightGBM, because it needs to get the feature importance - rb = RollingBenchmark(model_type="gbdt") + rb = RollingBenchmark(model_type="gbdt", **self.rb_kwargs) task = rb.basic_task() with R.start(experiment_name="feature_importance"): @@ -69,7 +106,7 @@ class DDGDA: fi = self.get_feature_importance() col_selected = fi.nlargest(topk) - rb = RollingBenchmark(model_type=self.sim_task_model) + rb = RollingBenchmark(model_type=self.sim_task_model, **self.rb_kwargs) task = rb.basic_task() dataset = init_instance_by_config(task["dataset"]) prep_ds = dataset.prepare(slice(None), col_set=["feature", "label"], data_key=DataHandlerLP.DK_L) @@ -96,7 +133,7 @@ class DDGDA: "kwargs": {"config": DIRNAME / "fea_label_df.pkl"}, } ) - handler.to_pickle(DIRNAME / "handler_proxy.pkl", dump_all=True) + handler.to_pickle(DIRNAME / self.proxy_hd, dump_all=True) @property def _internal_data_path(self): @@ -108,7 +145,7 @@ class DDGDA: This function will dump the input data for meta model """ # According to the experiments, the choice of the model type is very important for achieving good results - rb = RollingBenchmark(model_type=self.sim_task_model) + rb = RollingBenchmark(model_type=self.sim_task_model, **self.rb_kwargs) sim_task = rb.basic_task() if self.sim_task_model == "gbdt": @@ -122,24 +159,27 @@ class DDGDA: with self._internal_data_path.open("wb") as f: pickle.dump(internal_data, f) - def train_meta_model(self): + def train_meta_model(self, fill_method="max"): """ training a meta model based on a simplified linear proxy model; """ # 1) leverage the simplified proxy forecasting model to train meta model. # - Only the dataset part is important, in current version of meta model will integrate the - rb = RollingBenchmark(model_type=self.sim_task_model) + rb = RollingBenchmark(model_type=self.sim_task_model, **self.rb_kwargs) sim_task = rb.basic_task() + train_start = self.rb_kwargs.get("train_start", "2008-01-01") + train_end = "2010-12-31" if self.meta_1st_train_end is None else self.meta_1st_train_end + test_start = (pd.Timestamp(train_end) + pd.Timedelta(days=1)).strftime("%Y-%m-%d") proxy_forecast_model_task = { # "model": "qlib.contrib.model.linear.LinearModel", "dataset": { "class": "qlib.data.dataset.DatasetH", "kwargs": { - "handler": f"file://{(DIRNAME / 'handler_proxy.pkl').absolute()}", + "handler": f"file://{(DIRNAME / self.proxy_hd).absolute()}", "segments": { - "train": ("2008-01-01", "2010-12-31"), - "test": ("2011-01-01", sim_task["dataset"]["kwargs"]["segments"]["test"][1]), + "train": (train_start, train_end), + "test": (test_start, sim_task["dataset"]["kwargs"]["segments"]["test"][1]), }, }, }, @@ -156,7 +196,7 @@ class DDGDA: segments=0.62, # keep test period consistent with the dataset yaml trunc_days=1 + self.horizon, hist_step_n=30, - fill_method="max", + fill_method=fill_method, rolling_ext_days=0, ) # NOTE: @@ -165,12 +205,15 @@ class DDGDA: # So the misalignment will not affect the effectiveness of the method. with self._internal_data_path.open("rb") as f: internal_data = pickle.load(f) + md = MetaDatasetDS(exp_name=internal_data, **kwargs) # 3) train and logging meta model with R.start(experiment_name=self.meta_exp_name): R.log_params(**kwargs) - mm = MetaModelDS(step=self.step, hist_step_n=kwargs["hist_step_n"], lr=0.001, max_epoch=100, seed=43) + mm = MetaModelDS( + step=self.step, hist_step_n=kwargs["hist_step_n"], lr=0.001, max_epoch=100, seed=43, alpha=self.alpha + ) mm.fit(md) R.save_objects(model=mm) @@ -203,7 +246,7 @@ class DDGDA: hist_step_n = int(param["hist_step_n"]) fill_method = param.get("fill_method", "max") - rb = RollingBenchmark(model_type=self.forecast_model) + rb = RollingBenchmark(model_type=self.forecast_model, **self.rb_kwargs) task_l = rb.create_rolling_tasks() # 2.2) create meta dataset for final dataset @@ -233,13 +276,13 @@ class DDGDA: """ with self._task_path.open("rb") as f: tasks = pickle.load(f) - rb = RollingBenchmark(rolling_exp="rolling_ds", model_type=self.forecast_model) + rb = RollingBenchmark(rolling_exp="rolling_ds", model_type=self.forecast_model, **self.rb_kwargs) rb.train_rolling_tasks(tasks) rb.ens_rolling() rb.update_rolling_rec() def run_all(self): - # 1) file: handler_proxy.pkl + # 1) file: handler_proxy.pkl (self.proxy_hd) self.dump_data_for_proxy_model() # 2) # file: internal_data_s20.pkl diff --git a/examples/benchmarks_dynamic/baseline/rolling_benchmark.py b/examples/benchmarks_dynamic/baseline/rolling_benchmark.py index c192cd4cd..d452957d4 100644 --- a/examples/benchmarks_dynamic/baseline/rolling_benchmark.py +++ b/examples/benchmarks_dynamic/baseline/rolling_benchmark.py @@ -1,13 +1,17 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +from typing import Optional from qlib.model.ens.ensemble import RollingEnsemble from qlib.utils import init_instance_by_config import fire import yaml +import pandas as pd from qlib import auto_init from pathlib import Path from tqdm.auto import tqdm from qlib.model.trainer import TrainerR +from qlib.log import get_module_logger +from qlib.utils.data import update_config from qlib.workflow import R from qlib.tests.data import GetData @@ -25,11 +29,40 @@ class RollingBenchmark: """ - def __init__(self, rolling_exp="rolling_models", model_type="linear") -> None: + def __init__( + self, + rolling_exp: str = "rolling_models", + model_type: str = "linear", + h_path: Optional[str] = None, + train_start: Optional[str] = None, + test_end: Optional[str] = None, + task_ext_conf: Optional[dict] = None, + ) -> None: + """ + Parameters + ---------- + rolling_exp : str + The name for the experiments for rolling + model_type : str + The model to be boosted. + h_path : Optional[str] + the dumped data handler; + test_end : Optional[str] + the test end for the data. It is typically used together with the handler + train_start : Optional[str] + the train start for the data. It is typically used together with the handler. + task_ext_conf : Optional[dict] + some option to update the + """ self.step = 20 self.horizon = 20 self.rolling_exp = rolling_exp self.model_type = model_type + self.h_path = h_path + self.train_start = train_start + self.test_end = test_end + self.logger = get_module_logger("RollingBenchmark") + self.task_ext_conf = task_ext_conf def basic_task(self): """For fast training rolling""" @@ -42,6 +75,10 @@ class RollingBenchmark: h_path = DIRNAME / "linear_alpha158_handler_horizon{}.pkl".format(self.horizon) else: raise AssertionError("Model type is not supported!") + + if self.h_path is not None: + h_path = Path(self.h_path) + with conf_path.open("r") as f: conf = yaml.safe_load(f) @@ -52,6 +89,9 @@ class RollingBenchmark: task = conf["task"] + if self.task_ext_conf is not None: + task = update_config(task, self.task_ext_conf) + if not h_path.exists(): h_conf = task["dataset"]["kwargs"]["handler"] h = init_instance_by_config(h_conf) @@ -59,6 +99,15 @@ class RollingBenchmark: task["dataset"]["kwargs"]["handler"] = f"file://{h_path}" task["record"] = ["qlib.workflow.record_temp.SignalRecord"] + + if self.train_start is not None: + seg = task["dataset"]["kwargs"]["segments"]["train"] + task["dataset"]["kwargs"]["segments"]["train"] = pd.Timestamp(self.train_start), seg[1] + + if self.test_end is not None: + seg = task["dataset"]["kwargs"]["segments"]["test"] + task["dataset"]["kwargs"]["segments"]["test"] = seg[0], pd.Timestamp(self.test_end) + self.logger.info(task) return task def create_rolling_tasks(self): @@ -93,7 +142,7 @@ class RollingBenchmark: """ Evaluate the combined rolling results """ - for rid, rec in R.list_recorders(experiment_name=self.COMB_EXP).items(): + for _, rec in R.list_recorders(experiment_name=self.COMB_EXP).items(): for rt_cls in SigAnaRecord, PortAnaRecord: rt = rt_cls(recorder=rec, skip_existing=True) rt.generate() diff --git a/qlib/contrib/meta/data_selection/dataset.py b/qlib/contrib/meta/data_selection/dataset.py index 2c6fe2eae..e3689d964 100644 --- a/qlib/contrib/meta/data_selection/dataset.py +++ b/qlib/contrib/meta/data_selection/dataset.py @@ -55,8 +55,10 @@ class InternalData: # The handler is initialized for only once. if not trainer.has_worker(): self.dh = init_task_handler(perf_task_tpl) + self.dh.config(dump_all=False) # in some cases, the data handler are saved to disk with `dump_all=True` else: self.dh = init_instance_by_config(perf_task_tpl["dataset"]["kwargs"]["handler"]) + assert self.dh.dump_all is False # otherwise, it will save all the detailed data seg = perf_task_tpl["dataset"]["kwargs"]["segments"] @@ -77,7 +79,7 @@ class InternalData: get_module_logger("Internal Data").info("the data has been initialized") else: # train new models - assert 0 == len(recorders), "An empty experiment is required for setup `InternalData``" + assert 0 == len(recorders), "An empty experiment is required for setup `InternalData`" trainer.train(gen_task) # 2) extract the similarity matrix @@ -119,6 +121,7 @@ class MetaTaskDS(MetaTask): def __init__(self, task: dict, meta_info: pd.DataFrame, mode: str = MetaTask.PROC_MODE_FULL, fill_method="max"): """ + The description of the processed data time_perf: A array with shape -> data piece performance @@ -132,6 +135,10 @@ class MetaTaskDS(MetaTask): [0., 0., 0., ..., 0., 0., 1.], [0., 0., 0., ..., 0., 0., 1.]]) + Parameters + ---------- + meta_info: pd.DataFrame + please refer to the docs of _prepare_meta_ipt for detailed explanation. """ super().__init__(task, meta_info) self.fill_method = fill_method @@ -180,12 +187,41 @@ class MetaTaskDS(MetaTask): self.processed_meta_input = data_to_tensor(self.processed_meta_input) def _get_processed_meta_info(self): - meta_info_norm = self.meta_info.sub(self.meta_info.mean(axis=1), axis=0) # .fillna(0.) - if self.fill_method == "max": - meta_info_norm = meta_info_norm.T.fillna( - meta_info_norm.max(axis=1) - ).T # fill it with row max to align with previous implementation + meta_info_norm = self.meta_info.sub(self.meta_info.mean(axis=1), axis=0) + if self.fill_method.startswith("max"): + suffix = self.fill_method.lstrip("max") + if suffix == "seg": + fill_value = {} + for col in meta_info_norm.columns: + fill_value[col] = meta_info_norm.loc[meta_info_norm[col].isna(), :].dropna(axis=1).mean().max() + fill_value = pd.Series(fill_value).sort_index() + # The NaN Values are filled segment-wise. Below is an exampleof fill_value + # 2009-01-05 2009-02-06 0.145809 + # 2009-02-09 2009-03-06 0.148005 + # 2009-03-09 2009-04-03 0.090385 + # 2009-04-07 2009-05-05 0.114318 + # 2009-05-06 2009-06-04 0.119328 + # ... + meta_info_norm = meta_info_norm.fillna(fill_value) + else: + if len(suffix) > 0: + get_module_logger("MetaTaskDS").warning( + f"fill_method={self.fill_method}; the info after can't be correctly parsed. Please check your parameters." + ) + fill_value = meta_info_norm.max(axis=1) + # fill it with row max to align with previous implementation + # This will magnify the data similarity when data is in daily freq + + # the fill value corresponds to data like this + # It get a performance value for each day. + # The performance value are get from other models on this day + # 2009-01-16 0.276320 + # 2009-01-19 0.280603 + # ... + # 2011-06-27 0.203773 + meta_info_norm = meta_info_norm.T.fillna(fill_value).T elif self.fill_method == "zero": + # It will fillna(0.0) at the end. pass else: raise NotImplementedError(f"This type of input is not supported") @@ -286,7 +322,33 @@ class MetaDatasetDS(MetaTaskDataset): logger.warning(f"ValueError: {e}") assert len(self.meta_task_l) > 0, "No meta tasks found. Please check the data and setting" - def _prepare_meta_ipt(self, task): + def _prepare_meta_ipt(self, task) -> pd.DataFrame: + """ + Please refer to `self.internal_data.setup` for detailed information about `self.internal_data.data_ic_df` + + Indices with format below can be successfully sliced by `ic_df.loc[:end, pd.IndexSlice[:, :end]]` + + 2021-06-21 2021-06-04 .. 2021-03-22 2021-03-08 + 2021-07-02 2021-06-18 .. 2021-04-02 None + + Returns + ------- + a pd.DataFrame with similar content below. + - each column corresponds to a trained model named by the training data range + - each row corresponds to a day of data tested by the models of the columns + - The rows cells that overlaps with the data used by columns are masked + + + 2009-01-05 2009-02-09 ... 2011-04-27 2011-05-26 + 2009-02-06 2009-03-06 ... 2011-05-25 2011-06-23 + datetime ... + 2009-01-13 NaN 0.310639 ... -0.169057 0.137792 + 2009-01-14 NaN 0.261086 ... -0.143567 0.082581 + ... ... ... ... ... ... + 2011-06-30 -0.054907 -0.020219 ... -0.023226 NaN + 2011-07-01 -0.075762 -0.026626 ... -0.003167 NaN + + """ ic_df = self.internal_data.data_ic_df segs = task["dataset"]["kwargs"]["segments"] @@ -294,15 +356,19 @@ class MetaDatasetDS(MetaTaskDataset): ic_df_avail = ic_df.loc[:end, pd.IndexSlice[:, :end]] # meta data set focus on the **information** instead of preprocess - # 1) filter the future info - def mask_future(s): - """mask future information""" - # from qlib.utils import get_date_by_shift + # 1) filter the overlap info + def mask_overlap(s): + """ + mask overlap information + data after self.name[end] with self.trunc_days that contains future info are also considered as overlap info + + Approximately the diagnal + horizon length of data are masked. + """ start, end = s.name end = get_date_by_shift(trading_date=end, shift=self.trunc_days - 1, future=True) return s.mask((s.index >= start) & (s.index <= end)) - ic_df_avail = ic_df_avail.apply(mask_future) # apply to each col + ic_df_avail = ic_df_avail.apply(mask_overlap) # apply to each col # 2) filter the info with too long periods total_len = self.step * self.hist_step_n diff --git a/qlib/contrib/meta/data_selection/model.py b/qlib/contrib/meta/data_selection/model.py index 7b8b81101..068f15f9d 100644 --- a/qlib/contrib/meta/data_selection/model.py +++ b/qlib/contrib/meta/data_selection/model.py @@ -52,6 +52,7 @@ class MetaModelDS(MetaTaskModel): lr=0.0001, max_epoch=100, seed=43, + alpha=0.0, ): self.step = step self.hist_step_n = hist_step_n @@ -61,6 +62,7 @@ class MetaModelDS(MetaTaskModel): self.lr = lr self.max_epoch = max_epoch self.fitted = False + self.alpha = alpha torch.manual_seed(seed) def run_epoch(self, phase, task_list, epoch, opt, loss_l, ignore_weight=False): @@ -144,7 +146,11 @@ class MetaModelDS(MetaTaskModel): ) # debug: record when the test phase starts self.tn = PredNet( - step=self.step, hist_step_n=self.hist_step_n, clip_weight=self.clip_weight, clip_method=self.clip_method + step=self.step, + hist_step_n=self.hist_step_n, + clip_weight=self.clip_weight, + clip_method=self.clip_method, + alpha=self.alpha, ) opt = optim.Adam(self.tn.parameters(), lr=self.lr) diff --git a/qlib/contrib/meta/data_selection/net.py b/qlib/contrib/meta/data_selection/net.py index 0aa8845cf..fce19df3e 100644 --- a/qlib/contrib/meta/data_selection/net.py +++ b/qlib/contrib/meta/data_selection/net.py @@ -41,11 +41,18 @@ class TimeWeightMeta(SingleMetaBase): class PredNet(nn.Module): - def __init__(self, step, hist_step_n, clip_weight=None, clip_method="tanh"): + def __init__(self, step, hist_step_n, clip_weight=None, clip_method="tanh", alpha: float = 0.0): + """ + Parameters + ---------- + alpha : float + the regularization for sub model (useful when align meta model with linear submodel) + """ super().__init__() self.step = step self.twm = TimeWeightMeta(hist_step_n=hist_step_n, clip_weight=clip_weight, clip_method=clip_method) self.init_paramters(hist_step_n) + self.alpha = alpha def get_sample_weights(self, X, time_perf, time_belong, ignore_weight=False): weights = torch.from_numpy(np.ones(X.shape[0])).float().to(X.device) @@ -59,7 +66,7 @@ class PredNet(nn.Module): """Please refer to the docs of MetaTaskDS for the description of the variables""" weights = self.get_sample_weights(X, time_perf, time_belong, ignore_weight=ignore_weight) X_w = X.T * weights.view(1, -1) - theta = torch.inverse(X_w @ X) @ X_w @ y + theta = torch.inverse(X_w @ X + self.alpha * torch.eye(X_w.shape[0])) @ X_w @ y return X_test @ theta, weights def init_paramters(self, hist_step_n): diff --git a/qlib/contrib/meta/data_selection/utils.py b/qlib/contrib/meta/data_selection/utils.py index 6b680548e..7da502808 100644 --- a/qlib/contrib/meta/data_selection/utils.py +++ b/qlib/contrib/meta/data_selection/utils.py @@ -5,6 +5,9 @@ import numpy as np import torch from torch import nn +from qlib.constant import EPS +from qlib.log import get_module_logger + class ICLoss(nn.Module): def forward(self, pred, y, idx, skip_size=50): @@ -24,6 +27,7 @@ class ICLoss(nn.Module): diff_point.append(i) prev = date diff_point.append(None) + # The lengths of diff_point will be one more larger then diff_point ic_all = 0.0 skip_n = 0 @@ -34,13 +38,23 @@ class ICLoss(nn.Module): skip_n += 1 continue y_focus = y[start_i:end_i] + if pred_focus.std() < EPS or y_focus.std() < EPS: + # These cases often happend at the end of test data. + # Usually caused by fillna(0.) + skip_n += 1 + continue + ic_day = torch.dot( (pred_focus - pred_focus.mean()) / np.sqrt(pred_focus.shape[0]) / pred_focus.std(), (y_focus - y_focus.mean()) / np.sqrt(y_focus.shape[0]) / y_focus.std(), ) ic_all += ic_day if len(diff_point) - 1 - skip_n <= 0: - raise ValueError("No enough data for calculating iC") + raise ValueError("No enough data for calculating IC") + if skip_n > 0: + get_module_logger("ICLoss").info( + f"{skip_n} days are skipped due to zero std or small scale of valid samples." + ) ic_mean = ic_all / (len(diff_point) - 1 - skip_n) return -ic_mean # ic loss diff --git a/qlib/contrib/model/linear.py b/qlib/contrib/model/linear.py index b7f584a9d..7fd3d156b 100644 --- a/qlib/contrib/model/linear.py +++ b/qlib/contrib/model/linear.py @@ -4,6 +4,7 @@ import numpy as np import pandas as pd from typing import Text, Union +from qlib.log import get_module_logger from qlib.data.dataset.weight import Reweighter from scipy.optimize import nnls from sklearn.linear_model import LinearRegression, Ridge, Lasso @@ -29,7 +30,7 @@ class LinearModel(Model): RIDGE = "ridge" LASSO = "lasso" - def __init__(self, estimator="ols", alpha=0.0, fit_intercept=False): + def __init__(self, estimator="ols", alpha=0.0, fit_intercept=False, include_valid: bool = False): """ Parameters ---------- @@ -39,6 +40,9 @@ class LinearModel(Model): l1 or l2 regularization parameter fit_intercept : bool whether fit intercept + include_valid: bool + Should the validation data be included for training? + The validation data should be included """ assert estimator in [self.OLS, self.NNLS, self.RIDGE, self.LASSO], f"unsupported estimator `{estimator}`" self.estimator = estimator @@ -49,9 +53,16 @@ class LinearModel(Model): self.fit_intercept = fit_intercept self.coef_ = None + self.include_valid = include_valid def fit(self, dataset: DatasetH, reweighter: Reweighter = None): df_train = dataset.prepare("train", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L) + if self.include_valid: + try: + df_valid = dataset.prepare("valid", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L) + df_train = pd.concat([df_train, df_valid]) + except KeyError: + get_module_logger("LinearModel").info("include_valid=True, but valid does not exist") if df_train.empty: raise ValueError("Empty data from dataset, please check your dataset config.") if reweighter is not None: diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 7ccd64fd9..ad4178d34 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -720,3 +720,26 @@ class DataHandlerLP(DataHandler): ]: setattr(new_hd, key, getattr(handler, key, None)) return new_hd + + @classmethod + def from_df(cls, df: pd.DataFrame) -> "DataHandlerLP": + """ + Motivation: + - When user want to get a quick data handler. + + The created data handler will have only one shared Dataframe without processors. + After creating the handler, user may often want to dump the handler for reuse + Here is a typical use case + + .. code-block:: python + + from qlib.data.dataset import DataHandlerLP + dh = DataHandlerLP.from_df(df) + dh.to_pickle(fname, dump_all=True) + + TODO: + - The StaticDataLoader is quite slow. It don't have to copy the data again... + + """ + loader = data_loader_module.StaticDataLoader(df) + return cls(data_loader=loader) diff --git a/qlib/data/dataset/utils.py b/qlib/data/dataset/utils.py index c01d8070d..4761fb383 100644 --- a/qlib/data/dataset/utils.py +++ b/qlib/data/dataset/utils.py @@ -2,9 +2,8 @@ # Licensed under the MIT License. from __future__ import annotations import pandas as pd -from typing import Union, List +from typing import Union, List, TYPE_CHECKING from qlib.utils import init_instance_by_config -from typing import TYPE_CHECKING if TYPE_CHECKING: from qlib.data.dataset import DataHandler @@ -121,7 +120,7 @@ def convert_index_format(df: Union[pd.DataFrame, pd.Series], level: str = "datet return df -def init_task_handler(task: dict) -> Union[DataHandler, None]: +def init_task_handler(task: dict) -> DataHandler: """ initialize the handler part of the task **inplace** @@ -142,5 +141,6 @@ def init_task_handler(task: dict) -> Union[DataHandler, None]: if h_conf is not None: handler = init_instance_by_config(h_conf, accept_types=DataHandler) task["dataset"]["kwargs"]["handler"] = handler - return handler + else: + raise ValueError("The task does not contains a handler part.") diff --git a/qlib/utils/__init__.py b/qlib/utils/__init__.py index 4908f438f..edf2459bd 100644 --- a/qlib/utils/__init__.py +++ b/qlib/utils/__init__.py @@ -1,6 +1,7 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +# TODO: this utils covers too much utilities, please seperat it into sub modules from __future__ import division from __future__ import print_function diff --git a/qlib/utils/data.py b/qlib/utils/data.py index 82b69127b..6c62f7558 100644 --- a/qlib/utils/data.py +++ b/qlib/utils/data.py @@ -1,6 +1,10 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. -from typing import Union +""" +This module covers some utility functions that operate on data or basic object +""" +from copy import deepcopy +from typing import List, Union import pandas as pd import numpy as np @@ -54,3 +58,48 @@ def deepcopy_basic_type(obj: object) -> object: return {k: deepcopy_basic_type(v) for k, v in obj.items()} else: return obj + + +S_DROP = "__DROP__" # this is a symbol which indicates drop the value + + +def update_config(base_config: dict, ext_config: Union[dict, List[dict]]): + """ + supporting adding base config based on the ext_config + + >>> bc = {"a": "xixi"} + >>> ec = {"b": "haha"} + >>> new_bc = update_config(bc, ec) + >>> print(new_bc) + {'a': 'xixi', 'b': 'haha'} + >>> print(bc) # base config should not be changed + {'a': 'xixi'} + >>> print(update_config(bc, {"b": S_DROP})) + {'a': 'xixi'} + >>> print(update_config(new_bc, {"b": S_DROP})) + {'a': 'xixi'} + """ + + base_config = deepcopy(base_config) # in case of modifying base config + + for ec in ext_config if isinstance(ext_config, (list, tuple)) else [ext_config]: + for key in ec: + if key not in base_config: + # if it is not in the default key, then replace it. + # ADD if not drop + if ec[key] != S_DROP: + base_config[key] = ec[key] + + else: + if isinstance(base_config[key], dict) and isinstance(ec[key], dict): + # Recursive + # Both of them are dict, then update it nested + base_config[key] = update_config(base_config[key], ec[key]) + elif ec[key] == S_DROP: + # DROP + del base_config[key] + else: + # REPLACE + # one of then are not dict. Then replace + base_config[key] = ec[key] + return base_config diff --git a/tests/data_mid_layer_tests/README.md b/tests/data_mid_layer_tests/README.md new file mode 100644 index 000000000..9f43b17e1 --- /dev/null +++ b/tests/data_mid_layer_tests/README.md @@ -0,0 +1,5 @@ +# Introduction +The middle layers of data, which mainly includes +- Handler + - processors +- Datasets diff --git a/tests/test_dataset.py b/tests/data_mid_layer_tests/test_dataset.py similarity index 100% rename from tests/test_dataset.py rename to tests/data_mid_layer_tests/test_dataset.py diff --git a/tests/data_mid_layer_tests/test_handler.py b/tests/data_mid_layer_tests/test_handler.py new file mode 100644 index 000000000..3ac813f5b --- /dev/null +++ b/tests/data_mid_layer_tests/test_handler.py @@ -0,0 +1,37 @@ +import os +import pickle +import shutil +import unittest +from qlib.tests import TestAutoData +from qlib.data import D +from qlib.data.dataset.handler import DataHandlerLP + + +class HandlerTests(TestAutoData): + def to_str(self, obj): + return "".join(str(obj).split()) + + def test_handler_df(self): + df = D.features(["sh600519"], start_time="20190101", end_time="20190201", fields=["$close"]) + dh = DataHandlerLP.from_df(df) + print(dh.fetch()) + self.assertTrue(dh._data.equals(df)) + self.assertTrue(dh._infer is dh._data) + self.assertTrue(dh._learn is dh._data) + self.assertTrue(dh.data_loader._data is dh._data) + fname = "_handler_test.pkl" + dh.to_pickle(fname, dump_all=True) + + with open(fname, "rb") as f: + dh_d = pickle.load(f) + + self.assertTrue(dh_d._data.equals(df)) + self.assertTrue(dh_d._infer is dh_d._data) + self.assertTrue(dh_d._learn is dh_d._data) + # Data loader will no longer be useful + self.assertTrue("_data" not in dh_d.data_loader.__dict__.keys()) + os.remove(fname) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_handler_storage.py b/tests/data_mid_layer_tests/test_handler_storage.py similarity index 100% rename from tests/test_handler_storage.py rename to tests/data_mid_layer_tests/test_handler_storage.py diff --git a/tests/test_processor.py b/tests/data_mid_layer_tests/test_processor.py similarity index 100% rename from tests/test_processor.py rename to tests/data_mid_layer_tests/test_processor.py