1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-06-29 09:01:18 +08:00

Compare commits

..

2 Commits

Author SHA1 Message Date
you-n-g
32c3070b73 Refine DDG-DA (#1472)
* Run ddg-da successfully

* Support include valid; More parameters

* Support L2 reg & visualization

* Blackformat

* Enable fill_method

* Support specify handler & optim dataset

* Fix Pylint
2023-04-07 15:00:21 +08:00
you-n-g
40de67265a Update Docs about some concepts in DataHandler (#1485) 2023-04-07 10:02:16 +08:00
17 changed files with 457 additions and 39 deletions

View File

@@ -0,0 +1,107 @@
import pickle
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(color_codes=True)
plt.rcParams["font.sans-serif"] = "SimHei"
plt.rcParams["axes.unicode_minus"] = False
from tqdm.auto import tqdm
# tqdm.pandas() # for progress_apply
# %matplotlib inline
# %load_ext autoreload
# # Meta Input
# +
with open("./internal_data_s20.pkl", "rb") as f:
data = pickle.load(f)
data.data_ic_df.columns.names = ["start_date", "end_date"]
data_sim = data.data_ic_df.droplevel(axis=1, level="end_date")
data_sim.index.name = "test datetime"
# -
plt.figure(figsize=(40, 20))
sns.heatmap(data_sim)
plt.figure(figsize=(40, 20))
sns.heatmap(data_sim.rolling(20).mean())
# # Meta Model
from qlib import auto_init
auto_init()
from qlib.workflow import R
exp = R.get_exp(experiment_name="DDG-DA")
meta_rec = exp.list_recorders(rtype="list", max_results=1)[0]
meta_m = meta_rec.load_object("model")
pd.DataFrame(meta_m.tn.twm.linear.weight.detach().numpy()).T[0].plot()
pd.DataFrame(meta_m.tn.twm.linear.weight.detach().numpy()).T[0].rolling(5).mean().plot()
# # Meta Output
# +
with open("./tasks_s20.pkl", "rb") as f:
tasks = pickle.load(f)
task_df = {}
for t in tasks:
test_seg = t["dataset"]["kwargs"]["segments"]["test"]
if None not in test_seg:
# The last rolling is skipped.
task_df[test_seg] = t["reweighter"].time_weight
task_df = pd.concat(task_df)
task_df.index.names = ["OS_start", "OS_end", "IS_start", "IS_end"]
task_df = task_df.droplevel(["OS_end", "IS_end"])
task_df = task_df.unstack("OS_start")
# -
plt.figure(figsize=(40, 20))
sns.heatmap(task_df.T)
plt.figure(figsize=(40, 20))
sns.heatmap(task_df.rolling(10).mean().T)
# # Sub Models
#
# NOTE:
# - this section assumes that the model is Linear model!!
# - Other models does not support this analysis
exp = R.get_exp(experiment_name="rolling_ds")
def show_linear_weight(exp):
coef_df = {}
for r in exp.list_recorders("list"):
t = r.load_object("task")
if None in t["dataset"]["kwargs"]["segments"]["test"]:
continue
m = r.load_object("params.pkl")
coef_df[t["dataset"]["kwargs"]["segments"]["test"]] = pd.Series(m.coef_)
coef_df = pd.concat(coef_df)
coef_df.index.names = ["test_start", "test_end", "coef_idx"]
coef_df = coef_df.droplevel("test_end").unstack("coef_idx").T
plt.figure(figsize=(40, 20))
sns.heatmap(coef_df)
plt.show()
show_linear_weight(R.get_exp(experiment_name="rolling_ds"))
show_linear_weight(R.get_exp(experiment_name="rolling_models"))

View File

@@ -10,8 +10,10 @@ import pandas as pd
import fire
import sys
import pickle
from typing import Optional
from qlib import auto_init
from qlib.model.trainer import TrainerR
from qlib.typehint import Literal
from qlib.utils import init_instance_by_config
from qlib.workflow import R
from qlib.tests.data import GetData
@@ -30,7 +32,33 @@ class DDGDA:
- `rm -r mlruns`
"""
def __init__(self, sim_task_model="linear", forecast_model="linear"):
def __init__(
self,
sim_task_model: Literal["linear", "gbdt"] = "linear",
forecast_model: Literal["linear", "gbdt"] = "linear",
h_path: Optional[str] = None,
test_end: Optional[str] = None,
train_start: Optional[str] = None,
meta_1st_train_end: Optional[str] = None,
task_ext_conf: Optional[dict] = None,
alpha: float = 0.0,
proxy_hd: str = "handler_proxy.pkl",
):
"""
Parameters
----------
train_start: Optional[str]
the start datetime for data. It is used in training start time (for both tasks & meta learing)
test_end: Optional[str]
the end datetime for data. It is used in test end time
meta_1st_train_end: Optional[str]
the datetime of training end of the first meta_task
alpha: float
Setting the L2 regularization for ridge
The `alpha` is only passed to MetaModelDS (it is not passed to sim_task_model currently..)
"""
self.step = 20
# NOTE:
# the horizon must match the meaning in the base task template
@@ -38,10 +66,19 @@ class DDGDA:
self.meta_exp_name = "DDG-DA"
self.sim_task_model = sim_task_model # The model to capture the distribution of data.
self.forecast_model = forecast_model # downstream forecasting models' type
self.rb_kwargs = {
"h_path": h_path,
"test_end": test_end,
"train_start": train_start,
"task_ext_conf": task_ext_conf,
}
self.alpha = alpha
self.meta_1st_train_end = meta_1st_train_end
self.proxy_hd = proxy_hd
def get_feature_importance(self):
# this must be lightGBM, because it needs to get the feature importance
rb = RollingBenchmark(model_type="gbdt")
rb = RollingBenchmark(model_type="gbdt", **self.rb_kwargs)
task = rb.basic_task()
with R.start(experiment_name="feature_importance"):
@@ -69,7 +106,7 @@ class DDGDA:
fi = self.get_feature_importance()
col_selected = fi.nlargest(topk)
rb = RollingBenchmark(model_type=self.sim_task_model)
rb = RollingBenchmark(model_type=self.sim_task_model, **self.rb_kwargs)
task = rb.basic_task()
dataset = init_instance_by_config(task["dataset"])
prep_ds = dataset.prepare(slice(None), col_set=["feature", "label"], data_key=DataHandlerLP.DK_L)
@@ -96,7 +133,7 @@ class DDGDA:
"kwargs": {"config": DIRNAME / "fea_label_df.pkl"},
}
)
handler.to_pickle(DIRNAME / "handler_proxy.pkl", dump_all=True)
handler.to_pickle(DIRNAME / self.proxy_hd, dump_all=True)
@property
def _internal_data_path(self):
@@ -108,7 +145,7 @@ class DDGDA:
This function will dump the input data for meta model
"""
# According to the experiments, the choice of the model type is very important for achieving good results
rb = RollingBenchmark(model_type=self.sim_task_model)
rb = RollingBenchmark(model_type=self.sim_task_model, **self.rb_kwargs)
sim_task = rb.basic_task()
if self.sim_task_model == "gbdt":
@@ -122,24 +159,27 @@ class DDGDA:
with self._internal_data_path.open("wb") as f:
pickle.dump(internal_data, f)
def train_meta_model(self):
def train_meta_model(self, fill_method="max"):
"""
training a meta model based on a simplified linear proxy model;
"""
# 1) leverage the simplified proxy forecasting model to train meta model.
# - Only the dataset part is important, in current version of meta model will integrate the
rb = RollingBenchmark(model_type=self.sim_task_model)
rb = RollingBenchmark(model_type=self.sim_task_model, **self.rb_kwargs)
sim_task = rb.basic_task()
train_start = self.rb_kwargs.get("train_start", "2008-01-01")
train_end = "2010-12-31" if self.meta_1st_train_end is None else self.meta_1st_train_end
test_start = (pd.Timestamp(train_end) + pd.Timedelta(days=1)).strftime("%Y-%m-%d")
proxy_forecast_model_task = {
# "model": "qlib.contrib.model.linear.LinearModel",
"dataset": {
"class": "qlib.data.dataset.DatasetH",
"kwargs": {
"handler": f"file://{(DIRNAME / 'handler_proxy.pkl').absolute()}",
"handler": f"file://{(DIRNAME / self.proxy_hd).absolute()}",
"segments": {
"train": ("2008-01-01", "2010-12-31"),
"test": ("2011-01-01", sim_task["dataset"]["kwargs"]["segments"]["test"][1]),
"train": (train_start, train_end),
"test": (test_start, sim_task["dataset"]["kwargs"]["segments"]["test"][1]),
},
},
},
@@ -156,7 +196,7 @@ class DDGDA:
segments=0.62, # keep test period consistent with the dataset yaml
trunc_days=1 + self.horizon,
hist_step_n=30,
fill_method="max",
fill_method=fill_method,
rolling_ext_days=0,
)
# NOTE:
@@ -165,12 +205,15 @@ class DDGDA:
# So the misalignment will not affect the effectiveness of the method.
with self._internal_data_path.open("rb") as f:
internal_data = pickle.load(f)
md = MetaDatasetDS(exp_name=internal_data, **kwargs)
# 3) train and logging meta model
with R.start(experiment_name=self.meta_exp_name):
R.log_params(**kwargs)
mm = MetaModelDS(step=self.step, hist_step_n=kwargs["hist_step_n"], lr=0.001, max_epoch=100, seed=43)
mm = MetaModelDS(
step=self.step, hist_step_n=kwargs["hist_step_n"], lr=0.001, max_epoch=100, seed=43, alpha=self.alpha
)
mm.fit(md)
R.save_objects(model=mm)
@@ -203,7 +246,7 @@ class DDGDA:
hist_step_n = int(param["hist_step_n"])
fill_method = param.get("fill_method", "max")
rb = RollingBenchmark(model_type=self.forecast_model)
rb = RollingBenchmark(model_type=self.forecast_model, **self.rb_kwargs)
task_l = rb.create_rolling_tasks()
# 2.2) create meta dataset for final dataset
@@ -233,13 +276,13 @@ class DDGDA:
"""
with self._task_path.open("rb") as f:
tasks = pickle.load(f)
rb = RollingBenchmark(rolling_exp="rolling_ds", model_type=self.forecast_model)
rb = RollingBenchmark(rolling_exp="rolling_ds", model_type=self.forecast_model, **self.rb_kwargs)
rb.train_rolling_tasks(tasks)
rb.ens_rolling()
rb.update_rolling_rec()
def run_all(self):
# 1) file: handler_proxy.pkl
# 1) file: handler_proxy.pkl (self.proxy_hd)
self.dump_data_for_proxy_model()
# 2)
# file: internal_data_s20.pkl

View File

@@ -1,13 +1,17 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from typing import Optional
from qlib.model.ens.ensemble import RollingEnsemble
from qlib.utils import init_instance_by_config
import fire
import yaml
import pandas as pd
from qlib import auto_init
from pathlib import Path
from tqdm.auto import tqdm
from qlib.model.trainer import TrainerR
from qlib.log import get_module_logger
from qlib.utils.data import update_config
from qlib.workflow import R
from qlib.tests.data import GetData
@@ -25,11 +29,40 @@ class RollingBenchmark:
"""
def __init__(self, rolling_exp="rolling_models", model_type="linear") -> None:
def __init__(
self,
rolling_exp: str = "rolling_models",
model_type: str = "linear",
h_path: Optional[str] = None,
train_start: Optional[str] = None,
test_end: Optional[str] = None,
task_ext_conf: Optional[dict] = None,
) -> None:
"""
Parameters
----------
rolling_exp : str
The name for the experiments for rolling
model_type : str
The model to be boosted.
h_path : Optional[str]
the dumped data handler;
test_end : Optional[str]
the test end for the data. It is typically used together with the handler
train_start : Optional[str]
the train start for the data. It is typically used together with the handler.
task_ext_conf : Optional[dict]
some option to update the
"""
self.step = 20
self.horizon = 20
self.rolling_exp = rolling_exp
self.model_type = model_type
self.h_path = h_path
self.train_start = train_start
self.test_end = test_end
self.logger = get_module_logger("RollingBenchmark")
self.task_ext_conf = task_ext_conf
def basic_task(self):
"""For fast training rolling"""
@@ -42,6 +75,10 @@ class RollingBenchmark:
h_path = DIRNAME / "linear_alpha158_handler_horizon{}.pkl".format(self.horizon)
else:
raise AssertionError("Model type is not supported!")
if self.h_path is not None:
h_path = Path(self.h_path)
with conf_path.open("r") as f:
conf = yaml.safe_load(f)
@@ -52,6 +89,9 @@ class RollingBenchmark:
task = conf["task"]
if self.task_ext_conf is not None:
task = update_config(task, self.task_ext_conf)
if not h_path.exists():
h_conf = task["dataset"]["kwargs"]["handler"]
h = init_instance_by_config(h_conf)
@@ -59,6 +99,15 @@ class RollingBenchmark:
task["dataset"]["kwargs"]["handler"] = f"file://{h_path}"
task["record"] = ["qlib.workflow.record_temp.SignalRecord"]
if self.train_start is not None:
seg = task["dataset"]["kwargs"]["segments"]["train"]
task["dataset"]["kwargs"]["segments"]["train"] = pd.Timestamp(self.train_start), seg[1]
if self.test_end is not None:
seg = task["dataset"]["kwargs"]["segments"]["test"]
task["dataset"]["kwargs"]["segments"]["test"] = seg[0], pd.Timestamp(self.test_end)
self.logger.info(task)
return task
def create_rolling_tasks(self):
@@ -93,7 +142,7 @@ class RollingBenchmark:
"""
Evaluate the combined rolling results
"""
for rid, rec in R.list_recorders(experiment_name=self.COMB_EXP).items():
for _, rec in R.list_recorders(experiment_name=self.COMB_EXP).items():
for rt_cls in SigAnaRecord, PortAnaRecord:
rt = rt_cls(recorder=rec, skip_existing=True)
rt.generate()

View File

@@ -55,8 +55,10 @@ class InternalData:
# The handler is initialized for only once.
if not trainer.has_worker():
self.dh = init_task_handler(perf_task_tpl)
self.dh.config(dump_all=False) # in some cases, the data handler are saved to disk with `dump_all=True`
else:
self.dh = init_instance_by_config(perf_task_tpl["dataset"]["kwargs"]["handler"])
assert self.dh.dump_all is False # otherwise, it will save all the detailed data
seg = perf_task_tpl["dataset"]["kwargs"]["segments"]
@@ -77,7 +79,7 @@ class InternalData:
get_module_logger("Internal Data").info("the data has been initialized")
else:
# train new models
assert 0 == len(recorders), "An empty experiment is required for setup `InternalData``"
assert 0 == len(recorders), "An empty experiment is required for setup `InternalData`"
trainer.train(gen_task)
# 2) extract the similarity matrix
@@ -119,6 +121,7 @@ class MetaTaskDS(MetaTask):
def __init__(self, task: dict, meta_info: pd.DataFrame, mode: str = MetaTask.PROC_MODE_FULL, fill_method="max"):
"""
The description of the processed data
time_perf: A array with shape <hist_step_n * step, data pieces> -> data piece performance
@@ -132,6 +135,10 @@ class MetaTaskDS(MetaTask):
[0., 0., 0., ..., 0., 0., 1.],
[0., 0., 0., ..., 0., 0., 1.]])
Parameters
----------
meta_info: pd.DataFrame
please refer to the docs of _prepare_meta_ipt for detailed explanation.
"""
super().__init__(task, meta_info)
self.fill_method = fill_method
@@ -180,12 +187,41 @@ class MetaTaskDS(MetaTask):
self.processed_meta_input = data_to_tensor(self.processed_meta_input)
def _get_processed_meta_info(self):
meta_info_norm = self.meta_info.sub(self.meta_info.mean(axis=1), axis=0) # .fillna(0.)
if self.fill_method == "max":
meta_info_norm = meta_info_norm.T.fillna(
meta_info_norm.max(axis=1)
).T # fill it with row max to align with previous implementation
meta_info_norm = self.meta_info.sub(self.meta_info.mean(axis=1), axis=0)
if self.fill_method.startswith("max"):
suffix = self.fill_method.lstrip("max")
if suffix == "seg":
fill_value = {}
for col in meta_info_norm.columns:
fill_value[col] = meta_info_norm.loc[meta_info_norm[col].isna(), :].dropna(axis=1).mean().max()
fill_value = pd.Series(fill_value).sort_index()
# The NaN Values are filled segment-wise. Below is an exampleof fill_value
# 2009-01-05 2009-02-06 0.145809
# 2009-02-09 2009-03-06 0.148005
# 2009-03-09 2009-04-03 0.090385
# 2009-04-07 2009-05-05 0.114318
# 2009-05-06 2009-06-04 0.119328
# ...
meta_info_norm = meta_info_norm.fillna(fill_value)
else:
if len(suffix) > 0:
get_module_logger("MetaTaskDS").warning(
f"fill_method={self.fill_method}; the info after can't be correctly parsed. Please check your parameters."
)
fill_value = meta_info_norm.max(axis=1)
# fill it with row max to align with previous implementation
# This will magnify the data similarity when data is in daily freq
# the fill value corresponds to data like this
# It get a performance value for each day.
# The performance value are get from other models on this day
# 2009-01-16 0.276320
# 2009-01-19 0.280603
# ...
# 2011-06-27 0.203773
meta_info_norm = meta_info_norm.T.fillna(fill_value).T
elif self.fill_method == "zero":
# It will fillna(0.0) at the end.
pass
else:
raise NotImplementedError(f"This type of input is not supported")
@@ -286,7 +322,33 @@ class MetaDatasetDS(MetaTaskDataset):
logger.warning(f"ValueError: {e}")
assert len(self.meta_task_l) > 0, "No meta tasks found. Please check the data and setting"
def _prepare_meta_ipt(self, task):
def _prepare_meta_ipt(self, task) -> pd.DataFrame:
"""
Please refer to `self.internal_data.setup` for detailed information about `self.internal_data.data_ic_df`
Indices with format below can be successfully sliced by `ic_df.loc[:end, pd.IndexSlice[:, :end]]`
2021-06-21 2021-06-04 .. 2021-03-22 2021-03-08
2021-07-02 2021-06-18 .. 2021-04-02 None
Returns
-------
a pd.DataFrame with similar content below.
- each column corresponds to a trained model named by the training data range
- each row corresponds to a day of data tested by the models of the columns
- The rows cells that overlaps with the data used by columns are masked
2009-01-05 2009-02-09 ... 2011-04-27 2011-05-26
2009-02-06 2009-03-06 ... 2011-05-25 2011-06-23
datetime ...
2009-01-13 NaN 0.310639 ... -0.169057 0.137792
2009-01-14 NaN 0.261086 ... -0.143567 0.082581
... ... ... ... ... ...
2011-06-30 -0.054907 -0.020219 ... -0.023226 NaN
2011-07-01 -0.075762 -0.026626 ... -0.003167 NaN
"""
ic_df = self.internal_data.data_ic_df
segs = task["dataset"]["kwargs"]["segments"]
@@ -294,15 +356,19 @@ class MetaDatasetDS(MetaTaskDataset):
ic_df_avail = ic_df.loc[:end, pd.IndexSlice[:, :end]]
# meta data set focus on the **information** instead of preprocess
# 1) filter the future info
def mask_future(s):
"""mask future information"""
# from qlib.utils import get_date_by_shift
# 1) filter the overlap info
def mask_overlap(s):
"""
mask overlap information
data after self.name[end] with self.trunc_days that contains future info are also considered as overlap info
Approximately the diagnal + horizon length of data are masked.
"""
start, end = s.name
end = get_date_by_shift(trading_date=end, shift=self.trunc_days - 1, future=True)
return s.mask((s.index >= start) & (s.index <= end))
ic_df_avail = ic_df_avail.apply(mask_future) # apply to each col
ic_df_avail = ic_df_avail.apply(mask_overlap) # apply to each col
# 2) filter the info with too long periods
total_len = self.step * self.hist_step_n

View File

@@ -52,6 +52,7 @@ class MetaModelDS(MetaTaskModel):
lr=0.0001,
max_epoch=100,
seed=43,
alpha=0.0,
):
self.step = step
self.hist_step_n = hist_step_n
@@ -61,6 +62,7 @@ class MetaModelDS(MetaTaskModel):
self.lr = lr
self.max_epoch = max_epoch
self.fitted = False
self.alpha = alpha
torch.manual_seed(seed)
def run_epoch(self, phase, task_list, epoch, opt, loss_l, ignore_weight=False):
@@ -144,7 +146,11 @@ class MetaModelDS(MetaTaskModel):
) # debug: record when the test phase starts
self.tn = PredNet(
step=self.step, hist_step_n=self.hist_step_n, clip_weight=self.clip_weight, clip_method=self.clip_method
step=self.step,
hist_step_n=self.hist_step_n,
clip_weight=self.clip_weight,
clip_method=self.clip_method,
alpha=self.alpha,
)
opt = optim.Adam(self.tn.parameters(), lr=self.lr)

