1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-06-06 05:51:17 +08:00
Files
qlib/examples/benchmarks/TRA/src/model.py
Linlang 39f88daaa7 download orderbook data (#1754)
* download orderbook data

* fix CI error

* fix CI error

* test fix CI error

* test fix CI error

* test fix CI error

* test fix CI error

* test fix CI error

* test fix CI error

* test fix CI error

* test fix CI error

* test fix CI error

* test fix CI error

* test fix CI error

* test fix CI error

* test fix CI error

* test fix CI error

* test fix CI error

* test fix CI error

* test fix CI error

* test fix CI error

* test fix CI error

* optimize get_data code

* optimize get_data code

* optimize get_data code

* optimize README

---------

Co-authored-by: Linlang <v-linlanglv@microsoft.com>
2024-03-07 14:41:21 +08:00

594 lines
19 KiB
Python

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import os
import copy
import math
import json
import collections
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from tqdm import tqdm
from qlib.utils import get_or_create_path
from qlib.log import get_module_logger
from qlib.model.base import Model
device = "cuda" if torch.cuda.is_available() else "cpu"
class TRAModel(Model):
def __init__(
self,
model_config,
tra_config,
model_type="LSTM",
lr=1e-3,
n_epochs=500,
early_stop=50,
smooth_steps=5,
max_steps_per_epoch=None,
freeze_model=False,
model_init_state=None,
lamb=0.0,
rho=0.99,
seed=None,
logdir=None,
eval_train=True,
eval_test=False,
avg_params=True,
**kwargs,
):
np.random.seed(seed)
torch.manual_seed(seed)
self.logger = get_module_logger("TRA")
self.logger.info("TRA Model...")
self.model = eval(model_type)(**model_config).to(device)
if model_init_state:
self.model.load_state_dict(torch.load(model_init_state, map_location="cpu")["model"])
if freeze_model:
for param in self.model.parameters():
param.requires_grad_(False)
else:
self.logger.info("# model params: %d" % sum([p.numel() for p in self.model.parameters()]))
self.tra = TRA(self.model.output_size, **tra_config).to(device)
self.logger.info("# tra params: %d" % sum([p.numel() for p in self.tra.parameters()]))
self.optimizer = optim.Adam(list(self.model.parameters()) + list(self.tra.parameters()), lr=lr)
self.model_config = model_config
self.tra_config = tra_config
self.lr = lr
self.n_epochs = n_epochs
self.early_stop = early_stop
self.smooth_steps = smooth_steps
self.max_steps_per_epoch = max_steps_per_epoch
self.lamb = lamb
self.rho = rho
self.seed = seed
self.logdir = logdir
self.eval_train = eval_train
self.eval_test = eval_test
self.avg_params = avg_params
if self.tra.num_states > 1 and not self.eval_train:
self.logger.warn("`eval_train` will be ignored when using TRA")
if self.logdir is not None:
if os.path.exists(self.logdir):
self.logger.warn(f"logdir {self.logdir} is not empty")
os.makedirs(self.logdir, exist_ok=True)
self.fitted = False
self.global_step = -1
def train_epoch(self, data_set):
self.model.train()
self.tra.train()
data_set.train()
max_steps = self.n_epochs
if self.max_steps_per_epoch is not None:
max_steps = min(self.max_steps_per_epoch, self.n_epochs)
count = 0
total_loss = 0
total_count = 0
for batch in tqdm(data_set, total=max_steps):
count += 1
if count > max_steps:
break
self.global_step += 1
data, label, index = batch["data"], batch["label"], batch["index"]
feature = data[:, :, : -self.tra.num_states]
hist_loss = data[:, : -data_set.horizon, -self.tra.num_states :]
hidden = self.model(feature)
pred, all_preds, prob = self.tra(hidden, hist_loss)
loss = (pred - label).pow(2).mean()
L = (all_preds.detach() - label[:, None]).pow(2)
L -= L.min(dim=-1, keepdim=True).values # normalize & ensure positive input
data_set.assign_data(index, L) # save loss to memory
if prob is not None:
P = sinkhorn(-L, epsilon=0.01) # sample assignment matrix
lamb = self.lamb * (self.rho**self.global_step)
reg = prob.log().mul(P).sum(dim=-1).mean()
loss = loss - lamb * reg
loss.backward()
self.optimizer.step()
self.optimizer.zero_grad()
total_loss += loss.item()
total_count += len(pred)
total_loss /= total_count
return total_loss
def test_epoch(self, data_set, return_pred=False):
self.model.eval()
self.tra.eval()
data_set.eval()
preds = []
metrics = []
for batch in tqdm(data_set):
data, label, index = batch["data"], batch["label"], batch["index"]
feature = data[:, :, : -self.tra.num_states]
hist_loss = data[:, : -data_set.horizon, -self.tra.num_states :]
with torch.no_grad():
hidden = self.model(feature)
pred, all_preds, prob = self.tra(hidden, hist_loss)
L = (all_preds - label[:, None]).pow(2)
L -= L.min(dim=-1, keepdim=True).values # normalize & ensure positive input
data_set.assign_data(index, L) # save loss to memory
X = np.c_[
pred.cpu().numpy(),
label.cpu().numpy(),
]
columns = ["score", "label"]
if prob is not None:
X = np.c_[X, all_preds.cpu().numpy(), prob.cpu().numpy()]
columns += ["score_%d" % d for d in range(all_preds.shape[1])] + [
"prob_%d" % d for d in range(all_preds.shape[1])
]
pred = pd.DataFrame(X, index=index.cpu().numpy(), columns=columns)
metrics.append(evaluate(pred))
if return_pred:
preds.append(pred)
metrics = pd.DataFrame(metrics)
metrics = {
"MSE": metrics.MSE.mean(),
"MAE": metrics.MAE.mean(),
"IC": metrics.IC.mean(),
"ICIR": metrics.IC.mean() / metrics.IC.std(),
}
if return_pred:
preds = pd.concat(preds, axis=0)
preds.index = data_set.restore_index(preds.index)
preds.index = preds.index.swaplevel()
preds.sort_index(inplace=True)
return metrics, preds
def fit(self, dataset, evals_result=dict()):
train_set, valid_set, test_set = dataset.prepare(["train", "valid", "test"])
best_score = -1
best_epoch = 0
stop_rounds = 0
best_params = {
"model": copy.deepcopy(self.model.state_dict()),
"tra": copy.deepcopy(self.tra.state_dict()),
}
params_list = {
"model": collections.deque(maxlen=self.smooth_steps),
"tra": collections.deque(maxlen=self.smooth_steps),
}
evals_result["train"] = []
evals_result["valid"] = []
evals_result["test"] = []
# train
self.fitted = True
self.global_step = -1
if self.tra.num_states > 1:
self.logger.info("init memory...")
self.test_epoch(train_set)
for epoch in range(self.n_epochs):
self.logger.info("Epoch %d:", epoch)
self.logger.info("training...")
self.train_epoch(train_set)
self.logger.info("evaluating...")
# average params for inference
params_list["model"].append(copy.deepcopy(self.model.state_dict()))
params_list["tra"].append(copy.deepcopy(self.tra.state_dict()))
self.model.load_state_dict(average_params(params_list["model"]))
self.tra.load_state_dict(average_params(params_list["tra"]))
# NOTE: during evaluating, the whole memory will be refreshed
if self.tra.num_states > 1 or self.eval_train:
train_set.clear_memory() # NOTE: clear the shared memory
train_metrics = self.test_epoch(train_set)[0]
evals_result["train"].append(train_metrics)
self.logger.info("\ttrain metrics: %s" % train_metrics)
valid_metrics = self.test_epoch(valid_set)[0]
evals_result["valid"].append(valid_metrics)
self.logger.info("\tvalid metrics: %s" % valid_metrics)
if self.eval_test:
test_metrics = self.test_epoch(test_set)[0]
evals_result["test"].append(test_metrics)
self.logger.info("\ttest metrics: %s" % test_metrics)
if valid_metrics["IC"] > best_score:
best_score = valid_metrics["IC"]
stop_rounds = 0
best_epoch = epoch
best_params = {
"model": copy.deepcopy(self.model.state_dict()),
"tra": copy.deepcopy(self.tra.state_dict()),
}
else:
stop_rounds += 1
if stop_rounds >= self.early_stop:
self.logger.info("early stop @ %s" % epoch)
break
# restore parameters
self.model.load_state_dict(params_list["model"][-1])
self.tra.load_state_dict(params_list["tra"][-1])
self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch))
self.model.load_state_dict(best_params["model"])
self.tra.load_state_dict(best_params["tra"])
metrics, preds = self.test_epoch(test_set, return_pred=True)
self.logger.info("test metrics: %s" % metrics)
if self.logdir:
self.logger.info("save model & pred to local directory")
pd.concat({name: pd.DataFrame(evals_result[name]) for name in evals_result}, axis=1).to_csv(
self.logdir + "/logs.csv", index=False
)
torch.save(best_params, self.logdir + "/model.bin")
preds.to_pickle(self.logdir + "/pred.pkl")
info = {
"config": {
"model_config": self.model_config,
"tra_config": self.tra_config,
"lr": self.lr,
"n_epochs": self.n_epochs,
"early_stop": self.early_stop,
"smooth_steps": self.smooth_steps,
"max_steps_per_epoch": self.max_steps_per_epoch,
"lamb": self.lamb,
"rho": self.rho,
"seed": self.seed,
"logdir": self.logdir,
},
"best_eval_metric": -best_score, # NOTE: minux -1 for minimize
"metric": metrics,
}
with open(self.logdir + "/info.json", "w") as f:
json.dump(info, f)
def predict(self, dataset, segment="test"):
if not self.fitted:
raise ValueError("model is not fitted yet!")
test_set = dataset.prepare(segment)
metrics, preds = self.test_epoch(test_set, return_pred=True)
self.logger.info("test metrics: %s" % metrics)
return preds
class LSTM(nn.Module):
"""LSTM Model
Args:
input_size (int): input size (# features)
hidden_size (int): hidden size
num_layers (int): number of hidden layers
use_attn (bool): whether use attention layer.
we use concat attention as https://github.com/fulifeng/Adv-ALSTM/
dropout (float): dropout rate
input_drop (float): input dropout for data augmentation
noise_level (float): add gaussian noise to input for data augmentation
"""
def __init__(
self,
input_size=16,
hidden_size=64,
num_layers=2,
use_attn=True,
dropout=0.0,
input_drop=0.0,
noise_level=0.0,
*args,
**kwargs,
):
super().__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.use_attn = use_attn
self.noise_level = noise_level
self.input_drop = nn.Dropout(input_drop)
self.rnn = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout,
)
if self.use_attn:
self.W = nn.Linear(hidden_size, hidden_size)
self.u = nn.Linear(hidden_size, 1, bias=False)
self.output_size = hidden_size * 2
else:
self.output_size = hidden_size
def forward(self, x):
x = self.input_drop(x)
if self.training and self.noise_level > 0:
noise = torch.randn_like(x).to(x)
x = x + noise * self.noise_level
rnn_out, _ = self.rnn(x)
last_out = rnn_out[:, -1]
if self.use_attn:
laten = self.W(rnn_out).tanh()
scores = self.u(laten).softmax(dim=1)
att_out = (rnn_out * scores).sum(dim=1).squeeze()
last_out = torch.cat([last_out, att_out], dim=1)
return last_out
class PositionalEncoding(nn.Module):
# reference: https://pytorch.org/tutorials/beginner/transformer_tutorial.html
def __init__(self, d_model, dropout=0.1, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer("pe", pe)
def forward(self, x):
x = x + self.pe[: x.size(0), :]
return self.dropout(x)
class Transformer(nn.Module):
"""Transformer Model
Args:
input_size (int): input size (# features)
hidden_size (int): hidden size
num_layers (int): number of transformer layers
num_heads (int): number of heads in transformer
dropout (float): dropout rate
input_drop (float): input dropout for data augmentation
noise_level (float): add gaussian noise to input for data augmentation
"""
def __init__(
self,
input_size=16,
hidden_size=64,
num_layers=2,
num_heads=2,
dropout=0.0,
input_drop=0.0,
noise_level=0.0,
**kwargs,
):
super().__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.num_heads = num_heads
self.noise_level = noise_level
self.input_drop = nn.Dropout(input_drop)
self.input_proj = nn.Linear(input_size, hidden_size)
self.pe = PositionalEncoding(input_size, dropout)
layer = nn.TransformerEncoderLayer(
nhead=num_heads, dropout=dropout, d_model=hidden_size, dim_feedforward=hidden_size * 4
)
self.encoder = nn.TransformerEncoder(layer, num_layers=num_layers)
self.output_size = hidden_size
def forward(self, x):
x = self.input_drop(x)
if self.training and self.noise_level > 0:
noise = torch.randn_like(x).to(x)
x = x + noise * self.noise_level
x = x.permute(1, 0, 2).contiguous() # the first dim need to be sequence
x = self.pe(x)
x = self.input_proj(x)
out = self.encoder(x)
return out[-1]
class TRA(nn.Module):
"""Temporal Routing Adaptor (TRA)
TRA takes historical prediction errors & latent representation as inputs,
then routes the input sample to a specific predictor for training & inference.
Args:
input_size (int): input size (RNN/Transformer's hidden size)
num_states (int): number of latent states (i.e., trading patterns)
If `num_states=1`, then TRA falls back to traditional methods
hidden_size (int): hidden size of the router
tau (float): gumbel softmax temperature
"""
def __init__(self, input_size, num_states=1, hidden_size=8, tau=1.0, src_info="LR_TPE"):
super().__init__()
self.num_states = num_states
self.tau = tau
self.src_info = src_info
if num_states > 1:
self.router = nn.LSTM(
input_size=num_states,
hidden_size=hidden_size,
num_layers=1,
batch_first=True,
)
self.fc = nn.Linear(hidden_size + input_size, num_states)
self.predictors = nn.Linear(input_size, num_states)
def forward(self, hidden, hist_loss):
preds = self.predictors(hidden)
if self.num_states == 1:
return preds.squeeze(-1), preds, None
# information type
router_out, _ = self.router(hist_loss)
if "LR" in self.src_info:
latent_representation = hidden
else:
latent_representation = torch.randn(hidden.shape).to(hidden)
if "TPE" in self.src_info:
temporal_pred_error = router_out[:, -1]
else:
temporal_pred_error = torch.randn(router_out[:, -1].shape).to(hidden)
out = self.fc(torch.cat([temporal_pred_error, latent_representation], dim=-1))
prob = F.gumbel_softmax(out, dim=-1, tau=self.tau, hard=False)
if self.training:
final_pred = (preds * prob).sum(dim=-1)
else:
final_pred = preds[range(len(preds)), prob.argmax(dim=-1)]
return final_pred, preds, prob
def evaluate(pred):
pred = pred.rank(pct=True) # transform into percentiles
score = pred.score
label = pred.label
diff = score - label
MSE = (diff**2).mean()
MAE = (diff.abs()).mean()
IC = score.corr(label)
return {"MSE": MSE, "MAE": MAE, "IC": IC}
def average_params(params_list):
assert isinstance(params_list, (tuple, list, collections.deque))
n = len(params_list)
if n == 1:
return params_list[0]
new_params = collections.OrderedDict()
keys = None
for i, params in enumerate(params_list):
if keys is None:
keys = params.keys()
for k, v in params.items():
if k not in keys:
raise ValueError("the %d-th model has different params" % i)
if k not in new_params:
new_params[k] = v / n
else:
new_params[k] += v / n
return new_params
def shoot_infs(inp_tensor):
"""Replaces inf by maximum of tensor"""
mask_inf = torch.isinf(inp_tensor)
ind_inf = torch.nonzero(mask_inf, as_tuple=False)
if len(ind_inf) > 0:
for ind in ind_inf:
if len(ind) == 2:
inp_tensor[ind[0], ind[1]] = 0
elif len(ind) == 1:
inp_tensor[ind[0]] = 0
m = torch.max(inp_tensor)
for ind in ind_inf:
if len(ind) == 2:
inp_tensor[ind[0], ind[1]] = m
elif len(ind) == 1:
inp_tensor[ind[0]] = m
return inp_tensor
def sinkhorn(Q, n_iters=3, epsilon=0.01):
# epsilon should be adjusted according to logits value's scale
with torch.no_grad():
Q = shoot_infs(Q)
Q = torch.exp(Q / epsilon)
for i in range(n_iters):
Q /= Q.sum(dim=0, keepdim=True)
Q /= Q.sum(dim=1, keepdim=True)
return Q