1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-02 10:31:00 +08:00

add time series model GRU

This commit is contained in:
lwwang1995
2020-11-11 10:26:28 +08:00
parent b839733ec7
commit 9c2dbaa94e
4 changed files with 586 additions and 21 deletions

146
examples/workflow_by_code_gru.py Executable file
View File

@@ -0,0 +1,146 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import sys
from pathlib import Path
import qlib
import pandas as pd
from qlib.config import REG_CN
from qlib.contrib.model.pytorch_gru import GRU
from qlib.contrib.data.handler import ALPHA360
from qlib.contrib.strategy.strategy import TopkDropoutStrategy
from qlib.contrib.evaluate import (
backtest as normal_backtest,
risk_analysis,
)
from qlib.utils import exists_qlib_data
# from qlib.model.learner import train_model
from qlib.utils import init_instance_by_config
if __name__ == "__main__":
# use default data
provider_uri = "~/.qlib/qlib_data/cn_data" # target_dir
if not exists_qlib_data(provider_uri):
print(f"Qlib data is not found in {provider_uri}")
sys.path.append(str(Path(__file__).resolve().parent.parent.joinpath("scripts")))
from get_data import GetData
GetData().qlib_data_cn(target_dir=provider_uri)
qlib.init(provider_uri=provider_uri, region=REG_CN)
MARKET = "csi300"
BENCHMARK = "SH000300"
###################################
# train model
###################################
DATA_HANDLER_CONFIG = {
"start_time": "2008-01-01",
"end_time": "2020-08-01",
"fit_start_time":"2008-01-01",
"fit_end_time":"2014-12-31",
"instruments": MARKET,
}
TRAINER_CONFIG = {
"train_start_time": "2008-01-01",
"train_end_time": "2014-12-31",
"validate_start_time": "2015-01-01",
"validate_end_time": "2016-12-31",
"test_start_time": "2017-01-01",
"test_end_time": "2020-08-01",
}
task = {
"model": {
"class": "GRU",
"module_path": "qlib.contrib.model.pytorch_gru",
"kwargs": {
"d_feat": 6,
"hidden_size": 64,
"num_layers": 3,
"dropout": 0.0,
"n_epochs": 2000,
"lr": 1e-1,
"early_stop": 200,
"batch_size":800,
"smooth_steps": 5,
"metric": "mse",
"loss": "mse",
"seed": 0,
"GPU": 0,
}
},
"dataset": {
"class": "DatasetH",
"module_path": "qlib.data.dataset",
"kwargs": {
'handler': {
"class": "ALPHA360",
"module_path": "qlib.contrib.data.handler",
"kwargs": DATA_HANDLER_CONFIG
},
'segments': {
'train': ("2008-01-01", "2014-12-31"),
'valid': ("2015-01-01", "2016-12-31",),
'test': ("2017-01-01", "2020-08-01",),
}
}
}
# You shoud record the data in specific sequence
# "record": ['SignalRecord', 'SigAnaRecord', 'PortAnaRecord'],
}
# model = train_model(task)
model = init_instance_by_config(task['model'])
dataset = init_instance_by_config(task['dataset'])
model.fit(dataset)
pred_score = model.predict(dataset)
# save pred_score to file
pred_score_path = Path("~/tmp/qlib/pred_score.pkl").expanduser()
pred_score_path.parent.mkdir(exist_ok=True, parents=True)
pred_score.to_pickle(pred_score_path)
###################################
# backtest
###################################
STRATEGY_CONFIG = {
"topk": 50,
"n_drop": 5,
}
BACKTEST_CONFIG = {
"verbose": False,
"limit_threshold": 0.095,
"account": 100000000,
"benchmark": BENCHMARK,
"deal_price": "close",
"open_cost": 0.0005,
"close_cost": 0.0015,
"min_cost": 5,
}
# use default strategy
# custom Strategy, refer to: TODO: Strategy API url
strategy = TopkDropoutStrategy(**STRATEGY_CONFIG)
report_normal, positions_normal = normal_backtest(pred_score, strategy=strategy, **BACKTEST_CONFIG)
###################################
# analyze
# If need a more detailed analysis, refer to: examples/train_and_bakctest.ipynb
###################################
analysis = dict()
analysis["excess_return_without_cost"] = risk_analysis(report_normal["return"] - report_normal["bench"])
analysis["excess_return_with_cost"] = risk_analysis(
report_normal["return"] - report_normal["bench"] - report_normal["cost"]
)
analysis_df = pd.concat(analysis) # type: pd.DataFrame
print(analysis_df)

View File