View File

@@ -41,11 +41,18 @@ class TimeWeightMeta(SingleMetaBase):
class PredNet(nn.Module):
def __init__(self, step, hist_step_n, clip_weight=None, clip_method="tanh"):
def __init__(self, step, hist_step_n, clip_weight=None, clip_method="tanh", alpha: float = 0.0):
"""
Parameters
----------
alpha : float
the regularization for sub model (useful when align meta model with linear submodel)
"""
super().__init__()
self.step = step
self.twm = TimeWeightMeta(hist_step_n=hist_step_n, clip_weight=clip_weight, clip_method=clip_method)
self.init_paramters(hist_step_n)
self.alpha = alpha
def get_sample_weights(self, X, time_perf, time_belong, ignore_weight=False):
weights = torch.from_numpy(np.ones(X.shape[0])).float().to(X.device)
@@ -59,7 +66,7 @@ class PredNet(nn.Module):
"""Please refer to the docs of MetaTaskDS for the description of the variables"""
weights = self.get_sample_weights(X, time_perf, time_belong, ignore_weight=ignore_weight)
X_w = X.T * weights.view(1, -1)
theta = torch.inverse(X_w @ X) @ X_w @ y
theta = torch.inverse(X_w @ X + self.alpha * torch.eye(X_w.shape[0])) @ X_w @ y
return X_test @ theta, weights
def init_paramters(self, hist_step_n):

