1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-06-06 05:51:17 +08:00

Add high-frequency feature engineering code (#1022)

* highfreq data processing

* lint

* lint

* lint
This commit is contained in:
Yuchen Fang
2022-04-10 10:41:22 +08:00
committed by GitHub
parent 2952c443ca
commit 655ed982cf
4 changed files with 737 additions and 1 deletions

View File

@@ -0,0 +1,164 @@
from qlib.data.dataset.handler import DataHandler, DataHandlerLP
EPSILON = 1e-4
class HighFreqHandler(DataHandlerLP):
def __init__(
self,
instruments="csi300",
start_time=None,
end_time=None,
infer_processors=[],
learn_processors=[],
fit_start_time=None,
fit_end_time=None,
drop_raw=True,
):
def check_transform_proc(proc_l):
new_l = []
for p in proc_l:
p["kwargs"].update(
{
"fit_start_time": fit_start_time,
"fit_end_time": fit_end_time,
}
)
new_l.append(p)
return new_l
infer_processors = check_transform_proc(infer_processors)
learn_processors = check_transform_proc(learn_processors)
data_loader = {
"class": "QlibDataLoader",
"kwargs": {
"config": self.get_feature_config(),
"swap_level": False,
"freq": "1min",
},
}
super().__init__(
instruments=instruments,
start_time=start_time,
end_time=end_time,
data_loader=data_loader,
infer_processors=infer_processors,
learn_processors=learn_processors,
drop_raw=drop_raw,
)
def get_feature_config(self):
fields = []
names = []
template_if = "If(IsNull({1}), {0}, {1})"
template_paused = "Select(Gt($hx_paused_num, 1.001), {0})"
def get_normalized_price_feature(price_field, shift=0):
# norm with the close price of 237th minute of yesterday.
if shift == 0:
template_norm = "{0}/DayLast(Ref({1}, 243))"
else:
template_norm = "Ref({0}, " + str(shift) + ")/DayLast(Ref({1}, 243))"
template_fillnan = "FFillNan({0})"
# calculate -> ffill -> remove paused
feature_ops = template_paused.format(
template_fillnan.format(
template_norm.format(template_if.format("$close", price_field), template_fillnan.format("$close"))
)
)
return feature_ops
fields += [get_normalized_price_feature("$open", 0)]
fields += [get_normalized_price_feature("$high", 0)]
fields += [get_normalized_price_feature("$low", 0)]
fields += [get_normalized_price_feature("$close", 0)]
fields += [get_normalized_price_feature("$vwap", 0)]
names += ["$open", "$high", "$low", "$close", "$vwap"]
fields += [get_normalized_price_feature("$open", 240)]
fields += [get_normalized_price_feature("$high", 240)]
fields += [get_normalized_price_feature("$low", 240)]
fields += [get_normalized_price_feature("$close", 240)]
fields += [get_normalized_price_feature("$vwap", 240)]
names += ["$open_1", "$high_1", "$low_1", "$close_1", "$vwap_1"]
# calculate and fill nan with 0
template_gzero = "If(Ge({0}, 0), {0}, 0)"
fields += [
template_gzero.format(
template_paused.format(
"If(IsNull({0}), 0, {0})".format("{0}/Ref(DayLast(Mean({0}, 7200)), 240)".format("$volume"))
)
)
]
names += ["$volume"]
fields += [
template_gzero.format(
template_paused.format(
"If(IsNull({0}), 0, {0})".format(
"Ref({0}, 240)/Ref(DayLast(Mean({0}, 7200)), 240)".format("$volume")
)
)
)
]
names += ["$volume_1"]
return fields, names
class HighFreqBacktestHandler(DataHandler):
def __init__(
self,
instruments="csi300",
start_time=None,
end_time=None,
):
data_loader = {
"class": "QlibDataLoader",
"kwargs": {
"config": self.get_feature_config(),
"swap_level": False,
"freq": "1min",
},
}
super().__init__(
instruments=instruments,
start_time=start_time,
end_time=end_time,
data_loader=data_loader,
)
def get_feature_config(self):
fields = []
names = []
template_if = "If(IsNull({1}), {0}, {1})"
template_paused = "Select(Gt($hx_paused_num, 1.001), {0})"
# template_paused = "{0}"
template_fillnan = "FFillNan({0})"
fields += [
template_fillnan.format(template_paused.format("$close")),
]
names += ["$close0"]
fields += [
template_paused.format(
template_if.format(
template_fillnan.format("$close"),
"$vwap",
)
)
]
names += ["$vwap0"]
fields += [template_paused.format("If(IsNull({0}), 0, {0})".format("$volume"))]
names += ["$volume0"]
fields += [template_paused.format("If(IsNull({0}), 0, {0})".format("$factor"))]
names += ["$factor0"]
return fields, names

View File

@@ -0,0 +1,81 @@
import os
import numpy as np
import pandas as pd
from qlib.data.dataset.processor import Processor
from qlib.data.dataset.utils import fetch_df_by_index
from typing import Dict
class HighFreqTrans(Processor):
def __init__(self, dtype: str = "bool"):
self.dtype = dtype
def fit(self, df_features):
pass
def __call__(self, df_features):
if self.dtype == "bool":
return df_features.astype(np.int8)
else:
return df_features.astype(np.float32)
class HighFreqNorm(Processor):
def __init__(
self,
fit_start_time: pd.Timestamp,
fit_end_time: pd.Timestamp,
feature_save_dir: str,
norm_groups: Dict[str, int],
):
self.fit_start_time = fit_start_time
self.fit_end_time = fit_end_time
self.feature_save_dir = feature_save_dir
self.norm_groups = norm_groups
def fit(self, df_features) -> None:
if os.path.exists(self.feature_save_dir) and len(os.listdir(self.feature_save_dir)) != 0:
return
os.makedirs(self.feature_save_dir)
fetch_df = fetch_df_by_index(df_features, slice(self.fit_start_time, self.fit_end_time), level="datetime")
del df_features
index = 0
names = {}
for name, dim in self.norm_groups.items():
names[name] = slice(index, index + dim)
index += dim
for name, name_val in names.items():
df_values = fetch_df.iloc(axis=1)[name_val].values
if name.endswith("volume"):
df_values = np.log1p(df_values)
self.feature_mean = np.nanmean(df_values)
np.save(self.feature_save_dir + name + "_mean.npy", self.feature_mean)
df_values = df_values - self.feature_mean
self.feature_std = np.nanstd(np.absolute(df_values))
np.save(self.feature_save_dir + name + "_std.npy", self.feature_std)
df_values = df_values / self.feature_std
np.save(self.feature_save_dir + name + "_vmax.npy", np.nanmax(df_values))
np.save(self.feature_save_dir + name + "_vmin.npy", np.nanmin(df_values))
return
def __call__(self, df_features):
if "date" in df_features:
df_features.droplevel("date", inplace=True)
df_values = df_features.values
index = 0
names = {}
for name, dim in self.norm_groups.items():
names[name] = slice(index, index + dim)
index += dim
for name, name_val in names.items():
feature_mean = np.load(self.feature_save_dir + name + "_mean.npy")
feature_std = np.load(self.feature_save_dir + name + "_std.npy")
if name.endswith("volume"):
df_values[:, name_val] = np.log1p(df_values[:, name_val])
df_values[:, name_val] -= feature_mean
df_values[:, name_val] /= feature_std
df_features = pd.DataFrame(data=df_values, index=df_features.index, columns=df_features.columns)
return df_features.fillna(0)

View File

@@ -0,0 +1,301 @@
import os
import time
import datetime
from typing import Optional
import qlib
from qlib.data import D
from qlib.config import REG_CN
from qlib.utils import init_instance_by_config
from qlib.data.dataset.handler import DataHandlerLP
from qlib.data.data import Cal
from qlib.contrib.ops.high_freq import get_calendar_day, DayLast, FFillNan, BFillNan, Date, Select, IsNull, IsInf, Cut
import pickle as pkl
from joblib import Parallel, delayed
from utilsd.logging import print_log
class HighFreqProvider:
def __init__(
self,
start_time: str,
end_time: str,
train_end_time: str,
valid_start_time: str,
valid_end_time: str,
test_start_time: str,
qlib_conf: dict,
feature_conf: dict,
label_conf: Optional[dict] = None,
backtest_conf: dict = None,
**kwargs,
) -> None:
self.start_time = start_time
self.end_time = end_time
self.test_start_time = test_start_time
self.train_end_time = train_end_time
self.valid_start_time = valid_start_time
self.valid_end_time = valid_end_time
self._init_qlib(qlib_conf)
self.feature_conf = feature_conf
self.label_conf = label_conf
self.backtest_conf = backtest_conf
self.qlib_conf = qlib_conf
def get_pre_datasets(self):
"""Generate the training, validation and test datasets for prediction
Returns:
Tuple[BaseDataset, BaseDataset, BaseDataset]: The training and test datasets
"""
dict_feature_path = self.feature_conf["path"]
train_feature_path = dict_feature_path[:-4] + "_train.pkl"
valid_feature_path = dict_feature_path[:-4] + "_valid.pkl"
test_feature_path = dict_feature_path[:-4] + "_test.pkl"
dict_label_path = self.label_conf["path"]
train_label_path = dict_label_path[:-4] + "_train.pkl"
valid_label_path = dict_label_path[:-4] + "_valid.pkl"
test_label_path = dict_label_path[:-4] + "_test.pkl"
if (
not os.path.isfile(train_feature_path)
or not os.path.isfile(valid_feature_path)
or not os.path.isfile(test_feature_path)
):
xtrain, xvalid, xtest = self._gen_data(self.feature_conf)
xtrain.to_pickle(train_feature_path)
xvalid.to_pickle(valid_feature_path)
xtest.to_pickle(test_feature_path)
del xtrain, xvalid, xtest
if (
not os.path.isfile(train_label_path)
or not os.path.isfile(valid_label_path)
or not os.path.isfile(test_label_path)
):
ytrain, yvalid, ytest = self._gen_data(self.label_conf)
ytrain.to_pickle(train_label_path)
yvalid.to_pickle(valid_label_path)
ytest.to_pickle(test_label_path)
del ytrain, yvalid, ytest
feature = {
"train": train_feature_path,
"valid": valid_feature_path,
"test": test_feature_path,
}
label = {
"train": train_label_path,
"valid": valid_label_path,
"test": test_label_path,
}
return feature, label
def get_backtest(self, **kwargs) -> None:
self._gen_data(self.backtest_conf)
def _init_qlib(self, qlib_conf):
"""initialize qlib"""
qlib.init(
region=REG_CN,
auto_mount=False,
custom_ops=[DayLast, FFillNan, BFillNan, Date, Select, IsNull, IsInf, Cut],
expression_cache=None,
**qlib_conf,
)
def _prepare_calender_cache(self):
"""preload the calendar for cache"""
# This code used the copy-on-write feature of Linux
# to avoid calculating the calendar multiple times in the subprocess.
# This code may accelerate, but may be not useful on Windows and Mac Os
Cal.calendar(freq="1min")
get_calendar_day(freq="1min")
def _gen_dataframe(self, config, datasets=["train", "valid", "test"]):
try:
path = config.pop("path")
except KeyError as e:
raise ValueError("Must specify the path to save the dataset.") from e
if os.path.isfile(path):
start = time.time()
print_log("Dataset exists, load from disk.", __name__)
# res = dataset.prepare(['train', 'valid', 'test'])
with open(path, "rb") as f:
data = pkl.load(f)
if isinstance(data, dict):
res = [data[i] for i in datasets]
else:
res = data.prepare(datasets)
print_log(f"Data loaded, time cost: {time.time() - start:.2f}", __name__)
else:
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
print_log("Generating dataset", __name__)
start_time = time.time()
self._prepare_calender_cache()
dataset = init_instance_by_config(config)
trainset, validset, testset = dataset.prepare(["train", "valid", "test"])
data = {
"train": trainset,
"valid": validset,
"test": testset,
}
with open(path, "wb") as f:
pkl.dump(data, f)
with open(path[:-4] + "train.pkl", "wb") as f:
pkl.dump(trainset, f)
with open(path[:-4] + "valid.pkl", "wb") as f:
pkl.dump(validset, f)
with open(path[:-4] + "test.pkl", "wb") as f:
pkl.dump(testset, f)
res = [data[i] for i in datasets]
print_log(f"Data generated, time cost: {(time.time() - start_time):.2f}", __name__)
return res
def _gen_data(self, config, datasets=["train", "valid", "test"]):
try:
path = config.pop("path")
except KeyError as e:
raise ValueError("Must specify the path to save the dataset.") from e
if os.path.isfile(path):
start = time.time()
print_log("Dataset exists, load from disk.", __name__)
# res = dataset.prepare(['train', 'valid', 'test'])
with open(path, "rb") as f:
data = pkl.load(f)
if isinstance(data, dict):
res = [data[i] for i in datasets]
else:
res = data.prepare(datasets)
print_log(f"Data loaded, time cost: {time.time() - start:.2f}", __name__)
else:
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
print_log("Generating dataset", __name__)
start_time = time.time()
self._prepare_calender_cache()
dataset = init_instance_by_config(config)
dataset.config(dump_all=True, recursive=True)
dataset.to_pickle(path)
res = dataset.prepare(datasets)
print_log(f"Data generated, time cost: {(time.time() - start_time):.2f}", __name__)
return res
def _gen_dataset(self, config):
try:
path = config.pop("path")
except KeyError as e:
raise ValueError("Must specify the path to save the dataset.") from e
if os.path.isfile(path):
start = time.time()
print_log("Dataset exists, load from disk.", __name__)
with open(path, "rb") as f:
dataset = pkl.load(f)
print_log(f"Data loaded, time cost: {time.time() - start:.2f}", __name__)
else:
start = time.time()
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
print_log("Generating dataset", __name__)
self._prepare_calender_cache()
dataset = init_instance_by_config(config)
print_log(f"Dataset init, time cost: {time.time() - start:.2f}", __name__)
dataset.prepare(["train", "valid", "test"])
print_log(f"Dataset prepared, time cost: {time.time() - start:.2f}", __name__)
dataset.config(dump_all=True, recursive=True)
dataset.to_pickle(path)
return dataset
def _gen_day_dataset(self, config, conf_type):
try:
path = config.pop("path")
except KeyError as e:
raise ValueError("Must specify the path to save the dataset.") from e
if os.path.isfile(path + "tmp_dataset.pkl"):
start = time.time()
print_log("Dataset exists, load from disk.", __name__)
else:
start = time.time()
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
print_log("Generating dataset", __name__)
self._prepare_calender_cache()
dataset = init_instance_by_config(config)
print_log(f"Dataset init, time cost: {time.time() - start:.2f}", __name__)
dataset.config(dump_all=False, recursive=True)
dataset.to_pickle(path + "tmp_dataset.pkl")
with open(path + "tmp_dataset.pkl", "rb") as f:
new_dataset = pkl.load(f)
time_list = D.calendar(start_time=self.start_time, end_time=self.end_time, freq="1min")[::240]
def generate_dataset(times):
if os.path.isfile(path + times.strftime("%Y-%m-%d") + ".pkl"):
print("exist " + times.strftime("%Y-%m-%d"))
return
self._init_qlib(self.qlib_conf)
end_times = times + datetime.timedelta(days=1)
new_dataset.handler.config(**{"start_time": times, "end_time": end_times})
if conf_type == "backtest":
new_dataset.handler.setup_data()
else:
new_dataset.handler.setup_data(init_type=DataHandlerLP.IT_LS)
new_dataset.config(dump_all=True, recursive=True)
new_dataset.to_pickle(path + times.strftime("%Y-%m-%d") + ".pkl")
Parallel(n_jobs=8)(delayed(generate_dataset)(times) for times in time_list)
def _gen_stock_dataset(self, config, conf_type):
try:
path = config.pop("path")
except KeyError as e:
raise ValueError("Must specify the path to save the dataset.") from e
if os.path.isfile(path + "tmp_dataset.pkl"):
start = time.time()
print_log("Dataset exists, load from disk.", __name__)
else:
start = time.time()
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
print_log("Generating dataset", __name__)
self._prepare_calender_cache()
dataset = init_instance_by_config(config)
print_log(f"Dataset init, time cost: {time.time() - start:.2f}", __name__)
dataset.config(dump_all=False, recursive=True)
dataset.to_pickle(path + "tmp_dataset.pkl")
with open(path + "tmp_dataset.pkl", "rb") as f:
new_dataset = pkl.load(f)
instruments = D.instruments(market="all")
stock_list = D.list_instruments(
instruments=instruments, start_time=self.start_time, end_time=self.end_time, freq="1min", as_list=True
)
def generate_dataset(stock):
if os.path.isfile(path + stock + ".pkl"):
print("exist " + stock)
return
self._init_qlib(self.qlib_conf)
new_dataset.handler.config(**{"instruments": [stock]})
if conf_type == "backtest":
new_dataset.handler.setup_data()
else:
new_dataset.handler.setup_data(init_type=DataHandlerLP.IT_LS)
new_dataset.config(dump_all=True, recursive=True)
new_dataset.to_pickle(path + stock + ".pkl")
Parallel(n_jobs=32)(delayed(generate_dataset)(stock) for stock in stock_list)

View File

@@ -1,11 +1,12 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import numpy as np
import pandas as pd
from datetime import datetime
from qlib.data.cache import H
from qlib.data.data import Cal
from qlib.data.ops import ElemOperator
from qlib.data.ops import ElemOperator, PairOperator
from qlib.utils.time import time_to_day_index
@@ -35,6 +36,17 @@ def get_calendar_day(freq="1min", future=False):
return _calendar
def get_calendar_minute(freq="day", future=False):
"""Load High-Freq Calendar Minute Using Memcache"""
flag = f"{freq}_future_{future}_day"
if flag in H["c"]:
_calendar = H["c"][flag]
else:
_calendar = np.array(list(map(lambda x: x.minute // 30, Cal.load_calendar(freq, future))))
H["c"][flag] = _calendar
return _calendar
class DayCumsum(ElemOperator):
"""DayCumsum Operator during start time and end time.
@@ -83,3 +95,181 @@ class DayCumsum(ElemOperator):
_calendar = get_calendar_day(freq=freq)
series = self.feature.load(instrument, start_index, end_index, freq)
return series.groupby(_calendar[series.index]).transform(self.period_cusum)
class DayLast(ElemOperator):
"""DayLast Operator
Parameters
----------
feature : Expression
feature instance
Returns
----------
feature:
a series of that each value equals the last value of its day
"""
def _load_internal(self, instrument, start_index, end_index, freq):
_calendar = get_calendar_day(freq=freq)
series = self.feature.load(instrument, start_index, end_index, freq)
return series.groupby(_calendar[series.index]).transform("last")
class FFillNan(ElemOperator):
"""FFillNan Operator
Parameters
----------
feature : Expression
feature instance
Returns
----------
feature:
a forward fill nan feature
"""
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
return series.fillna(method="ffill")
class BFillNan(ElemOperator):
"""BFillNan Operator
Parameters
----------
feature : Expression
feature instance
Returns
----------
feature:
a backfoward fill nan feature
"""
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
return series.fillna(method="bfill")
class Date(ElemOperator):
"""Date Operator
Parameters
----------
feature : Expression
feature instance
Returns
----------
feature:
a series of that each value is the date corresponding to feature.index
"""
def _load_internal(self, instrument, start_index, end_index, freq):
_calendar = get_calendar_day(freq=freq)
series = self.feature.load(instrument, start_index, end_index, freq)
return pd.Series(_calendar[series.index], index=series.index)
class Select(PairOperator):
"""Select Operator
Parameters
----------
feature_left : Expression
feature instance, select condition
feature_right : Expression
feature instance, select value
Returns
----------
feature:
value(feature_right) that meets the condition(feature_left)
"""
def _load_internal(self, instrument, start_index, end_index, freq):
series_condition = self.feature_left.load(instrument, start_index, end_index, freq)
series_feature = self.feature_right.load(instrument, start_index, end_index, freq)
return series_feature.loc[series_condition]
class IsNull(ElemOperator):
"""IsNull Operator
Parameters
----------
feature : Expression
feature instance
Returns
----------
feature:
A series indicating whether the feature is nan
"""
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
return series.isnull()
class IsInf(ElemOperator):
"""IsInf Operator
Parameters
----------
feature : Expression
feature instance
Returns
----------
feature:
A series indicating whether the feature is inf
"""
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
return np.isinf(series)
class Cut(ElemOperator):
"""Cut Operator
Parameters
----------
feature : Expression
feature instance
l : int
l > 0, delete the first l elements of feature (default is None, which means 0)
r : int
r < 0, delete the last -r elements of feature (default is None, which means 0)
Returns
----------
feature:
A series with the first l and last -r elements deleted from the feature.
Note: It is deleted from the raw data, not the sliced data
"""
def __init__(self, feature, left=None, right=None):
self.left = left
self.right = right
if (self.left is not None and self.left <= 0) or (self.right is not None and self.right >= 0):
raise ValueError("Cut operator l shoud > 0 and r should < 0")
super(Cut, self).__init__(feature)
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
return series.iloc[self.left : self.right]
def get_extended_window_size(self):
ll = 0 if self.left is None else self.left
rr = 0 if self.right is None else abs(self.right)
lft_etd, rght_etd = self.feature.get_extended_window_size()
lft_etd = lft_etd + ll
rght_etd = rght_etd + rr
return lft_etd, rght_etd