@@ -8,29 +8,81 @@ from ...data.dataset import processor as processor_module
from ...log import TimeInspector
import copy
class ALPHA360(DataHandlerLP):
def __init__(self, instruments="csi500", start_time=None, end_time=None):
def __init__(
self,
instruments="csi500",
start_time=None,
end_time=None,
fit_start_time=None,
fit_end_time=None
):
data_loader = {
"class": "QlibDataLoader",
"kwargs": {
"config": {
"feature": {
"price": {"windows": range(60)},
"volume": {"windows": range(60)},
},
"feature": self.get_feature_config(),
"label": self.get_label_config(),
},
},
}
learn_processors = [
{"class": "DropnaLabel", "kwargs": {'group': 'label'}},
{"class": "CSZScoreNorm", "kwargs": {"fields_group": "label"}},
]
infer_processors = [
{"class": "ConfigSectionProcessor", "module_path": "qlib.contrib.data.processor"}
] # ConfigSectionProcessor will normalize LABEL0
super().__init__(instruments, start_time, end_time, data_loader=data_loader, infer_processors=infer_processors)
{"class": "ProcessInf", "kwargs": {}},
{"class": "ZscoreNorm", "kwargs": {"fit_start_time": fit_start_time, "fit_end_time": fit_end_time}},
{"class": "Fillna", "kwargs": {}},
]
super().__init__(
instruments,
start_time,
end_time,
data_loader=data_loader,
learn_processors=learn_processors,
infer_processors=infer_processors
)
def get_label_config(self):
return (["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"])
def get_feature_config(self):
fields = []
names = []
for i in range(59,0,-1):
fields += ["Ref($close, %d)/$close"%(i)]
names += ["CLOSE%d"%(i)]
fields += ["Ref($open, %d)/$close"%(i)]
names += ["OPEN%d"%(i)]
fields += ["Ref($high, %d)/$close"%(i)]
names += ["HIGH%d"%(i)]
fields += ["Ref($low, %d)/$close"%(i)]
names += ["LOW%d"%(i)]
fields += ["Ref($vwap, %d)/$close"%(i)]
names += ["VWAP%d"%(i)]
fields += ["Ref($volume, %d)/$volume"%(i)]
names += ["VOLUME%d"%(i)]
fields += ["$close/$close"]
fields += ["$open/$close"]
fields += ["$high/$close"]
fields += ["$low/$close"]
fields += ["$vwap/$close"]
fields += ["$volume/$volume"]
names += ["CLOSE0"]
names += ["OPEN0"]
names += ["HIGH0"]
names += ["LOW0"]
names += ["VWAP0"]
names += ["VOLUME0"]
return fields, names
class ALPHA360vwap(ALPHA360):
def get_label_config(self):
@@ -90,7 +142,7 @@ class Alpha158(DataHandlerLP):
"kbar": {},
"price": {
"windows": [0],
"feature": ["OPEN", "HIGH", "LOW"],
"feature": ["OPEN", "HIGH", "LOW", "VWAP"],
},
"rolling": {},
}
@@ -281,16 +333,5 @@ class Alpha158(DataHandlerLP):
class Alpha158vwap(Alpha158):
def get_feature_config(self):
conf = {
"kbar": {},
"price": {
"windows": [0],
"feature": ["OPEN", "HIGH", "LOW", "VWAP"],
},
"rolling": {},
}
return self.parse_config_to_fields(conf)
def get_label_config(self):
return (["Ref($vwap, -2)/Ref($vwap, -1) - 1"], ["LABEL0"])

362
qlib/contrib/model/pytorch_gru.py Executable file
View File