View File

@@ -5,6 +5,9 @@ import numpy as np
import torch
from torch import nn
from qlib.constant import EPS
from qlib.log import get_module_logger
class ICLoss(nn.Module):
def forward(self, pred, y, idx, skip_size=50):
@@ -24,6 +27,7 @@ class ICLoss(nn.Module):
diff_point.append(i)
prev = date
diff_point.append(None)
# The lengths of diff_point will be one more larger then diff_point
ic_all = 0.0
skip_n = 0
@@ -34,13 +38,23 @@ class ICLoss(nn.Module):
skip_n += 1
continue
y_focus = y[start_i:end_i]
if pred_focus.std() < EPS or y_focus.std() < EPS:
# These cases often happend at the end of test data.
# Usually caused by fillna(0.)
skip_n += 1
continue
ic_day = torch.dot(
(pred_focus - pred_focus.mean()) / np.sqrt(pred_focus.shape[0]) / pred_focus.std(),
(y_focus - y_focus.mean()) / np.sqrt(y_focus.shape[0]) / y_focus.std(),
)
ic_all += ic_day
if len(diff_point) - 1 - skip_n <= 0:
raise ValueError("No enough data for calculating iC")
raise ValueError("No enough data for calculating IC")
if skip_n > 0:
get_module_logger("ICLoss").info(
f"{skip_n} days are skipped due to zero std or small scale of valid samples."
)
ic_mean = ic_all / (len(diff_point) - 1 - skip_n)
return -ic_mean # ic loss

