diff --git a/qlib/contrib/model/pytorch_localformer.py b/qlib/contrib/model/pytorch_localformer.py index 683a9bd4f..1b722ead2 100644 --- a/qlib/contrib/model/pytorch_localformer.py +++ b/qlib/contrib/model/pytorch_localformer.py @@ -8,6 +8,7 @@ from __future__ import print_function import os import numpy as np import pandas as pd +from typing import Text, Union import copy import math from ...utils import get_or_create_path @@ -23,6 +24,7 @@ from ...model.base import Model from ...data.dataset import DatasetH, TSDatasetH from ...data.dataset.handler import DataHandlerLP from torch.nn.modules.container import ModuleList +# qrun examples/benchmarks/Localformer/workflow_config_localformer_Alpha360.yaml ” class LocalformerModel(Model): @@ -30,7 +32,7 @@ class LocalformerModel(Model): self, d_feat: int = 20, d_model: int = 64, - batch_size: int = 8192, + batch_size: int = 2048, nhead: int = 2, num_layers: int = 2, dropout: float = 0, @@ -62,9 +64,7 @@ class LocalformerModel(Model): self.device = torch.device("cuda:%d" % GPU if torch.cuda.is_available() and GPU >= 0 else "cpu") self.seed = seed self.logger = get_module_logger("TransformerModel") - self.logger.info( - "Improved Transformer:" "\nbatch_size : {}" "\ndevice : {}".format(self.batch_size, self.device) - ) + self.logger.info("Naive Transformer:" "\nbatch_size : {}" "\ndevice : {}".format(self.batch_size, self.device)) if self.seed is not None: np.random.seed(self.seed) @@ -106,15 +106,25 @@ class LocalformerModel(Model): raise ValueError("unknown metric `%s`" % self.metric) - def train_epoch(self, data_loader): + def train_epoch(self, x_train, y_train): + + x_train_values = x_train.values + y_train_values = np.squeeze(y_train.values) self.model.train() - for data in data_loader: - feature = data[:, :, 0:-1].to(self.device) - label = data[:, -1, -1].to(self.device) + indices = np.arange(len(x_train_values)) + np.random.shuffle(indices) - pred = self.model(feature.float()) # .float() + for i in range(len(indices))[:: self.batch_size]: + + if len(indices) - i < self.batch_size: + break + + feature = torch.from_numpy(x_train_values[indices[i : i + self.batch_size]]).float().to(self.device) + label = torch.from_numpy(y_train_values[indices[i : i + self.batch_size]]).float().to(self.device) + + pred = self.model(feature) loss = self.loss_fn(pred, label) self.train_optimizer.zero_grad() @@ -122,20 +132,29 @@ class LocalformerModel(Model): torch.nn.utils.clip_grad_value_(self.model.parameters(), 3.0) self.train_optimizer.step() - def test_epoch(self, data_loader): + def test_epoch(self, data_x, data_y): + + # prepare training data + x_values = data_x.values + y_values = np.squeeze(data_y.values) self.model.eval() scores = [] losses = [] - for data in data_loader: + indices = np.arange(len(x_values)) - feature = data[:, :, 0:-1].to(self.device) - label = data[:, -1, -1].to(self.device) + for i in range(len(indices))[:: self.batch_size]: + + if len(indices) - i < self.batch_size: + break + + feature = torch.from_numpy(x_values[indices[i: i + self.batch_size]]).float().to(self.device) + label = torch.from_numpy(y_values[indices[i: i + self.batch_size]]).float().to(self.device) with torch.no_grad(): - pred = self.model(feature.float()) # .float() + pred = self.model(feature) loss = self.loss_fn(pred, label) losses.append(loss.item()) @@ -151,21 +170,16 @@ class LocalformerModel(Model): save_path=None, ): - dl_train = dataset.prepare("train", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L) - dl_valid = dataset.prepare("valid", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L) - - dl_train.config(fillna_type="ffill+bfill") # process nan brought by dataloader - dl_valid.config(fillna_type="ffill+bfill") # process nan brought by dataloader - - train_loader = DataLoader( - dl_train, batch_size=self.batch_size, shuffle=True, num_workers=self.n_jobs, drop_last=True - ) - valid_loader = DataLoader( - dl_valid, batch_size=self.batch_size, shuffle=False, num_workers=self.n_jobs, drop_last=True + df_train, df_valid, df_test = dataset.prepare( + ["train", "valid", "test"], + col_set=["feature", "label"], + data_key=DataHandlerLP.DK_L, ) + x_train, y_train = df_train["feature"], df_train["label"] + x_valid, y_valid = df_valid["feature"], df_valid["label"] + save_path = get_or_create_path(save_path) - stop_steps = 0 train_loss = 0 best_score = -np.inf @@ -180,10 +194,10 @@ class LocalformerModel(Model): for step in range(self.n_epochs): self.logger.info("Epoch%d:", step) self.logger.info("training...") - self.train_epoch(train_loader) + self.train_epoch(x_train, y_train) self.logger.info("evaluating...") - train_loss, train_score = self.test_epoch(train_loader) - val_loss, val_score = self.test_epoch(valid_loader) + train_loss, train_score = self.test_epoch(x_train, y_train) + val_loss, val_score = self.test_epoch(x_valid, y_valid) self.logger.info("train %.6f, valid %.6f" % (train_score, val_score)) evals_result["train"].append(train_score) evals_result["valid"].append(val_score) @@ -206,25 +220,32 @@ class LocalformerModel(Model): if self.use_gpu: torch.cuda.empty_cache() - def predict(self, dataset): + def predict(self, dataset: DatasetH, segment: Union[Text, slice] = "test"): if not self.fitted: raise ValueError("model is not fitted yet!") - dl_test = dataset.prepare("test", col_set=["feature", "label"], data_key=DataHandlerLP.DK_I) - dl_test.config(fillna_type="ffill+bfill") - test_loader = DataLoader(dl_test, batch_size=self.batch_size, num_workers=self.n_jobs) + x_test = dataset.prepare(segment, col_set="feature", data_key=DataHandlerLP.DK_I) + index = x_test.index self.model.eval() + x_values = x_test.values + sample_num = x_values.shape[0] preds = [] - for data in test_loader: - feature = data[:, :, 0:-1].to(self.device) + for begin in range(sample_num)[:: self.batch_size]: + + if sample_num - begin < self.batch_size: + end = sample_num + else: + end = begin + self.batch_size + + x_batch = torch.from_numpy(x_values[begin:end]).float().to(self.device) with torch.no_grad(): - pred = self.model(feature.float()).detach().cpu().numpy() + pred = self.model(x_batch).detach().cpu().numpy() preds.append(pred) - return pd.Series(np.concatenate(preds), index=dl_test.get_index()) + return pd.Series(np.concatenate(preds), index=index) class PositionalEncoding(nn.Module): @@ -289,8 +310,9 @@ class Transformer(nn.Module): self.d_feat = d_feat def forward(self, src): - # src [N, T, F], [512, 60, 6] - src = self.feature_layer(src) # [512, 60, 8] + # src [N, F*T] --> [N, T, F] + src = src.reshape(len(src), self.d_feat, -1).permute(0, 2, 1) + src = self.feature_layer(src) # src [N, T, F] --> [T, N, F], [60, 512, 8] src = src.transpose(1, 0) # not batch first diff --git a/qlib/contrib/model/pytorch_localformer_ts.py b/qlib/contrib/model/pytorch_localformer_ts.py new file mode 100644 index 000000000..aa7af84df --- /dev/null +++ b/qlib/contrib/model/pytorch_localformer_ts.py @@ -0,0 +1,310 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +from __future__ import division +from __future__ import print_function + +import os +import numpy as np +import pandas as pd +import copy +import math +from ...utils import get_or_create_path +from ...log import get_module_logger + +import torch +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader + +from .pytorch_utils import count_parameters +from ...model.base import Model +from ...data.dataset import DatasetH, TSDatasetH +from ...data.dataset.handler import DataHandlerLP +from torch.nn.modules.container import ModuleList + + +class LocalformerModel(Model): + def __init__( + self, + d_feat: int = 20, + d_model: int = 64, + batch_size: int = 8192, + nhead: int = 2, + num_layers: int = 2, + dropout: float = 0, + n_epochs=100, + lr=0.0001, + metric="", + early_stop=5, + loss="mse", + optimizer="adam", + reg=1e-3, + n_jobs=10, + GPU=2, + seed=None, + **kwargs + ): + + # set hyper-parameters. + self.d_model = d_model + self.dropout = dropout + self.n_epochs = n_epochs + self.lr = lr + self.reg = reg + self.metric = metric + self.batch_size = batch_size + self.early_stop = early_stop + self.optimizer = optimizer.lower() + self.loss = loss + self.n_jobs = n_jobs + self.device = torch.device("cuda:%d" % GPU if torch.cuda.is_available() and GPU >= 0 else "cpu") + self.seed = seed + self.logger = get_module_logger("TransformerModel") + self.logger.info( + "Improved Transformer:" "\nbatch_size : {}" "\ndevice : {}".format(self.batch_size, self.device) + ) + + if self.seed is not None: + np.random.seed(self.seed) + torch.manual_seed(self.seed) + + self.model = Transformer(d_feat, d_model, nhead, num_layers, dropout, self.device) + if optimizer.lower() == "adam": + self.train_optimizer = optim.Adam(self.model.parameters(), lr=self.lr, weight_decay=self.reg) + elif optimizer.lower() == "gd": + self.train_optimizer = optim.SGD(self.model.parameters(), lr=self.lr, weight_decay=self.reg) + else: + raise NotImplementedError("optimizer {} is not supported!".format(optimizer)) + + self.fitted = False + self.model.to(self.device) + + @property + def use_gpu(self): + return self.device != torch.device("cpu") + + def mse(self, pred, label): + loss = (pred.float() - label.float()) ** 2 + return torch.mean(loss) + + def loss_fn(self, pred, label): + mask = ~torch.isnan(label) + + if self.loss == "mse": + return self.mse(pred[mask], label[mask]) + + raise ValueError("unknown loss `%s`" % self.loss) + + def metric_fn(self, pred, label): + + mask = torch.isfinite(label) + + if self.metric == "" or self.metric == "loss": + return -self.loss_fn(pred[mask], label[mask]) + + raise ValueError("unknown metric `%s`" % self.metric) + + def train_epoch(self, data_loader): + + self.model.train() + + for data in data_loader: + feature = data[:, :, 0:-1].to(self.device) + label = data[:, -1, -1].to(self.device) + + pred = self.model(feature.float()) # .float() + loss = self.loss_fn(pred, label) + + self.train_optimizer.zero_grad() + loss.backward() + torch.nn.utils.clip_grad_value_(self.model.parameters(), 3.0) + self.train_optimizer.step() + + def test_epoch(self, data_loader): + + self.model.eval() + + scores = [] + losses = [] + + for data in data_loader: + + feature = data[:, :, 0:-1].to(self.device) + label = data[:, -1, -1].to(self.device) + + with torch.no_grad(): + pred = self.model(feature.float()) # .float() + loss = self.loss_fn(pred, label) + losses.append(loss.item()) + + score = self.metric_fn(pred, label) + scores.append(score.item()) + + return np.mean(losses), np.mean(scores) + + def fit( + self, + dataset: DatasetH, + evals_result=dict(), + save_path=None, + ): + + dl_train = dataset.prepare("train", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L) + dl_valid = dataset.prepare("valid", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L) + import pdb + pdb.set_trace() + + dl_train.config(fillna_type="ffill+bfill") # process nan brought by dataloader + dl_valid.config(fillna_type="ffill+bfill") # process nan brought by dataloader + + train_loader = DataLoader( + dl_train, batch_size=self.batch_size, shuffle=True, num_workers=self.n_jobs, drop_last=True + ) + valid_loader = DataLoader( + dl_valid, batch_size=self.batch_size, shuffle=False, num_workers=self.n_jobs, drop_last=True + ) + + save_path = get_or_create_path(save_path) + + stop_steps = 0 + train_loss = 0 + best_score = -np.inf + best_epoch = 0 + evals_result["train"] = [] + evals_result["valid"] = [] + + # train + self.logger.info("training...") + self.fitted = True + + for step in range(self.n_epochs): + self.logger.info("Epoch%d:", step) + self.logger.info("training...") + self.train_epoch(train_loader) + self.logger.info("evaluating...") + train_loss, train_score = self.test_epoch(train_loader) + val_loss, val_score = self.test_epoch(valid_loader) + self.logger.info("train %.6f, valid %.6f" % (train_score, val_score)) + evals_result["train"].append(train_score) + evals_result["valid"].append(val_score) + + if val_score > best_score: + best_score = val_score + stop_steps = 0 + best_epoch = step + best_param = copy.deepcopy(self.model.state_dict()) + else: + stop_steps += 1 + if stop_steps >= self.early_stop: + self.logger.info("early stop") + break + + self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch)) + self.model.load_state_dict(best_param) + torch.save(best_param, save_path) + + if self.use_gpu: + torch.cuda.empty_cache() + + def predict(self, dataset): + if not self.fitted: + raise ValueError("model is not fitted yet!") + + dl_test = dataset.prepare("test", col_set=["feature", "label"], data_key=DataHandlerLP.DK_I) + dl_test.config(fillna_type="ffill+bfill") + test_loader = DataLoader(dl_test, batch_size=self.batch_size, num_workers=self.n_jobs) + self.model.eval() + preds = [] + + for data in test_loader: + feature = data[:, :, 0:-1].to(self.device) + + with torch.no_grad(): + pred = self.model(feature.float()).detach().cpu().numpy() + + preds.append(pred) + + return pd.Series(np.concatenate(preds), index=dl_test.get_index()) + + +class PositionalEncoding(nn.Module): + def __init__(self, d_model, max_len=1000): + super(PositionalEncoding, self).__init__() + pe = torch.zeros(max_len, d_model) + position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) + div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) + pe[:, 0::2] = torch.sin(position * div_term) + pe[:, 1::2] = torch.cos(position * div_term) + pe = pe.unsqueeze(0).transpose(0, 1) + self.register_buffer("pe", pe) + + def forward(self, x): + # [T, N, F] + return x + self.pe[: x.size(0), :] + + +def _get_clones(module, N): + return ModuleList([copy.deepcopy(module) for i in range(N)]) + + +class LocalformerEncoder(nn.Module): + __constants__ = ["norm"] + + def __init__(self, encoder_layer, num_layers, d_model): + super(LocalformerEncoder, self).__init__() + self.layers = _get_clones(encoder_layer, num_layers) + self.conv = _get_clones(nn.Conv1d(d_model, d_model, 3, 1, 1), num_layers) + self.num_layers = num_layers + + def forward(self, src, mask): + output = src + out = src + + for i, mod in enumerate(self.layers): + # [T, N, F] --> [N, T, F] --> [N, F, T] + out = output.transpose(1, 0).transpose(2, 1) + out = self.conv[i](out).transpose(2, 1).transpose(1, 0) + + output = mod(output + out, src_mask=mask) + + return output + out + + +class Transformer(nn.Module): + def __init__(self, d_feat=6, d_model=8, nhead=4, num_layers=2, dropout=0.5, device=None): + super(Transformer, self).__init__() + self.rnn = nn.GRU( + input_size=d_model, + hidden_size=d_model, + num_layers=num_layers, + batch_first=False, + dropout=dropout, + ) + self.feature_layer = nn.Linear(d_feat, d_model) + self.pos_encoder = PositionalEncoding(d_model) + self.encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dropout=dropout) + self.transformer_encoder = LocalformerEncoder(self.encoder_layer, num_layers=num_layers, d_model=d_model) + self.decoder_layer = nn.Linear(d_model, 1) + self.device = device + self.d_feat = d_feat + + def forward(self, src): + # src [N, T, F], [512, 60, 6] + src = self.feature_layer(src) # [512, 60, 8] + + # src [N, T, F] --> [T, N, F], [60, 512, 8] + src = src.transpose(1, 0) # not batch first + + mask = None + + src = self.pos_encoder(src) + output = self.transformer_encoder(src, mask) # [60, 512, 8] + + output, _ = self.rnn(output) + + # [T, N, F] --> [N, T*F] + output = self.decoder_layer(output.transpose(1, 0)[:, -1, :]) # [512, 1] + + return output.squeeze() diff --git a/qlib/contrib/model/pytorch_transformer.py b/qlib/contrib/model/pytorch_transformer.py index c53564903..cca7a7871 100644 --- a/qlib/contrib/model/pytorch_transformer.py +++ b/qlib/contrib/model/pytorch_transformer.py @@ -8,6 +8,7 @@ from __future__ import print_function import os import numpy as np import pandas as pd +from typing import Text, Union import copy import math from ...utils import get_or_create_path @@ -22,6 +23,7 @@ from .pytorch_utils import count_parameters from ...model.base import Model from ...data.dataset import DatasetH, TSDatasetH from ...data.dataset.handler import DataHandlerLP +# qrun examples/benchmarks/Transformer/workflow_config_transformer_Alpha360.yaml ” class TransformerModel(Model): @@ -29,7 +31,7 @@ class TransformerModel(Model): self, d_feat: int = 20, d_model: int = 64, - batch_size: int = 8192, + batch_size: int = 2048, nhead: int = 2, num_layers: int = 2, dropout: float = 0, @@ -103,15 +105,25 @@ class TransformerModel(Model): raise ValueError("unknown metric `%s`" % self.metric) - def train_epoch(self, data_loader): + def train_epoch(self, x_train, y_train): + + x_train_values = x_train.values + y_train_values = np.squeeze(y_train.values) self.model.train() - for data in data_loader: - feature = data[:, :, 0:-1].to(self.device) - label = data[:, -1, -1].to(self.device) + indices = np.arange(len(x_train_values)) + np.random.shuffle(indices) - pred = self.model(feature.float()) # .float() + for i in range(len(indices))[:: self.batch_size]: + + if len(indices) - i < self.batch_size: + break + + feature = torch.from_numpy(x_train_values[indices[i : i + self.batch_size]]).float().to(self.device) + label = torch.from_numpy(y_train_values[indices[i : i + self.batch_size]]).float().to(self.device) + + pred = self.model(feature) loss = self.loss_fn(pred, label) self.train_optimizer.zero_grad() @@ -119,20 +131,29 @@ class TransformerModel(Model): torch.nn.utils.clip_grad_value_(self.model.parameters(), 3.0) self.train_optimizer.step() - def test_epoch(self, data_loader): + def test_epoch(self, data_x, data_y): + + # prepare training data + x_values = data_x.values + y_values = np.squeeze(data_y.values) self.model.eval() scores = [] losses = [] - for data in data_loader: + indices = np.arange(len(x_values)) - feature = data[:, :, 0:-1].to(self.device) - label = data[:, -1, -1].to(self.device) + for i in range(len(indices))[:: self.batch_size]: + + if len(indices) - i < self.batch_size: + break + + feature = torch.from_numpy(x_values[indices[i: i + self.batch_size]]).float().to(self.device) + label = torch.from_numpy(y_values[indices[i: i + self.batch_size]]).float().to(self.device) with torch.no_grad(): - pred = self.model(feature.float()) # .float() + pred = self.model(feature) loss = self.loss_fn(pred, label) losses.append(loss.item()) @@ -148,21 +169,16 @@ class TransformerModel(Model): save_path=None, ): - dl_train = dataset.prepare("train", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L) - dl_valid = dataset.prepare("valid", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L) - - dl_train.config(fillna_type="ffill+bfill") # process nan brought by dataloader - dl_valid.config(fillna_type="ffill+bfill") # process nan brought by dataloader - - train_loader = DataLoader( - dl_train, batch_size=self.batch_size, shuffle=True, num_workers=self.n_jobs, drop_last=True - ) - valid_loader = DataLoader( - dl_valid, batch_size=self.batch_size, shuffle=False, num_workers=self.n_jobs, drop_last=True + df_train, df_valid, df_test = dataset.prepare( + ["train", "valid", "test"], + col_set=["feature", "label"], + data_key=DataHandlerLP.DK_L, ) + x_train, y_train = df_train["feature"], df_train["label"] + x_valid, y_valid = df_valid["feature"], df_valid["label"] + save_path = get_or_create_path(save_path) - stop_steps = 0 train_loss = 0 best_score = -np.inf @@ -177,10 +193,10 @@ class TransformerModel(Model): for step in range(self.n_epochs): self.logger.info("Epoch%d:", step) self.logger.info("training...") - self.train_epoch(train_loader) + self.train_epoch(x_train, y_train) self.logger.info("evaluating...") - train_loss, train_score = self.test_epoch(train_loader) - val_loss, val_score = self.test_epoch(valid_loader) + train_loss, train_score = self.test_epoch(x_train, y_train) + val_loss, val_score = self.test_epoch(x_valid, y_valid) self.logger.info("train %.6f, valid %.6f" % (train_score, val_score)) evals_result["train"].append(train_score) evals_result["valid"].append(val_score) @@ -203,25 +219,32 @@ class TransformerModel(Model): if self.use_gpu: torch.cuda.empty_cache() - def predict(self, dataset): + def predict(self, dataset: DatasetH, segment: Union[Text, slice] = "test"): if not self.fitted: raise ValueError("model is not fitted yet!") - dl_test = dataset.prepare("test", col_set=["feature", "label"], data_key=DataHandlerLP.DK_I) - dl_test.config(fillna_type="ffill+bfill") - test_loader = DataLoader(dl_test, batch_size=self.batch_size, num_workers=self.n_jobs) + x_test = dataset.prepare(segment, col_set="feature", data_key=DataHandlerLP.DK_I) + index = x_test.index self.model.eval() + x_values = x_test.values + sample_num = x_values.shape[0] preds = [] - for data in test_loader: - feature = data[:, :, 0:-1].to(self.device) + for begin in range(sample_num)[:: self.batch_size]: + + if sample_num - begin < self.batch_size: + end = sample_num + else: + end = begin + self.batch_size + + x_batch = torch.from_numpy(x_values[begin:end]).float().to(self.device) with torch.no_grad(): - pred = self.model(feature.float()).detach().cpu().numpy() + pred = self.model(x_batch).detach().cpu().numpy() preds.append(pred) - return pd.Series(np.concatenate(preds), index=dl_test.get_index()) + return pd.Series(np.concatenate(preds), index=index) class PositionalEncoding(nn.Module): @@ -252,8 +275,9 @@ class Transformer(nn.Module): self.d_feat = d_feat def forward(self, src): - # src [N, T, F], [512, 60, 6] - src = self.feature_layer(src) # [512, 60, 8] + # src [N, F*T] --> [N, T, F] + src = src.reshape(len(src), self.d_feat, -1).permute(0, 2, 1) + src = self.feature_layer(src) # src [N, T, F] --> [T, N, F], [60, 512, 8] src = src.transpose(1, 0) # not batch first diff --git a/qlib/contrib/model/pytorch_transformer_ts.py b/qlib/contrib/model/pytorch_transformer_ts.py new file mode 100644 index 000000000..c53564903 --- /dev/null +++ b/qlib/contrib/model/pytorch_transformer_ts.py @@ -0,0 +1,269 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +from __future__ import division +from __future__ import print_function + +import os +import numpy as np +import pandas as pd +import copy +import math +from ...utils import get_or_create_path +from ...log import get_module_logger + +import torch +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader + +from .pytorch_utils import count_parameters +from ...model.base import Model +from ...data.dataset import DatasetH, TSDatasetH +from ...data.dataset.handler import DataHandlerLP + + +class TransformerModel(Model): + def __init__( + self, + d_feat: int = 20, + d_model: int = 64, + batch_size: int = 8192, + nhead: int = 2, + num_layers: int = 2, + dropout: float = 0, + n_epochs=100, + lr=0.0001, + metric="", + early_stop=5, + loss="mse", + optimizer="adam", + reg=1e-3, + n_jobs=10, + GPU=0, + seed=None, + **kwargs + ): + + # set hyper-parameters. + self.d_model = d_model + self.dropout = dropout + self.n_epochs = n_epochs + self.lr = lr + self.reg = reg + self.metric = metric + self.batch_size = batch_size + self.early_stop = early_stop + self.optimizer = optimizer.lower() + self.loss = loss + self.n_jobs = n_jobs + self.device = torch.device("cuda:%d" % GPU if torch.cuda.is_available() and GPU >= 0 else "cpu") + self.seed = seed + self.logger = get_module_logger("TransformerModel") + self.logger.info("Naive Transformer:" "\nbatch_size : {}" "\ndevice : {}".format(self.batch_size, self.device)) + + if self.seed is not None: + np.random.seed(self.seed) + torch.manual_seed(self.seed) + + self.model = Transformer(d_feat, d_model, nhead, num_layers, dropout, self.device) + if optimizer.lower() == "adam": + self.train_optimizer = optim.Adam(self.model.parameters(), lr=self.lr, weight_decay=self.reg) + elif optimizer.lower() == "gd": + self.train_optimizer = optim.SGD(self.model.parameters(), lr=self.lr, weight_decay=self.reg) + else: + raise NotImplementedError("optimizer {} is not supported!".format(optimizer)) + + self.fitted = False + self.model.to(self.device) + + @property + def use_gpu(self): + return self.device != torch.device("cpu") + + def mse(self, pred, label): + loss = (pred.float() - label.float()) ** 2 + return torch.mean(loss) + + def loss_fn(self, pred, label): + mask = ~torch.isnan(label) + + if self.loss == "mse": + return self.mse(pred[mask], label[mask]) + + raise ValueError("unknown loss `%s`" % self.loss) + + def metric_fn(self, pred, label): + + mask = torch.isfinite(label) + + if self.metric == "" or self.metric == "loss": + return -self.loss_fn(pred[mask], label[mask]) + + raise ValueError("unknown metric `%s`" % self.metric) + + def train_epoch(self, data_loader): + + self.model.train() + + for data in data_loader: + feature = data[:, :, 0:-1].to(self.device) + label = data[:, -1, -1].to(self.device) + + pred = self.model(feature.float()) # .float() + loss = self.loss_fn(pred, label) + + self.train_optimizer.zero_grad() + loss.backward() + torch.nn.utils.clip_grad_value_(self.model.parameters(), 3.0) + self.train_optimizer.step() + + def test_epoch(self, data_loader): + + self.model.eval() + + scores = [] + losses = [] + + for data in data_loader: + + feature = data[:, :, 0:-1].to(self.device) + label = data[:, -1, -1].to(self.device) + + with torch.no_grad(): + pred = self.model(feature.float()) # .float() + loss = self.loss_fn(pred, label) + losses.append(loss.item()) + + score = self.metric_fn(pred, label) + scores.append(score.item()) + + return np.mean(losses), np.mean(scores) + + def fit( + self, + dataset: DatasetH, + evals_result=dict(), + save_path=None, + ): + + dl_train = dataset.prepare("train", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L) + dl_valid = dataset.prepare("valid", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L) + + dl_train.config(fillna_type="ffill+bfill") # process nan brought by dataloader + dl_valid.config(fillna_type="ffill+bfill") # process nan brought by dataloader + + train_loader = DataLoader( + dl_train, batch_size=self.batch_size, shuffle=True, num_workers=self.n_jobs, drop_last=True + ) + valid_loader = DataLoader( + dl_valid, batch_size=self.batch_size, shuffle=False, num_workers=self.n_jobs, drop_last=True + ) + + save_path = get_or_create_path(save_path) + + stop_steps = 0 + train_loss = 0 + best_score = -np.inf + best_epoch = 0 + evals_result["train"] = [] + evals_result["valid"] = [] + + # train + self.logger.info("training...") + self.fitted = True + + for step in range(self.n_epochs): + self.logger.info("Epoch%d:", step) + self.logger.info("training...") + self.train_epoch(train_loader) + self.logger.info("evaluating...") + train_loss, train_score = self.test_epoch(train_loader) + val_loss, val_score = self.test_epoch(valid_loader) + self.logger.info("train %.6f, valid %.6f" % (train_score, val_score)) + evals_result["train"].append(train_score) + evals_result["valid"].append(val_score) + + if val_score > best_score: + best_score = val_score + stop_steps = 0 + best_epoch = step + best_param = copy.deepcopy(self.model.state_dict()) + else: + stop_steps += 1 + if stop_steps >= self.early_stop: + self.logger.info("early stop") + break + + self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch)) + self.model.load_state_dict(best_param) + torch.save(best_param, save_path) + + if self.use_gpu: + torch.cuda.empty_cache() + + def predict(self, dataset): + if not self.fitted: + raise ValueError("model is not fitted yet!") + + dl_test = dataset.prepare("test", col_set=["feature", "label"], data_key=DataHandlerLP.DK_I) + dl_test.config(fillna_type="ffill+bfill") + test_loader = DataLoader(dl_test, batch_size=self.batch_size, num_workers=self.n_jobs) + self.model.eval() + preds = [] + + for data in test_loader: + feature = data[:, :, 0:-1].to(self.device) + + with torch.no_grad(): + pred = self.model(feature.float()).detach().cpu().numpy() + + preds.append(pred) + + return pd.Series(np.concatenate(preds), index=dl_test.get_index()) + + +class PositionalEncoding(nn.Module): + def __init__(self, d_model, max_len=1000): + super(PositionalEncoding, self).__init__() + pe = torch.zeros(max_len, d_model) + position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) + div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) + pe[:, 0::2] = torch.sin(position * div_term) + pe[:, 1::2] = torch.cos(position * div_term) + pe = pe.unsqueeze(0).transpose(0, 1) + self.register_buffer("pe", pe) + + def forward(self, x): + # [T, N, F] + return x + self.pe[: x.size(0), :] + + +class Transformer(nn.Module): + def __init__(self, d_feat=6, d_model=8, nhead=4, num_layers=2, dropout=0.5, device=None): + super(Transformer, self).__init__() + self.feature_layer = nn.Linear(d_feat, d_model) + self.pos_encoder = PositionalEncoding(d_model) + self.encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dropout=dropout) + self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers) + self.decoder_layer = nn.Linear(d_model, 1) + self.device = device + self.d_feat = d_feat + + def forward(self, src): + # src [N, T, F], [512, 60, 6] + src = self.feature_layer(src) # [512, 60, 8] + + # src [N, T, F] --> [T, N, F], [60, 512, 8] + src = src.transpose(1, 0) # not batch first + + mask = None + + src = self.pos_encoder(src) + output = self.transformer_encoder(src, mask) # [60, 512, 8] + + # [T, N, F] --> [N, T*F] + output = self.decoder_layer(output.transpose(1, 0)[:, -1, :]) # [512, 1] + + return output.squeeze()