1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-03 02:50:58 +08:00

General handler for open source data preprocessing (#1302)

* feat(data):  add a general highfreq data handler for open source

Add HighFreqOpenHandler and HighFreqOpenBacktestHandler for data pipeline without paused_num
information.

* fix: position of parameter init

* style(data): 💄 rename open to general

* style(data): 💄 lint

* style: 💄 delete useless comment & fix inheritance relation

* style: 💄 lint

* style: 💄 remove duplicated function

Co-authored-by: mingzhehan <v-zhaoxing@Microsoft.com>
This commit is contained in:
Yuchen Fang
2022-10-12 15:18:30 +08:00
committed by GitHub
parent 4fa37a96bf
commit 54928e956d

View File

@@ -1,5 +1,7 @@
from qlib.data.dataset.handler import DataHandler, DataHandlerLP
from .handler import check_transform_proc
EPSILON = 1e-4
@@ -15,20 +17,9 @@ class HighFreqHandler(DataHandlerLP):
fit_end_time=None,
drop_raw=True,
):
def check_transform_proc(proc_l):
new_l = []
for p in proc_l:
p["kwargs"].update(
{
"fit_start_time": fit_start_time,
"fit_end_time": fit_end_time,
}
)
new_l.append(p)
return new_l
infer_processors = check_transform_proc(infer_processors)
learn_processors = check_transform_proc(learn_processors)
infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time)
learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time)
data_loader = {
"class": "QlibDataLoader",
@@ -110,6 +101,103 @@ class HighFreqHandler(DataHandlerLP):
return fields, names
class HighFreqGeneralHandler(DataHandlerLP):
def __init__(
self,
instruments="csi300",
start_time=None,
end_time=None,
infer_processors=[],
learn_processors=[],
fit_start_time=None,
fit_end_time=None,
drop_raw=True,
day_length=240,
):
self.day_length = day_length
infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time)
learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time)
data_loader = {
"class": "QlibDataLoader",
"kwargs": {
"config": self.get_feature_config(),
"swap_level": False,
"freq": "1min",
},
}
super().__init__(
instruments=instruments,
start_time=start_time,
end_time=end_time,
data_loader=data_loader,
infer_processors=infer_processors,
learn_processors=learn_processors,
drop_raw=drop_raw,
)
def get_feature_config(self):
fields = []
names = []
template_if = "If(IsNull({1}), {0}, {1})"
template_paused = f"Cut({{0}}, {self.day_length * 2}, None)"
def get_normalized_price_feature(price_field, shift=0):
# norm with the close price of 237th minute of yesterday.
if shift == 0:
template_norm = f"{{0}}/DayLast(Ref({{1}}, {self.day_length * 2}))"
else:
template_norm = f"Ref({{0}}, " + str(shift) + f")/DayLast(Ref({{1}}, {self.day_length}))"
template_fillnan = "FFillNan({0})"
# calculate -> ffill -> remove paused
feature_ops = template_paused.format(
template_fillnan.format(
template_norm.format(template_if.format("$close", price_field), template_fillnan.format("$close"))
)
)
return feature_ops
fields += [get_normalized_price_feature("$open", 0)]
fields += [get_normalized_price_feature("$high", 0)]
fields += [get_normalized_price_feature("$low", 0)]
fields += [get_normalized_price_feature("$close", 0)]
fields += [get_normalized_price_feature("$vwap", 0)]
names += ["$open", "$high", "$low", "$close", "$vwap"]
fields += [get_normalized_price_feature("$open", self.day_length)]
fields += [get_normalized_price_feature("$high", self.day_length)]
fields += [get_normalized_price_feature("$low", self.day_length)]
fields += [get_normalized_price_feature("$close", self.day_length)]
fields += [get_normalized_price_feature("$vwap", self.day_length)]
names += ["$open_1", "$high_1", "$low_1", "$close_1", "$vwap_1"]
# calculate and fill nan with 0
fields += [
template_paused.format(
"If(IsNull({0}), 0, {0})".format(
f"{{0}}/Ref(DayLast(Mean({{0}}, {self.day_length * 30})), {self.day_length})".format("$volume")
)
)
]
names += ["$volume"]
fields += [
template_paused.format(
"If(IsNull({0}), 0, {0})".format(
f"Ref({{0}}, {self.day_length})/Ref(DayLast(Mean({{0}}, {self.day_length * 30})), {self.day_length})".format(
"$volume"
)
)
)
]
names += ["$volume_1"]
return fields, names
class HighFreqBacktestHandler(DataHandler):
def __init__(
self,
@@ -163,6 +251,53 @@ class HighFreqBacktestHandler(DataHandler):
return fields, names
class HighFreqGeneralBacktestHandler(DataHandler):
def __init__(
self,
instruments="csi300",
start_time=None,
end_time=None,
day_length=240,
):
self.day_length = day_length
data_loader = {
"class": "QlibDataLoader",
"kwargs": {
"config": self.get_feature_config(),
"swap_level": False,
"freq": "1min",
},
}
super().__init__(
instruments=instruments,
start_time=start_time,
end_time=end_time,
data_loader=data_loader,
)
def get_feature_config(self):
fields = []
names = []
template_paused = f"Cut({{0}}, {self.day_length * 2}, None)"
template_fillnan = "FFillNan({0})"
template_if = "If(IsNull({1}), {0}, {1})"
fields += [
template_paused.format(template_fillnan.format("$close")),
]
names += ["$close0"]
fields += [
template_paused.format(template_if.format(template_fillnan.format("$close"), "$vwap")),
]
names += ["$vwap0"]
fields += [template_paused.format("If(IsNull({0}), 0, {0})".format("$volume"))]
names += ["$volume0"]
return fields, names
class HighFreqOrderHandler(DataHandlerLP):
def __init__(
self,
@@ -175,20 +310,9 @@ class HighFreqOrderHandler(DataHandlerLP):
fit_end_time=None,
drop_raw=True,
):
def check_transform_proc(proc_l):
new_l = []
for p in proc_l:
p["kwargs"].update(
{
"fit_start_time": fit_start_time,
"fit_end_time": fit_end_time,
}
)
new_l.append(p)
return new_l
infer_processors = check_transform_proc(infer_processors)
learn_processors = check_transform_proc(learn_processors)
infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time)
learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time)
data_loader = {
"class": "QlibDataLoader",
@@ -356,7 +480,6 @@ class HighFreqBacktestOrderHandler(DataHandler):
template_if = "If(IsNull({1}), {0}, {1})"
template_paused = "Select(Gt($hx_paused_num, 1.001), {0})"
# template_paused = "{0}"
template_fillnan = "FFillNan({0})"
fields += [
template_fillnan.format(template_paused.format("$close")),