View File

@@ -4,6 +4,7 @@
import numpy as np
import pandas as pd
from typing import Text, Union
from qlib.log import get_module_logger
from qlib.data.dataset.weight import Reweighter
from scipy.optimize import nnls
from sklearn.linear_model import LinearRegression, Ridge, Lasso
@@ -29,7 +30,7 @@ class LinearModel(Model):
RIDGE = "ridge"
LASSO = "lasso"
def __init__(self, estimator="ols", alpha=0.0, fit_intercept=False):
def __init__(self, estimator="ols", alpha=0.0, fit_intercept=False, include_valid: bool = False):
"""
Parameters
----------
@@ -39,6 +40,9 @@ class LinearModel(Model):
l1 or l2 regularization parameter
fit_intercept : bool
whether fit intercept
include_valid: bool
Should the validation data be included for training?
The validation data should be included
"""
assert estimator in [self.OLS, self.NNLS, self.RIDGE, self.LASSO], f"unsupported estimator `{estimator}`"
self.estimator = estimator
@@ -49,9 +53,16 @@ class LinearModel(Model):
self.fit_intercept = fit_intercept
self.coef_ = None
self.include_valid = include_valid
def fit(self, dataset: DatasetH, reweighter: Reweighter = None):
df_train = dataset.prepare("train", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L)
if self.include_valid:
try:
df_valid = dataset.prepare("valid", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L)
df_train = pd.concat([df_train, df_valid])
except KeyError:
get_module_logger("LinearModel").info("include_valid=True, but valid does not exist")
if df_train.empty:
raise ValueError("Empty data from dataset, please check your dataset config.")
if reweighter is not None:

