From 1bbd0261950696a355aac96a373ec9ece565f181 Mon Sep 17 00:00:00 2001 From: Wendi Li Date: Wed, 9 Dec 2020 12:07:01 +0800 Subject: [PATCH] Add files via upload --- examples/benchmarks/TFT/tft.py | 540 ++++++++++++++++++--------------- 1 file changed, 291 insertions(+), 249 deletions(-) diff --git a/examples/benchmarks/TFT/tft.py b/examples/benchmarks/TFT/tft.py index 388ec7f14..e1205b0e0 100644 --- a/examples/benchmarks/TFT/tft.py +++ b/examples/benchmarks/TFT/tft.py @@ -1,249 +1,291 @@ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - -import numpy as np -import pandas as pd -import tensorflow.compat.v1 as tf -import data_formatters.base -import expt_settings.configs -import libs.hyperparam_opt -import libs.tft_model -import libs.utils as utils -import os -import datetime as dte - - -from qlib.model.base import ModelFT -from qlib.data.dataset import DatasetH -from qlib.data.dataset.handler import DataHandlerLP - - -# To register new datasets, please add them here. -ALLOW_DATASET = ["Alpha158"] -DATASET_SETTING = { - "Alpha158": { - "feature_col": ["RESI5", "WVMA5", "RSQR5", "KLEN", "RSQR10", "CORR5", "CORD5", "CORR10", "ROC60", "RESI10"], - "label_col": ["LABEL0"], - }, -} -# To register new datasets, please add their configurations here. - - -def get_shifted_label(data_df, shifts=5, col_shift="LABEL0"): - return data_df[[col_shift]].groupby("instrument").apply(lambda df: df.shift(shifts)) - - -def fill_test_na(test_df): - test_df_res = test_df.copy() - feature_cols = ~test_df_res.columns.str.contains("label", case=False) - test_feature_fna = test_df_res.loc[:, feature_cols].groupby("datetime").apply(lambda df: df.fillna(df.mean())) - test_df_res.loc[:, feature_cols] = test_feature_fna - return test_df_res - - -def process_qlib_data(df, dataset, fillna=False): - """Prepare data to fit the TFT model. - - Args: - df: Original DataFrame. - fillna: Whether to fill the data with the mean values. - - Returns: - Transformed DataFrame. - - """ - # Several features selected manually - feature_col = DATASET_SETTING[dataset]["feature_col"] - label_col = DATASET_SETTING[dataset]["label_col"] - temp_df = df.loc[:, feature_col + label_col] - if fillna: - temp_df = fill_test_na(temp_df) - temp_df = temp_df.swaplevel() - temp_df = temp_df.sort_index() - temp_df = temp_df.reset_index(level=0) - dates = pd.to_datetime(temp_df.index) - temp_df["date"] = dates - temp_df["day_of_week"] = dates.dayofweek - temp_df["month"] = dates.month - temp_df["year"] = dates.year - temp_df["const"] = 1.0 - return temp_df - - -def process_predicted(df, col_name): - """Transform the TFT predicted data into Qlib format. - - Args: - df: Original DataFrame. - fillna: New column name. - - Returns: - Transformed DataFrame. - - """ - df_res = df.copy() - df_res = df_res.rename(columns={"forecast_time": "datetime", "identifier": "instrument", "t+4": col_name}) - df_res = df_res.set_index(["datetime", "instrument"]).sort_index() - df_res = df_res[[col_name]] - return df_res - - -def format_score(forecast_df, col_name="pred", label_shift=5): - pred = process_predicted(forecast_df, col_name=col_name) - pred = get_shifted_label(pred, shifts=-label_shift, col_shift=col_name) - pred = pred.dropna()[col_name] - return pred - - -def transform_df(df, col_name="LABEL0"): - df_res = df["feature"] - df_res[col_name] = df["label"] - return df_res - - -class TFTModel(ModelFT): - """TFT Model""" - - def __init__(self, **kwargs): - self.model = None - - def _prepare_data(self, dataset: DatasetH): - df_train, df_valid = dataset.prepare( - ["train", "valid"], col_set=["feature", "label"], data_key=DataHandlerLP.DK_L - ) - return transform_df(df_train), transform_df(df_valid) - - def fit( - self, - dataset: DatasetH, - DATASET="Alpha158", - MODEL_FOLDER="qlib_alpha158_model", - LABEL_COL="LABEL0", - LABEL_SHIFT=5, - USE_GPU_ID=0, - **kwargs - ): - - if DATASET not in ALLOW_DATASET: - raise AssertionError("The dataset is not supported, please make a new formatter to fit this dataset") - - dtrain, dvalid = self._prepare_data(dataset) - dtrain.loc[:, LABEL_COL] = get_shifted_label(dtrain, shifts=LABEL_SHIFT, col_shift=LABEL_COL) - dvalid.loc[:, LABEL_COL] = get_shifted_label(dvalid, shifts=LABEL_SHIFT, col_shift=LABEL_COL) - - train = process_qlib_data(dtrain, DATASET, fillna=True).dropna() - valid = process_qlib_data(dvalid, DATASET, fillna=True).dropna() - - ExperimentConfig = expt_settings.configs.ExperimentConfig - config = ExperimentConfig(DATASET) - self.data_formatter = config.make_data_formatter() - self.model_folder = MODEL_FOLDER - self.gpu_id = USE_GPU_ID - self.label_shift = LABEL_SHIFT - self.expt_name = DATASET - self.label_col = LABEL_COL - - use_gpu = (True, self.gpu_id) - # ===========================Training Process=========================== - ModelClass = libs.tft_model.TemporalFusionTransformer - if not isinstance(self.data_formatter, data_formatters.base.GenericDataFormatter): - raise ValueError( - "Data formatters should inherit from" - + "AbstractDataFormatter! Type={}".format(type(self.data_formatter)) - ) - - default_keras_session = tf.keras.backend.get_session() - - if use_gpu[0]: - self.tf_config = utils.get_default_tensorflow_config(tf_device="gpu", gpu_id=use_gpu[1]) - else: - self.tf_config = utils.get_default_tensorflow_config(tf_device="cpu") - - self.data_formatter.set_scalers(train) - - # Sets up default params - fixed_params = self.data_formatter.get_experiment_params() - params = self.data_formatter.get_default_model_params() - - # Wendi: 合并调优的参数和非调优的参数 - params = {**params, **fixed_params} - - if not os.path.exists(self.model_folder): - os.makedirs(self.model_folder) - params["model_folder"] = self.model_folder - - print("*** Begin training ***") - best_loss = np.Inf - - tf.reset_default_graph() - - self.tf_graph = tf.Graph() - with self.tf_graph.as_default(): - self.sess = tf.Session(config=self.tf_config) - tf.keras.backend.set_session(self.sess) - self.model = ModelClass(params, use_cudnn=use_gpu[0]) - self.sess.run(tf.global_variables_initializer()) - self.model.fit(train_df=train, valid_df=valid) - print("*** Finished training ***") - saved_model_dir = self.model_folder + "/" + "saved_model" - if not os.path.exists(saved_model_dir): - os.makedirs(saved_model_dir) - self.model.save(saved_model_dir) - - def extract_numerical_data(data): - """Strips out forecast time and identifier columns.""" - return data[[col for col in data.columns if col not in {"forecast_time", "identifier"}]] - - # p50_loss = utils.numpy_normalised_quantile_loss( - # extract_numerical_data(targets), extract_numerical_data(p50_forecast), - # 0.5) - # p90_loss = utils.numpy_normalised_quantile_loss( - # extract_numerical_data(targets), extract_numerical_data(p90_forecast), - # 0.9) - tf.keras.backend.set_session(default_keras_session) - print("Training completed.".format(dte.datetime.now())) - # ===========================Training Process=========================== - - def predict(self, dataset): - if self.model is None: - raise ValueError("model is not fitted yet!") - d_test = dataset.prepare("test", col_set=["feature", "label"]) - d_test = transform_df(d_test) - d_test.loc[:, self.label_col] = get_shifted_label(d_test, shifts=self.label_shift, col_shift=self.label_col) - test = process_qlib_data(d_test, self.expt_name, fillna=True).dropna() - - use_gpu = (True, self.gpu_id) - # ===========================Predicting Process=========================== - default_keras_session = tf.keras.backend.get_session() - - # Sets up default params - fixed_params = self.data_formatter.get_experiment_params() - params = self.data_formatter.get_default_model_params() - params = {**params, **fixed_params} - - print("*** Begin predicting ***") - tf.reset_default_graph() - - with self.tf_graph.as_default(): - tf.keras.backend.set_session(self.sess) - output_map = self.model.predict(test, return_targets=True) - targets = self.data_formatter.format_predictions(output_map["targets"]) - p50_forecast = self.data_formatter.format_predictions(output_map["p50"]) - p90_forecast = self.data_formatter.format_predictions(output_map["p90"]) - tf.keras.backend.set_session(default_keras_session) - - predict50 = format_score(p50_forecast, "pred", 1) - predict90 = format_score(p90_forecast, "pred", 1) - predict = (predict50 + predict90) / 2 # self.label_shift - # ===========================Predicting Process=========================== - return predict - - def finetune(self, dataset: DatasetH): - """ - finetune model - Parameters - ---------- - dataset : DatasetH - dataset for finetuning - """ - pass +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import numpy as np +import pandas as pd +import tensorflow.compat.v1 as tf +import data_formatters.base +import expt_settings.configs +import libs.hyperparam_opt +import libs.tft_model +import libs.utils as utils +import os +import datetime as dte + + +from qlib.model.base import ModelFT +from qlib.data.dataset import DatasetH +from qlib.data.dataset.handler import DataHandlerLP + + +# To register new datasets, please add them here. +ALLOW_DATASET = ["Alpha158", "Alpha360"] +# To register new datasets, please add their configurations here. +DATASET_SETTING = { + "Alpha158": { + "feature_col": [ + "RESI5", + "WVMA5", + "RSQR5", + "KLEN", + "RSQR10", + "CORR5", + "CORD5", + "CORR10", + "ROC60", + "RESI10", + "VSTD5", + "RSQR60", + "CORR60", + "WVMA60", + "STD5", + "RSQR20", + "CORD60", + "CORD10", + "CORR20", + "KLOW", + ], + "label_col": "LABEL0", + }, + "Alpha360": { + "feature_col": [ + "HIGH0", + "LOW0", + "OPEN0", + "CLOSE1", + "HIGH1", + "VOLUME1", + "LOW1", + "VOLUME3", + "OPEN1", + "VOLUME4", + "CLOSE2", + "CLOSE4", + "VOLUME5", + "LOW2", + "CLOSE3", + "VOLUME2", + "HIGH2", + "LOW4", + "VOLUME8", + "VOLUME11", + ], + "label_col": "LABEL0", + }, +} + + +def get_shifted_label(data_df, shifts=5, col_shift="LABEL0"): + return data_df[[col_shift]].groupby("instrument").apply(lambda df: df.shift(shifts)) + + +def fill_test_na(test_df): + test_df_res = test_df.copy() + feature_cols = ~test_df_res.columns.str.contains("label", case=False) + test_feature_fna = test_df_res.loc[:, feature_cols].groupby("datetime").apply(lambda df: df.fillna(df.mean())) + test_df_res.loc[:, feature_cols] = test_feature_fna + return test_df_res + + +def process_qlib_data(df, dataset, fillna=False): + """Prepare data to fit the TFT model. + + Args: + df: Original DataFrame. + fillna: Whether to fill the data with the mean values. + + Returns: + Transformed DataFrame. + + """ + # Several features selected manually + feature_col = DATASET_SETTING[dataset]["feature_col"] + label_col = [DATASET_SETTING[dataset]["label_col"]] + temp_df = df.loc[:, feature_col + label_col] + if fillna: + temp_df = fill_test_na(temp_df) + temp_df = temp_df.swaplevel() + temp_df = temp_df.sort_index() + temp_df = temp_df.reset_index(level=0) + dates = pd.to_datetime(temp_df.index) + temp_df["date"] = dates + temp_df["day_of_week"] = dates.dayofweek + temp_df["month"] = dates.month + temp_df["year"] = dates.year + temp_df["const"] = 1.0 + return temp_df + + +def process_predicted(df, col_name): + """Transform the TFT predicted data into Qlib format. + + Args: + df: Original DataFrame. + fillna: New column name. + + Returns: + Transformed DataFrame. + + """ + df_res = df.copy() + df_res = df_res.rename(columns={"forecast_time": "datetime", "identifier": "instrument", "t+4": col_name}) + df_res = df_res.set_index(["datetime", "instrument"]).sort_index() + df_res = df_res[[col_name]] + return df_res + + +def format_score(forecast_df, col_name="pred", label_shift=5): + pred = process_predicted(forecast_df, col_name=col_name) + pred = get_shifted_label(pred, shifts=-label_shift, col_shift=col_name) + pred = pred.dropna()[col_name] + return pred + + +def transform_df(df, col_name="LABEL0"): + df_res = df["feature"] + df_res[col_name] = df["label"] + return df_res + + +class TFTModel(ModelFT): + """TFT Model""" + + def __init__(self, **kwargs): + self.model = None + self.params = {"DATASET": "Alpha158", "label_shift": 5} + self.params.update(kwargs) + + def _prepare_data(self, dataset: DatasetH): + df_train, df_valid = dataset.prepare( + ["train", "valid"], col_set=["feature", "label"], data_key=DataHandlerLP.DK_L + ) + return transform_df(df_train), transform_df(df_valid) + + def fit(self, dataset: DatasetH, MODEL_FOLDER="qlib_tft_model", USE_GPU_ID=0, **kwargs): + DATASET = self.params["DATASET"] + LABEL_SHIFT = self.params["label_shift"] + LABEL_COL = DATASET_SETTING[DATASET]["label_col"] + + if DATASET not in ALLOW_DATASET: + raise AssertionError("The dataset is not supported, please make a new formatter to fit this dataset") + + dtrain, dvalid = self._prepare_data(dataset) + dtrain.loc[:, LABEL_COL] = get_shifted_label(dtrain, shifts=LABEL_SHIFT, col_shift=LABEL_COL) + dvalid.loc[:, LABEL_COL] = get_shifted_label(dvalid, shifts=LABEL_SHIFT, col_shift=LABEL_COL) + + train = process_qlib_data(dtrain, DATASET, fillna=True).dropna() + valid = process_qlib_data(dvalid, DATASET, fillna=True).dropna() + + ExperimentConfig = expt_settings.configs.ExperimentConfig + config = ExperimentConfig(DATASET) + self.data_formatter = config.make_data_formatter() + self.model_folder = MODEL_FOLDER + self.gpu_id = USE_GPU_ID + self.label_shift = LABEL_SHIFT + self.expt_name = DATASET + self.label_col = LABEL_COL + + use_gpu = (True, self.gpu_id) + # ===========================Training Process=========================== + ModelClass = libs.tft_model.TemporalFusionTransformer + if not isinstance(self.data_formatter, data_formatters.base.GenericDataFormatter): + raise ValueError( + "Data formatters should inherit from" + + "AbstractDataFormatter! Type={}".format(type(self.data_formatter)) + ) + + default_keras_session = tf.keras.backend.get_session() + + if use_gpu[0]: + self.tf_config = utils.get_default_tensorflow_config(tf_device="gpu", gpu_id=use_gpu[1]) + else: + self.tf_config = utils.get_default_tensorflow_config(tf_device="cpu") + + self.data_formatter.set_scalers(train) + + # Sets up default params + fixed_params = self.data_formatter.get_experiment_params() + params = self.data_formatter.get_default_model_params() + + # Wendi: 合并调优的参数和非调优的参数 + params = {**params, **fixed_params} + + if not os.path.exists(self.model_folder): + os.makedirs(self.model_folder) + params["model_folder"] = self.model_folder + + print("*** Begin training ***") + best_loss = np.Inf + + tf.reset_default_graph() + + self.tf_graph = tf.Graph() + with self.tf_graph.as_default(): + self.sess = tf.Session(config=self.tf_config) + tf.keras.backend.set_session(self.sess) + self.model = ModelClass(params, use_cudnn=use_gpu[0]) + self.sess.run(tf.global_variables_initializer()) + self.model.fit(train_df=train, valid_df=valid) + print("*** Finished training ***") + saved_model_dir = self.model_folder + "/" + "saved_model" + if not os.path.exists(saved_model_dir): + os.makedirs(saved_model_dir) + self.model.save(saved_model_dir) + + def extract_numerical_data(data): + """Strips out forecast time and identifier columns.""" + return data[[col for col in data.columns if col not in {"forecast_time", "identifier"}]] + + # p50_loss = utils.numpy_normalised_quantile_loss( + # extract_numerical_data(targets), extract_numerical_data(p50_forecast), + # 0.5) + # p90_loss = utils.numpy_normalised_quantile_loss( + # extract_numerical_data(targets), extract_numerical_data(p90_forecast), + # 0.9) + tf.keras.backend.set_session(default_keras_session) + print("Training completed.".format(dte.datetime.now())) + # ===========================Training Process=========================== + + def predict(self, dataset): + if self.model is None: + raise ValueError("model is not fitted yet!") + d_test = dataset.prepare("test", col_set=["feature", "label"]) + d_test = transform_df(d_test) + d_test.loc[:, self.label_col] = get_shifted_label(d_test, shifts=self.label_shift, col_shift=self.label_col) + test = process_qlib_data(d_test, self.expt_name, fillna=True).dropna() + + use_gpu = (True, self.gpu_id) + # ===========================Predicting Process=========================== + default_keras_session = tf.keras.backend.get_session() + + # Sets up default params + fixed_params = self.data_formatter.get_experiment_params() + params = self.data_formatter.get_default_model_params() + params = {**params, **fixed_params} + + print("*** Begin predicting ***") + tf.reset_default_graph() + + with self.tf_graph.as_default(): + tf.keras.backend.set_session(self.sess) + output_map = self.model.predict(test, return_targets=True) + targets = self.data_formatter.format_predictions(output_map["targets"]) + p50_forecast = self.data_formatter.format_predictions(output_map["p50"]) + p90_forecast = self.data_formatter.format_predictions(output_map["p90"]) + tf.keras.backend.set_session(default_keras_session) + + predict50 = format_score(p50_forecast, "pred", 1) + predict90 = format_score(p90_forecast, "pred", 1) + predict = (predict50 + predict90) / 2 # self.label_shift + # ===========================Predicting Process=========================== + return predict + + def finetune(self, dataset: DatasetH): + """ + finetune model + Parameters + ---------- + dataset : DatasetH + dataset for finetuning + """ + pass