From 54928e956d8c3c8303e309ba024f29eaf76de600 Mon Sep 17 00:00:00 2001 From: Yuchen Fang Date: Wed, 12 Oct 2022 15:18:30 +0800 Subject: [PATCH] General handler for open source data preprocessing (#1302) * feat(data): :sparkles: add a general highfreq data handler for open source Add HighFreqOpenHandler and HighFreqOpenBacktestHandler for data pipeline without paused_num information. * fix: position of parameter init * style(data): :lipstick: rename open to general * style(data): :lipstick: lint * style: :lipstick: delete useless comment & fix inheritance relation * style: :lipstick: lint * style: :lipstick: remove duplicated function Co-authored-by: mingzhehan --- qlib/contrib/data/highfreq_handler.py | 177 ++++++++++++++++++++++---- 1 file changed, 150 insertions(+), 27 deletions(-) diff --git a/qlib/contrib/data/highfreq_handler.py b/qlib/contrib/data/highfreq_handler.py index 4898725da..373b8e669 100644 --- a/qlib/contrib/data/highfreq_handler.py +++ b/qlib/contrib/data/highfreq_handler.py @@ -1,5 +1,7 @@ from qlib.data.dataset.handler import DataHandler, DataHandlerLP +from .handler import check_transform_proc + EPSILON = 1e-4 @@ -15,20 +17,9 @@ class HighFreqHandler(DataHandlerLP): fit_end_time=None, drop_raw=True, ): - def check_transform_proc(proc_l): - new_l = [] - for p in proc_l: - p["kwargs"].update( - { - "fit_start_time": fit_start_time, - "fit_end_time": fit_end_time, - } - ) - new_l.append(p) - return new_l - infer_processors = check_transform_proc(infer_processors) - learn_processors = check_transform_proc(learn_processors) + infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time) + learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time) data_loader = { "class": "QlibDataLoader", @@ -110,6 +101,103 @@ class HighFreqHandler(DataHandlerLP): return fields, names +class HighFreqGeneralHandler(DataHandlerLP): + def __init__( + self, + instruments="csi300", + start_time=None, + end_time=None, + infer_processors=[], + learn_processors=[], + fit_start_time=None, + fit_end_time=None, + drop_raw=True, + day_length=240, + ): + self.day_length = day_length + + infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time) + learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time) + + data_loader = { + "class": "QlibDataLoader", + "kwargs": { + "config": self.get_feature_config(), + "swap_level": False, + "freq": "1min", + }, + } + super().__init__( + instruments=instruments, + start_time=start_time, + end_time=end_time, + data_loader=data_loader, + infer_processors=infer_processors, + learn_processors=learn_processors, + drop_raw=drop_raw, + ) + + def get_feature_config(self): + fields = [] + names = [] + + template_if = "If(IsNull({1}), {0}, {1})" + template_paused = f"Cut({{0}}, {self.day_length * 2}, None)" + + def get_normalized_price_feature(price_field, shift=0): + # norm with the close price of 237th minute of yesterday. + if shift == 0: + template_norm = f"{{0}}/DayLast(Ref({{1}}, {self.day_length * 2}))" + else: + template_norm = f"Ref({{0}}, " + str(shift) + f")/DayLast(Ref({{1}}, {self.day_length}))" + + template_fillnan = "FFillNan({0})" + # calculate -> ffill -> remove paused + feature_ops = template_paused.format( + template_fillnan.format( + template_norm.format(template_if.format("$close", price_field), template_fillnan.format("$close")) + ) + ) + return feature_ops + + fields += [get_normalized_price_feature("$open", 0)] + fields += [get_normalized_price_feature("$high", 0)] + fields += [get_normalized_price_feature("$low", 0)] + fields += [get_normalized_price_feature("$close", 0)] + fields += [get_normalized_price_feature("$vwap", 0)] + names += ["$open", "$high", "$low", "$close", "$vwap"] + + fields += [get_normalized_price_feature("$open", self.day_length)] + fields += [get_normalized_price_feature("$high", self.day_length)] + fields += [get_normalized_price_feature("$low", self.day_length)] + fields += [get_normalized_price_feature("$close", self.day_length)] + fields += [get_normalized_price_feature("$vwap", self.day_length)] + names += ["$open_1", "$high_1", "$low_1", "$close_1", "$vwap_1"] + + # calculate and fill nan with 0 + fields += [ + template_paused.format( + "If(IsNull({0}), 0, {0})".format( + f"{{0}}/Ref(DayLast(Mean({{0}}, {self.day_length * 30})), {self.day_length})".format("$volume") + ) + ) + ] + names += ["$volume"] + + fields += [ + template_paused.format( + "If(IsNull({0}), 0, {0})".format( + f"Ref({{0}}, {self.day_length})/Ref(DayLast(Mean({{0}}, {self.day_length * 30})), {self.day_length})".format( + "$volume" + ) + ) + ) + ] + names += ["$volume_1"] + + return fields, names + + class HighFreqBacktestHandler(DataHandler): def __init__( self, @@ -163,6 +251,53 @@ class HighFreqBacktestHandler(DataHandler): return fields, names +class HighFreqGeneralBacktestHandler(DataHandler): + def __init__( + self, + instruments="csi300", + start_time=None, + end_time=None, + day_length=240, + ): + self.day_length = day_length + data_loader = { + "class": "QlibDataLoader", + "kwargs": { + "config": self.get_feature_config(), + "swap_level": False, + "freq": "1min", + }, + } + super().__init__( + instruments=instruments, + start_time=start_time, + end_time=end_time, + data_loader=data_loader, + ) + + def get_feature_config(self): + fields = [] + names = [] + + template_paused = f"Cut({{0}}, {self.day_length * 2}, None)" + template_fillnan = "FFillNan({0})" + template_if = "If(IsNull({1}), {0}, {1})" + fields += [ + template_paused.format(template_fillnan.format("$close")), + ] + names += ["$close0"] + + fields += [ + template_paused.format(template_if.format(template_fillnan.format("$close"), "$vwap")), + ] + names += ["$vwap0"] + + fields += [template_paused.format("If(IsNull({0}), 0, {0})".format("$volume"))] + names += ["$volume0"] + + return fields, names + + class HighFreqOrderHandler(DataHandlerLP): def __init__( self, @@ -175,20 +310,9 @@ class HighFreqOrderHandler(DataHandlerLP): fit_end_time=None, drop_raw=True, ): - def check_transform_proc(proc_l): - new_l = [] - for p in proc_l: - p["kwargs"].update( - { - "fit_start_time": fit_start_time, - "fit_end_time": fit_end_time, - } - ) - new_l.append(p) - return new_l - infer_processors = check_transform_proc(infer_processors) - learn_processors = check_transform_proc(learn_processors) + infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time) + learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time) data_loader = { "class": "QlibDataLoader", @@ -356,7 +480,6 @@ class HighFreqBacktestOrderHandler(DataHandler): template_if = "If(IsNull({1}), {0}, {1})" template_paused = "Select(Gt($hx_paused_num, 1.001), {0})" - # template_paused = "{0}" template_fillnan = "FFillNan({0})" fields += [ template_fillnan.format(template_paused.format("$close")),