View File

@@ -720,3 +720,26 @@ class DataHandlerLP(DataHandler):
]:
setattr(new_hd, key, getattr(handler, key, None))
return new_hd
@classmethod
def from_df(cls, df: pd.DataFrame) -> "DataHandlerLP":
"""
Motivation:
- When user want to get a quick data handler.
The created data handler will have only one shared Dataframe without processors.
After creating the handler, user may often want to dump the handler for reuse
Here is a typical use case
.. code-block:: python
from qlib.data.dataset import DataHandlerLP
dh = DataHandlerLP.from_df(df)
dh.to_pickle(fname, dump_all=True)
TODO:
- The StaticDataLoader is quite slow. It don't have to copy the data again...
"""
loader = data_loader_module.StaticDataLoader(df)
return cls(data_loader=loader)

View File

@@ -2,9 +2,8 @@
# Licensed under the MIT License.
from __future__ import annotations
import pandas as pd
from typing import Union, List
from typing import Union, List, TYPE_CHECKING
from qlib.utils import init_instance_by_config
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from qlib.data.dataset import DataHandler
@@ -121,7 +120,7 @@ def convert_index_format(df: Union[pd.DataFrame, pd.Series], level: str = "datet
return df
def init_task_handler(task: dict) -> Union[DataHandler, None]:
def init_task_handler(task: dict) -> DataHandler:
"""
initialize the handler part of the task **inplace**
@@ -142,5 +141,6 @@ def init_task_handler(task: dict) -> Union[DataHandler, None]:
if h_conf is not None:
handler = init_instance_by_config(h_conf, accept_types=DataHandler)
task["dataset"]["kwargs"]["handler"] = handler
return handler
else:
raise ValueError("The task does not contains a handler part.")

View File

@@ -1,6 +1,7 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# TODO: this utils covers too much utilities, please seperat it into sub modules
from __future__ import division
from __future__ import print_function

View File

@@ -1,6 +1,10 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from typing import Union
"""
This module covers some utility functions that operate on data or basic object
"""
from copy import deepcopy
from typing import List, Union
import pandas as pd
import numpy as np
@@ -54,3 +58,48 @@ def deepcopy_basic_type(obj: object) -> object:
return {k: deepcopy_basic_type(v) for k, v in obj.items()}
else:
return obj
S_DROP = "__DROP__" # this is a symbol which indicates drop the value
def update_config(base_config: dict, ext_config: Union[dict, List[dict]]):
"""
supporting adding base config based on the ext_config
>>> bc = {"a": "xixi"}
>>> ec = {"b": "haha"}
>>> new_bc = update_config(bc, ec)
>>> print(new_bc)
{'a': 'xixi', 'b': 'haha'}
>>> print(bc) # base config should not be changed
{'a': 'xixi'}
>>> print(update_config(bc, {"b": S_DROP}))
{'a': 'xixi'}
>>> print(update_config(new_bc, {"b": S_DROP}))
{'a': 'xixi'}
"""
base_config = deepcopy(base_config) # in case of modifying base config
for ec in ext_config if isinstance(ext_config, (list, tuple)) else [ext_config]:
for key in ec:
if key not in base_config:
# if it is not in the default key, then replace it.
# ADD if not drop
if ec[key] != S_DROP:
base_config[key] = ec[key]
else:
if isinstance(base_config[key], dict) and isinstance(ec[key], dict):
# Recursive
# Both of them are dict, then update it nested
base_config[key] = update_config(base_config[key], ec[key])
elif ec[key] == S_DROP:
# DROP
del base_config[key]
else:
# REPLACE
# one of then are not dict. Then replace
base_config[key] = ec[key]
return base_config

View File

@@ -0,0 +1,5 @@
# Introduction
The middle layers of data, which mainly includes
- Handler
- processors
- Datasets

View File

@@ -0,0 +1,37 @@
import os
import pickle
import shutil
import unittest
from qlib.tests import TestAutoData
from qlib.data import D
from qlib.data.dataset.handler import DataHandlerLP
class HandlerTests(TestAutoData):
def to_str(self, obj):
return "".join(str(obj).split())
def test_handler_df(self):
df = D.features(["sh600519"], start_time="20190101", end_time="20190201", fields=["$close"])
dh = DataHandlerLP.from_df(df)
print(dh.fetch())
self.assertTrue(dh._data.equals(df))
self.assertTrue(dh._infer is dh._data)
self.assertTrue(dh._learn is dh._data)
self.assertTrue(dh.data_loader._data is dh._data)
fname = "_handler_test.pkl"
dh.to_pickle(fname, dump_all=True)
with open(fname, "rb") as f:
dh_d = pickle.load(f)
self.assertTrue(dh_d._data.equals(df))
self.assertTrue(dh_d._infer is dh_d._data)
self.assertTrue(dh_d._learn is dh_d._data)
# Data loader will no longer be useful
self.assertTrue("_data" not in dh_d.data_loader.__dict__.keys())
os.remove(fname)
if __name__ == "__main__":
unittest.main()