diff --git a/qlib/contrib/data/highfreq_handler.py b/qlib/contrib/data/highfreq_handler.py new file mode 100644 index 000000000..f7f8054ff --- /dev/null +++ b/qlib/contrib/data/highfreq_handler.py @@ -0,0 +1,164 @@ +from qlib.data.dataset.handler import DataHandler, DataHandlerLP + +EPSILON = 1e-4 + + +class HighFreqHandler(DataHandlerLP): + def __init__( + self, + instruments="csi300", + start_time=None, + end_time=None, + infer_processors=[], + learn_processors=[], + fit_start_time=None, + fit_end_time=None, + drop_raw=True, + ): + def check_transform_proc(proc_l): + new_l = [] + for p in proc_l: + p["kwargs"].update( + { + "fit_start_time": fit_start_time, + "fit_end_time": fit_end_time, + } + ) + new_l.append(p) + return new_l + + infer_processors = check_transform_proc(infer_processors) + learn_processors = check_transform_proc(learn_processors) + + data_loader = { + "class": "QlibDataLoader", + "kwargs": { + "config": self.get_feature_config(), + "swap_level": False, + "freq": "1min", + }, + } + super().__init__( + instruments=instruments, + start_time=start_time, + end_time=end_time, + data_loader=data_loader, + infer_processors=infer_processors, + learn_processors=learn_processors, + drop_raw=drop_raw, + ) + + def get_feature_config(self): + fields = [] + names = [] + + template_if = "If(IsNull({1}), {0}, {1})" + template_paused = "Select(Gt($hx_paused_num, 1.001), {0})" + + def get_normalized_price_feature(price_field, shift=0): + # norm with the close price of 237th minute of yesterday. + if shift == 0: + template_norm = "{0}/DayLast(Ref({1}, 243))" + else: + template_norm = "Ref({0}, " + str(shift) + ")/DayLast(Ref({1}, 243))" + + template_fillnan = "FFillNan({0})" + # calculate -> ffill -> remove paused + feature_ops = template_paused.format( + template_fillnan.format( + template_norm.format(template_if.format("$close", price_field), template_fillnan.format("$close")) + ) + ) + return feature_ops + + fields += [get_normalized_price_feature("$open", 0)] + fields += [get_normalized_price_feature("$high", 0)] + fields += [get_normalized_price_feature("$low", 0)] + fields += [get_normalized_price_feature("$close", 0)] + fields += [get_normalized_price_feature("$vwap", 0)] + names += ["$open", "$high", "$low", "$close", "$vwap"] + + fields += [get_normalized_price_feature("$open", 240)] + fields += [get_normalized_price_feature("$high", 240)] + fields += [get_normalized_price_feature("$low", 240)] + fields += [get_normalized_price_feature("$close", 240)] + fields += [get_normalized_price_feature("$vwap", 240)] + names += ["$open_1", "$high_1", "$low_1", "$close_1", "$vwap_1"] + + # calculate and fill nan with 0 + template_gzero = "If(Ge({0}, 0), {0}, 0)" + fields += [ + template_gzero.format( + template_paused.format( + "If(IsNull({0}), 0, {0})".format("{0}/Ref(DayLast(Mean({0}, 7200)), 240)".format("$volume")) + ) + ) + ] + names += ["$volume"] + + fields += [ + template_gzero.format( + template_paused.format( + "If(IsNull({0}), 0, {0})".format( + "Ref({0}, 240)/Ref(DayLast(Mean({0}, 7200)), 240)".format("$volume") + ) + ) + ) + ] + names += ["$volume_1"] + + return fields, names + + +class HighFreqBacktestHandler(DataHandler): + def __init__( + self, + instruments="csi300", + start_time=None, + end_time=None, + ): + data_loader = { + "class": "QlibDataLoader", + "kwargs": { + "config": self.get_feature_config(), + "swap_level": False, + "freq": "1min", + }, + } + super().__init__( + instruments=instruments, + start_time=start_time, + end_time=end_time, + data_loader=data_loader, + ) + + def get_feature_config(self): + fields = [] + names = [] + + template_if = "If(IsNull({1}), {0}, {1})" + template_paused = "Select(Gt($hx_paused_num, 1.001), {0})" + # template_paused = "{0}" + template_fillnan = "FFillNan({0})" + fields += [ + template_fillnan.format(template_paused.format("$close")), + ] + names += ["$close0"] + + fields += [ + template_paused.format( + template_if.format( + template_fillnan.format("$close"), + "$vwap", + ) + ) + ] + names += ["$vwap0"] + + fields += [template_paused.format("If(IsNull({0}), 0, {0})".format("$volume"))] + names += ["$volume0"] + + fields += [template_paused.format("If(IsNull({0}), 0, {0})".format("$factor"))] + names += ["$factor0"] + + return fields, names diff --git a/qlib/contrib/data/highfreq_processor.py b/qlib/contrib/data/highfreq_processor.py new file mode 100644 index 000000000..f7041e9f4 --- /dev/null +++ b/qlib/contrib/data/highfreq_processor.py @@ -0,0 +1,81 @@ +import os + +import numpy as np +import pandas as pd +from qlib.data.dataset.processor import Processor +from qlib.data.dataset.utils import fetch_df_by_index +from typing import Dict + + +class HighFreqTrans(Processor): + def __init__(self, dtype: str = "bool"): + self.dtype = dtype + + def fit(self, df_features): + pass + + def __call__(self, df_features): + if self.dtype == "bool": + return df_features.astype(np.int8) + else: + return df_features.astype(np.float32) + + +class HighFreqNorm(Processor): + def __init__( + self, + fit_start_time: pd.Timestamp, + fit_end_time: pd.Timestamp, + feature_save_dir: str, + norm_groups: Dict[str, int], + ): + + self.fit_start_time = fit_start_time + self.fit_end_time = fit_end_time + self.feature_save_dir = feature_save_dir + self.norm_groups = norm_groups + + def fit(self, df_features) -> None: + if os.path.exists(self.feature_save_dir) and len(os.listdir(self.feature_save_dir)) != 0: + return + os.makedirs(self.feature_save_dir) + fetch_df = fetch_df_by_index(df_features, slice(self.fit_start_time, self.fit_end_time), level="datetime") + del df_features + index = 0 + names = {} + for name, dim in self.norm_groups.items(): + names[name] = slice(index, index + dim) + index += dim + for name, name_val in names.items(): + df_values = fetch_df.iloc(axis=1)[name_val].values + if name.endswith("volume"): + df_values = np.log1p(df_values) + self.feature_mean = np.nanmean(df_values) + np.save(self.feature_save_dir + name + "_mean.npy", self.feature_mean) + df_values = df_values - self.feature_mean + self.feature_std = np.nanstd(np.absolute(df_values)) + np.save(self.feature_save_dir + name + "_std.npy", self.feature_std) + df_values = df_values / self.feature_std + np.save(self.feature_save_dir + name + "_vmax.npy", np.nanmax(df_values)) + np.save(self.feature_save_dir + name + "_vmin.npy", np.nanmin(df_values)) + return + + def __call__(self, df_features): + if "date" in df_features: + df_features.droplevel("date", inplace=True) + df_values = df_features.values + index = 0 + names = {} + for name, dim in self.norm_groups.items(): + names[name] = slice(index, index + dim) + index += dim + for name, name_val in names.items(): + feature_mean = np.load(self.feature_save_dir + name + "_mean.npy") + feature_std = np.load(self.feature_save_dir + name + "_std.npy") + + if name.endswith("volume"): + df_values[:, name_val] = np.log1p(df_values[:, name_val]) + df_values[:, name_val] -= feature_mean + df_values[:, name_val] /= feature_std + df_features = pd.DataFrame(data=df_values, index=df_features.index, columns=df_features.columns) + return df_features.fillna(0) diff --git a/qlib/contrib/data/highfreq_provider.py b/qlib/contrib/data/highfreq_provider.py new file mode 100644 index 000000000..7e47da0bf --- /dev/null +++ b/qlib/contrib/data/highfreq_provider.py @@ -0,0 +1,301 @@ +import os +import time +import datetime +from typing import Optional + +import qlib +from qlib.data import D +from qlib.config import REG_CN +from qlib.utils import init_instance_by_config +from qlib.data.dataset.handler import DataHandlerLP +from qlib.data.data import Cal +from qlib.contrib.ops.high_freq import get_calendar_day, DayLast, FFillNan, BFillNan, Date, Select, IsNull, IsInf, Cut +import pickle as pkl +from joblib import Parallel, delayed +from utilsd.logging import print_log + + +class HighFreqProvider: + def __init__( + self, + start_time: str, + end_time: str, + train_end_time: str, + valid_start_time: str, + valid_end_time: str, + test_start_time: str, + qlib_conf: dict, + feature_conf: dict, + label_conf: Optional[dict] = None, + backtest_conf: dict = None, + **kwargs, + ) -> None: + self.start_time = start_time + self.end_time = end_time + self.test_start_time = test_start_time + self.train_end_time = train_end_time + self.valid_start_time = valid_start_time + self.valid_end_time = valid_end_time + self._init_qlib(qlib_conf) + self.feature_conf = feature_conf + self.label_conf = label_conf + self.backtest_conf = backtest_conf + self.qlib_conf = qlib_conf + + def get_pre_datasets(self): + """Generate the training, validation and test datasets for prediction + + Returns: + Tuple[BaseDataset, BaseDataset, BaseDataset]: The training and test datasets + """ + + dict_feature_path = self.feature_conf["path"] + train_feature_path = dict_feature_path[:-4] + "_train.pkl" + valid_feature_path = dict_feature_path[:-4] + "_valid.pkl" + test_feature_path = dict_feature_path[:-4] + "_test.pkl" + + dict_label_path = self.label_conf["path"] + train_label_path = dict_label_path[:-4] + "_train.pkl" + valid_label_path = dict_label_path[:-4] + "_valid.pkl" + test_label_path = dict_label_path[:-4] + "_test.pkl" + + if ( + not os.path.isfile(train_feature_path) + or not os.path.isfile(valid_feature_path) + or not os.path.isfile(test_feature_path) + ): + xtrain, xvalid, xtest = self._gen_data(self.feature_conf) + xtrain.to_pickle(train_feature_path) + xvalid.to_pickle(valid_feature_path) + xtest.to_pickle(test_feature_path) + del xtrain, xvalid, xtest + + if ( + not os.path.isfile(train_label_path) + or not os.path.isfile(valid_label_path) + or not os.path.isfile(test_label_path) + ): + ytrain, yvalid, ytest = self._gen_data(self.label_conf) + ytrain.to_pickle(train_label_path) + yvalid.to_pickle(valid_label_path) + ytest.to_pickle(test_label_path) + del ytrain, yvalid, ytest + + feature = { + "train": train_feature_path, + "valid": valid_feature_path, + "test": test_feature_path, + } + + label = { + "train": train_label_path, + "valid": valid_label_path, + "test": test_label_path, + } + + return feature, label + + def get_backtest(self, **kwargs) -> None: + self._gen_data(self.backtest_conf) + + def _init_qlib(self, qlib_conf): + """initialize qlib""" + + qlib.init( + region=REG_CN, + auto_mount=False, + custom_ops=[DayLast, FFillNan, BFillNan, Date, Select, IsNull, IsInf, Cut], + expression_cache=None, + **qlib_conf, + ) + + def _prepare_calender_cache(self): + """preload the calendar for cache""" + + # This code used the copy-on-write feature of Linux + # to avoid calculating the calendar multiple times in the subprocess. + # This code may accelerate, but may be not useful on Windows and Mac Os + Cal.calendar(freq="1min") + get_calendar_day(freq="1min") + + def _gen_dataframe(self, config, datasets=["train", "valid", "test"]): + try: + path = config.pop("path") + except KeyError as e: + raise ValueError("Must specify the path to save the dataset.") from e + if os.path.isfile(path): + start = time.time() + print_log("Dataset exists, load from disk.", __name__) + + # res = dataset.prepare(['train', 'valid', 'test']) + with open(path, "rb") as f: + data = pkl.load(f) + if isinstance(data, dict): + res = [data[i] for i in datasets] + else: + res = data.prepare(datasets) + print_log(f"Data loaded, time cost: {time.time() - start:.2f}", __name__) + else: + if not os.path.exists(os.path.dirname(path)): + os.makedirs(os.path.dirname(path)) + print_log("Generating dataset", __name__) + start_time = time.time() + self._prepare_calender_cache() + dataset = init_instance_by_config(config) + trainset, validset, testset = dataset.prepare(["train", "valid", "test"]) + data = { + "train": trainset, + "valid": validset, + "test": testset, + } + with open(path, "wb") as f: + pkl.dump(data, f) + with open(path[:-4] + "train.pkl", "wb") as f: + pkl.dump(trainset, f) + with open(path[:-4] + "valid.pkl", "wb") as f: + pkl.dump(validset, f) + with open(path[:-4] + "test.pkl", "wb") as f: + pkl.dump(testset, f) + res = [data[i] for i in datasets] + print_log(f"Data generated, time cost: {(time.time() - start_time):.2f}", __name__) + return res + + def _gen_data(self, config, datasets=["train", "valid", "test"]): + try: + path = config.pop("path") + except KeyError as e: + raise ValueError("Must specify the path to save the dataset.") from e + if os.path.isfile(path): + start = time.time() + print_log("Dataset exists, load from disk.", __name__) + + # res = dataset.prepare(['train', 'valid', 'test']) + with open(path, "rb") as f: + data = pkl.load(f) + if isinstance(data, dict): + res = [data[i] for i in datasets] + else: + res = data.prepare(datasets) + print_log(f"Data loaded, time cost: {time.time() - start:.2f}", __name__) + else: + if not os.path.exists(os.path.dirname(path)): + os.makedirs(os.path.dirname(path)) + print_log("Generating dataset", __name__) + start_time = time.time() + self._prepare_calender_cache() + dataset = init_instance_by_config(config) + dataset.config(dump_all=True, recursive=True) + dataset.to_pickle(path) + res = dataset.prepare(datasets) + print_log(f"Data generated, time cost: {(time.time() - start_time):.2f}", __name__) + return res + + def _gen_dataset(self, config): + try: + path = config.pop("path") + except KeyError as e: + raise ValueError("Must specify the path to save the dataset.") from e + if os.path.isfile(path): + start = time.time() + print_log("Dataset exists, load from disk.", __name__) + + with open(path, "rb") as f: + dataset = pkl.load(f) + print_log(f"Data loaded, time cost: {time.time() - start:.2f}", __name__) + else: + start = time.time() + if not os.path.exists(os.path.dirname(path)): + os.makedirs(os.path.dirname(path)) + print_log("Generating dataset", __name__) + self._prepare_calender_cache() + dataset = init_instance_by_config(config) + print_log(f"Dataset init, time cost: {time.time() - start:.2f}", __name__) + dataset.prepare(["train", "valid", "test"]) + print_log(f"Dataset prepared, time cost: {time.time() - start:.2f}", __name__) + dataset.config(dump_all=True, recursive=True) + dataset.to_pickle(path) + return dataset + + def _gen_day_dataset(self, config, conf_type): + try: + path = config.pop("path") + except KeyError as e: + raise ValueError("Must specify the path to save the dataset.") from e + + if os.path.isfile(path + "tmp_dataset.pkl"): + start = time.time() + print_log("Dataset exists, load from disk.", __name__) + else: + start = time.time() + if not os.path.exists(os.path.dirname(path)): + os.makedirs(os.path.dirname(path)) + print_log("Generating dataset", __name__) + self._prepare_calender_cache() + dataset = init_instance_by_config(config) + print_log(f"Dataset init, time cost: {time.time() - start:.2f}", __name__) + dataset.config(dump_all=False, recursive=True) + dataset.to_pickle(path + "tmp_dataset.pkl") + + with open(path + "tmp_dataset.pkl", "rb") as f: + new_dataset = pkl.load(f) + + time_list = D.calendar(start_time=self.start_time, end_time=self.end_time, freq="1min")[::240] + + def generate_dataset(times): + if os.path.isfile(path + times.strftime("%Y-%m-%d") + ".pkl"): + print("exist " + times.strftime("%Y-%m-%d")) + return + self._init_qlib(self.qlib_conf) + end_times = times + datetime.timedelta(days=1) + new_dataset.handler.config(**{"start_time": times, "end_time": end_times}) + if conf_type == "backtest": + new_dataset.handler.setup_data() + else: + new_dataset.handler.setup_data(init_type=DataHandlerLP.IT_LS) + new_dataset.config(dump_all=True, recursive=True) + new_dataset.to_pickle(path + times.strftime("%Y-%m-%d") + ".pkl") + + Parallel(n_jobs=8)(delayed(generate_dataset)(times) for times in time_list) + + def _gen_stock_dataset(self, config, conf_type): + try: + path = config.pop("path") + except KeyError as e: + raise ValueError("Must specify the path to save the dataset.") from e + + if os.path.isfile(path + "tmp_dataset.pkl"): + start = time.time() + print_log("Dataset exists, load from disk.", __name__) + else: + start = time.time() + if not os.path.exists(os.path.dirname(path)): + os.makedirs(os.path.dirname(path)) + print_log("Generating dataset", __name__) + self._prepare_calender_cache() + dataset = init_instance_by_config(config) + print_log(f"Dataset init, time cost: {time.time() - start:.2f}", __name__) + dataset.config(dump_all=False, recursive=True) + dataset.to_pickle(path + "tmp_dataset.pkl") + + with open(path + "tmp_dataset.pkl", "rb") as f: + new_dataset = pkl.load(f) + + instruments = D.instruments(market="all") + stock_list = D.list_instruments( + instruments=instruments, start_time=self.start_time, end_time=self.end_time, freq="1min", as_list=True + ) + + def generate_dataset(stock): + if os.path.isfile(path + stock + ".pkl"): + print("exist " + stock) + return + self._init_qlib(self.qlib_conf) + new_dataset.handler.config(**{"instruments": [stock]}) + if conf_type == "backtest": + new_dataset.handler.setup_data() + else: + new_dataset.handler.setup_data(init_type=DataHandlerLP.IT_LS) + new_dataset.config(dump_all=True, recursive=True) + new_dataset.to_pickle(path + stock + ".pkl") + + Parallel(n_jobs=32)(delayed(generate_dataset)(stock) for stock in stock_list) diff --git a/qlib/contrib/ops/high_freq.py b/qlib/contrib/ops/high_freq.py index d35dbf92f..ee2825fbf 100644 --- a/qlib/contrib/ops/high_freq.py +++ b/qlib/contrib/ops/high_freq.py @@ -1,11 +1,12 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. import numpy as np +import pandas as pd from datetime import datetime from qlib.data.cache import H from qlib.data.data import Cal -from qlib.data.ops import ElemOperator +from qlib.data.ops import ElemOperator, PairOperator from qlib.utils.time import time_to_day_index @@ -35,6 +36,17 @@ def get_calendar_day(freq="1min", future=False): return _calendar +def get_calendar_minute(freq="day", future=False): + """Load High-Freq Calendar Minute Using Memcache""" + flag = f"{freq}_future_{future}_day" + if flag in H["c"]: + _calendar = H["c"][flag] + else: + _calendar = np.array(list(map(lambda x: x.minute // 30, Cal.load_calendar(freq, future)))) + H["c"][flag] = _calendar + return _calendar + + class DayCumsum(ElemOperator): """DayCumsum Operator during start time and end time. @@ -83,3 +95,181 @@ class DayCumsum(ElemOperator): _calendar = get_calendar_day(freq=freq) series = self.feature.load(instrument, start_index, end_index, freq) return series.groupby(_calendar[series.index]).transform(self.period_cusum) + + +class DayLast(ElemOperator): + """DayLast Operator + + Parameters + ---------- + feature : Expression + feature instance + + Returns + ---------- + feature: + a series of that each value equals the last value of its day + """ + + def _load_internal(self, instrument, start_index, end_index, freq): + _calendar = get_calendar_day(freq=freq) + series = self.feature.load(instrument, start_index, end_index, freq) + return series.groupby(_calendar[series.index]).transform("last") + + +class FFillNan(ElemOperator): + """FFillNan Operator + + Parameters + ---------- + feature : Expression + feature instance + + Returns + ---------- + feature: + a forward fill nan feature + """ + + def _load_internal(self, instrument, start_index, end_index, freq): + series = self.feature.load(instrument, start_index, end_index, freq) + return series.fillna(method="ffill") + + +class BFillNan(ElemOperator): + """BFillNan Operator + + Parameters + ---------- + feature : Expression + feature instance + + Returns + ---------- + feature: + a backfoward fill nan feature + """ + + def _load_internal(self, instrument, start_index, end_index, freq): + series = self.feature.load(instrument, start_index, end_index, freq) + return series.fillna(method="bfill") + + +class Date(ElemOperator): + """Date Operator + + Parameters + ---------- + feature : Expression + feature instance + + Returns + ---------- + feature: + a series of that each value is the date corresponding to feature.index + """ + + def _load_internal(self, instrument, start_index, end_index, freq): + _calendar = get_calendar_day(freq=freq) + series = self.feature.load(instrument, start_index, end_index, freq) + return pd.Series(_calendar[series.index], index=series.index) + + +class Select(PairOperator): + """Select Operator + + Parameters + ---------- + feature_left : Expression + feature instance, select condition + feature_right : Expression + feature instance, select value + + Returns + ---------- + feature: + value(feature_right) that meets the condition(feature_left) + + """ + + def _load_internal(self, instrument, start_index, end_index, freq): + series_condition = self.feature_left.load(instrument, start_index, end_index, freq) + series_feature = self.feature_right.load(instrument, start_index, end_index, freq) + return series_feature.loc[series_condition] + + +class IsNull(ElemOperator): + """IsNull Operator + + Parameters + ---------- + feature : Expression + feature instance + + Returns + ---------- + feature: + A series indicating whether the feature is nan + """ + + def _load_internal(self, instrument, start_index, end_index, freq): + series = self.feature.load(instrument, start_index, end_index, freq) + return series.isnull() + + +class IsInf(ElemOperator): + """IsInf Operator + + Parameters + ---------- + feature : Expression + feature instance + + Returns + ---------- + feature: + A series indicating whether the feature is inf + """ + + def _load_internal(self, instrument, start_index, end_index, freq): + series = self.feature.load(instrument, start_index, end_index, freq) + return np.isinf(series) + + +class Cut(ElemOperator): + """Cut Operator + + Parameters + ---------- + feature : Expression + feature instance + l : int + l > 0, delete the first l elements of feature (default is None, which means 0) + r : int + r < 0, delete the last -r elements of feature (default is None, which means 0) + Returns + ---------- + feature: + A series with the first l and last -r elements deleted from the feature. + Note: It is deleted from the raw data, not the sliced data + """ + + def __init__(self, feature, left=None, right=None): + self.left = left + self.right = right + if (self.left is not None and self.left <= 0) or (self.right is not None and self.right >= 0): + raise ValueError("Cut operator l shoud > 0 and r should < 0") + + super(Cut, self).__init__(feature) + + def _load_internal(self, instrument, start_index, end_index, freq): + series = self.feature.load(instrument, start_index, end_index, freq) + return series.iloc[self.left : self.right] + + def get_extended_window_size(self): + ll = 0 if self.left is None else self.left + rr = 0 if self.right is None else abs(self.right) + lft_etd, rght_etd = self.feature.get_extended_window_size() + lft_etd = lft_etd + ll + rght_etd = rght_etd + rr + return lft_etd, rght_etd