# Copyright (c) Microsoft Corporation. # Licensed under the MIT License. import pandas as pd from qlib.data.dataset.loader import QlibDataLoader from qlib.contrib.data.handler import DataHandlerLP, _DEFAULT_LEARN_PROCESSORS, check_transform_proc class Avg15minLoader(QlibDataLoader): def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame: df = super(Avg15minLoader, self).load(instruments, start_time, end_time) if self.is_group: # feature_day(day freq) and feature_15min(1min freq, Average every 15 minutes) renamed feature df.columns = df.columns.map(lambda x: ("feature", x[1]) if x[0].startswith("feature") else x) return df class Avg15minHandler(DataHandlerLP): def __init__( self, instruments="csi500", start_time=None, end_time=None, freq="day", infer_processors=[], learn_processors=_DEFAULT_LEARN_PROCESSORS, fit_start_time=None, fit_end_time=None, process_type=DataHandlerLP.PTYPE_A, filter_pipe=None, inst_processors=None, **kwargs, ): infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time) learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time) data_loader = Avg15minLoader( config=self.loader_config(), filter_pipe=filter_pipe, freq=freq, inst_processors=inst_processors ) super().__init__( instruments=instruments, start_time=start_time, end_time=end_time, data_loader=data_loader, infer_processors=infer_processors, learn_processors=learn_processors, process_type=process_type, ) def loader_config(self): # Results for dataset: df: pd.DataFrame # len(df.columns) == 6 + 6 * 16, len(df.index.get_level_values(level="datetime").unique()) == T # df.columns: close0, close1, ..., close16, open0, ..., open16, ..., vwap16 # freq == day: # close0, open0, low0, high0, volume0, vwap0 # freq == 1min: # close1, ..., close16, ..., vwap1, ..., vwap16 # df.index.name == ["datetime", "instrument"]: pd.MultiIndex # Example: # feature ... label # close0 open0 low0 ... vwap1 vwap16 LABEL0 # datetime instrument ... # 2020-10-09 SH600000 11.794546 11.819587 11.769505 ... NaN NaN -0.005214 # 2020-10-15 SH600000 12.044961 11.944795 11.932274 ... NaN NaN -0.007202 # ... ... ... ... ... ... ... ... # 2021-05-28 SZ300676 6.369684 6.495406 6.306568 ... NaN NaN -0.001321 # 2021-05-31 SZ300676 6.601626 6.465643 6.465130 ... NaN NaN -0.023428 # features day: len(columns) == 6, freq = day # $close is the closing price of the current trading day: # if the user needs to get the `close` before the last T days, use Ref($close, T-1), for example: # $close Ref($close, 1) Ref($close, 2) Ref($close, 3) Ref($close, 4) # instrument datetime # SH600519 2021-06-01 244.271530 # 2021-06-02 242.205917 244.271530 # 2021-06-03 242.229889 242.205917 244.271530 # 2021-06-04 245.421524 242.229889 242.205917 244.271530 # 2021-06-07 247.547089 245.421524 242.229889 242.205917 244.271530 # WARNING: Ref($close, N), if N == 0, Ref($close, N) ==> $close fields = ["$close", "$open", "$low", "$high", "$volume", "$vwap"] # names: close0, open0, ..., vwap0 names = list(map(lambda x: x.strip("$") + "0", fields)) config = {"feature_day": (fields, names)} # features 15min: len(columns) == 6 * 16, freq = 1min # $close is the closing price of the current trading day: # if the user gets 'close' for the i-th 15min of the last T days, use `Ref(Mean($close, 15), (T-1) * 240 + i * 15)`, for example: # Ref(Mean($close, 15), 225) Ref(Mean($close, 15), 465) Ref(Mean($close, 15), 705) # instrument datetime # SH600519 2021-05-31 241.769897 243.077942 244.712997 # 2021-06-01 244.271530 241.769897 243.077942 # 2021-06-02 242.205917 244.271530 241.769897 # WARNING: Ref(Mean($close, 15), N), if N == 0, Ref(Mean($close, 15), N) ==> Mean($close, 15) # Results of the current script: # time: 09:00 --> 09:14, ..., 14:45 --> 14:59 # fields: Ref(Mean($close, 15), 225), ..., Mean($close, 15) # name: close1, ..., close16 # # Expression description: take close as an example # Mean($close, 15) ==> df["$close"].rolling(15, min_periods=1).mean() # Ref(Mean($close, 15), 15) ==> df["$close"].rolling(15, min_periods=1).mean().shift(15) # NOTE: The last data of each trading day, which is the average of the i-th 15 minutes # Average: # Average of the i-th 15-minute period of each trading day: 1 <= i <= 250 // 16 # Avg(15minutes): Ref(Mean($close, 15), 240 - i * 15) # # Average of the first 15 minutes of each trading day; i = 1 # Avg(09:00 --> 09:14), df.index.loc["09:14"]: Ref(Mean($close, 15), 240- 1 * 15) ==> Ref(Mean($close, 15), 225) # Average of the last 15 minutes of each trading day; i = 16 # Avg(14:45 --> 14:59), df.index.loc["14:59"]: Ref(Mean($close, 15), 240 - 16 * 15) ==> Ref(Mean($close, 15), 0) ==> Mean($close, 15) # 15min resample to day # df.resample("1d").last() tmp_fields = [] tmp_names = [] for i, _f in enumerate(fields): _fields = [f"Ref(Mean({_f}, 15), {j * 15})" for j in range(1, 240 // 15)] _names = [f"{names[i][:-1]}{int(names[i][-1])+j}" for j in range(240 // 15 - 1, 0, -1)] _fields.append(f"Mean({_f}, 15)") _names.append(f"{names[i][:-1]}{int(names[i][-1])+240 // 15}") tmp_fields += _fields tmp_names += _names config["feature_15min"] = (tmp_fields, tmp_names) # label config["label"] = (["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"]) return config