diff --git a/examples/workflow_by_code_gru.py b/examples/workflow_by_code_gru.py new file mode 100755 index 000000000..2bcbe2aa6 --- /dev/null +++ b/examples/workflow_by_code_gru.py @@ -0,0 +1,146 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import sys +from pathlib import Path + +import qlib +import pandas as pd +from qlib.config import REG_CN +from qlib.contrib.model.pytorch_gru import GRU +from qlib.contrib.data.handler import ALPHA360 +from qlib.contrib.strategy.strategy import TopkDropoutStrategy +from qlib.contrib.evaluate import ( + backtest as normal_backtest, + risk_analysis, +) +from qlib.utils import exists_qlib_data + +# from qlib.model.learner import train_model +from qlib.utils import init_instance_by_config + + +if __name__ == "__main__": + + # use default data + provider_uri = "~/.qlib/qlib_data/cn_data" # target_dir + if not exists_qlib_data(provider_uri): + print(f"Qlib data is not found in {provider_uri}") + sys.path.append(str(Path(__file__).resolve().parent.parent.joinpath("scripts"))) + from get_data import GetData + + GetData().qlib_data_cn(target_dir=provider_uri) + + qlib.init(provider_uri=provider_uri, region=REG_CN) + + MARKET = "csi300" + BENCHMARK = "SH000300" + + + ################################### + # train model + ################################### + DATA_HANDLER_CONFIG = { + "start_time": "2008-01-01", + "end_time": "2020-08-01", + "fit_start_time":"2008-01-01", + "fit_end_time":"2014-12-31", + "instruments": MARKET, + } + + TRAINER_CONFIG = { + "train_start_time": "2008-01-01", + "train_end_time": "2014-12-31", + "validate_start_time": "2015-01-01", + "validate_end_time": "2016-12-31", + "test_start_time": "2017-01-01", + "test_end_time": "2020-08-01", + } + + task = { + "model": { + "class": "GRU", + "module_path": "qlib.contrib.model.pytorch_gru", + "kwargs": { + "d_feat": 6, + "hidden_size": 64, + "num_layers": 3, + "dropout": 0.0, + "n_epochs": 2000, + "lr": 1e-1, + "early_stop": 200, + "batch_size":800, + "smooth_steps": 5, + "metric": "mse", + "loss": "mse", + "seed": 0, + "GPU": 0, + } + }, + "dataset": { + "class": "DatasetH", + "module_path": "qlib.data.dataset", + "kwargs": { + 'handler': { + "class": "ALPHA360", + "module_path": "qlib.contrib.data.handler", + "kwargs": DATA_HANDLER_CONFIG + }, + 'segments': { + 'train': ("2008-01-01", "2014-12-31"), + 'valid': ("2015-01-01", "2016-12-31",), + 'test': ("2017-01-01", "2020-08-01",), + } + } + } + # You shoud record the data in specific sequence + # "record": ['SignalRecord', 'SigAnaRecord', 'PortAnaRecord'], + } + + # model = train_model(task) + model = init_instance_by_config(task['model']) + dataset = init_instance_by_config(task['dataset']) + + model.fit(dataset) + + pred_score = model.predict(dataset) + + # save pred_score to file + pred_score_path = Path("~/tmp/qlib/pred_score.pkl").expanduser() + pred_score_path.parent.mkdir(exist_ok=True, parents=True) + pred_score.to_pickle(pred_score_path) + + ################################### + # backtest + ################################### + STRATEGY_CONFIG = { + "topk": 50, + "n_drop": 5, + } + BACKTEST_CONFIG = { + "verbose": False, + "limit_threshold": 0.095, + "account": 100000000, + "benchmark": BENCHMARK, + "deal_price": "close", + "open_cost": 0.0005, + "close_cost": 0.0015, + "min_cost": 5, + } + + # use default strategy + # custom Strategy, refer to: TODO: Strategy API url + strategy = TopkDropoutStrategy(**STRATEGY_CONFIG) + report_normal, positions_normal = normal_backtest(pred_score, strategy=strategy, **BACKTEST_CONFIG) + + ################################### + # analyze + # If need a more detailed analysis, refer to: examples/train_and_bakctest.ipynb + ################################### + analysis = dict() + analysis["excess_return_without_cost"] = risk_analysis(report_normal["return"] - report_normal["bench"]) + analysis["excess_return_with_cost"] = risk_analysis( + report_normal["return"] - report_normal["bench"] - report_normal["cost"] + ) + analysis_df = pd.concat(analysis) # type: pd.DataFrame + print(analysis_df) diff --git a/qlib/contrib/data/handler.py b/qlib/contrib/data/handler.py index b2fd0515d..61f8652be 100644 --- a/qlib/contrib/data/handler.py +++ b/qlib/contrib/data/handler.py @@ -8,29 +8,81 @@ from ...data.dataset import processor as processor_module from ...log import TimeInspector import copy - class ALPHA360(DataHandlerLP): - def __init__(self, instruments="csi500", start_time=None, end_time=None): + def __init__( + self, + instruments="csi500", + start_time=None, + end_time=None, + fit_start_time=None, + fit_end_time=None + ): data_loader = { "class": "QlibDataLoader", "kwargs": { "config": { - "feature": { - "price": {"windows": range(60)}, - "volume": {"windows": range(60)}, - }, + "feature": self.get_feature_config(), "label": self.get_label_config(), }, }, } + + learn_processors = [ + {"class": "DropnaLabel", "kwargs": {'group': 'label'}}, + {"class": "CSZScoreNorm", "kwargs": {"fields_group": "label"}}, + ] infer_processors = [ - {"class": "ConfigSectionProcessor", "module_path": "qlib.contrib.data.processor"} - ] # ConfigSectionProcessor will normalize LABEL0 - super().__init__(instruments, start_time, end_time, data_loader=data_loader, infer_processors=infer_processors) + {"class": "ProcessInf", "kwargs": {}}, + {"class": "ZscoreNorm", "kwargs": {"fit_start_time": fit_start_time, "fit_end_time": fit_end_time}}, + {"class": "Fillna", "kwargs": {}}, + ] + + super().__init__( + instruments, + start_time, + end_time, + data_loader=data_loader, + learn_processors=learn_processors, + infer_processors=infer_processors + ) def get_label_config(self): return (["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"]) + def get_feature_config(self): + + fields = [] + names = [] + + for i in range(59,0,-1): + fields += ["Ref($close, %d)/$close"%(i)] + names += ["CLOSE%d"%(i)] + fields += ["Ref($open, %d)/$close"%(i)] + names += ["OPEN%d"%(i)] + fields += ["Ref($high, %d)/$close"%(i)] + names += ["HIGH%d"%(i)] + fields += ["Ref($low, %d)/$close"%(i)] + names += ["LOW%d"%(i)] + fields += ["Ref($vwap, %d)/$close"%(i)] + names += ["VWAP%d"%(i)] + fields += ["Ref($volume, %d)/$volume"%(i)] + names += ["VOLUME%d"%(i)] + + fields += ["$close/$close"] + fields += ["$open/$close"] + fields += ["$high/$close"] + fields += ["$low/$close"] + fields += ["$vwap/$close"] + fields += ["$volume/$volume"] + names += ["CLOSE0"] + names += ["OPEN0"] + names += ["HIGH0"] + names += ["LOW0"] + names += ["VWAP0"] + names += ["VOLUME0"] + + return fields, names + class ALPHA360vwap(ALPHA360): def get_label_config(self): @@ -90,7 +142,7 @@ class Alpha158(DataHandlerLP): "kbar": {}, "price": { "windows": [0], - "feature": ["OPEN", "HIGH", "LOW"], + "feature": ["OPEN", "HIGH", "LOW", "VWAP"], }, "rolling": {}, } @@ -281,16 +333,5 @@ class Alpha158(DataHandlerLP): class Alpha158vwap(Alpha158): - def get_feature_config(self): - conf = { - "kbar": {}, - "price": { - "windows": [0], - "feature": ["OPEN", "HIGH", "LOW", "VWAP"], - }, - "rolling": {}, - } - return self.parse_config_to_fields(conf) - def get_label_config(self): return (["Ref($vwap, -2)/Ref($vwap, -1) - 1"], ["LABEL0"]) diff --git a/qlib/contrib/model/pytorch_gru.py b/qlib/contrib/model/pytorch_gru.py new file mode 100755 index 000000000..2a97d038e --- /dev/null +++ b/qlib/contrib/model/pytorch_gru.py @@ -0,0 +1,362 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +from __future__ import division +from __future__ import print_function + +import os +import numpy as np +import pandas as pd +import copy +from sklearn.metrics import roc_auc_score, mean_squared_error +import logging +from ...utils import unpack_archive_with_buffer, save_multiple_parts_file, create_save_path, drop_nan_by_y_index +from ...log import get_module_logger, TimeInspector + +import torch +import torch.nn as nn +import torch.optim as optim + +from ...model.base import Model +from ...data.dataset import DatasetH +from ...data.dataset.handler import DataHandlerLP + +class GRU(Model): + """GRU Model + + Parameters + ---------- + input_dim : int + input dimension + output_dim : int + output dimension + layers : tuple + layer sizes + lr : float + learning rate + lr_decay : float + learning rate decay + lr_decay_steps : int + learning rate decay steps + optimizer : str + optimizer name + GPU : str + the GPU ID(s) used for training + """ + + def __init__( + self, + d_feat=6, + hidden_size=64, + num_layers=2, + dropout=0.0, + n_epochs=200, + lr=0.001, + batch_size=2000, + early_stop=20, + eval_steps=5, + loss="mse", + lr_decay=0.96, + lr_decay_steps=100, + optimizer="gd", + GPU="0", + seed=0, + **kwargs + ): + # Set logger. + self.logger = get_module_logger("GRU") + self.logger.info("GRU pytorch version...") + + # set hyper-parameters. + self.d_feat = d_feat + self.hidden_size = hidden_size + self.num_layers = num_layers + self.dropout = dropout + self.n_epochs = n_epochs + self.lr = lr + self.batch_size = batch_size + self.early_stop = early_stop + self.eval_steps = eval_steps + self.lr_decay = lr_decay + self.lr_decay_steps = lr_decay_steps + self.optimizer = optimizer.lower() + self.loss_type = loss + self.visible_GPU = GPU + self.use_gpu = torch.cuda.is_available() + self.seed = seed + + self.logger.info( + "GRU parameters setting:" + "\nd_feat : {}" + "\nhidden_size : {}" + "\nnum_layers : {}" + "\ndropout : {}" + "\nn_epochs : {}" + "\nlr : {}" + "\nbatch_size : {}" + "\nearly_stop : {}" + "\neval_steps : {}" + "\nlr_decay : {}" + "\nlr_decay_steps : {}" + "\noptimizer : {}" + "\nloss_type : {}" + "\nvisible_GPU : {}" + "\nuse_GPU : {}" + "\nseed : {}".format( + d_feat, + hidden_size, + num_layers, + dropout, + n_epochs, + lr, + batch_size, + early_stop, + eval_steps, + lr_decay, + lr_decay_steps, + optimizer.lower(), + loss, + GPU, + self.use_gpu, + seed, + ) + ) + + if loss not in {"mse", "binary"}: + raise NotImplementedError("loss {} is not supported!".format(loss)) + self._scorer = mean_squared_error if loss == "mse" else roc_auc_score + + self.gru_model = GRUModel(d_feat=self.d_feat, hidden_size=self.hidden_size, num_layers=self.num_layers, dropout=self.dropout) + if optimizer.lower() == "adam": + self.train_optimizer = optim.Adam(self.gru_model.parameters(), lr=self.lr) + elif optimizer.lower() == "gd": + self.train_optimizer = optim.SGD(self.gru_model.parameters(), lr=self.lr) + else: + raise NotImplementedError("optimizer {} is not supported!".format(optimizer)) + + # Reduce learning rate when loss has stopped decrease + self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( + self.train_optimizer, + mode="min", + factor=0.5, + patience=10, + verbose=True, + threshold=0.0001, + threshold_mode="rel", + cooldown=0, + min_lr=0.00001, + eps=1e-08, + ) + + self._fitted = False + if self.use_gpu: + self.gru_model.cuda() + # set the visible GPU + if self.visible_GPU: + os.environ["CUDA_VISIBLE_DEVICES"] = self.visible_GPU + + def fit( + self, + dataset: DatasetH, + evals_result=dict(), + verbose=True, + save_path=None, + ): + + df_train, df_valid = dataset.prepare( + ["train", "valid"], col_set=["feature", "label"], data_key=DataHandlerLP.DK_L + ) + x_train, y_train = df_train["feature"], df_train["label"] + x_valid, y_valid = df_valid["feature"], df_valid["label"] + + x_train.to_pickle('~/x_train_init.pkl') + y_train.to_pickle('~/y_train_init.pkl') + + x_train = x_train.fillna(0) + y_train = y_train.fillna(0) + x_valid = x_valid.fillna(0) + y_valid = y_valid.fillna(0) + x_train.to_pickle('~/x_train.pkl') + y_train.to_pickle('~/y_train.pkl') + + # Lightgbm need 1D array as its label + save_path = create_save_path(save_path) + stop_steps = 0 + train_loss = 0 + best_loss = np.inf + evals_result["train"] = [] + evals_result["valid"] = [] + + # train + self.logger.info("training...") + self._fitted = True + # return + # prepare training data + x_train_values = torch.from_numpy(x_train.values).float() + y_train_values = torch.from_numpy(np.squeeze(y_train.values)).float() + train_num = y_train_values.shape[0] + + # prepare validation data + x_val_auto = torch.from_numpy(x_valid.values).float() + y_val_auto = torch.from_numpy(np.squeeze(y_valid.values)).float() + + if self.use_gpu: + x_val_auto = x_val_auto.cuda() + y_val_auto = y_val_auto.cuda() + + for step in range(self.n_epochs): + if stop_steps >= self.early_stop: + if verbose: + self.logger.info("\tearly stop") + break + loss = AverageMeter() + self.gru_model.train() + self.train_optimizer.zero_grad() + + choice = np.random.choice(train_num, self.batch_size) + x_batch_auto = x_train_values[choice] + y_batch_auto = y_train_values[choice] + + if self.use_gpu: + x_batch_auto = x_batch_auto.float().cuda() + y_batch_auto = y_batch_auto.float().cuda() + + # forward + preds = self.gru_model(x_batch_auto) + cur_loss = self.get_loss(preds, y_batch_auto, self.loss_type) + cur_loss.backward() + self.train_optimizer.step() + loss.update(cur_loss.item()) + + # validation + train_loss += loss.val + # print(loss.val) + if step and step % self.eval_steps == 0: + stop_steps += 1 + train_loss /= self.eval_steps + + with torch.no_grad(): + self.gru_model.eval() + loss_val = AverageMeter() + + # forward + preds = self.gru_model(x_val_auto) + cur_loss_val = self.get_loss(preds, y_val_auto, self.loss_type) + loss_val.update(cur_loss_val.item()) + + if verbose: + self.logger.info( + "[Epoch {}]: train_loss {:.6f}, valid_loss {:.6f}".format(step, train_loss, loss_val.val) + ) + evals_result["train"].append(train_loss) + evals_result["valid"].append(loss_val.val) + if loss_val.val < best_loss: + if verbose: + self.logger.info( + "\tvalid loss update from {:.6f} to {:.6f}, save checkpoint.".format( + best_loss, loss_val.val + ) + ) + best_loss = loss_val.val + stop_steps = 0 + torch.save(self.gru_model.state_dict(), save_path) + train_loss = 0 + # update learning rate + self.scheduler.step(cur_loss_val) + + # restore the optimal parameters after training ?? + # self.gru_model.load_state_dict(torch.load(save_path)) + if self.use_gpu: + torch.cuda.empty_cache() + + def get_loss(self, pred, target, loss_type): + if loss_type == "mse": + sqr_loss = (pred - target)**2 + loss = sqr_loss.mean() + return loss + elif loss_type == "binary": + loss = nn.BCELoss() + return loss(pred, target) + else: + raise NotImplementedError("loss {} is not supported!".format(loss_type)) + + def predict(self, dataset): + if not self._fitted: + raise ValueError("model is not fitted yet!") + + x_test = dataset.prepare("test", col_set="feature") + x_test = x_test.fillna(0) + index = x_test.index + x_test = torch.from_numpy(x_test.values).float() + + if self.use_gpu: + x_test = x_test.cuda() + self.gru_model.eval() + + with torch.no_grad(): + if self.use_gpu: + preds = self.gru_model(x_test).detach().cpu().numpy() + else: + preds = self.gru_model(x_test).detach().numpy() + return pd.Series(preds, index=index) + + def save(self, filename, **kwargs): + with save_multiple_parts_file(filename) as model_dir: + model_path = os.path.join(model_dir, os.path.split(model_dir)[-1]) + # Save model + torch.save(self.gru_model.state_dict(), model_path) + + def load(self, buffer, **kwargs): + with unpack_archive_with_buffer(buffer) as model_dir: + # Get model name + _model_name = os.path.splitext(list(filter(lambda x: x.startswith("model.bin"), os.listdir(model_dir)))[0])[ + 0 + ] + _model_path = os.path.join(model_dir, _model_name) + # Load model + self.gru_model.load_state_dict(torch.load(_model_path)) + self._fitted = True + +class AverageMeter(object): + """Computes and stores the average and current value""" + + def __init__(self): + self.reset() + + def reset(self): + self.val = 0 + self.avg = 0 + self.sum = 0 + self.count = 0 + + def update(self, val, n=1): + self.val = val + self.sum += val * n + self.count += n + self.avg = self.sum / self.count + + +class GRUModel(nn.Module): + + def __init__(self, d_feat=6, hidden_size=64, num_layers=2, dropout=0.0): + super().__init__() + + self.rnn = nn.GRU( + input_size=d_feat, + hidden_size=hidden_size, + num_layers=num_layers, + batch_first=True, + dropout=dropout, + ) + self.fc_out = nn.Linear(hidden_size, 1) + + self.d_feat = d_feat + + def forward(self, x): + # x: [N, F*T] + x = x.reshape(len(x), self.d_feat, -1) # [N, F, T] + x = x.permute(0, 2, 1) # [N, T, F] + out, _ = self.rnn(x) + return self.fc_out(out[:, -1, :]).squeeze() + diff --git a/qlib/data/dataset/processor.py b/qlib/data/dataset/processor.py index ead8707db..1f6754312 100644 --- a/qlib/data/dataset/processor.py +++ b/qlib/data/dataset/processor.py @@ -106,6 +106,22 @@ class ProcessInf(Processor): return replace_inf(df) +class Fillna(Processor): + """Process infinity """ + + def __call__(self, df): + def fill_na(data): + def process_na(df): + for col in df.columns: + # FIXME: Such behavior is very weird + df[col] = df[col].fillna(0) + return df + + data = datetime_groupby_apply(data, process_na) + data.sort_index(inplace=True) + return data + + return fill_na(df) class MinMaxNorm(Processor): def __init__(self, fit_start_time, fit_end_time, fields_group=None):