From ffedb6382f58950bd0b3e820e4f165ee6fb6ee4c Mon Sep 17 00:00:00 2001 From: bxdd Date: Mon, 25 Jan 2021 17:58:45 +0000 Subject: [PATCH 01/17] add highfreq example --- examples/high_freq/__init__.py | 0 examples/high_freq/highfreq_handler.py | 220 +++++++++++++++++++++++ examples/high_freq/highfreq_ops.py | 62 +++++++ examples/high_freq/highfreq_processor.py | 70 ++++++++ examples/high_freq/workflow.py | 137 ++++++++++++++ qlib/contrib/data/handler.py | 16 +- qlib/data/data.py | 15 +- qlib/data/dataset/__init__.py | 46 ++++- qlib/data/dataset/handler.py | 23 ++- qlib/data/dataset/loader.py | 25 +-- 10 files changed, 585 insertions(+), 29 deletions(-) create mode 100644 examples/high_freq/__init__.py create mode 100644 examples/high_freq/highfreq_handler.py create mode 100644 examples/high_freq/highfreq_ops.py create mode 100644 examples/high_freq/highfreq_processor.py create mode 100644 examples/high_freq/workflow.py diff --git a/examples/high_freq/__init__.py b/examples/high_freq/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/examples/high_freq/highfreq_handler.py b/examples/high_freq/highfreq_handler.py new file mode 100644 index 000000000..32557f768 --- /dev/null +++ b/examples/high_freq/highfreq_handler.py @@ -0,0 +1,220 @@ +from qlib.data.dataset.handler import DataHandler, DataHandlerLP +from qlib.data.dataset.processor import Processor +from qlib.utils import get_cls_kwargs +from qlib.log import TimeInspector + + +class HighFreqHandler(DataHandlerLP): + def __init__( + self, + instruments="csi500", + start_time=None, + end_time=None, + freq="1min", + infer_processors=[], + learn_processors=[], + fit_start_time=None, + fit_end_time=None, + drop_raw=True, + ): + def check_transform_proc(proc_l): + new_l = [] + for p in proc_l: + p["kwargs"].update( + { + "fit_start_time": fit_start_time, + "fit_end_time": fit_end_time, + } + ) + new_l.append(p) + return new_l + + infer_processors = [] + learn_processors = [] + + data_loader = { + "class": "QlibDataLoader", + "kwargs": { + "config": self.get_feature_config(), + "swap_level": False, + }, + } + super().__init__( + instruments=instruments, + start_time=start_time, + end_time=end_time, + freq=freq, + data_loader=data_loader, + infer_processors=infer_processors, + learn_processors=learn_processors, + drop_raw=drop_raw, + ) + + + def get_feature_config(self): + fields = [] + names = [] + + template_if = "If(IsNull({1}), {0}, {1})" + template_paused = "Select(Eq($paused, 0.0), {0})" + template_fillnan = "FFillNan({0})" + fields += [ + "{0}/Ref(DayLast({1}), 240)".format( + template_if.format( + template_fillnan.format(template_paused.format("$close")), + template_paused.format("$open"), + ), + template_fillnan.format(template_paused.format("$close")), + ) + ] + fields += [ + "{0}/Ref(DayLast({1}), 240)".format( + template_if.format( + template_fillnan.format(template_paused.format("$close")), + template_paused.format("$high"), + ), + template_fillnan.format(template_paused.format("$close")), + ) + ] + fields += [ + "{0}/Ref(DayLast({1}), 240)".format( + template_if.format( + template_fillnan.format(template_paused.format("$close")), + template_paused.format("$low"), + ), + template_fillnan.format(template_paused.format("$close")), + ) + ] + fields += ["{0}/Ref(DayLast({0}), 240)".format(template_fillnan.format(template_paused.format("$close")))] + fields += [ + "{0}/Ref(DayLast({1}), 240)".format( + "If(IsNull({1}), {0}, If(Or(Or(Or(Eq({1}, np.inf), Eq({1}, -np.inf)), Eq({1}, 0)), Or(Gt({1}, Mul(1.001, {3})), Lt({1}, Mul(0.999, {2})))), {0}, {1}))".format( + template_fillnan.format(template_paused.format("$close")), + template_paused.format("$vwap"), + template_paused.format("$low"), + template_paused.format("$high"), + ), + template_fillnan.format(template_paused.format("$close")), + ) + ] + names += ["$open", "$high", "$low", "$close", "$vwap"] + + fields += [ + "Ref({0}, 240)/Ref(DayLast({1}), 240)".format( + template_if.format( + template_fillnan.format(template_paused.format("$close")), + template_paused.format("$open"), + ), + template_fillnan.format(template_paused.format("$close")), + ) + ] + fields += [ + "Ref({0}, 240)/Ref(DayLast({1}), 240)".format( + template_if.format( + template_fillnan.format(template_paused.format("$close")), + template_paused.format("$high"), + ), + template_fillnan.format(template_paused.format("$close")), + ) + ] + fields += [ + "Ref({0}, 240)/Ref(DayLast({1}), 240)".format( + template_if.format( + template_fillnan.format(template_paused.format("$close")), + template_paused.format("$low"), + ), + template_fillnan.format(template_paused.format("$close")), + ) + ] + fields += [ + "Ref({0}, 240)/Ref(DayLast({0}), 240)".format(template_fillnan.format(template_paused.format("$close"))) + ] + fields += [ + "Ref({0}, 240)/Ref(DayLast({1}), 240)".format( + "If(IsNull({1}), {0}, If(Or(Or(Or(Eq({1}, np.inf), Eq({1}, -np.inf)), Eq({1}, 0)), Or(Gt({1}, Mul(1.001, {3})), Lt({1}, Mul(0.999, {2})))), {0}, {1}))".format( + template_fillnan.format(template_paused.format("$close")), + template_paused.format("$vwap"), + template_paused.format("$low"), + template_paused.format("$high"), + ), + template_fillnan.format(template_paused.format("$close")), + ) + ] + names += ["$open_1", "$high_1", "$low_1", "$close_1", "$vwap_1"] + + fields += [ + "{0}/Ref(DayLast(Mean({0}, 7200)), 240)".format( + "If(IsNull({1}), 0, If(Or(Gt({2}, Mul(1.001, {4})), Lt({2}, Mul(0.999, {3}))), 0, {1}))".format( + template_fillnan.format(template_paused.format("$close")), + template_paused.format("$volume"), + template_paused.format("$vwap"), + template_paused.format("$low"), + template_paused.format("$high"), + ) + ) + ] + names += ["$volume"] + fields += [ + "Ref({0}, 240)/Ref(DayLast(Mean({0}, 7200)), 240)".format( + "If(IsNull({1}), 0, If(Or(Gt({2}, Mul(1.001, {4})), Lt({2}, Mul(0.999, {3}))), 0, {1}))".format( + template_fillnan.format(template_paused.format("$close")), + template_paused.format("$volume"), + template_paused.format("$vwap"), + template_paused.format("$low"), + template_paused.format("$high"), + ) + ) + ] + names += ["$volume_1"] + + fields += [template_paused.format("Date($close)")] + names += ["date"] + return fields, names + + +class HighFreqBacktestHandler(DataHandler): + def __init__( + self, + instruments="csi300", + start_time=None, + end_time=None, + freq="1min", + ): + infer_processors = check_transform_proc(infer_processors) + learn_processors = check_transform_proc(learn_processors) + data_loader = { + "class": "QlibDataLoader", + "kwargs": { + "config": self.get_feature_config(), + "swap_level": False, + }, + } + super().__init__( + instruments=instruments, + start_time=start_time, + end_time=end_time, + freq=freq, + data_loader=data_loader, + ) + + def get_feature_config(self): + fields = [] + names = [] + + template_if = "If(Eq({1}, np.nan), {0}, {1})" + template_paused = "Select(Eq($paused, 0.0), {0})" + template_fillnan = "FFillNan({0})" + + fields += [template_fillnan.format(template_paused.format("$close")),] + names += ["$close0"] + fields += [ + "If(Eq({1}, np.nan), 0, If(Or(Gt({2}, Mul(1.001, {4})), Lt({2}, Mul(0.999, {3}))), 0, {1}))".format( + template_fillnan.format(template_paused.format("$close")), + template_paused.format("$volume"), + template_paused.format("$vwap"), + template_paused.format("$low"), + template_paused.format("$high"), + ) + ] + names += ["$volume0"] + return fields, names diff --git a/examples/high_freq/highfreq_ops.py b/examples/high_freq/highfreq_ops.py new file mode 100644 index 000000000..f6470d68e --- /dev/null +++ b/examples/high_freq/highfreq_ops.py @@ -0,0 +1,62 @@ +import numpy as np +import pandas as pd +import importlib +from qlib.data.ops import ElemOperator, PairOperator +from qlib.config import C +from qlib.data.data import Cal + + +class DayFirst(ElemOperator): + def __init__(self, feature): + super(DayFirst, self).__init__(feature, "day_first") + + def _load_internal(self, instrument, start_index, end_index, freq): + _calendar = Cal.get_calender_day(freq=freq)[0] + series = self.feature.load(instrument, start_index, end_index, freq) + return series.groupby(_calendar[series.index]).transform("first") + + +class DayLast(ElemOperator): + def __init__(self, feature): + super(DayLast, self).__init__(feature, "day_last") + + def _load_internal(self, instrument, start_index, end_index, freq): + _calendar = Cal.get_calender_day(freq=freq)[0] + series = self.feature.load(instrument, start_index, end_index, freq) + return series.groupby(_calendar[series.index]).transform("last") + + +class FFillNan(ElemOperator): + def __init__(self, feature): + super(FFillNan, self).__init__(feature, "fill_nan") + + def _load_internal(self, instrument, start_index, end_index, freq): + series = self.feature.load(instrument, start_index, end_index, freq) + return series.fillna(method="ffill") + + +class Date(ElemOperator): + def __init__(self, feature): + super(Date, self).__init__(feature, "date") + + def _load_internal(self, instrument, start_index, end_index, freq): + _calendar = Cal.get_calender_day(freq=freq)[0] + series = self.feature.load(instrument, start_index, end_index, freq) + return pd.Series(_calendar[series.index], index=series.index) + +class Select(PairOperator): + def __init__(self, condition, feature): + super(Select, self).__init__(condition, feature, "select") + + def _load_internal(self, instrument, start_index, end_index, freq): + series_condition = self.feature_left.load(instrument, start_index, end_index, freq) + series_feature = self.feature_right.load(instrument, start_index, end_index, freq) + return series_feature.loc[series_condition] + +class IsNull(ElemOperator): + def __init__(self, feature): + super(IsNull, self).__init__(feature, "isnull") + + def _load_internal(self, instrument, start_index, end_index, freq): + series = self.feature.load(instrument, start_index, end_index, freq) + return series.isnull() \ No newline at end of file diff --git a/examples/high_freq/highfreq_processor.py b/examples/high_freq/highfreq_processor.py new file mode 100644 index 000000000..fc86b1a70 --- /dev/null +++ b/examples/high_freq/highfreq_processor.py @@ -0,0 +1,70 @@ +import numpy as np +import pandas as pd +from qlib.data.dataset.processor import Processor +from qlib.log import TimeInspector +from qlib.data.dataset.utils import fetch_df_by_index + + +class HighFreqNorm(Processor): + def __init__(self, fit_start_time, fit_end_time): + self.fit_start_time = fit_start_time + self.fit_end_time = fit_end_time + + def fit(self, df_features): + fetch_df = fetch_df_by_index(df, slice(self.fit_start_time, self.fit_end_time), level="datetime") + del df + df_values = fetch_df.values + names = { + "price": slice(0, 10), + "volume": slice(10, 12), + } + self.feature_med = {} + self.feature_std = {} + self.feature_vmax = {} + self.feature_vmin = {} + for name, name_val in names.items(): + part_values = df_values[:, name_val] + if name == "volume": + df_features.loc(axis=1)[name_val] = np.log1p(part_values) + self.feature_med[name] = np.nanmedian(part_values) + part_values = part_values - self.feature_med # mean, copy + self.feature_std[name] = np.nanmedian(np.absolute(part_values)) * 1.4826 + 1e-12 + part_values = part_values / self.feature_std + self.feature_vmax[name] = np.nanmax(part_values) + self.feature_vmin[name] = np.nanmin(part_values) + + def __call__(self, df_features): + df_features.set_index("date", append=True, drop=True, inplace=True) + df_values = df_features.values + names = { + "price": slice(0, 10), + "volume": slice(10, 12), + } + + for name, name_val in names.items(): + part_values = df_values[:, name_val] + if name == "volume": + part_values[:] = np.log1p(part_values) + part_values -= self.feature_med[name] + part_values /= self.feature_std[name] + slice0 = part_values > 3.0 + slice1 = part_values > 3.5 + slice2 = part_values < -3.0 + slice3 = part_values < -3.5 + + part_values[slice0] = 3.0 + (part_values[slice0] - 3.0) / (self.feature_vmax[name] - 3) * 0.5 + part_values[slice1] = 3.5 + part_values[slice2] = -3.0 - (part_values[slice2] + 3.0) / (self.feature_vmin[name] + 3) * 0.5 + part_values[slice3] = -3.5 + # print("start_call_feature_reshape") + idx = df_features.index.droplevel("datetime").drop_duplicates() + feat = df_values[:, [0, 1, 2, 3, 4, 10]].reshape(-1, 6 * 240) + feat_1 = df_values[:, [5, 6, 7, 8, 9, 11]].reshape(-1, 6 * 240) + + df_new_features = pd.DataFrame( + data=np.concatenate((feat, feat_1), axis=1), + index=idx, + columns=["FEATURE_%d" % i for i in range(12 * 240)], + ).sort_index() + + return df_new_features \ No newline at end of file diff --git a/examples/high_freq/workflow.py b/examples/high_freq/workflow.py new file mode 100644 index 000000000..83a344b0f --- /dev/null +++ b/examples/high_freq/workflow.py @@ -0,0 +1,137 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import sys +from pathlib import Path + +import qlib +import pickle +import numpy as np +import pandas as pd +from qlib.config import REG_CN +from qlib.contrib.model.gbdt import LGBModel +from qlib.contrib.data.handler import Alpha158 +from qlib.contrib.strategy.strategy import TopkDropoutStrategy +from qlib.contrib.evaluate import ( + backtest as normal_backtest, + risk_analysis, +) + +from qlib.utils import init_instance_by_config +from qlib.data.dataset.handler import DataHandlerLP +from qlib.data.ops import Operators +from qlib.data.data import Cal + +from highfreq_ops import DayFirst, DayLast, FFillNan, Date, Select, IsNull + +def save_dataset(dataset, path: [Path, str]): + """ + save dataset to path + + Parameters + ---------- + path : [Path, str] + path to save + """ + dataset.to_pickle(path=path) + +def load_dataset(path: [Path, str], init_type=DataHandlerLP.IT_LS): + """ + load dataset from path + + Parameters + ---------- + path : [Path, str] + path to load + + init_type : str + - if `init_type` == DataHandlerLP.IT_FIT_SEQ: + + the input of `DataHandlerLP.fit` will be the output of the previous processor + + - if `init_type` == DataHandlerLP.IT_FIT_IND: + + the input of `DataHandlerLP.fit` will be the original df + + - if `init_type` == DataHandlerLP.IT_LS: + + The state of the object has been load by pickle + """ + fd = open(path, 'rb') + dataset = pickle.load(fd) + dataset.init(init_type=init_type) + fd.close() + return dataset + +if __name__ == "__main__": + + # use default data + provider_uri = "/mnt/v-xiabi/data/qlib/high_freq" # target_dir + qlib.init(provider_uri=provider_uri, custom_ops=[DayFirst, DayLast, FFillNan, Date, Select, IsNull], redis_port=233, region=REG_CN, auto_mount=False) + + MARKET = "csi300" + BENCHMARK = "SH000300" + + ################################### + # train model + ################################### + DATA_HANDLER_CONFIG0 = { + "start_time": "2017-01-01 00:00:00", + "end_time": "2020-11-30 15:00:00", + "freq": "1min", + "fit_start_time": "2017-01-01 00:00:00", + "fit_end_time": "2020-08-31 15:00:00", + "instruments": "all", + "infer_processors": [{"class": "HighFreqNorm", "module_path": "highfreq_processor", "kwargs": {}}], + } + DATA_HANDLER_CONFIG1 = { + "start_time": "2017-01-01 00:00:00", + "end_time": "2020-11-30 15:00:00", + "freq": "1min", + "instruments": "all", + } + + task = { + "dataset": { + "class": "DatasetH", + "module_path": "qlib.data.dataset", + "kwargs": { + "handler": { + "class": "HighFreqHandler", + "module_path": "highfreq_handler", + "kwargs": DATA_HANDLER_CONFIG0, + }, + "segments": { + "train": ("2017-01-01 00:00:00", "2020-08-31 15:00:00"), + "test": ( + "2020-09-01 00:00:00", + "2020-11-30 15:00:00", + ), + }, + }, + }, + # You shoud record the data in specific sequence + # "record": ['SignalRecord', 'SigAnaRecord', 'PortAnaRecord'], + "dataset_backtest": { + "class": "DatasetH", + "module_path": "qlib.data.dataset", + "kwargs": { + "handler": { + "class": "HighFreqBacktestHandler", + "module_path": "highfreq_hander", + "kwargs": DATA_HANDLER_CONFIG1, + }, + "segments": { + "train": ("2017-01-01 00:00:00", "2020-08-31 15:00:00"), + "test": ( + "2020-09-01 00:00:00", + "2020-11-30 15:00:00", + ), + }, + }, + }, + } + Cal.get_calender_day(freq="1min") # TO FIX: load the calendar day for cache + dataset = init_instance_by_config(task["dataset"]) + dataset_backtest = init_instance_by_config(task["dataset_backtest"]) + diff --git a/qlib/contrib/data/handler.py b/qlib/contrib/data/handler.py index 88a1f0680..23e37a5e4 100644 --- a/qlib/contrib/data/handler.py +++ b/qlib/contrib/data/handler.py @@ -49,6 +49,7 @@ class Alpha360(DataHandlerLP): instruments="csi500", start_time=None, end_time=None, + freq="day", infer_processors=_DEFAULT_INFER_PROCESSORS, learn_processors=_DEFAULT_LEARN_PROCESSORS, fit_start_time=None, @@ -69,9 +70,10 @@ class Alpha360(DataHandlerLP): } super().__init__( - instruments, - start_time, - end_time, + instruments=instruments, + start_time=start_time, + end_time=end_time, + freq="day", data_loader=data_loader, learn_processors=learn_processors, infer_processors=infer_processors, @@ -130,6 +132,7 @@ class Alpha158(DataHandlerLP): instruments="csi500", start_time=None, end_time=None, + freq="day", infer_processors=[], learn_processors=_DEFAULT_LEARN_PROCESSORS, fit_start_time=None, @@ -147,9 +150,10 @@ class Alpha158(DataHandlerLP): }, } super().__init__( - instruments, - start_time, - end_time, + instruments=instruments, + start_time=start_time, + end_time=end_time, + freq=freq, data_loader=data_loader, infer_processors=infer_processors, learn_processors=learn_processors, diff --git a/qlib/data/data.py b/qlib/data/data.py index d95728199..3021ebe82 100644 --- a/qlib/data/data.py +++ b/qlib/data/data.py @@ -123,6 +123,16 @@ class CalendarProvider(abc.ABC): H["c"][flag] = _calendar, _calendar_index return _calendar, _calendar_index + def get_calender_day(self, freq="day", future=False): + flag = f"{freq}_future_{future}_day" + if flag in H["c"]: + _calendar, _calendar_index = H["c"][flag] + else: + _calendar = np.array(list(map(lambda x: x.date(), self._load_calendar(freq, future)))) + _calendar_index = {x: i for i, x in enumerate(_calendar)} # for fast search + H["c"][flag] = _calendar, _calendar_index + return _calendar, _calendar_index + def _uri(self, start_time, end_time, freq, future=False): """Get the uri of calendar generation task.""" return hash_args(start_time, end_time, freq, future) @@ -686,7 +696,10 @@ class LocalExpressionProvider(ExpressionProvider): # 1) The stock data is currently float. If there is other types of data, this part needs to be re-implemented. # 2) The the precision should be configurable try: - series = series.astype(np.float32) + if series.dtype == np.float64: + series = series.astype(np.float32) + elif series.dtype == np.bool: + series = series.astype(np.int8) except ValueError: pass if not series.empty: diff --git a/qlib/data/dataset/__init__.py b/qlib/data/dataset/__init__.py index 96e4a6e41..df7af3f5e 100644 --- a/qlib/data/dataset/__init__.py +++ b/qlib/data/dataset/__init__.py @@ -87,6 +87,36 @@ class DatasetH(Dataset): """ super().__init__(handler, segments) + + def init(self, init_type: str = DataHandlerLP.IT_FIT_SEQ, enable_cache: bool = False): + """ + Initialize the data of Qlib + + Parameters + ---------- + init_type : str + - if `init_type` == DataHandlerLP.IT_FIT_SEQ: + + the input of `DataHandlerLP.fit` will be the output of the previous processor + + - if `init_type` == DataHandlerLP.IT_FIT_IND: + + the input of `DataHandlerLP.fit` will be the original df + + - if `init_type` == DataHandlerLP.IT_LS: + + The state of the object has been load by pickle + + enable_cache : bool + default value is false: + + - if `enable_cache` == True: + + the processed data will be saved on disk, and handler will load the cached data from the disk directly + when we call `init` next time + """ + self.handler.init(init_type=init_type, enable_cache=enable_cache) + def setup_data(self, handler: Union[dict, DataHandler], segments: list): """ Setup the underlying data. @@ -116,8 +146,8 @@ class DatasetH(Dataset): 'outsample': ("2017-01-01", "2020-08-01",), } """ - self._handler = init_instance_by_config(handler, accept_types=DataHandler) - self._segments = segments.copy() + self.handler = init_instance_by_config(handler, accept_types=DataHandler) + self.segments = segments.copy() def _prepare_seg(self, slc: slice, **kwargs): """ @@ -127,7 +157,7 @@ class DatasetH(Dataset): ---------- slc : slice """ - return self._handler.fetch(slc, **kwargs) + return self.handler.fetch(slc, **kwargs) def prepare( self, @@ -150,7 +180,7 @@ class DatasetH(Dataset): - ['train', 'valid'] col_set : str - The col_set will be passed to self._handler when fetching data. + The col_set will be passed to self.handler when fetching data. data_key : str The data to fetch: DK_* Default is DK_I, which indicate fetching data for **inference**. @@ -166,16 +196,16 @@ class DatasetH(Dataset): logger = get_module_logger("DatasetH") fetch_kwargs = {"col_set": col_set} fetch_kwargs.update(kwargs) - if "data_key" in getfullargspec(self._handler.fetch).args: + if "data_key" in getfullargspec(self.handler.fetch).args: fetch_kwargs["data_key"] = data_key else: logger.info(f"data_key[{data_key}] is ignored.") # Handle all kinds of segments format if isinstance(segments, (list, tuple)): - return [self._prepare_seg(slice(*self._segments[seg]), **fetch_kwargs) for seg in segments] + return [self._prepare_seg(slice(*self.segments[seg]), **fetch_kwargs) for seg in segments] elif isinstance(segments, str): - return self._prepare_seg(slice(*self._segments[segments]), **fetch_kwargs) + return self._prepare_seg(slice(*self.segments[segments]), **fetch_kwargs) elif isinstance(segments, slice): return self._prepare_seg(segments, **fetch_kwargs) else: @@ -409,7 +439,7 @@ class TSDatasetH(DatasetH): def setup_data(self, *args, **kwargs): super().setup_data(*args, **kwargs) - cal = self._handler.fetch(col_set=self._handler.CS_RAW).index.get_level_values("datetime").unique() + cal = self.handler.fetch(col_set=self.handler.CS_RAW).index.get_level_values("datetime").unique() cal = sorted(cal) # Get the datatime index for building timestamp self.cal = cal diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 18f838300..9dfc4746a 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -57,6 +57,7 @@ class DataHandler(Serializable): instruments=None, start_time=None, end_time=None, + freq="day", data_loader: Tuple[dict, str, DataLoader] = None, init_data=True, fetch_orig=True, @@ -70,6 +71,8 @@ class DataHandler(Serializable): start_time of the original data. end_time : end_time of the original data. + freq : + frequency of data data_loader : Tuple[dict, str, DataLoader] data loader to load the data. init_data : @@ -92,6 +95,7 @@ class DataHandler(Serializable): self.instruments = instruments self.start_time = start_time self.end_time = end_time + self.freq = freq self.fetch_orig = fetch_orig if init_data: with TimeInspector.logt("Init data"): @@ -119,7 +123,7 @@ class DataHandler(Serializable): # Setup data. # _data may be with multiple column index level. The outer level indicates the feature set name with TimeInspector.logt("Loading data"): - self._data = self.data_loader.load(self.instruments, self.start_time, self.end_time) + self._data = self.data_loader.load(self.instruments, self.start_time, self.end_time, self.freq) # TODO: cache CS_ALL = "__all" # return all columns with single-level index column @@ -258,10 +262,12 @@ class DataHandlerLP(DataHandler): instruments=None, start_time=None, end_time=None, + freq="day", data_loader: Tuple[dict, str, DataLoader] = None, infer_processors=[], learn_processors=[], process_type=PTYPE_A, + drop_raw=False, **kwargs, ): """ @@ -303,6 +309,8 @@ class DataHandlerLP(DataHandler): - self._learn will be processed by infer_processors + learn_processors - (e.g. self._infer processed by learn_processors ) + drop_raw: bool + Whether to drop the raw data """ # Setup preprocessor @@ -319,7 +327,8 @@ class DataHandlerLP(DataHandler): ) self.process_type = process_type - super().__init__(instruments, start_time, end_time, data_loader, **kwargs) + self.drop_raw = drop_raw + super().__init__(instruments, start_time, end_time, freq, data_loader, **kwargs) def get_all_processors(self): return self.infer_processors + self.learn_processors @@ -348,7 +357,7 @@ class DataHandlerLP(DataHandler): """ # data for inference _infer_df = self._data - if len(self.infer_processors) > 0: # avoid modifying the original data + if len(self.infer_processors) > 0 and not self.drop_raw: # avoid modifying the original data _infer_df = _infer_df.copy() for proc in self.infer_processors: @@ -378,6 +387,8 @@ class DataHandlerLP(DataHandler): _learn_df = proc(_learn_df) self._learn = _learn_df + if self.drop_raw: + del self._data # init type IT_FIT_SEQ = "fit_seq" # the input of `fit` will be the output of the previous processor IT_FIT_IND = "fit_ind" # the input of `fit` will be the original df @@ -416,7 +427,11 @@ class DataHandlerLP(DataHandler): # TODO: Be able to cache handler data. Save the memory for data processing def _get_df_by_key(self, data_key: str = DK_I) -> pd.DataFrame: - df = getattr(self, {self.DK_R: "_data", self.DK_I: "_infer", self.DK_L: "_learn"}[data_key]) + try: + df = getattr(self, {self.DK_R: "_data", self.DK_I: "_infer", self.DK_L: "_learn"}[data_key]) + except AttributeError: + print("please set drop_raw = False if you want to use raw data") + raise return df def fetch( diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index a51ea119a..c6d06b57f 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -19,7 +19,7 @@ class DataLoader(abc.ABC): """ @abc.abstractmethod - def load(self, instruments, start_time=None, end_time=None) -> pd.DataFrame: + def load(self, instruments, start_time=None, end_time=None, freq="day") -> pd.DataFrame: """ load the data as pd.DataFrame. @@ -94,7 +94,7 @@ class DLWParser(DataLoader): return exprs, names @abc.abstractmethod - def load_group_df(self, instruments, exprs: list, names: list, start_time=None, end_time=None) -> pd.DataFrame: + def load_group_df(self, instruments, exprs: list, names: list, start_time=None, end_time=None, freq="day") -> pd.DataFrame: """ load the dataframe for specific group @@ -114,25 +114,25 @@ class DLWParser(DataLoader): """ pass - def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame: + def load(self, instruments=None, start_time=None, end_time=None, freq="day") -> pd.DataFrame: if self.is_group: df = pd.concat( { - grp: self.load_group_df(instruments, exprs, names, start_time, end_time) + grp: self.load_group_df(instruments, exprs, names, start_time, end_time, freq) for grp, (exprs, names) in self.fields.items() }, axis=1, ) else: exprs, names = self.fields - df = self.load_group_df(instruments, exprs, names, start_time, end_time) + df = self.load_group_df(instruments, exprs, names, start_time, end_time, freq) return df class QlibDataLoader(DLWParser): """Same as QlibDataLoader. The fields can be define by config""" - def __init__(self, config: Tuple[list, tuple, dict], filter_pipe=None): + def __init__(self, config: Tuple[list, tuple, dict], filter_pipe=None, swap_level=True): """ Parameters ---------- @@ -140,11 +140,15 @@ class QlibDataLoader(DLWParser): Please refer to the doc of DLWParser filter_pipe : Filter pipe for the instruments + swap_level : + Whether to swap level of MultiIndex """ self.filter_pipe = filter_pipe + self.swap_level = swap_level + print("swap level", swap_level) super().__init__(config) - def load_group_df(self, instruments, exprs: list, names: list, start_time=None, end_time=None) -> pd.DataFrame: + def load_group_df(self, instruments, exprs: list, names: list, start_time=None, end_time=None, freq="day") -> pd.DataFrame: if instruments is None: warnings.warn("`instruments` is not set, will load all stocks") instruments = "all" @@ -153,9 +157,10 @@ class QlibDataLoader(DLWParser): elif self.filter_pipe is not None: warnings.warn("`filter_pipe` is not None, but it will not be used with `instruments` as list") - df = D.features(instruments, exprs, start_time, end_time) + df = D.features(instruments, exprs, start_time, end_time, freq) df.columns = names - df = df.swaplevel().sort_index() # NOTE: always return + if self.swap_level: + df = df.swaplevel().sort_index() # NOTE: if swaplevel, return return df @@ -177,7 +182,7 @@ class StaticDataLoader(DataLoader): self.join = join self._data = None - def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame: + def load(self, instruments=None, start_time=None, end_time=None, freq="day") -> pd.DataFrame: self._maybe_load_raw_data() if instruments is None: df = self._data From 06dbd02b997e6b239276cbd4b00f7141d872d266 Mon Sep 17 00:00:00 2001 From: bxdd Date: Mon, 25 Jan 2021 17:59:48 +0000 Subject: [PATCH 02/17] black format --- examples/high_freq/highfreq_handler.py | 7 ++++--- examples/high_freq/highfreq_ops.py | 6 ++++-- examples/high_freq/highfreq_processor.py | 8 ++++---- examples/high_freq/workflow.py | 16 ++++++++++++---- qlib/data/data.py | 2 +- qlib/data/dataset/__init__.py | 3 +-- qlib/data/dataset/handler.py | 3 ++- qlib/data/dataset/loader.py | 8 ++++++-- 8 files changed, 34 insertions(+), 19 deletions(-) diff --git a/examples/high_freq/highfreq_handler.py b/examples/high_freq/highfreq_handler.py index 32557f768..13d0e8298 100644 --- a/examples/high_freq/highfreq_handler.py +++ b/examples/high_freq/highfreq_handler.py @@ -50,7 +50,6 @@ class HighFreqHandler(DataHandlerLP): drop_raw=drop_raw, ) - def get_feature_config(self): fields = [] names = [] @@ -98,7 +97,7 @@ class HighFreqHandler(DataHandlerLP): ) ] names += ["$open", "$high", "$low", "$close", "$vwap"] - + fields += [ "Ref({0}, 240)/Ref(DayLast({1}), 240)".format( template_if.format( @@ -205,7 +204,9 @@ class HighFreqBacktestHandler(DataHandler): template_paused = "Select(Eq($paused, 0.0), {0})" template_fillnan = "FFillNan({0})" - fields += [template_fillnan.format(template_paused.format("$close")),] + fields += [ + template_fillnan.format(template_paused.format("$close")), + ] names += ["$close0"] fields += [ "If(Eq({1}, np.nan), 0, If(Or(Gt({2}, Mul(1.001, {4})), Lt({2}, Mul(0.999, {3}))), 0, {1}))".format( diff --git a/examples/high_freq/highfreq_ops.py b/examples/high_freq/highfreq_ops.py index f6470d68e..4d35da9c4 100644 --- a/examples/high_freq/highfreq_ops.py +++ b/examples/high_freq/highfreq_ops.py @@ -9,7 +9,7 @@ from qlib.data.data import Cal class DayFirst(ElemOperator): def __init__(self, feature): super(DayFirst, self).__init__(feature, "day_first") - + def _load_internal(self, instrument, start_index, end_index, freq): _calendar = Cal.get_calender_day(freq=freq)[0] series = self.feature.load(instrument, start_index, end_index, freq) @@ -44,6 +44,7 @@ class Date(ElemOperator): series = self.feature.load(instrument, start_index, end_index, freq) return pd.Series(_calendar[series.index], index=series.index) + class Select(PairOperator): def __init__(self, condition, feature): super(Select, self).__init__(condition, feature, "select") @@ -53,10 +54,11 @@ class Select(PairOperator): series_feature = self.feature_right.load(instrument, start_index, end_index, freq) return series_feature.loc[series_condition] + class IsNull(ElemOperator): def __init__(self, feature): super(IsNull, self).__init__(feature, "isnull") def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) - return series.isnull() \ No newline at end of file + return series.isnull() diff --git a/examples/high_freq/highfreq_processor.py b/examples/high_freq/highfreq_processor.py index fc86b1a70..dc8792a57 100644 --- a/examples/high_freq/highfreq_processor.py +++ b/examples/high_freq/highfreq_processor.py @@ -62,9 +62,9 @@ class HighFreqNorm(Processor): feat_1 = df_values[:, [5, 6, 7, 8, 9, 11]].reshape(-1, 6 * 240) df_new_features = pd.DataFrame( - data=np.concatenate((feat, feat_1), axis=1), - index=idx, - columns=["FEATURE_%d" % i for i in range(12 * 240)], + data=np.concatenate((feat, feat_1), axis=1), + index=idx, + columns=["FEATURE_%d" % i for i in range(12 * 240)], ).sort_index() - return df_new_features \ No newline at end of file + return df_new_features diff --git a/examples/high_freq/workflow.py b/examples/high_freq/workflow.py index 83a344b0f..dc13bd245 100644 --- a/examples/high_freq/workflow.py +++ b/examples/high_freq/workflow.py @@ -24,6 +24,7 @@ from qlib.data.data import Cal from highfreq_ops import DayFirst, DayLast, FFillNan, Date, Select, IsNull + def save_dataset(dataset, path: [Path, str]): """ save dataset to path @@ -35,6 +36,7 @@ def save_dataset(dataset, path: [Path, str]): """ dataset.to_pickle(path=path) + def load_dataset(path: [Path, str], init_type=DataHandlerLP.IT_LS): """ load dataset from path @@ -48,7 +50,7 @@ def load_dataset(path: [Path, str], init_type=DataHandlerLP.IT_LS): - if `init_type` == DataHandlerLP.IT_FIT_SEQ: the input of `DataHandlerLP.fit` will be the output of the previous processor - + - if `init_type` == DataHandlerLP.IT_FIT_IND: the input of `DataHandlerLP.fit` will be the original df @@ -57,17 +59,24 @@ def load_dataset(path: [Path, str], init_type=DataHandlerLP.IT_LS): The state of the object has been load by pickle """ - fd = open(path, 'rb') + fd = open(path, "rb") dataset = pickle.load(fd) dataset.init(init_type=init_type) fd.close() return dataset + if __name__ == "__main__": # use default data provider_uri = "/mnt/v-xiabi/data/qlib/high_freq" # target_dir - qlib.init(provider_uri=provider_uri, custom_ops=[DayFirst, DayLast, FFillNan, Date, Select, IsNull], redis_port=233, region=REG_CN, auto_mount=False) + qlib.init( + provider_uri=provider_uri, + custom_ops=[DayFirst, DayLast, FFillNan, Date, Select, IsNull], + redis_port=233, + region=REG_CN, + auto_mount=False, + ) MARKET = "csi300" BENCHMARK = "SH000300" @@ -134,4 +143,3 @@ if __name__ == "__main__": Cal.get_calender_day(freq="1min") # TO FIX: load the calendar day for cache dataset = init_instance_by_config(task["dataset"]) dataset_backtest = init_instance_by_config(task["dataset_backtest"]) - diff --git a/qlib/data/data.py b/qlib/data/data.py index 3021ebe82..030f56494 100644 --- a/qlib/data/data.py +++ b/qlib/data/data.py @@ -132,7 +132,7 @@ class CalendarProvider(abc.ABC): _calendar_index = {x: i for i, x in enumerate(_calendar)} # for fast search H["c"][flag] = _calendar, _calendar_index return _calendar, _calendar_index - + def _uri(self, start_time, end_time, freq, future=False): """Get the uri of calendar generation task.""" return hash_args(start_time, end_time, freq, future) diff --git a/qlib/data/dataset/__init__.py b/qlib/data/dataset/__init__.py index df7af3f5e..adad6cfb8 100644 --- a/qlib/data/dataset/__init__.py +++ b/qlib/data/dataset/__init__.py @@ -87,7 +87,6 @@ class DatasetH(Dataset): """ super().__init__(handler, segments) - def init(self, init_type: str = DataHandlerLP.IT_FIT_SEQ, enable_cache: bool = False): """ Initialize the data of Qlib @@ -98,7 +97,7 @@ class DatasetH(Dataset): - if `init_type` == DataHandlerLP.IT_FIT_SEQ: the input of `DataHandlerLP.fit` will be the output of the previous processor - + - if `init_type` == DataHandlerLP.IT_FIT_IND: the input of `DataHandlerLP.fit` will be the original df diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 9dfc4746a..0e6093bd9 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -389,6 +389,7 @@ class DataHandlerLP(DataHandler): if self.drop_raw: del self._data + # init type IT_FIT_SEQ = "fit_seq" # the input of `fit` will be the output of the previous processor IT_FIT_IND = "fit_ind" # the input of `fit` will be the original df @@ -431,7 +432,7 @@ class DataHandlerLP(DataHandler): df = getattr(self, {self.DK_R: "_data", self.DK_I: "_infer", self.DK_L: "_learn"}[data_key]) except AttributeError: print("please set drop_raw = False if you want to use raw data") - raise + raise return df def fetch( diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index c6d06b57f..324ff9a4f 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -94,7 +94,9 @@ class DLWParser(DataLoader): return exprs, names @abc.abstractmethod - def load_group_df(self, instruments, exprs: list, names: list, start_time=None, end_time=None, freq="day") -> pd.DataFrame: + def load_group_df( + self, instruments, exprs: list, names: list, start_time=None, end_time=None, freq="day" + ) -> pd.DataFrame: """ load the dataframe for specific group @@ -148,7 +150,9 @@ class QlibDataLoader(DLWParser): print("swap level", swap_level) super().__init__(config) - def load_group_df(self, instruments, exprs: list, names: list, start_time=None, end_time=None, freq="day") -> pd.DataFrame: + def load_group_df( + self, instruments, exprs: list, names: list, start_time=None, end_time=None, freq="day" + ) -> pd.DataFrame: if instruments is None: warnings.warn("`instruments` is not set, will load all stocks") instruments = "all" From 6a145df87c2c15de2e4ad36920170be0d29f3c17 Mon Sep 17 00:00:00 2001 From: bxdd Date: Tue, 26 Jan 2021 07:32:06 +0000 Subject: [PATCH 03/17] fix bug --- examples/high_freq/highfreq_handler.py | 8 +- examples/high_freq/highfreq_ops.py | 6 +- examples/high_freq/highfreq_processor.py | 18 ++--- examples/high_freq/workflow.py | 94 +++++++++++++++++++----- examples/workflow_by_code.py | 2 +- qlib/config.py | 4 +- qlib/data/data.py | 2 +- qlib/data/dataset/__init__.py | 36 +++------ qlib/data/dataset/handler.py | 2 + qlib/data/dataset/loader.py | 1 - qlib/data/ops.py | 16 +++- 11 files changed, 118 insertions(+), 71 deletions(-) diff --git a/examples/high_freq/highfreq_handler.py b/examples/high_freq/highfreq_handler.py index 13d0e8298..d50b95ec7 100644 --- a/examples/high_freq/highfreq_handler.py +++ b/examples/high_freq/highfreq_handler.py @@ -29,8 +29,8 @@ class HighFreqHandler(DataHandlerLP): new_l.append(p) return new_l - infer_processors = [] - learn_processors = [] + infer_processors = check_transform_proc(infer_processors) + learn_processors = check_transform_proc(learn_processors) data_loader = { "class": "QlibDataLoader", @@ -179,8 +179,6 @@ class HighFreqBacktestHandler(DataHandler): end_time=None, freq="1min", ): - infer_processors = check_transform_proc(infer_processors) - learn_processors = check_transform_proc(learn_processors) data_loader = { "class": "QlibDataLoader", "kwargs": { @@ -207,7 +205,7 @@ class HighFreqBacktestHandler(DataHandler): fields += [ template_fillnan.format(template_paused.format("$close")), ] - names += ["$close0"] + names += ["$vwap0"] fields += [ "If(Eq({1}, np.nan), 0, If(Or(Gt({2}, Mul(1.001, {4})), Lt({2}, Mul(0.999, {3}))), 0, {1}))".format( template_fillnan.format(template_paused.format("$close")), diff --git a/examples/high_freq/highfreq_ops.py b/examples/high_freq/highfreq_ops.py index 4d35da9c4..a3fa7ac4a 100644 --- a/examples/high_freq/highfreq_ops.py +++ b/examples/high_freq/highfreq_ops.py @@ -11,7 +11,7 @@ class DayFirst(ElemOperator): super(DayFirst, self).__init__(feature, "day_first") def _load_internal(self, instrument, start_index, end_index, freq): - _calendar = Cal.get_calender_day(freq=freq)[0] + _calendar = Cal.get_calendar_day(freq=freq)[0] series = self.feature.load(instrument, start_index, end_index, freq) return series.groupby(_calendar[series.index]).transform("first") @@ -21,7 +21,7 @@ class DayLast(ElemOperator): super(DayLast, self).__init__(feature, "day_last") def _load_internal(self, instrument, start_index, end_index, freq): - _calendar = Cal.get_calender_day(freq=freq)[0] + _calendar = Cal.get_calendar_day(freq=freq)[0] series = self.feature.load(instrument, start_index, end_index, freq) return series.groupby(_calendar[series.index]).transform("last") @@ -40,7 +40,7 @@ class Date(ElemOperator): super(Date, self).__init__(feature, "date") def _load_internal(self, instrument, start_index, end_index, freq): - _calendar = Cal.get_calender_day(freq=freq)[0] + _calendar = Cal.get_calendar_day(freq=freq)[0] series = self.feature.load(instrument, start_index, end_index, freq) return pd.Series(_calendar[series.index], index=series.index) diff --git a/examples/high_freq/highfreq_processor.py b/examples/high_freq/highfreq_processor.py index dc8792a57..c6b69acae 100644 --- a/examples/high_freq/highfreq_processor.py +++ b/examples/high_freq/highfreq_processor.py @@ -1,7 +1,6 @@ import numpy as np import pandas as pd from qlib.data.dataset.processor import Processor -from qlib.log import TimeInspector from qlib.data.dataset.utils import fetch_df_by_index @@ -11,8 +10,9 @@ class HighFreqNorm(Processor): self.fit_end_time = fit_end_time def fit(self, df_features): - fetch_df = fetch_df_by_index(df, slice(self.fit_start_time, self.fit_end_time), level="datetime") - del df + print("==============fit==============") + fetch_df = fetch_df_by_index(df_features, slice(self.fit_start_time, self.fit_end_time), level="datetime") + del df_features df_values = fetch_df.values names = { "price": slice(0, 10), @@ -23,17 +23,18 @@ class HighFreqNorm(Processor): self.feature_vmax = {} self.feature_vmin = {} for name, name_val in names.items(): - part_values = df_values[:, name_val] + part_values = df_values[:, name_val].astype(np.float32) if name == "volume": - df_features.loc(axis=1)[name_val] = np.log1p(part_values) + part_values = np.log1p(part_values) self.feature_med[name] = np.nanmedian(part_values) - part_values = part_values - self.feature_med # mean, copy + part_values = part_values - self.feature_med[name] # mean, copy self.feature_std[name] = np.nanmedian(np.absolute(part_values)) * 1.4826 + 1e-12 - part_values = part_values / self.feature_std + part_values = part_values / self.feature_std[name] self.feature_vmax[name] = np.nanmax(part_values) self.feature_vmin[name] = np.nanmin(part_values) def __call__(self, df_features): + print("==============call==============") df_features.set_index("date", append=True, drop=True, inplace=True) df_values = df_features.values names = { @@ -58,13 +59,12 @@ class HighFreqNorm(Processor): part_values[slice3] = -3.5 # print("start_call_feature_reshape") idx = df_features.index.droplevel("datetime").drop_duplicates() + idx.set_names(['instrument', 'datetime'], inplace=True) feat = df_values[:, [0, 1, 2, 3, 4, 10]].reshape(-1, 6 * 240) feat_1 = df_values[:, [5, 6, 7, 8, 9, 11]].reshape(-1, 6 * 240) - df_new_features = pd.DataFrame( data=np.concatenate((feat, feat_1), axis=1), index=idx, columns=["FEATURE_%d" % i for i in range(12 * 240)], ).sort_index() - return df_new_features diff --git a/examples/high_freq/workflow.py b/examples/high_freq/workflow.py index dc13bd245..112e99070 100644 --- a/examples/high_freq/workflow.py +++ b/examples/high_freq/workflow.py @@ -73,31 +73,36 @@ if __name__ == "__main__": qlib.init( provider_uri=provider_uri, custom_ops=[DayFirst, DayLast, FFillNan, Date, Select, IsNull], - redis_port=233, + redis_port=-1, region=REG_CN, auto_mount=False, ) - MARKET = "csi300" + MARKET = "test_10" BENCHMARK = "SH000300" + start_time = "2019-01-01 00:00:00" + end_time = "2019-12-31 15:00:00" + train_end_time = "2019-05-31 15:00:00" + test_start_time = "2019-06-01 00:00:00" + ################################### # train model ################################### DATA_HANDLER_CONFIG0 = { - "start_time": "2017-01-01 00:00:00", - "end_time": "2020-11-30 15:00:00", + "start_time": start_time, + "end_time": end_time, "freq": "1min", - "fit_start_time": "2017-01-01 00:00:00", - "fit_end_time": "2020-08-31 15:00:00", - "instruments": "all", + "fit_start_time": start_time, + "fit_end_time": train_end_time, + "instruments": MARKET, "infer_processors": [{"class": "HighFreqNorm", "module_path": "highfreq_processor", "kwargs": {}}], } DATA_HANDLER_CONFIG1 = { - "start_time": "2017-01-01 00:00:00", - "end_time": "2020-11-30 15:00:00", + "start_time": start_time, + "end_time": end_time, "freq": "1min", - "instruments": "all", + "instruments": MARKET, } task = { @@ -111,10 +116,10 @@ if __name__ == "__main__": "kwargs": DATA_HANDLER_CONFIG0, }, "segments": { - "train": ("2017-01-01 00:00:00", "2020-08-31 15:00:00"), + "train": (start_time, train_end_time), "test": ( - "2020-09-01 00:00:00", - "2020-11-30 15:00:00", + test_start_time, + end_time, ), }, }, @@ -127,19 +132,72 @@ if __name__ == "__main__": "kwargs": { "handler": { "class": "HighFreqBacktestHandler", - "module_path": "highfreq_hander", + "module_path": "highfreq_handler", "kwargs": DATA_HANDLER_CONFIG1, }, "segments": { - "train": ("2017-01-01 00:00:00", "2020-08-31 15:00:00"), + "train": (start_time, train_end_time), "test": ( - "2020-09-01 00:00:00", - "2020-11-30 15:00:00", + test_start_time, + end_time, ), }, }, }, } - Cal.get_calender_day(freq="1min") # TO FIX: load the calendar day for cache + ##=============load the calendar for cache============= + Cal.calendar(freq="1min") + Cal.get_calendar_day(freq="1min") + + + ##=============get data============= dataset = init_instance_by_config(task["dataset"]) dataset_backtest = init_instance_by_config(task["dataset_backtest"]) + xtrain, xtest = dataset.prepare(['train', 'test']) + backtest_train, backtest_test = dataset_backtest.prepare(['train', 'test']) + print(xtrain, xtest) + print(backtest_train, backtest_test) + del xtrain, xtest + del backtest_train, backtest_test + + ##=============dump dataset============= + dataset.to_pickle(path="dataset.pkl") + dataset_backtest.to_pickle(path="dataset_backtest.pkl") + + del dataset, dataset_backtest + ##=============reload dataset============= + file_dataset = open("dataset.pkl", "rb") + dataset = pickle.load(file_dataset) + file_dataset.close() + + file_dataset_backtest = open("dataset_backtest.pkl", "rb") + dataset_backtest = pickle.load(file_dataset_backtest) + + file_dataset_backtest.close() + + ##=============reload_dataset============= + dataset.init(init_type=DataHandlerLP.IT_LS) + dataset_backtest.init(init_type=DataHandlerLP.IT_LS) + + + + ##=============reinit qlib============= + qlib.init( + provider_uri=provider_uri, + custom_ops=[DayFirst, DayLast, FFillNan, Date, Select, IsNull], + redis_port=-1, + region=REG_CN, + auto_mount=False, + ) + + Cal.calendar(freq="1min") #load the calendar for cache + Cal.get_calendar_day(freq="1min") #load the calendar for cache + + ##=============test dataset + xtrain, xtest = dataset.prepare(['train', 'test']) + backtest_train, backtest_test = dataset_backtest.prepare(['train', 'test']) + + print(xtrain, xtest) + print(backtest_train, backtest_test) + del xtrain, xtest + del backtest_train, backtest_test diff --git a/examples/workflow_by_code.py b/examples/workflow_by_code.py index ea9c70083..6253f3ee4 100644 --- a/examples/workflow_by_code.py +++ b/examples/workflow_by_code.py @@ -30,7 +30,7 @@ if __name__ == "__main__": GetData().qlib_data(target_dir=provider_uri, region=REG_CN) - qlib.init(provider_uri=provider_uri, region=REG_CN) + qlib.init(provider_uri=provider_uri, region=REG_CN, redis_port=233) market = "csi300" benchmark = "SH000300" diff --git a/qlib/config.py b/qlib/config.py index a65d41041..e94752953 100644 --- a/qlib/config.py +++ b/qlib/config.py @@ -291,12 +291,12 @@ class QlibConfig(Config): def register(self): from .utils import init_instance_by_config - from .data.ops import register_custom_ops + from .data.ops import register_all_ops from .data.data import register_all_wrappers from .workflow import R, QlibRecorder from .workflow.utils import experiment_exit_handler - register_custom_ops(self) + register_all_ops(self) register_all_wrappers(self) # set up QlibRecorder exp_manager = init_instance_by_config(self["exp_manager"]) diff --git a/qlib/data/data.py b/qlib/data/data.py index 030f56494..dddf1901e 100644 --- a/qlib/data/data.py +++ b/qlib/data/data.py @@ -123,7 +123,7 @@ class CalendarProvider(abc.ABC): H["c"][flag] = _calendar, _calendar_index return _calendar, _calendar_index - def get_calender_day(self, freq="day", future=False): + def get_calendar_day(self, freq="day", future=False): flag = f"{freq}_future_{future}_day" if flag in H["c"]: _calendar, _calendar_index = H["c"][flag] diff --git a/qlib/data/dataset/__init__.py b/qlib/data/dataset/__init__.py index adad6cfb8..65dcf7ccb 100644 --- a/qlib/data/dataset/__init__.py +++ b/qlib/data/dataset/__init__.py @@ -87,34 +87,16 @@ class DatasetH(Dataset): """ super().__init__(handler, segments) - def init(self, init_type: str = DataHandlerLP.IT_FIT_SEQ, enable_cache: bool = False): - """ - Initialize the data of Qlib + def init(self, **kwargs): - Parameters - ---------- - init_type : str - - if `init_type` == DataHandlerLP.IT_FIT_SEQ: - - the input of `DataHandlerLP.fit` will be the output of the previous processor - - - if `init_type` == DataHandlerLP.IT_FIT_IND: - - the input of `DataHandlerLP.fit` will be the original df - - - if `init_type` == DataHandlerLP.IT_LS: - - The state of the object has been load by pickle - - enable_cache : bool - default value is false: - - - if `enable_cache` == True: - - the processed data will be saved on disk, and handler will load the cached data from the disk directly - when we call `init` next time - """ - self.handler.init(init_type=init_type, enable_cache=enable_cache) + logger = get_module_logger("DatasetH") + handler_init_kwargs = {} + for arg_key, arg_value in kwargs.items(): + if arg_key in getfullargspec(self.handler.init).args: + handler_init_kwargs[arg_key] = arg_value + else: + logger.info(f"init arguments[{arg_key}] is ignored.") + self.handler.init(**handler_init_kwargs) def setup_data(self, handler: Union[dict, DataHandler], segments: list): """ diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 0e6093bd9..627624022 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -433,6 +433,8 @@ class DataHandlerLP(DataHandler): except AttributeError: print("please set drop_raw = False if you want to use raw data") raise + except: + raise return df def fetch( diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index 324ff9a4f..3b33ff749 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -147,7 +147,6 @@ class QlibDataLoader(DLWParser): """ self.filter_pipe = filter_pipe self.swap_level = swap_level - print("swap level", swap_level) super().__init__(config) def load_group_df( diff --git a/qlib/data/ops.py b/qlib/data/ops.py index 91f7349d2..62df98647 100644 --- a/qlib/data/ops.py +++ b/qlib/data/ops.py @@ -17,11 +17,13 @@ from ..log import get_module_logger try: from ._libs.rolling import rolling_slope, rolling_rsquare, rolling_resi from ._libs.expanding import expanding_slope, expanding_rsquare, expanding_resi -except ImportError as err: +except ImportError: print( "#### Do not import qlib package in the repository directory in case of importing qlib from . without compiling #####" ) raise +except: + raise np.seterr(invalid="ignore") @@ -1451,6 +1453,9 @@ class OpsWrapper(object): def __init__(self): self._ops = {} + def reset(self): + self._ops = {} + def register(self, ops_list): for operator in ops_list: if not issubclass(operator, ExpressionOps): @@ -1469,12 +1474,15 @@ class OpsWrapper(object): Operators = OpsWrapper() -Operators.register(OpsList) -def register_custom_ops(C): - """register custom operator""" +def register_all_ops(C): + """register all operator""" logger = get_module_logger("ops") + + Operators.reset() + Operators.register(OpsList) + if getattr(C, "custom_ops", None) is not None: Operators.register(C.custom_ops) logger.debug("register custom operator {}".format(C.custom_ops)) From 8e9ca22b070e6958f81b547c5ba24c8e11801193 Mon Sep 17 00:00:00 2001 From: bxdd Date: Tue, 26 Jan 2021 07:33:26 +0000 Subject: [PATCH 04/17] del some print --- examples/high_freq/highfreq_processor.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/examples/high_freq/highfreq_processor.py b/examples/high_freq/highfreq_processor.py index c6b69acae..d2ffe41ce 100644 --- a/examples/high_freq/highfreq_processor.py +++ b/examples/high_freq/highfreq_processor.py @@ -10,7 +10,6 @@ class HighFreqNorm(Processor): self.fit_end_time = fit_end_time def fit(self, df_features): - print("==============fit==============") fetch_df = fetch_df_by_index(df_features, slice(self.fit_start_time, self.fit_end_time), level="datetime") del df_features df_values = fetch_df.values @@ -34,7 +33,6 @@ class HighFreqNorm(Processor): self.feature_vmin[name] = np.nanmin(part_values) def __call__(self, df_features): - print("==============call==============") df_features.set_index("date", append=True, drop=True, inplace=True) df_values = df_features.values names = { From 58616fced9140bb3e54547c897ca257d68e503fa Mon Sep 17 00:00:00 2001 From: bxdd Date: Tue, 26 Jan 2021 07:33:50 +0000 Subject: [PATCH 05/17] black format --- examples/high_freq/highfreq_processor.py | 2 +- examples/high_freq/workflow.py | 19 ++++++++----------- qlib/data/ops.py | 2 +- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/examples/high_freq/highfreq_processor.py b/examples/high_freq/highfreq_processor.py index d2ffe41ce..d71cd2e85 100644 --- a/examples/high_freq/highfreq_processor.py +++ b/examples/high_freq/highfreq_processor.py @@ -57,7 +57,7 @@ class HighFreqNorm(Processor): part_values[slice3] = -3.5 # print("start_call_feature_reshape") idx = df_features.index.droplevel("datetime").drop_duplicates() - idx.set_names(['instrument', 'datetime'], inplace=True) + idx.set_names(["instrument", "datetime"], inplace=True) feat = df_values[:, [0, 1, 2, 3, 4, 10]].reshape(-1, 6 * 240) feat_1 = df_values[:, [5, 6, 7, 8, 9, 11]].reshape(-1, 6 * 240) df_new_features = pd.DataFrame( diff --git a/examples/high_freq/workflow.py b/examples/high_freq/workflow.py index 112e99070..8304cf26a 100644 --- a/examples/high_freq/workflow.py +++ b/examples/high_freq/workflow.py @@ -147,14 +147,13 @@ if __name__ == "__main__": } ##=============load the calendar for cache============= Cal.calendar(freq="1min") - Cal.get_calendar_day(freq="1min") - + Cal.get_calendar_day(freq="1min") ##=============get data============= dataset = init_instance_by_config(task["dataset"]) dataset_backtest = init_instance_by_config(task["dataset_backtest"]) - xtrain, xtest = dataset.prepare(['train', 'test']) - backtest_train, backtest_test = dataset_backtest.prepare(['train', 'test']) + xtrain, xtest = dataset.prepare(["train", "test"]) + backtest_train, backtest_test = dataset_backtest.prepare(["train", "test"]) print(xtrain, xtest) print(backtest_train, backtest_test) del xtrain, xtest @@ -172,15 +171,13 @@ if __name__ == "__main__": file_dataset_backtest = open("dataset_backtest.pkl", "rb") dataset_backtest = pickle.load(file_dataset_backtest) - + file_dataset_backtest.close() ##=============reload_dataset============= dataset.init(init_type=DataHandlerLP.IT_LS) dataset_backtest.init(init_type=DataHandlerLP.IT_LS) - - ##=============reinit qlib============= qlib.init( provider_uri=provider_uri, @@ -190,12 +187,12 @@ if __name__ == "__main__": auto_mount=False, ) - Cal.calendar(freq="1min") #load the calendar for cache - Cal.get_calendar_day(freq="1min") #load the calendar for cache + Cal.calendar(freq="1min") # load the calendar for cache + Cal.get_calendar_day(freq="1min") # load the calendar for cache ##=============test dataset - xtrain, xtest = dataset.prepare(['train', 'test']) - backtest_train, backtest_test = dataset_backtest.prepare(['train', 'test']) + xtrain, xtest = dataset.prepare(["train", "test"]) + backtest_train, backtest_test = dataset_backtest.prepare(["train", "test"]) print(xtrain, xtest) print(backtest_train, backtest_test) diff --git a/qlib/data/ops.py b/qlib/data/ops.py index 62df98647..66e588be1 100644 --- a/qlib/data/ops.py +++ b/qlib/data/ops.py @@ -1479,7 +1479,7 @@ Operators = OpsWrapper() def register_all_ops(C): """register all operator""" logger = get_module_logger("ops") - + Operators.reset() Operators.register(OpsList) From e4ecea55e4d88ed767d84eda9a9e5d30edeca8de Mon Sep 17 00:00:00 2001 From: bxdd Date: Tue, 26 Jan 2021 07:41:22 +0000 Subject: [PATCH 06/17] fix --- examples/high_freq/workflow.py | 44 +--------------------------------- 1 file changed, 1 insertion(+), 43 deletions(-) diff --git a/examples/high_freq/workflow.py b/examples/high_freq/workflow.py index 8304cf26a..eb30fb1b8 100644 --- a/examples/high_freq/workflow.py +++ b/examples/high_freq/workflow.py @@ -24,48 +24,6 @@ from qlib.data.data import Cal from highfreq_ops import DayFirst, DayLast, FFillNan, Date, Select, IsNull - -def save_dataset(dataset, path: [Path, str]): - """ - save dataset to path - - Parameters - ---------- - path : [Path, str] - path to save - """ - dataset.to_pickle(path=path) - - -def load_dataset(path: [Path, str], init_type=DataHandlerLP.IT_LS): - """ - load dataset from path - - Parameters - ---------- - path : [Path, str] - path to load - - init_type : str - - if `init_type` == DataHandlerLP.IT_FIT_SEQ: - - the input of `DataHandlerLP.fit` will be the output of the previous processor - - - if `init_type` == DataHandlerLP.IT_FIT_IND: - - the input of `DataHandlerLP.fit` will be the original df - - - if `init_type` == DataHandlerLP.IT_LS: - - The state of the object has been load by pickle - """ - fd = open(path, "rb") - dataset = pickle.load(fd) - dataset.init(init_type=init_type) - fd.close() - return dataset - - if __name__ == "__main__": # use default data @@ -78,7 +36,7 @@ if __name__ == "__main__": auto_mount=False, ) - MARKET = "test_10" + MARKET = "all" BENCHMARK = "SH000300" start_time = "2019-01-01 00:00:00" From 1b569d371d24440d80e20c7cb01563cc1a33d0f5 Mon Sep 17 00:00:00 2001 From: bxdd Date: Tue, 26 Jan 2021 14:32:08 +0000 Subject: [PATCH 07/17] simpson vwap --- examples/high_freq/highfreq_handler.py | 54 ++++++++------- examples/high_freq/highfreq_processor.py | 1 + examples/high_freq/workflow.py | 88 +++++++++++++----------- 3 files changed, 78 insertions(+), 65 deletions(-) diff --git a/examples/high_freq/highfreq_handler.py b/examples/high_freq/highfreq_handler.py index d50b95ec7..09d954364 100644 --- a/examples/high_freq/highfreq_handler.py +++ b/examples/high_freq/highfreq_handler.py @@ -7,7 +7,7 @@ from qlib.log import TimeInspector class HighFreqHandler(DataHandlerLP): def __init__( self, - instruments="csi500", + instruments="csi300", start_time=None, end_time=None, freq="1min", @@ -55,8 +55,10 @@ class HighFreqHandler(DataHandlerLP): names = [] template_if = "If(IsNull({1}), {0}, {1})" - template_paused = "Select(Eq($paused, 0.0), {0})" + #template_paused = "Select(Eq($paused, 0.0), {0})" + template_paused="{0}" template_fillnan = "FFillNan({0})" + simpson_vwap = "($open + 2*$high + 2*$low + $close)/6" fields += [ "{0}/Ref(DayLast({1}), 240)".format( template_if.format( @@ -87,11 +89,9 @@ class HighFreqHandler(DataHandlerLP): fields += ["{0}/Ref(DayLast({0}), 240)".format(template_fillnan.format(template_paused.format("$close")))] fields += [ "{0}/Ref(DayLast({1}), 240)".format( - "If(IsNull({1}), {0}, If(Or(Or(Or(Eq({1}, np.inf), Eq({1}, -np.inf)), Eq({1}, 0)), Or(Gt({1}, Mul(1.001, {3})), Lt({1}, Mul(0.999, {2})))), {0}, {1}))".format( + template_if.format( template_fillnan.format(template_paused.format("$close")), - template_paused.format("$vwap"), - template_paused.format("$low"), - template_paused.format("$high"), + template_paused.format(simpson_vwap), ), template_fillnan.format(template_paused.format("$close")), ) @@ -128,13 +128,12 @@ class HighFreqHandler(DataHandlerLP): fields += [ "Ref({0}, 240)/Ref(DayLast({0}), 240)".format(template_fillnan.format(template_paused.format("$close"))) ] + fields += [ "Ref({0}, 240)/Ref(DayLast({1}), 240)".format( - "If(IsNull({1}), {0}, If(Or(Or(Or(Eq({1}, np.inf), Eq({1}, -np.inf)), Eq({1}, 0)), Or(Gt({1}, Mul(1.001, {3})), Lt({1}, Mul(0.999, {2})))), {0}, {1}))".format( + template_if.format( template_fillnan.format(template_paused.format("$close")), - template_paused.format("$vwap"), - template_paused.format("$low"), - template_paused.format("$high"), + template_paused.format(simpson_vwap), ), template_fillnan.format(template_paused.format("$close")), ) @@ -143,10 +142,9 @@ class HighFreqHandler(DataHandlerLP): fields += [ "{0}/Ref(DayLast(Mean({0}, 7200)), 240)".format( - "If(IsNull({1}), 0, If(Or(Gt({2}, Mul(1.001, {4})), Lt({2}, Mul(0.999, {3}))), 0, {1}))".format( - template_fillnan.format(template_paused.format("$close")), + "If(IsNull({0}), 0, If(Or(Gt({1}, Mul(1.001, {3})), Lt({1}, Mul(0.999, {2}))), 0, {0}))".format( template_paused.format("$volume"), - template_paused.format("$vwap"), + template_paused.format(simpson_vwap), template_paused.format("$low"), template_paused.format("$high"), ) @@ -155,10 +153,9 @@ class HighFreqHandler(DataHandlerLP): names += ["$volume"] fields += [ "Ref({0}, 240)/Ref(DayLast(Mean({0}, 7200)), 240)".format( - "If(IsNull({1}), 0, If(Or(Gt({2}, Mul(1.001, {4})), Lt({2}, Mul(0.999, {3}))), 0, {1}))".format( - template_fillnan.format(template_paused.format("$close")), + "If(IsNull({0}), 0, If(Or(Gt({1}, Mul(1.001, {3})), Lt({1}, Mul(0.999, {2}))), 0, {0}))".format( template_paused.format("$volume"), - template_paused.format("$vwap"), + template_paused.format(simpson_vwap), template_paused.format("$low"), template_paused.format("$high"), ) @@ -199,21 +196,26 @@ class HighFreqBacktestHandler(DataHandler): names = [] template_if = "If(Eq({1}, np.nan), {0}, {1})" - template_paused = "Select(Eq($paused, 0.0), {0})" + #template_paused = "Select(Eq($paused, 0.0), {0})" + template_paused="{0}" template_fillnan = "FFillNan({0})" - + simpson_vwap = "($open + 2*$high + 2*$low + $close)/6" + #fields += [ + # template_fillnan.format(template_paused.format("$close")), + #] + fields += [template_if.format( + template_fillnan.format(template_paused.format("$close")), + template_paused.format(simpson_vwap), + )] + names += ["$vwap_0"] fields += [ - template_fillnan.format(template_paused.format("$close")), - ] - names += ["$vwap0"] - fields += [ - "If(Eq({1}, np.nan), 0, If(Or(Gt({2}, Mul(1.001, {4})), Lt({2}, Mul(0.999, {3}))), 0, {1}))".format( - template_fillnan.format(template_paused.format("$close")), + "If(IsNull({0}), 0, If(Or(Gt({1}, Mul(1.001, {3})), Lt({1}, Mul(0.999, {2}))), 0, {0}))".format( template_paused.format("$volume"), - template_paused.format("$vwap"), + template_paused.format(simpson_vwap), template_paused.format("$low"), template_paused.format("$high"), ) ] - names += ["$volume0"] + names += ["$volume_0"] + return fields, names diff --git a/examples/high_freq/highfreq_processor.py b/examples/high_freq/highfreq_processor.py index d71cd2e85..bf4a30a5b 100644 --- a/examples/high_freq/highfreq_processor.py +++ b/examples/high_freq/highfreq_processor.py @@ -58,6 +58,7 @@ class HighFreqNorm(Processor): # print("start_call_feature_reshape") idx = df_features.index.droplevel("datetime").drop_duplicates() idx.set_names(["instrument", "datetime"], inplace=True) + print(df_values.shape) feat = df_values[:, [0, 1, 2, 3, 4, 10]].reshape(-1, 6 * 240) feat_1 = df_values[:, [5, 6, 7, 8, 9, 11]].reshape(-1, 6 * 240) df_new_features = pd.DataFrame( diff --git a/examples/high_freq/workflow.py b/examples/high_freq/workflow.py index eb30fb1b8..e1736394e 100644 --- a/examples/high_freq/workflow.py +++ b/examples/high_freq/workflow.py @@ -27,7 +27,7 @@ from highfreq_ops import DayFirst, DayLast, FFillNan, Date, Select, IsNull if __name__ == "__main__": # use default data - provider_uri = "/mnt/v-xiabi/data/qlib/high_freq" # target_dir + provider_uri = "/nfs_data/qlib_data/yahoo_high_qlib" # target_dir qlib.init( provider_uri=provider_uri, custom_ops=[DayFirst, DayLast, FFillNan, Date, Select, IsNull], @@ -38,12 +38,16 @@ if __name__ == "__main__": MARKET = "all" BENCHMARK = "SH000300" + DROP_LOAD_DATASET = False # flag wether to test [drop and load dataset] - start_time = "2019-01-01 00:00:00" - end_time = "2019-12-31 15:00:00" - train_end_time = "2019-05-31 15:00:00" - test_start_time = "2019-06-01 00:00:00" - + #start_time = "2019-01-01 00:00:00" + #end_time = "2019-12-31 15:00:00" + #train_end_time = "2019-05-31 15:00:00" + #test_start_time = "2019-06-01 00:00:00" + start_time = "2020-09-14 00:00:00" + end_time = "2021-01-18 16:00:00" + train_end_time = "2020-11-30 16:00:00" + test_start_time = "2020-12-01 00:00:00" ################################### # train model ################################### @@ -108,51 +112,57 @@ if __name__ == "__main__": Cal.get_calendar_day(freq="1min") ##=============get data============= + dataset = init_instance_by_config(task["dataset"]) + xtrain, xtest = dataset.prepare(["train", "test"]) + print(xtrain, xtest) + dataset_backtest = init_instance_by_config(task["dataset_backtest"]) - xtrain, xtest = dataset.prepare(["train", "test"]) backtest_train, backtest_test = dataset_backtest.prepare(["train", "test"]) - print(xtrain, xtest) print(backtest_train, backtest_test) + del xtrain, xtest del backtest_train, backtest_test - ##=============dump dataset============= - dataset.to_pickle(path="dataset.pkl") - dataset_backtest.to_pickle(path="dataset_backtest.pkl") - del dataset, dataset_backtest - ##=============reload dataset============= - file_dataset = open("dataset.pkl", "rb") - dataset = pickle.load(file_dataset) - file_dataset.close() + if DROP_LOAD_DATASET: - file_dataset_backtest = open("dataset_backtest.pkl", "rb") - dataset_backtest = pickle.load(file_dataset_backtest) + ##=============dump dataset============= + dataset.to_pickle(path="dataset.pkl") + dataset_backtest.to_pickle(path="dataset_backtest.pkl") - file_dataset_backtest.close() + del dataset, dataset_backtest + ##=============reload dataset============= + file_dataset = open("dataset.pkl", "rb") + dataset = pickle.load(file_dataset) + file_dataset.close() - ##=============reload_dataset============= - dataset.init(init_type=DataHandlerLP.IT_LS) - dataset_backtest.init(init_type=DataHandlerLP.IT_LS) + file_dataset_backtest = open("dataset_backtest.pkl", "rb") + dataset_backtest = pickle.load(file_dataset_backtest) - ##=============reinit qlib============= - qlib.init( - provider_uri=provider_uri, - custom_ops=[DayFirst, DayLast, FFillNan, Date, Select, IsNull], - redis_port=-1, - region=REG_CN, - auto_mount=False, - ) + file_dataset_backtest.close() - Cal.calendar(freq="1min") # load the calendar for cache - Cal.get_calendar_day(freq="1min") # load the calendar for cache + ##=============reload_dataset============= + dataset.init(init_type=DataHandlerLP.IT_LS) + dataset_backtest.init(init_type=DataHandlerLP.IT_LS) - ##=============test dataset - xtrain, xtest = dataset.prepare(["train", "test"]) - backtest_train, backtest_test = dataset_backtest.prepare(["train", "test"]) + ##=============reinit qlib============= + qlib.init( + provider_uri=provider_uri, + custom_ops=[DayFirst, DayLast, FFillNan, Date, Select, IsNull], + redis_port=-1, + region=REG_CN, + auto_mount=False, + ) - print(xtrain, xtest) - print(backtest_train, backtest_test) - del xtrain, xtest - del backtest_train, backtest_test + Cal.calendar(freq="1min") # load the calendar for cache + Cal.get_calendar_day(freq="1min") # load the calendar for cache + + ##=============test dataset + xtrain, xtest = dataset.prepare(["train", "test"]) + backtest_train, backtest_test = dataset_backtest.prepare(["train", "test"]) + + print(xtrain, xtest) + print(backtest_train, backtest_test) + del xtrain, xtest + del backtest_train, backtest_test From 3bdd54308bdf97a8dcbe21d51ce7c0eea4947be8 Mon Sep 17 00:00:00 2001 From: bxdd Date: Tue, 26 Jan 2021 17:02:30 +0000 Subject: [PATCH 08/17] update some little code --- examples/high_freq/highfreq_handler.py | 24 +++++++++++++----------- examples/high_freq/highfreq_processor.py | 1 - examples/high_freq/workflow.py | 22 ++++++++++++---------- 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/examples/high_freq/highfreq_handler.py b/examples/high_freq/highfreq_handler.py index 09d954364..dc69396da 100644 --- a/examples/high_freq/highfreq_handler.py +++ b/examples/high_freq/highfreq_handler.py @@ -55,8 +55,8 @@ class HighFreqHandler(DataHandlerLP): names = [] template_if = "If(IsNull({1}), {0}, {1})" - #template_paused = "Select(Eq($paused, 0.0), {0})" - template_paused="{0}" + template_paused = "Select(Eq($paused, 0.0), {0})" + # template_paused="{0}" template_fillnan = "FFillNan({0})" simpson_vwap = "($open + 2*$high + 2*$low + $close)/6" fields += [ @@ -128,7 +128,7 @@ class HighFreqHandler(DataHandlerLP): fields += [ "Ref({0}, 240)/Ref(DayLast({0}), 240)".format(template_fillnan.format(template_paused.format("$close"))) ] - + fields += [ "Ref({0}, 240)/Ref(DayLast({1}), 240)".format( template_if.format( @@ -196,17 +196,19 @@ class HighFreqBacktestHandler(DataHandler): names = [] template_if = "If(Eq({1}, np.nan), {0}, {1})" - #template_paused = "Select(Eq($paused, 0.0), {0})" - template_paused="{0}" + template_paused = "Select(Eq($paused, 0.0), {0})" + # template_paused="{0}" template_fillnan = "FFillNan({0})" simpson_vwap = "($open + 2*$high + 2*$low + $close)/6" - #fields += [ + # fields += [ # template_fillnan.format(template_paused.format("$close")), - #] - fields += [template_if.format( - template_fillnan.format(template_paused.format("$close")), - template_paused.format(simpson_vwap), - )] + # ] + fields += [ + template_if.format( + template_fillnan.format(template_paused.format("$close")), + template_paused.format(simpson_vwap), + ) + ] names += ["$vwap_0"] fields += [ "If(IsNull({0}), 0, If(Or(Gt({1}, Mul(1.001, {3})), Lt({1}, Mul(0.999, {2}))), 0, {0}))".format( diff --git a/examples/high_freq/highfreq_processor.py b/examples/high_freq/highfreq_processor.py index bf4a30a5b..d71cd2e85 100644 --- a/examples/high_freq/highfreq_processor.py +++ b/examples/high_freq/highfreq_processor.py @@ -58,7 +58,6 @@ class HighFreqNorm(Processor): # print("start_call_feature_reshape") idx = df_features.index.droplevel("datetime").drop_duplicates() idx.set_names(["instrument", "datetime"], inplace=True) - print(df_values.shape) feat = df_values[:, [0, 1, 2, 3, 4, 10]].reshape(-1, 6 * 240) feat_1 = df_values[:, [5, 6, 7, 8, 9, 11]].reshape(-1, 6 * 240) df_new_features = pd.DataFrame( diff --git a/examples/high_freq/workflow.py b/examples/high_freq/workflow.py index e1736394e..c021ff099 100644 --- a/examples/high_freq/workflow.py +++ b/examples/high_freq/workflow.py @@ -38,12 +38,12 @@ if __name__ == "__main__": MARKET = "all" BENCHMARK = "SH000300" - DROP_LOAD_DATASET = False # flag wether to test [drop and load dataset] + DROP_LOAD_DATASET = False # flag wether to test [drop and load dataset] - #start_time = "2019-01-01 00:00:00" - #end_time = "2019-12-31 15:00:00" - #train_end_time = "2019-05-31 15:00:00" - #test_start_time = "2019-06-01 00:00:00" + # start_time = "2019-01-01 00:00:00" + # end_time = "2019-12-31 15:00:00" + # train_end_time = "2019-05-31 15:00:00" + # test_start_time = "2019-06-01 00:00:00" start_time = "2020-09-14 00:00:00" end_time = "2021-01-18 16:00:00" train_end_time = "2020-11-30 16:00:00" @@ -108,11 +108,12 @@ if __name__ == "__main__": }, } ##=============load the calendar for cache============= - Cal.calendar(freq="1min") - Cal.get_calendar_day(freq="1min") + # unnecessary, but may accelerate + Cal.calendar(freq="1min") # load the calendar for cache + Cal.get_calendar_day(freq="1min") # load the calendar for cache ##=============get data============= - + dataset = init_instance_by_config(task["dataset"]) xtrain, xtest = dataset.prepare(["train", "test"]) print(xtrain, xtest) @@ -124,7 +125,7 @@ if __name__ == "__main__": del xtrain, xtest del backtest_train, backtest_test - + ## example to show how to save the dataset and reload it, and how to use different data if DROP_LOAD_DATASET: ##=============dump dataset============= @@ -147,6 +148,7 @@ if __name__ == "__main__": dataset_backtest.init(init_type=DataHandlerLP.IT_LS) ##=============reinit qlib============= + ## Unless you want to modify the provider_uri and other configurations, reinit is unnecessary qlib.init( provider_uri=provider_uri, custom_ops=[DayFirst, DayLast, FFillNan, Date, Select, IsNull], @@ -158,7 +160,7 @@ if __name__ == "__main__": Cal.calendar(freq="1min") # load the calendar for cache Cal.get_calendar_day(freq="1min") # load the calendar for cache - ##=============test dataset + ##=============test dataset============= xtrain, xtest = dataset.prepare(["train", "test"]) backtest_train, backtest_test = dataset_backtest.prepare(["train", "test"]) From 2a5f06ee9ebaeb45eba58a5c723124c515631790 Mon Sep 17 00:00:00 2001 From: bxdd Date: Wed, 27 Jan 2021 06:25:40 +0000 Subject: [PATCH 09/17] update dataset test --- tests/test_dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_dataset.py b/tests/test_dataset.py index 10cd588e6..ed2f14d2f 100755 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -66,7 +66,7 @@ class TestDataset(TestAutoData): # Check the data # Get data from DataFrame Directly data_from_df = ( - tsdh._handler.fetch(data_key=DataHandlerLP.DK_L) + tsdh.handler.fetch(data_key=DataHandlerLP.DK_L) .loc(axis=0)["2015-01-01":"2016-12-31", "SZ300315"] .iloc[-30:] .values From 6fc4f2b249fa3594b7baf6b3dd5846dd06e3078e Mon Sep 17 00:00:00 2001 From: bxdd Date: Wed, 27 Jan 2021 07:02:59 +0000 Subject: [PATCH 10/17] fix a bug --- examples/high_freq/highfreq_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/high_freq/highfreq_handler.py b/examples/high_freq/highfreq_handler.py index dc69396da..264851031 100644 --- a/examples/high_freq/highfreq_handler.py +++ b/examples/high_freq/highfreq_handler.py @@ -195,7 +195,7 @@ class HighFreqBacktestHandler(DataHandler): fields = [] names = [] - template_if = "If(Eq({1}, np.nan), {0}, {1})" + template_if = "If(IsNull({1}), {0}, {1})" template_paused = "Select(Eq($paused, 0.0), {0})" # template_paused="{0}" template_fillnan = "FFillNan({0})" From 02dea2aeb619cb2cb91a52d8f95522107d07a48c Mon Sep 17 00:00:00 2001 From: bxdd Date: Wed, 27 Jan 2021 07:42:00 +0000 Subject: [PATCH 11/17] update paused --- examples/high_freq/highfreq_handler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/high_freq/highfreq_handler.py b/examples/high_freq/highfreq_handler.py index 264851031..298ffb5c0 100644 --- a/examples/high_freq/highfreq_handler.py +++ b/examples/high_freq/highfreq_handler.py @@ -55,7 +55,7 @@ class HighFreqHandler(DataHandlerLP): names = [] template_if = "If(IsNull({1}), {0}, {1})" - template_paused = "Select(Eq($paused, 0.0), {0})" + template_paused = "Select(Or(IsNull($paused), Eq($paused, 0.0)), {0})" # template_paused="{0}" template_fillnan = "FFillNan({0})" simpson_vwap = "($open + 2*$high + 2*$low + $close)/6" @@ -196,7 +196,7 @@ class HighFreqBacktestHandler(DataHandler): names = [] template_if = "If(IsNull({1}), {0}, {1})" - template_paused = "Select(Eq($paused, 0.0), {0})" + template_paused = "Select(Or(IsNull($paused), Eq($paused, 0.0)), {0})" # template_paused="{0}" template_fillnan = "FFillNan({0})" simpson_vwap = "($open + 2*$high + 2*$low + $close)/6" From 948b829ff4481fcab862f1e3ce7682720df79947 Mon Sep 17 00:00:00 2001 From: bxdd Date: Wed, 27 Jan 2021 10:34:31 +0000 Subject: [PATCH 12/17] add get_data in highfreq --- examples/high_freq/workflow.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/examples/high_freq/workflow.py b/examples/high_freq/workflow.py index c021ff099..a2ec67365 100644 --- a/examples/high_freq/workflow.py +++ b/examples/high_freq/workflow.py @@ -21,13 +21,21 @@ from qlib.utils import init_instance_by_config from qlib.data.dataset.handler import DataHandlerLP from qlib.data.ops import Operators from qlib.data.data import Cal +from qlib.utils import exists_qlib_data from highfreq_ops import DayFirst, DayLast, FFillNan, Date, Select, IsNull if __name__ == "__main__": - # use default data - provider_uri = "/nfs_data/qlib_data/yahoo_high_qlib" # target_dir + # use yahoo_cn_1min data + provider_uri = "~/.qlib/qlib_data/yahoo_cn_1min" + if not exists_qlib_data(provider_uri): + print(f"Qlib data is not found in {provider_uri}") + sys.path.append(str(Path(__file__).resolve().parent.parent.parent.joinpath("scripts"))) + from get_data import GetData + + GetData().qlib_data(target_dir=provider_uri, interval="1min", region=REG_CN) + qlib.init( provider_uri=provider_uri, custom_ops=[DayFirst, DayLast, FFillNan, Date, Select, IsNull], From f6dd006c35139c6528c5507e2f60d7c3c7eaab72 Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 28 Jan 2021 11:31:15 +0000 Subject: [PATCH 13/17] update --- examples/{high_freq => highfreq}/__init__.py | 0 .../highfreq_handler.py | 107 ++++---------- .../{high_freq => highfreq}/highfreq_ops.py | 42 ++++-- .../highfreq_processor.py | 32 ++-- examples/{high_freq => highfreq}/workflow.py | 120 +++++++-------- examples/workflow_by_code.py | 2 +- qlib/config.py | 6 + qlib/data/base.py | 2 +- qlib/data/data.py | 21 +-- qlib/data/dataset/__init__.py | 11 +- qlib/data/dataset/handler.py | 12 +- qlib/data/ops.py | 138 ++++++++++++------ 12 files changed, 242 insertions(+), 251 deletions(-) rename examples/{high_freq => highfreq}/__init__.py (100%) rename examples/{high_freq => highfreq}/highfreq_handler.py (62%) rename examples/{high_freq => highfreq}/highfreq_ops.py (69%) rename examples/{high_freq => highfreq}/highfreq_processor.py (69%) rename examples/{high_freq => highfreq}/workflow.py (54%) diff --git a/examples/high_freq/__init__.py b/examples/highfreq/__init__.py similarity index 100% rename from examples/high_freq/__init__.py rename to examples/highfreq/__init__.py diff --git a/examples/high_freq/highfreq_handler.py b/examples/highfreq/highfreq_handler.py similarity index 62% rename from examples/high_freq/highfreq_handler.py rename to examples/highfreq/highfreq_handler.py index 298ffb5c0..cb23f48bb 100644 --- a/examples/high_freq/highfreq_handler.py +++ b/examples/highfreq/highfreq_handler.py @@ -56,88 +56,44 @@ class HighFreqHandler(DataHandlerLP): template_if = "If(IsNull({1}), {0}, {1})" template_paused = "Select(Or(IsNull($paused), Eq($paused, 0.0)), {0})" - # template_paused="{0}" - template_fillnan = "FFillNan({0})" + template_fillnan = "BFillNan(FFillNan({0}))" + # Because there is no vwap field in the yahoo data, a method similar to Simpson integration is used to approximate vwap simpson_vwap = "($open + 2*$high + 2*$low + $close)/6" - fields += [ - "{0}/Ref(DayLast({1}), 240)".format( + + def get_04_price_feature(price_field): + """Get 0~4 column price feature ops""" + feature_ops = "{0}/Ref(DayLast({1}), 240)".format( template_if.format( template_fillnan.format(template_paused.format("$close")), - template_paused.format("$open"), + template_paused.format(price_field), ), template_fillnan.format(template_paused.format("$close")), ) - ] - fields += [ - "{0}/Ref(DayLast({1}), 240)".format( - template_if.format( - template_fillnan.format(template_paused.format("$close")), - template_paused.format("$high"), - ), - template_fillnan.format(template_paused.format("$close")), - ) - ] - fields += [ - "{0}/Ref(DayLast({1}), 240)".format( - template_if.format( - template_fillnan.format(template_paused.format("$close")), - template_paused.format("$low"), - ), - template_fillnan.format(template_paused.format("$close")), - ) - ] - fields += ["{0}/Ref(DayLast({0}), 240)".format(template_fillnan.format(template_paused.format("$close")))] - fields += [ - "{0}/Ref(DayLast({1}), 240)".format( - template_if.format( - template_fillnan.format(template_paused.format("$close")), - template_paused.format(simpson_vwap), - ), - template_fillnan.format(template_paused.format("$close")), - ) - ] + return feature_ops + + fields += [get_04_price_feature("$open")] + fields += [get_04_price_feature("$high")] + fields += [get_04_price_feature("$low")] + fields += [get_04_price_feature("$close")] + fields += [get_04_price_feature(simpson_vwap)] names += ["$open", "$high", "$low", "$close", "$vwap"] - fields += [ - "Ref({0}, 240)/Ref(DayLast({1}), 240)".format( + def get_59_price_feature(price_field): + """Get 5~9 column price feature ops""" + feature_ops = "Ref({0}, 240)/Ref(DayLast({1}), 240)".format( template_if.format( template_fillnan.format(template_paused.format("$close")), - template_paused.format("$open"), + template_paused.format(price_field), ), template_fillnan.format(template_paused.format("$close")), ) - ] - fields += [ - "Ref({0}, 240)/Ref(DayLast({1}), 240)".format( - template_if.format( - template_fillnan.format(template_paused.format("$close")), - template_paused.format("$high"), - ), - template_fillnan.format(template_paused.format("$close")), - ) - ] - fields += [ - "Ref({0}, 240)/Ref(DayLast({1}), 240)".format( - template_if.format( - template_fillnan.format(template_paused.format("$close")), - template_paused.format("$low"), - ), - template_fillnan.format(template_paused.format("$close")), - ) - ] - fields += [ - "Ref({0}, 240)/Ref(DayLast({0}), 240)".format(template_fillnan.format(template_paused.format("$close"))) - ] + return feature_ops - fields += [ - "Ref({0}, 240)/Ref(DayLast({1}), 240)".format( - template_if.format( - template_fillnan.format(template_paused.format("$close")), - template_paused.format(simpson_vwap), - ), - template_fillnan.format(template_paused.format("$close")), - ) - ] + fields += [get_59_price_feature("$open")] + fields += [get_59_price_feature("$high")] + fields += [get_59_price_feature("$low")] + fields += [get_59_price_feature("$close")] + fields += [get_59_price_feature(simpson_vwap)] names += ["$open_1", "$high_1", "$low_1", "$close_1", "$vwap_1"] fields += [ @@ -197,19 +153,20 @@ class HighFreqBacktestHandler(DataHandler): template_if = "If(IsNull({1}), {0}, {1})" template_paused = "Select(Or(IsNull($paused), Eq($paused, 0.0)), {0})" - # template_paused="{0}" - template_fillnan = "FFillNan({0})" + template_fillnan = "BFillNan(FFillNan({0}))" + # Because there is no vwap field in the yahoo data, a method similar to Simpson integration is used to approximate vwap simpson_vwap = "($open + 2*$high + 2*$low + $close)/6" - # fields += [ - # template_fillnan.format(template_paused.format("$close")), - # ] + fields += [ + template_fillnan.format(template_paused.format("$close")), + ] + names += ["$close0"] fields += [ template_if.format( template_fillnan.format(template_paused.format("$close")), template_paused.format(simpson_vwap), ) ] - names += ["$vwap_0"] + names += ["$vwap0"] fields += [ "If(IsNull({0}), 0, If(Or(Gt({1}, Mul(1.001, {3})), Lt({1}, Mul(0.999, {2}))), 0, {0}))".format( template_paused.format("$volume"), @@ -218,6 +175,6 @@ class HighFreqBacktestHandler(DataHandler): template_paused.format("$high"), ) ] - names += ["$volume_0"] + names += ["$volume0"] return fields, names diff --git a/examples/high_freq/highfreq_ops.py b/examples/highfreq/highfreq_ops.py similarity index 69% rename from examples/high_freq/highfreq_ops.py rename to examples/highfreq/highfreq_ops.py index a3fa7ac4a..cee6914a2 100644 --- a/examples/high_freq/highfreq_ops.py +++ b/examples/highfreq/highfreq_ops.py @@ -3,51 +3,61 @@ import pandas as pd import importlib from qlib.data.ops import ElemOperator, PairOperator from qlib.config import C +from qlib.data.cache import H from qlib.data.data import Cal -class DayFirst(ElemOperator): - def __init__(self, feature): - super(DayFirst, self).__init__(feature, "day_first") - - def _load_internal(self, instrument, start_index, end_index, freq): - _calendar = Cal.get_calendar_day(freq=freq)[0] - series = self.feature.load(instrument, start_index, end_index, freq) - return series.groupby(_calendar[series.index]).transform("first") +def get_calendar_day(freq="day", future=False): + flag = f"{freq}_future_{future}_day" + if flag in H["c"]: + _calendar = H["c"][flag] + else: + _calendar = np.array(list(map(lambda x: x.date(), Cal.load_calendar(freq, future)))) + H["c"][flag] = _calendar + return _calendar class DayLast(ElemOperator): def __init__(self, feature): - super(DayLast, self).__init__(feature, "day_last") + super(DayLast, self).__init__(feature) def _load_internal(self, instrument, start_index, end_index, freq): - _calendar = Cal.get_calendar_day(freq=freq)[0] + _calendar = get_calendar_day(freq=freq) series = self.feature.load(instrument, start_index, end_index, freq) return series.groupby(_calendar[series.index]).transform("last") class FFillNan(ElemOperator): def __init__(self, feature): - super(FFillNan, self).__init__(feature, "fill_nan") + super(FFillNan, self).__init__(feature) def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) return series.fillna(method="ffill") -class Date(ElemOperator): +class BFillNan(ElemOperator): def __init__(self, feature): - super(Date, self).__init__(feature, "date") + super(BFillNan, self).__init__(feature) def _load_internal(self, instrument, start_index, end_index, freq): - _calendar = Cal.get_calendar_day(freq=freq)[0] + series = self.feature.load(instrument, start_index, end_index, freq) + return series.fillna(method="bfill") + + +class Date(ElemOperator): + def __init__(self, feature): + super(Date, self).__init__(feature) + + def _load_internal(self, instrument, start_index, end_index, freq): + _calendar = get_calendar_day(freq=freq) series = self.feature.load(instrument, start_index, end_index, freq) return pd.Series(_calendar[series.index], index=series.index) class Select(PairOperator): def __init__(self, condition, feature): - super(Select, self).__init__(condition, feature, "select") + super(Select, self).__init__(condition, feature) def _load_internal(self, instrument, start_index, end_index, freq): series_condition = self.feature_left.load(instrument, start_index, end_index, freq) @@ -57,7 +67,7 @@ class Select(PairOperator): class IsNull(ElemOperator): def __init__(self, feature): - super(IsNull, self).__init__(feature, "isnull") + super(IsNull, self).__init__(feature) def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) diff --git a/examples/high_freq/highfreq_processor.py b/examples/highfreq/highfreq_processor.py similarity index 69% rename from examples/high_freq/highfreq_processor.py rename to examples/highfreq/highfreq_processor.py index d71cd2e85..f0ab0dec2 100644 --- a/examples/high_freq/highfreq_processor.py +++ b/examples/highfreq/highfreq_processor.py @@ -26,7 +26,7 @@ class HighFreqNorm(Processor): if name == "volume": part_values = np.log1p(part_values) self.feature_med[name] = np.nanmedian(part_values) - part_values = part_values - self.feature_med[name] # mean, copy + part_values = part_values - self.feature_med[name] self.feature_std[name] = np.nanmedian(np.absolute(part_values)) * 1.4826 + 1e-12 part_values = part_values / self.feature_std[name] self.feature_vmax[name] = np.nanmax(part_values) @@ -41,23 +41,27 @@ class HighFreqNorm(Processor): } for name, name_val in names.items(): - part_values = df_values[:, name_val] if name == "volume": - part_values[:] = np.log1p(part_values) - part_values -= self.feature_med[name] - part_values /= self.feature_std[name] - slice0 = part_values > 3.0 - slice1 = part_values > 3.5 - slice2 = part_values < -3.0 - slice3 = part_values < -3.5 + df_values[:, name_val] = np.log1p(df_values[:, name_val]) + df_values[:, name_val] -= self.feature_med[name] + df_values[:, name_val] /= self.feature_std[name] + slice0 = df_values[:, name_val] > 3.0 + slice1 = df_values[:, name_val] > 3.5 + slice2 = df_values[:, name_val] < -3.0 + slice3 = df_values[:, name_val] < -3.5 - part_values[slice0] = 3.0 + (part_values[slice0] - 3.0) / (self.feature_vmax[name] - 3) * 0.5 - part_values[slice1] = 3.5 - part_values[slice2] = -3.0 - (part_values[slice2] + 3.0) / (self.feature_vmin[name] + 3) * 0.5 - part_values[slice3] = -3.5 - # print("start_call_feature_reshape") + df_values[:, name_val][slice0] = ( + 3.0 + (df_values[:, name_val][slice0] - 3.0) / (self.feature_vmax[name] - 3) * 0.5 + ) + df_values[:, name_val][slice1] = 3.5 + df_values[:, name_val][slice2] = ( + -3.0 - (df_values[:, name_val][slice2] + 3.0) / (self.feature_vmin[name] + 3) * 0.5 + ) + df_values[:, name_val][slice3] = -3.5 idx = df_features.index.droplevel("datetime").drop_duplicates() idx.set_names(["instrument", "datetime"], inplace=True) + + # Reshape is specifically for adapting to RL high-freq executor feat = df_values[:, [0, 1, 2, 3, 4, 10]].reshape(-1, 6 * 240) feat_1 = df_values[:, [5, 6, 7, 8, 9, 11]].reshape(-1, 6 * 240) df_new_features = pd.DataFrame( diff --git a/examples/high_freq/workflow.py b/examples/highfreq/workflow.py similarity index 54% rename from examples/high_freq/workflow.py rename to examples/highfreq/workflow.py index a2ec67365..7bbb03df4 100644 --- a/examples/high_freq/workflow.py +++ b/examples/highfreq/workflow.py @@ -2,13 +2,14 @@ # Licensed under the MIT License. import sys +import fire from pathlib import Path import qlib import pickle import numpy as np import pandas as pd -from qlib.config import REG_CN +from qlib.config import HIGH_FREQ_CONFIG from qlib.contrib.model.gbdt import LGBModel from qlib.contrib.data.handler import Alpha158 from qlib.contrib.strategy.strategy import TopkDropoutStrategy @@ -23,42 +24,22 @@ from qlib.data.ops import Operators from qlib.data.data import Cal from qlib.utils import exists_qlib_data -from highfreq_ops import DayFirst, DayLast, FFillNan, Date, Select, IsNull +from highfreq_ops import get_calendar_day, DayLast, FFillNan, BFillNan, Date, Select, IsNull -if __name__ == "__main__": - # use yahoo_cn_1min data - provider_uri = "~/.qlib/qlib_data/yahoo_cn_1min" - if not exists_qlib_data(provider_uri): - print(f"Qlib data is not found in {provider_uri}") - sys.path.append(str(Path(__file__).resolve().parent.parent.parent.joinpath("scripts"))) - from get_data import GetData +class HighfreqWorkflow(object): - GetData().qlib_data(target_dir=provider_uri, interval="1min", region=REG_CN) - - qlib.init( - provider_uri=provider_uri, - custom_ops=[DayFirst, DayLast, FFillNan, Date, Select, IsNull], - redis_port=-1, - region=REG_CN, - auto_mount=False, - ) + SPEC_CONF = {"custom_ops": [DayLast, FFillNan, BFillNan, Date, Select, IsNull], "expression_cache": None} MARKET = "all" BENCHMARK = "SH000300" DROP_LOAD_DATASET = False # flag wether to test [drop and load dataset] - # start_time = "2019-01-01 00:00:00" - # end_time = "2019-12-31 15:00:00" - # train_end_time = "2019-05-31 15:00:00" - # test_start_time = "2019-06-01 00:00:00" start_time = "2020-09-14 00:00:00" end_time = "2021-01-18 16:00:00" train_end_time = "2020-11-30 16:00:00" test_start_time = "2020-12-01 00:00:00" - ################################### - # train model - ################################### + DATA_HANDLER_CONFIG0 = { "start_time": start_time, "end_time": end_time, @@ -94,8 +75,6 @@ if __name__ == "__main__": }, }, }, - # You shoud record the data in specific sequence - # "record": ['SignalRecord', 'SigAnaRecord', 'PortAnaRecord'], "dataset_backtest": { "class": "DatasetH", "module_path": "qlib.data.dataset", @@ -115,26 +94,50 @@ if __name__ == "__main__": }, }, } - ##=============load the calendar for cache============= - # unnecessary, but may accelerate - Cal.calendar(freq="1min") # load the calendar for cache - Cal.get_calendar_day(freq="1min") # load the calendar for cache - ##=============get data============= + def _init_qlib(self): + """initialize qlib""" + # use yahoo_cn_1min data + QLIB_INIT_CONFIG = {**HIGH_FREQ_CONFIG, **self.SPEC_CONF} + provider_uri = QLIB_INIT_CONFIG.get("provider_uri") + if not exists_qlib_data(provider_uri): + print(f"Qlib data is not found in {provider_uri}") + sys.path.append(str(Path(__file__).resolve().parent.parent.parent.joinpath("scripts"))) + from get_data import GetData - dataset = init_instance_by_config(task["dataset"]) - xtrain, xtest = dataset.prepare(["train", "test"]) - print(xtrain, xtest) + GetData().qlib_data(target_dir=provider_uri, interval="1min", region=REG_CN) + qlib.init(**QLIB_INIT_CONFIG) - dataset_backtest = init_instance_by_config(task["dataset_backtest"]) - backtest_train, backtest_test = dataset_backtest.prepare(["train", "test"]) - print(backtest_train, backtest_test) + def _prepare_calender_cache(self): + """preload the calendar for cache""" - del xtrain, xtest - del backtest_train, backtest_test + # This code used the copy-on-write feature of Linux to avoid calculating the calendar multiple times in the subprocess + # This code may accelerate, but may be not useful on Windows and Mac Os + Cal.calendar(freq="1min") + get_calendar_day(freq="1min") - ## example to show how to save the dataset and reload it, and how to use different data - if DROP_LOAD_DATASET: + def get_data(self): + """use dataset to get highreq data""" + self._init_qlib() + self._prepare_calender_cache() + + dataset = init_instance_by_config(self.task["dataset"]) + xtrain, xtest = dataset.prepare(["train", "test"]) + print(xtrain, xtest) + + dataset_backtest = init_instance_by_config(self.task["dataset_backtest"]) + backtest_train, backtest_test = dataset_backtest.prepare(["train", "test"]) + print(backtest_train, backtest_test) + + del xtrain, xtest + del backtest_train, backtest_test + + def dump_and_load_dataset(self): + """dump and load dataset state on disk""" + self._init_qlib() + self._prepare_calender_cache() + dataset = init_instance_by_config(self.task["dataset"]) + dataset_backtest = init_instance_by_config(self.task["dataset_backtest"]) ##=============dump dataset============= dataset.to_pickle(path="dataset.pkl") @@ -142,33 +145,18 @@ if __name__ == "__main__": del dataset, dataset_backtest ##=============reload dataset============= - file_dataset = open("dataset.pkl", "rb") - dataset = pickle.load(file_dataset) - file_dataset.close() + with open("dataset.pkl", "rb") as file_dataset: + dataset = pickle.load(file_dataset) - file_dataset_backtest = open("dataset_backtest.pkl", "rb") - dataset_backtest = pickle.load(file_dataset_backtest) - - file_dataset_backtest.close() + with open("dataset_backtest.pkl", "rb") as file_dataset_backtest: + dataset_backtest = pickle.load(file_dataset_backtest) + self._prepare_calender_cache() ##=============reload_dataset============= dataset.init(init_type=DataHandlerLP.IT_LS) - dataset_backtest.init(init_type=DataHandlerLP.IT_LS) + dataset_backtest.init() - ##=============reinit qlib============= - ## Unless you want to modify the provider_uri and other configurations, reinit is unnecessary - qlib.init( - provider_uri=provider_uri, - custom_ops=[DayFirst, DayLast, FFillNan, Date, Select, IsNull], - redis_port=-1, - region=REG_CN, - auto_mount=False, - ) - - Cal.calendar(freq="1min") # load the calendar for cache - Cal.get_calendar_day(freq="1min") # load the calendar for cache - - ##=============test dataset============= + ##=============get data============= xtrain, xtest = dataset.prepare(["train", "test"]) backtest_train, backtest_test = dataset_backtest.prepare(["train", "test"]) @@ -176,3 +164,7 @@ if __name__ == "__main__": print(backtest_train, backtest_test) del xtrain, xtest del backtest_train, backtest_test + + +if __name__ == "__main__": + fire.Fire(HighfreqWorkflow) diff --git a/examples/workflow_by_code.py b/examples/workflow_by_code.py index 6253f3ee4..ea9c70083 100644 --- a/examples/workflow_by_code.py +++ b/examples/workflow_by_code.py @@ -30,7 +30,7 @@ if __name__ == "__main__": GetData().qlib_data(target_dir=provider_uri, region=REG_CN) - qlib.init(provider_uri=provider_uri, region=REG_CN, redis_port=233) + qlib.init(provider_uri=provider_uri, region=REG_CN) market = "csi300" benchmark = "SH000300" diff --git a/qlib/config.py b/qlib/config.py index e94752953..e7120c23a 100644 --- a/qlib/config.py +++ b/qlib/config.py @@ -193,6 +193,12 @@ MODE_CONF = { }, } +HIGH_FREQ_CONFIG = { + "provider_uri": "~/.qlib/qlib_data/yahoo_cn_1min", + "dataset_cache": None, + "expression_cache": "DiskExpressionCache", + "region": REG_CN, +} _default_region_config = { REG_CN: { diff --git a/qlib/data/base.py b/qlib/data/base.py index 92fc57ffe..e318843c4 100644 --- a/qlib/data/base.py +++ b/qlib/data/base.py @@ -157,7 +157,7 @@ class Expression(abc.ABC): @abc.abstractmethod def _load_internal(self, instrument, start_index, end_index, freq): - pass + raise NotImplementedError("This function must be implemented in your newly defined feature") @abc.abstractmethod def get_longest_back_rolling(self): diff --git a/qlib/data/data.py b/qlib/data/data.py index d7f50e0b0..2a0e569ab 100644 --- a/qlib/data/data.py +++ b/qlib/data/data.py @@ -117,17 +117,7 @@ class CalendarProvider(abc.ABC): if flag in H["c"]: _calendar, _calendar_index = H["c"][flag] else: - _calendar = np.array(self._load_calendar(freq, future)) - _calendar_index = {x: i for i, x in enumerate(_calendar)} # for fast search - H["c"][flag] = _calendar, _calendar_index - return _calendar, _calendar_index - - def get_calendar_day(self, freq="day", future=False): - flag = f"{freq}_future_{future}_day" - if flag in H["c"]: - _calendar, _calendar_index = H["c"][flag] - else: - _calendar = np.array(list(map(lambda x: x.date(), self._load_calendar(freq, future)))) + _calendar = np.array(self.load_calendar(freq, future)) _calendar_index = {x: i for i, x in enumerate(_calendar)} # for fast search H["c"][flag] = _calendar, _calendar_index return _calendar, _calendar_index @@ -514,7 +504,7 @@ class LocalCalendarProvider(CalendarProvider): """Calendar file uri.""" return os.path.join(C.get_data_path(), "calendars", "{}.txt") - def _load_calendar(self, freq, future): + def load_calendar(self, freq, future): """Load original calendar timestamp from file. Parameters @@ -679,12 +669,11 @@ class LocalExpressionProvider(ExpressionProvider): # 1) The stock data is currently float. If there is other types of data, this part needs to be re-implemented. # 2) The the precision should be configurable try: - if series.dtype == np.float64: - series = series.astype(np.float32) - elif series.dtype == np.bool: - series = series.astype(np.int8) + series = series.astype(np.float32) except ValueError: pass + except TypeError: + pass if not series.empty: series = series.loc[start_index:end_index] return series diff --git a/qlib/data/dataset/__init__.py b/qlib/data/dataset/__init__.py index 65dcf7ccb..117da764f 100644 --- a/qlib/data/dataset/__init__.py +++ b/qlib/data/dataset/__init__.py @@ -88,15 +88,8 @@ class DatasetH(Dataset): super().__init__(handler, segments) def init(self, **kwargs): - - logger = get_module_logger("DatasetH") - handler_init_kwargs = {} - for arg_key, arg_value in kwargs.items(): - if arg_key in getfullargspec(self.handler.init).args: - handler_init_kwargs[arg_key] = arg_value - else: - logger.info(f"init arguments[{arg_key}] is ignored.") - self.handler.init(**handler_init_kwargs) + """Initialize the DatasetH, Only parameters belonging to handler.init will be passed in""" + self.handler.init(**kwargs) def setup_data(self, handler: Union[dict, DataHandler], segments: list): """ diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 627624022..abcd5a60c 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -428,13 +428,11 @@ class DataHandlerLP(DataHandler): # TODO: Be able to cache handler data. Save the memory for data processing def _get_df_by_key(self, data_key: str = DK_I) -> pd.DataFrame: - try: - df = getattr(self, {self.DK_R: "_data", self.DK_I: "_infer", self.DK_L: "_learn"}[data_key]) - except AttributeError: - print("please set drop_raw = False if you want to use raw data") - raise - except: - raise + if data_key == self.DK_R and self.drop_raw: + raise AttributeError( + "DataHandlerLP has not attribute _data, please set drop_raw = False if you want to use raw data" + ) + df = getattr(self, {self.DK_R: "_data", self.DK_I: "_infer", self.DK_L: "_learn"}[data_key]) return df def fetch( diff --git a/qlib/data/ops.py b/qlib/data/ops.py index 66e588be1..940c24002 100644 --- a/qlib/data/ops.py +++ b/qlib/data/ops.py @@ -6,6 +6,7 @@ from __future__ import division from __future__ import print_function import sys +import abc import numpy as np import pandas as pd @@ -22,8 +23,6 @@ except ImportError: "#### Do not import qlib package in the repository directory in case of importing qlib from . without compiling #####" ) raise -except: - raise np.seterr(invalid="ignore") @@ -34,12 +33,39 @@ np.seterr(invalid="ignore") class ElemOperator(ExpressionOps): """Element-wise Operator + Parameters + ---------- + feature : Expression + feature instance + + Returns + ---------- + Expression + feature operation output + """ + + def __init__(self, feature): + self.feature = feature + + def __str__(self): + return "{}({})".format(type(self).__name__, self.feature) + + def get_longest_back_rolling(self): + return self.feature.get_longest_back_rolling() + + def get_extended_window_size(self): + return self.feature.get_extended_window_size() + + +class NpElemOperator(ElemOperator): + """Numpy Element-wise Operator + Parameters ---------- feature : Expression feature instance func : str - feature operation method + numpy feature operation method Returns ---------- @@ -50,22 +76,14 @@ class ElemOperator(ExpressionOps): def __init__(self, feature, func): self.feature = feature self.func = func - - def __str__(self): - return "{}({})".format(type(self).__name__, self.feature) + super(NpElemOperator, self).__init__(feature) def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) return getattr(np, self.func)(series) - def get_longest_back_rolling(self): - return self.feature.get_longest_back_rolling() - def get_extended_window_size(self): - return self.feature.get_extended_window_size() - - -class Abs(ElemOperator): +class Abs(NpElemOperator): """Feature Absolute Value Parameters @@ -83,7 +101,7 @@ class Abs(ElemOperator): super(Abs, self).__init__(feature, "abs") -class Sign(ElemOperator): +class Sign(NpElemOperator): """Feature Sign Parameters @@ -110,7 +128,7 @@ class Sign(ElemOperator): return getattr(np, self.func)(series) -class Log(ElemOperator): +class Log(NpElemOperator): """Feature Log Parameters @@ -128,7 +146,7 @@ class Log(ElemOperator): super(Log, self).__init__(feature, "log") -class Power(ElemOperator): +class Power(NpElemOperator): """Feature Power Parameters @@ -154,7 +172,7 @@ class Power(ElemOperator): return getattr(np, self.func)(series, self.exponent) -class Mask(ElemOperator): +class Mask(NpElemOperator): """Feature Mask Parameters @@ -181,7 +199,7 @@ class Mask(ElemOperator): return self.feature.load(self.instrument, start_index, end_index, freq) -class Not(ElemOperator): +class Not(NpElemOperator): """Not Operator Parameters @@ -220,28 +238,13 @@ class PairOperator(ExpressionOps): two features' operation output """ - def __init__(self, feature_left, feature_right, func): + def __init__(self, feature_left, feature_right): self.feature_left = feature_left self.feature_right = feature_right - self.func = func def __str__(self): return "{}({},{})".format(type(self).__name__, self.feature_left, self.feature_right) - def _load_internal(self, instrument, start_index, end_index, freq): - assert any( - [isinstance(self.feature_left, Expression), self.feature_right, Expression] - ), "at least one of two inputs is Expression instance" - if isinstance(self.feature_left, Expression): - series_left = self.feature_left.load(instrument, start_index, end_index, freq) - else: - series_left = self.feature_left # numeric value - if isinstance(self.feature_right, Expression): - series_right = self.feature_right.load(instrument, start_index, end_index, freq) - else: - series_right = self.feature_right - return getattr(np, self.func)(series_left, series_right) - def get_longest_back_rolling(self): if isinstance(self.feature_left, Expression): left_br = self.feature_left.get_longest_back_rolling() @@ -267,7 +270,46 @@ class PairOperator(ExpressionOps): return max(ll, rl), max(lr, rr) -class Add(PairOperator): +class NpPairOperator(PairOperator): + """Numpy Pair-wise operator + + Parameters + ---------- + feature_left : Expression + feature instance or numeric value + feature_right : Expression + feature instance or numeric value + func : str + operator function + + Returns + ---------- + Feature: + two features' operation output + """ + + def __init__(self, feature_left, feature_right, func): + self.feature_left = feature_left + self.feature_right = feature_right + self.func = func + super(NpPairOperator, self).__init__(feature_left, feature_right) + + def _load_internal(self, instrument, start_index, end_index, freq): + assert any( + [isinstance(self.feature_left, Expression), self.feature_right, Expression] + ), "at least one of two inputs is Expression instance" + if isinstance(self.feature_left, Expression): + series_left = self.feature_left.load(instrument, start_index, end_index, freq) + else: + series_left = self.feature_left # numeric value + if isinstance(self.feature_right, Expression): + series_right = self.feature_right.load(instrument, start_index, end_index, freq) + else: + series_right = self.feature_right + return getattr(np, self.func)(series_left, series_right) + + +class Add(NpPairOperator): """Add Operator Parameters @@ -287,7 +329,7 @@ class Add(PairOperator): super(Add, self).__init__(feature_left, feature_right, "add") -class Sub(PairOperator): +class Sub(NpPairOperator): """Subtract Operator Parameters @@ -307,7 +349,7 @@ class Sub(PairOperator): super(Sub, self).__init__(feature_left, feature_right, "subtract") -class Mul(PairOperator): +class Mul(NpPairOperator): """Multiply Operator Parameters @@ -327,7 +369,7 @@ class Mul(PairOperator): super(Mul, self).__init__(feature_left, feature_right, "multiply") -class Div(PairOperator): +class Div(NpPairOperator): """Division Operator Parameters @@ -347,7 +389,7 @@ class Div(PairOperator): super(Div, self).__init__(feature_left, feature_right, "divide") -class Greater(PairOperator): +class Greater(NpPairOperator): """Greater Operator Parameters @@ -367,7 +409,7 @@ class Greater(PairOperator): super(Greater, self).__init__(feature_left, feature_right, "maximum") -class Less(PairOperator): +class Less(NpPairOperator): """Less Operator Parameters @@ -387,7 +429,7 @@ class Less(PairOperator): super(Less, self).__init__(feature_left, feature_right, "minimum") -class Gt(PairOperator): +class Gt(NpPairOperator): """Greater Than Operator Parameters @@ -407,7 +449,7 @@ class Gt(PairOperator): super(Gt, self).__init__(feature_left, feature_right, "greater") -class Ge(PairOperator): +class Ge(NpPairOperator): """Greater Equal Than Operator Parameters @@ -427,7 +469,7 @@ class Ge(PairOperator): super(Ge, self).__init__(feature_left, feature_right, "greater_equal") -class Lt(PairOperator): +class Lt(NpPairOperator): """Less Than Operator Parameters @@ -447,7 +489,7 @@ class Lt(PairOperator): super(Lt, self).__init__(feature_left, feature_right, "less") -class Le(PairOperator): +class Le(NpPairOperator): """Less Equal Than Operator Parameters @@ -467,7 +509,7 @@ class Le(PairOperator): super(Le, self).__init__(feature_left, feature_right, "less_equal") -class Eq(PairOperator): +class Eq(NpPairOperator): """Equal Operator Parameters @@ -487,7 +529,7 @@ class Eq(PairOperator): super(Eq, self).__init__(feature_left, feature_right, "equal") -class Ne(PairOperator): +class Ne(NpPairOperator): """Not Equal Operator Parameters @@ -507,7 +549,7 @@ class Ne(PairOperator): super(Ne, self).__init__(feature_left, feature_right, "not_equal") -class And(PairOperator): +class And(NpPairOperator): """And Operator Parameters @@ -527,7 +569,7 @@ class And(PairOperator): super(And, self).__init__(feature_left, feature_right, "bitwise_and") -class Or(PairOperator): +class Or(NpPairOperator): """Or Operator Parameters From ffa68fd010a2bdedcf8859cb63dea96df804fd35 Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 28 Jan 2021 14:25:55 +0000 Subject: [PATCH 14/17] update --- examples/highfreq/highfreq_handler.py | 40 ++++++++++++--------------- examples/highfreq/highfreq_ops.py | 18 ------------ examples/highfreq/workflow.py | 7 ++--- examples/workflow_by_code.py | 7 ++--- tests/test_register_ops.py | 6 ---- 5 files changed, 21 insertions(+), 57 deletions(-) diff --git a/examples/highfreq/highfreq_handler.py b/examples/highfreq/highfreq_handler.py index cb23f48bb..627a32f89 100644 --- a/examples/highfreq/highfreq_handler.py +++ b/examples/highfreq/highfreq_handler.py @@ -60,9 +60,14 @@ class HighFreqHandler(DataHandlerLP): # Because there is no vwap field in the yahoo data, a method similar to Simpson integration is used to approximate vwap simpson_vwap = "($open + 2*$high + 2*$low + $close)/6" - def get_04_price_feature(price_field): + def get_normalized_price_feature(price_field, shift=0): """Get 0~4 column price feature ops""" - feature_ops = "{0}/Ref(DayLast({1}), 240)".format( + if shift == 0: + template_norm = "{0}/Ref(DayLast({1}), 240)" + else: + template_norm = "Ref({0}, " + str(shift) + ")/Ref(DayLast({1}), 240)" + + feature_ops = template_norm.format( template_if.format( template_fillnan.format(template_paused.format("$close")), template_paused.format(price_field), @@ -71,29 +76,18 @@ class HighFreqHandler(DataHandlerLP): ) return feature_ops - fields += [get_04_price_feature("$open")] - fields += [get_04_price_feature("$high")] - fields += [get_04_price_feature("$low")] - fields += [get_04_price_feature("$close")] - fields += [get_04_price_feature(simpson_vwap)] + fields += [get_normalized_price_feature("$open", 0)] + fields += [get_normalized_price_feature("$high", 0)] + fields += [get_normalized_price_feature("$low", 0)] + fields += [get_normalized_price_feature("$close", 0)] + fields += [get_normalized_price_feature(simpson_vwap, 0)] names += ["$open", "$high", "$low", "$close", "$vwap"] - def get_59_price_feature(price_field): - """Get 5~9 column price feature ops""" - feature_ops = "Ref({0}, 240)/Ref(DayLast({1}), 240)".format( - template_if.format( - template_fillnan.format(template_paused.format("$close")), - template_paused.format(price_field), - ), - template_fillnan.format(template_paused.format("$close")), - ) - return feature_ops - - fields += [get_59_price_feature("$open")] - fields += [get_59_price_feature("$high")] - fields += [get_59_price_feature("$low")] - fields += [get_59_price_feature("$close")] - fields += [get_59_price_feature(simpson_vwap)] + fields += [get_normalized_price_feature("$open", 240)] + fields += [get_normalized_price_feature("$high", 240)] + fields += [get_normalized_price_feature("$low", 240)] + fields += [get_normalized_price_feature("$close", 240)] + fields += [get_normalized_price_feature(simpson_vwap, 240)] names += ["$open_1", "$high_1", "$low_1", "$close_1", "$vwap_1"] fields += [ diff --git a/examples/highfreq/highfreq_ops.py b/examples/highfreq/highfreq_ops.py index cee6914a2..85ed63285 100644 --- a/examples/highfreq/highfreq_ops.py +++ b/examples/highfreq/highfreq_ops.py @@ -18,9 +18,6 @@ def get_calendar_day(freq="day", future=False): class DayLast(ElemOperator): - def __init__(self, feature): - super(DayLast, self).__init__(feature) - def _load_internal(self, instrument, start_index, end_index, freq): _calendar = get_calendar_day(freq=freq) series = self.feature.load(instrument, start_index, end_index, freq) @@ -28,27 +25,18 @@ class DayLast(ElemOperator): class FFillNan(ElemOperator): - def __init__(self, feature): - super(FFillNan, self).__init__(feature) - def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) return series.fillna(method="ffill") class BFillNan(ElemOperator): - def __init__(self, feature): - super(BFillNan, self).__init__(feature) - def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) return series.fillna(method="bfill") class Date(ElemOperator): - def __init__(self, feature): - super(Date, self).__init__(feature) - def _load_internal(self, instrument, start_index, end_index, freq): _calendar = get_calendar_day(freq=freq) series = self.feature.load(instrument, start_index, end_index, freq) @@ -56,9 +44,6 @@ class Date(ElemOperator): class Select(PairOperator): - def __init__(self, condition, feature): - super(Select, self).__init__(condition, feature) - def _load_internal(self, instrument, start_index, end_index, freq): series_condition = self.feature_left.load(instrument, start_index, end_index, freq) series_feature = self.feature_right.load(instrument, start_index, end_index, freq) @@ -66,9 +51,6 @@ class Select(PairOperator): class IsNull(ElemOperator): - def __init__(self, feature): - super(IsNull, self).__init__(feature) - def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) return series.isnull() diff --git a/examples/highfreq/workflow.py b/examples/highfreq/workflow.py index 7bbb03df4..4482bbc6d 100644 --- a/examples/highfreq/workflow.py +++ b/examples/highfreq/workflow.py @@ -18,11 +18,11 @@ from qlib.contrib.evaluate import ( risk_analysis, ) -from qlib.utils import init_instance_by_config +from qlib.utils import init_instance_by_config, exists_qlib_data from qlib.data.dataset.handler import DataHandlerLP from qlib.data.ops import Operators from qlib.data.data import Cal -from qlib.utils import exists_qlib_data +from qlib.tests.data import GetData from highfreq_ops import get_calendar_day, DayLast, FFillNan, BFillNan, Date, Select, IsNull @@ -102,9 +102,6 @@ class HighfreqWorkflow(object): provider_uri = QLIB_INIT_CONFIG.get("provider_uri") if not exists_qlib_data(provider_uri): print(f"Qlib data is not found in {provider_uri}") - sys.path.append(str(Path(__file__).resolve().parent.parent.parent.joinpath("scripts"))) - from get_data import GetData - GetData().qlib_data(target_dir=provider_uri, interval="1min", region=REG_CN) qlib.init(**QLIB_INIT_CONFIG) diff --git a/examples/workflow_by_code.py b/examples/workflow_by_code.py index ea9c70083..4b254ee60 100644 --- a/examples/workflow_by_code.py +++ b/examples/workflow_by_code.py @@ -17,7 +17,7 @@ from qlib.contrib.evaluate import ( from qlib.utils import exists_qlib_data, init_instance_by_config, flatten_dict from qlib.workflow import R from qlib.workflow.record_temp import SignalRecord, PortAnaRecord - +from qlib.tests.data import GetData if __name__ == "__main__": @@ -25,12 +25,9 @@ if __name__ == "__main__": provider_uri = "~/.qlib/qlib_data/cn_data" # target_dir if not exists_qlib_data(provider_uri): print(f"Qlib data is not found in {provider_uri}") - sys.path.append(str(Path(__file__).resolve().parent.parent.joinpath("scripts"))) - from get_data import GetData - GetData().qlib_data(target_dir=provider_uri, region=REG_CN) - qlib.init(provider_uri=provider_uri, region=REG_CN) + qlib.init(provider_uri=provider_uri, region=REG_CN, redis_port=-1) market = "csi300" benchmark = "SH000300" diff --git a/tests/test_register_ops.py b/tests/test_register_ops.py index cb172b2bb..7d3322ddc 100644 --- a/tests/test_register_ops.py +++ b/tests/test_register_ops.py @@ -26,9 +26,6 @@ class Diff(ElemOperator): a feature instance with first difference """ - def __init__(self, feature): - super(Diff, self).__init__(feature, "diff") - def _load_internal(self, instrument, start_index, end_index, freq): series = self.feature.load(instrument, start_index, end_index, freq) return series.diff() @@ -50,9 +47,6 @@ class Distance(PairOperator): a feature instance with distance """ - def __init__(self, feature_left, feature_right): - super(Distance, self).__init__(feature_left, feature_right, "distance") - def _load_internal(self, instrument, start_index, end_index, freq): series_left = self.feature_left.load(instrument, start_index, end_index, freq) series_right = self.feature_right.load(instrument, start_index, end_index, freq) From f3eb02a0bd364a4f9170de09389f5e326bd83d55 Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 28 Jan 2021 14:26:30 +0000 Subject: [PATCH 15/17] update docstring --- examples/highfreq/highfreq_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/highfreq/highfreq_handler.py b/examples/highfreq/highfreq_handler.py index 627a32f89..be2084626 100644 --- a/examples/highfreq/highfreq_handler.py +++ b/examples/highfreq/highfreq_handler.py @@ -61,7 +61,7 @@ class HighFreqHandler(DataHandlerLP): simpson_vwap = "($open + 2*$high + 2*$low + $close)/6" def get_normalized_price_feature(price_field, shift=0): - """Get 0~4 column price feature ops""" + """Get normalized price feature ops""" if shift == 0: template_norm = "{0}/Ref(DayLast({1}), 240)" else: From 76cf9dad99ba0e79b42d2a060d7fcbb0a7697cbf Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 28 Jan 2021 14:30:20 +0000 Subject: [PATCH 16/17] update --- examples/highfreq/workflow.py | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/highfreq/workflow.py b/examples/highfreq/workflow.py index 4482bbc6d..e5fdcdb59 100644 --- a/examples/highfreq/workflow.py +++ b/examples/highfreq/workflow.py @@ -33,7 +33,6 @@ class HighfreqWorkflow(object): MARKET = "all" BENCHMARK = "SH000300" - DROP_LOAD_DATASET = False # flag wether to test [drop and load dataset] start_time = "2020-09-14 00:00:00" end_time = "2021-01-18 16:00:00" From 8ef89b4fa84041145715a57b7f5dab3ad9ae6e27 Mon Sep 17 00:00:00 2001 From: bxdd Date: Thu, 28 Jan 2021 15:01:07 +0000 Subject: [PATCH 17/17] update --- examples/workflow_by_code.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/workflow_by_code.py b/examples/workflow_by_code.py index 4b254ee60..6d166646c 100644 --- a/examples/workflow_by_code.py +++ b/examples/workflow_by_code.py @@ -27,7 +27,7 @@ if __name__ == "__main__": print(f"Qlib data is not found in {provider_uri}") GetData().qlib_data(target_dir=provider_uri, region=REG_CN) - qlib.init(provider_uri=provider_uri, region=REG_CN, redis_port=-1) + qlib.init(provider_uri=provider_uri, region=REG_CN) market = "csi300" benchmark = "SH000300"