@@ -0,0 +1,362 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from __future__ import division
from __future__ import print_function
import os
import numpy as np
import pandas as pd
import copy
from sklearn.metrics import roc_auc_score, mean_squared_error
import logging
from ...utils import unpack_archive_with_buffer, save_multiple_parts_file, create_save_path, drop_nan_by_y_index
from ...log import get_module_logger, TimeInspector
import torch
import torch.nn as nn
import torch.optim as optim
from ...model.base import Model
from ...data.dataset import DatasetH
from ...data.dataset.handler import DataHandlerLP
class GRU(Model):
"""GRU Model
Parameters
----------
input_dim : int
input dimension
output_dim : int
output dimension
layers : tuple
layer sizes
lr : float
learning rate
lr_decay : float
learning rate decay
lr_decay_steps : int
learning rate decay steps
optimizer : str
optimizer name
GPU : str
the GPU ID(s) used for training
"""
def __init__(
self,
d_feat=6,
hidden_size=64,
num_layers=2,
dropout=0.0,
n_epochs=200,
lr=0.001,
batch_size=2000,
early_stop=20,
eval_steps=5,
loss="mse",
lr_decay=0.96,
lr_decay_steps=100,
optimizer="gd",
GPU="0",
seed=0,
**kwargs
):
# Set logger.
self.logger = get_module_logger("GRU")
self.logger.info("GRU pytorch version...")
# set hyper-parameters.
self.d_feat = d_feat
self.hidden_size = hidden_size
self.num_layers = num_layers
self.dropout = dropout
self.n_epochs = n_epochs
self.lr = lr
self.batch_size = batch_size
self.early_stop = early_stop
self.eval_steps = eval_steps
self.lr_decay = lr_decay
self.lr_decay_steps = lr_decay_steps
self.optimizer = optimizer.lower()
self.loss_type = loss
self.visible_GPU = GPU
self.use_gpu = torch.cuda.is_available()
self.seed = seed
self.logger.info(
"GRU parameters setting:"
"\nd_feat : {}"
"\nhidden_size : {}"
"\nnum_layers : {}"
"\ndropout : {}"
"\nn_epochs : {}"
"\nlr : {}"
"\nbatch_size : {}"
"\nearly_stop : {}"
"\neval_steps : {}"
"\nlr_decay : {}"
"\nlr_decay_steps : {}"
"\noptimizer : {}"
"\nloss_type : {}"
"\nvisible_GPU : {}"
"\nuse_GPU : {}"
"\nseed : {}".format(
d_feat,
hidden_size,
num_layers,
dropout,
n_epochs,
lr,
batch_size,
early_stop,
eval_steps,
lr_decay,
lr_decay_steps,
optimizer.lower(),
loss,
GPU,
self.use_gpu,
seed,
)
)
if loss not in {"mse", "binary"}:
raise NotImplementedError("loss {} is not supported!".format(loss))
self._scorer = mean_squared_error if loss == "mse" else roc_auc_score
self.gru_model = GRUModel(d_feat=self.d_feat, hidden_size=self.hidden_size, num_layers=self.num_layers, dropout=self.dropout)
if optimizer.lower() == "adam":
self.train_optimizer = optim.Adam(self.gru_model.parameters(), lr=self.lr)
elif optimizer.lower() == "gd":
self.train_optimizer = optim.SGD(self.gru_model.parameters(), lr=self.lr)
else:
raise NotImplementedError("optimizer {} is not supported!".format(optimizer))
# Reduce learning rate when loss has stopped decrease
self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
self.train_optimizer,
mode="min",
factor=0.5,
patience=10,
verbose=True,
threshold=0.0001,
threshold_mode="rel",
cooldown=0,
min_lr=0.00001,
eps=1e-08,
)
self._fitted = False
if self.use_gpu:
self.gru_model.cuda()
# set the visible GPU
if self.visible_GPU:
os.environ["CUDA_VISIBLE_DEVICES"] = self.visible_GPU
def fit(
self,
dataset: DatasetH,
evals_result=dict(),
verbose=True,
save_path=None,
):
df_train, df_valid = dataset.prepare(
["train", "valid"], col_set=["feature", "label"], data_key=DataHandlerLP.DK_L
)
x_train, y_train = df_train["feature"], df_train["label"]
x_valid, y_valid = df_valid["feature"], df_valid["label"]
x_train.to_pickle('~/x_train_init.pkl')
y_train.to_pickle('~/y_train_init.pkl')
x_train = x_train.fillna(0)
y_train = y_train.fillna(0)
x_valid = x_valid.fillna(0)
y_valid = y_valid.fillna(0)
x_train.to_pickle('~/x_train.pkl')
y_train.to_pickle('~/y_train.pkl')
# Lightgbm need 1D array as its label
save_path = create_save_path(save_path)
stop_steps = 0
train_loss = 0
best_loss = np.inf
evals_result["train"] = []
evals_result["valid"] = []
# train
self.logger.info("training...")
self._fitted = True
# return
# prepare training data
x_train_values = torch.from_numpy(x_train.values).float()
y_train_values = torch.from_numpy(np.squeeze(y_train.values)).float()
train_num = y_train_values.shape[0]
# prepare validation data
x_val_auto = torch.from_numpy(x_valid.values).float()
y_val_auto = torch.from_numpy(np.squeeze(y_valid.values)).float()
if self.use_gpu:
x_val_auto = x_val_auto.cuda()
y_val_auto = y_val_auto.cuda()
for step in range(self.n_epochs):
if stop_steps >= self.early_stop:
if verbose:
self.logger.info("\tearly stop")
break
loss = AverageMeter()
self.gru_model.train()
self.train_optimizer.zero_grad()
choice = np.random.choice(train_num, self.batch_size)
x_batch_auto = x_train_values[choice]
y_batch_auto = y_train_values[choice]
if self.use_gpu:
x_batch_auto = x_batch_auto.float().cuda()
y_batch_auto = y_batch_auto.float().cuda()
# forward
preds = self.gru_model(x_batch_auto)
cur_loss = self.get_loss(preds, y_batch_auto, self.loss_type)
cur_loss.backward()
self.train_optimizer.step()
loss.update(cur_loss.item())
# validation
train_loss += loss.val
# print(loss.val)
if step and step % self.eval_steps == 0:
stop_steps += 1
train_loss /= self.eval_steps
with torch.no_grad():
self.gru_model.eval()
loss_val = AverageMeter()
# forward
preds = self.gru_model(x_val_auto)
cur_loss_val = self.get_loss(preds, y_val_auto, self.loss_type)
loss_val.update(cur_loss_val.item())
if verbose:
self.logger.info(
"[Epoch {}]: train_loss {:.6f}, valid_loss {:.6f}".format(step, train_loss, loss_val.val)
)
evals_result["train"].append(train_loss)
evals_result["valid"].append(loss_val.val)
if loss_val.val < best_loss:
if verbose:
self.logger.info(
"\tvalid loss update from {:.6f} to {:.6f}, save checkpoint.".format(
best_loss, loss_val.val
)
)
best_loss = loss_val.val
stop_steps = 0
torch.save(self.gru_model.state_dict(), save_path)
train_loss = 0
# update learning rate
self.scheduler.step(cur_loss_val)
# restore the optimal parameters after training ??
# self.gru_model.load_state_dict(torch.load(save_path))
if self.use_gpu:
torch.cuda.empty_cache()
def get_loss(self, pred, target, loss_type):
if loss_type == "mse":
sqr_loss = (pred - target)**2
loss = sqr_loss.mean()
return loss
elif loss_type == "binary":
loss = nn.BCELoss()
return loss(pred, target)
else:
raise NotImplementedError("loss {} is not supported!".format(loss_type))
def predict(self, dataset):
if not self._fitted:
raise ValueError("model is not fitted yet!")
x_test = dataset.prepare("test", col_set="feature")
x_test = x_test.fillna(0)
index = x_test.index
x_test = torch.from_numpy(x_test.values).float()
if self.use_gpu:
x_test = x_test.cuda()
self.gru_model.eval()
with torch.no_grad():
if self.use_gpu:
preds = self.gru_model(x_test).detach().cpu().numpy()
else:
preds = self.gru_model(x_test).detach().numpy()
return pd.Series(preds, index=index)
def save(self, filename, **kwargs):
with save_multiple_parts_file(filename) as model_dir:
model_path = os.path.join(model_dir, os.path.split(model_dir)[-1])
# Save model
torch.save(self.gru_model.state_dict(), model_path)
def load(self, buffer, **kwargs):
with unpack_archive_with_buffer(buffer) as model_dir:
# Get model name
_model_name = os.path.splitext(list(filter(lambda x: x.startswith("model.bin"), os.listdir(model_dir)))[0])[
0
]
_model_path = os.path.join(model_dir, _model_name)
# Load model
self.gru_model.load_state_dict(torch.load(_model_path))
self._fitted = True
class AverageMeter(object):
"""Computes and stores the average and current value"""
def __init__(self):
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
class GRUModel(nn.Module):
def __init__(self, d_feat=6, hidden_size=64, num_layers=2, dropout=0.0):
super().__init__()
self.rnn = nn.GRU(
input_size=d_feat,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout,
)
self.fc_out = nn.Linear(hidden_size, 1)
self.d_feat = d_feat
def forward(self, x):
# x: [N, F*T]
x = x.reshape(len(x), self.d_feat, -1) # [N, F, T]
x = x.permute(0, 2, 1) # [N, T, F]
out, _ = self.rnn(x)
return self.fc_out(out[:, -1, :]).squeeze()

View File

@@ -106,6 +106,22 @@ class ProcessInf(Processor):
return replace_inf(df)
class Fillna(Processor):
"""Process infinity """
def __call__(self, df):
def fill_na(data):
def process_na(df):
for col in df.columns:
# FIXME: Such behavior is very weird
df[col] = df[col].fillna(0)
return df
data = datetime_groupby_apply(data, process_na)
data.sort_index(inplace=True)
return data
return fill_na(df)
class MinMaxNorm(Processor):
def __init__(self, fit_start_time, fit_end_time, fields_group=None):