1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-06-06 05:51:17 +08:00

Add TRA Model

This commit is contained in:
linhx25
2021-06-25 16:12:50 +08:00
parent 4837ba8db3
commit 107e40f3ee
13 changed files with 2180 additions and 0 deletions

View File

@@ -0,0 +1,81 @@
# Learning Multiple Stock Trading Patterns with Temporal Routing Adaptor and Optimal Transport
This code provides a PyTorch implementation for TRA (Temporal Routing Adaptor), as described in the paper [Learning Multiple Stock Trading Patterns with Temporal Routing Adaptor and Optimal Transport](http://arxiv.org/abs/2106.12950).
* TRA (Temporal Routing Adaptor) is a lightweight module that consists of a set of independent predictors for learning multiple patterns as well as a router to dispatch samples to different predictors.
* We also design a learning algorithm based on Optimal Transport (OT) to obtain the optimal sample to predictor assignment and effectively optimize the router with such assignment through an auxiliary loss term.
# Running TRA
## Requirements
- Install `Qlib` main branch
## Running
We attach our running scripts for the paper in `run.sh`.
And here are two ways to run the model:
* Running from scripts with default parameters
You can directly run from Qlib command `qrun`:
```
qrun configs/config_alstm.yaml
```
* Running from code with self-defined parameters
Setting different parameters is also allowed. See codes in `example.py`:
```
python example.py --config_file configs/config_alstm.yaml
```
Here we trained TRA on a pretrained backbone model. Therefore we run `*_init.yaml` before TRA's scipts.
# Results
## Outputs
After running the scripts, you can find result files in path `./output`:
`info.json` - config settings and result metrics.
`log.csv` - running logs.
`model.bin` - the model parameter dictionary.
`pred.pkl` - the prediction scores and output for inference.
## Our Results
| Methods | MSE| MAE| IC | ICIR | AR | AV | SR | MDD |
|-------------------|-------------------|---------------------|--------------------|--------------------|--------------------|--------------------|--------------------|--------------------|
|Linear|0.163|0.327|0.020|0.132|-3.2%|16.8%|-0.191|32.1%|
|LightGBM|0.160(0.000)|0.323(0.000)|0.041|0.292|7.8%|15.5%|0.503|25.7%|
|MLP|0.160(0.002)|0.323(0.003)|0.037|0.273|3.7%|15.3%|0.264|26.2%|
|SFM|0.159(0.001) |0.321(0.001) |0.047 |0.381 |7.1% |14.3% |0.497 |22.9%|
|ALSTM|0.158(0.001) |0.320(0.001) |0.053 |0.419 |12.3% |13.7% |0.897 |20.2%|
|Trans.|0.158(0.001) |0.322(0.001) |0.051 |0.400 |14.5% |14.2% |1.028 |22.5%|
|ALSTM+TS|0.160(0.002) |0.321(0.002) |0.039 |0.291 |6.7% |14.6% |0.480|22.3%|
|Trans.+TS|0.160(0.004) |0.324(0.005) |0.037 |0.278 |10.4% |14.7% |0.722 |23.7%|
|ALSTM+TRA(Ours)|0.157(0.000) |0.318(0.000) |0.059 |0.460 |12.4% |14.0% |0.885 |20.4%|
|Trans.+TRA(Ours)|0.157(0.000) |0.320(0.000) |0.056 |0.442 |16.1% |14.2% |1.133 |23.1%|
A more detailed demo for our experiment results in the paper can be found in `Report.ipynb`.
# Common Issues
For help or issues using TRA, please submit a GitHub issue.
Sometimes we might encounter situation where the loss is `NaN`, please check the `epsilon` parameter in the sinkhorn algorithm, adjusting the `epsilon` according to input's scale is important.
# Citation
If you find this repository useful in your research, please cite:
```
@inproceedings{HengxuKDD2021,
author = {Hengxu Lin and Dong Zhou and Weiqing Liu and Jiang Bian},
title = {Learning Multiple Stock Trading Patterns with Temporal Routing Adaptor and Optimal Transport},
booktitle = {Proceedings of the 27th ACM SIGKDD Conference on Knowledge Discovery \& Data Mining},
series = {KDD '21},
year = {2021},
publisher = {ACM},
}
```

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,63 @@
qlib_init:
provider_uri: "~/.qlib/qlib_data/cn_data"
region: cn
data_loader_config: &data_loader_config
class: StaticDataLoader
module_path: qlib.data.dataset.loader
kwargs:
config:
feature: data/feature.pkl
label: data/label.pkl
model_config: &model_config
input_size: 16
hidden_size: 256
num_layers: 2
num_heads: 2
use_attn: True
dropout: 0.1
num_states: &num_states 1
tra_config: &tra_config
num_states: *num_states
hidden_size: 16
tau: 1.0
src_info: LR_TPE
task:
model:
class: TRAModel
module_path: src/model.py
kwargs:
lr: 0.0002
n_epochs: 500
max_steps_per_epoch: 100
early_stop: 20
seed: 1000
logdir: output/test/alstm
model_type: LSTM
model_config: *model_config
tra_config: *tra_config
lamb: 1.0
rho: 0.99
freeze_model: False
model_init_state:
dataset:
class: MTSDatasetH
module_path: src/dataset.py
kwargs:
handler:
class: DataHandler
module_path: qlib.data.dataset.handler
kwargs:
data_loader: *data_loader_config
segments:
train: [2007-10-30, 2016-05-27]
valid: [2016-09-26, 2018-05-29]
test: [2018-09-21, 2020-06-30]
seq_len: 60
horizon: 21
num_states: *num_states
batch_size: 1024

View File

@@ -0,0 +1,63 @@
qlib_init:
provider_uri: "~/.qlib/qlib_data/cn_data"
region: cn
data_loader_config: &data_loader_config
class: StaticDataLoader
module_path: qlib.data.dataset.loader
kwargs:
config:
feature: data/feature.pkl
label: data/label.pkl
model_config: &model_config
input_size: 16
hidden_size: 256
num_layers: 2
num_heads: 2
use_attn: True
dropout: 0.1
num_states: &num_states 10
tra_config: &tra_config
num_states: *num_states
hidden_size: 16
tau: 1.0
src_info: LR_TPE
task:
model:
class: TRAModel
module_path: src/model.py
kwargs:
lr: 0.0001
n_epochs: 500
max_steps_per_epoch: 100
early_stop: 20
seed: 1000
logdir: output/test/alstm_tra
model_type: LSTM
model_config: *model_config
tra_config: *tra_config
lamb: 2.0
rho: 0.99
freeze_model: True
model_init_state: output/test/alstm_tra_init/model.bin
dataset:
class: MTSDatasetH
module_path: src/dataset.py
kwargs:
handler:
class: DataHandler
module_path: qlib.data.dataset.handler
kwargs:
data_loader: *data_loader_config
segments:
train: [2007-10-30, 2016-05-27]
valid: [2016-09-26, 2018-05-29]
test: [2018-09-21, 2020-06-30]
seq_len: 60
horizon: 21
num_states: *num_states
batch_size: 1024

View File

@@ -0,0 +1,63 @@
qlib_init:
provider_uri: "~/.qlib/qlib_data/cn_data"
region: cn
data_loader_config: &data_loader_config
class: StaticDataLoader
module_path: qlib.data.dataset.loader
kwargs:
config:
feature: data/feature.pkl
label: data/label.pkl
model_config: &model_config
input_size: 16
hidden_size: 256
num_layers: 2
num_heads: 2
use_attn: True
dropout: 0.1
num_states: &num_states 3
tra_config: &tra_config
num_states: *num_states
hidden_size: 16
tau: 1.0
src_info: LR_TPE
task:
model:
class: TRAModel
module_path: src/model.py
kwargs:
lr: 0.0002
n_epochs: 500
max_steps_per_epoch: 100
early_stop: 20
seed: 1000
logdir: output/test/alstm_tra_init
model_type: LSTM
model_config: *model_config
tra_config: *tra_config
lamb: 1.0
rho: 0.99
freeze_model: False
model_init_state:
dataset:
class: MTSDatasetH
module_path: src/dataset.py
kwargs:
handler:
class: DataHandler
module_path: qlib.data.dataset.handler
kwargs:
data_loader: *data_loader_config
segments:
train: [2007-10-30, 2016-05-27]
valid: [2016-09-26, 2018-05-29]
test: [2018-09-21, 2020-06-30]
seq_len: 60
horizon: 21
num_states: *num_states
batch_size: 512

View File

@@ -0,0 +1,63 @@
qlib_init:
provider_uri: "~/.qlib/qlib_data/cn_data"
region: cn
data_loader_config: &data_loader_config
class: StaticDataLoader
module_path: qlib.data.dataset.loader
kwargs:
config:
feature: data/feature.pkl
label: data/label.pkl
model_config: &model_config
input_size: 16
hidden_size: 64
num_layers: 2
num_heads: 4
use_attn: False
dropout: 0.1
num_states: &num_states 1
tra_config: &tra_config
num_states: *num_states
hidden_size: 16
tau: 1.0
src_info: LR_TPE
task:
model:
class: TRAModel
module_path: src/model.py
kwargs:
lr: 0.0002
n_epochs: 500
max_steps_per_epoch: 100
early_stop: 20
seed: 1000
logdir: output/test/transformer
model_type: Transformer
model_config: *model_config
tra_config: *tra_config
lamb: 1.0
rho: 0.99
freeze_model: False
model_init_state:
dataset:
class: MTSDatasetH
module_path: src/dataset.py
kwargs:
handler:
class: DataHandler
module_path: qlib.data.dataset.handler
kwargs:
data_loader: *data_loader_config
segments:
train: [2007-10-30, 2016-05-27]
valid: [2016-09-26, 2018-05-29]
test: [2018-09-21, 2020-06-30]
seq_len: 60
horizon: 21
num_states: *num_states
batch_size: 1024

View File

@@ -0,0 +1,63 @@
qlib_init:
provider_uri: "~/.qlib/qlib_data/cn_data"
region: cn
data_loader_config: &data_loader_config
class: StaticDataLoader
module_path: qlib.data.dataset.loader
kwargs:
config:
feature: data/feature.pkl
label: data/label.pkl
model_config: &model_config
input_size: 16
hidden_size: 64
num_layers: 2
num_heads: 4
use_attn: False
dropout: 0.1
num_states: &num_states 3
tra_config: &tra_config
num_states: *num_states
hidden_size: 16
tau: 1.0
src_info: LR_TPE
task:
model:
class: TRAModel
module_path: src/model.py
kwargs:
lr: 0.0005
n_epochs: 500
max_steps_per_epoch: 100
early_stop: 20
seed: 1000
logdir: output/test/transformer_tra
model_type: Transformer
model_config: *model_config
tra_config: *tra_config
lamb: 1.0
rho: 0.99
freeze_model: True
model_init_state: output/test/transformer_tra_init/model.bin
dataset:
class: MTSDatasetH
module_path: src/dataset.py
kwargs:
handler:
class: DataHandler
module_path: qlib.data.dataset.handler
kwargs:
data_loader: *data_loader_config
segments:
train: [2007-10-30, 2016-05-27]
valid: [2016-09-26, 2018-05-29]
test: [2018-09-21, 2020-06-30]
seq_len: 60
horizon: 21
num_states: *num_states
batch_size: 512

View File

@@ -0,0 +1,63 @@
qlib_init:
provider_uri: "~/.qlib/qlib_data/cn_data"
region: cn
data_loader_config: &data_loader_config
class: StaticDataLoader
module_path: qlib.data.dataset.loader
kwargs:
config:
feature: data/feature.pkl
label: data/label.pkl
model_config: &model_config
input_size: 16
hidden_size: 64
num_layers: 2
num_heads: 4
use_attn: False
dropout: 0.1
num_states: &num_states 3
tra_config: &tra_config
num_states: *num_states
hidden_size: 16
tau: 1.0
src_info: LR_TPE
task:
model:
class: TRAModel
module_path: src/model.py
kwargs:
lr: 0.0002
n_epochs: 500
max_steps_per_epoch: 100
early_stop: 20
seed: 1000
logdir: output/test/transformer_tra_init
model_type: Transformer
model_config: *model_config
tra_config: *tra_config
lamb: 1.0
rho: 0.99
freeze_model: False
model_init_state:
dataset:
class: MTSDatasetH
module_path: src/dataset.py
kwargs:
handler:
class: DataHandler
module_path: qlib.data.dataset.handler
kwargs:
data_loader: *data_loader_config
segments:
train: [2007-10-30, 2016-05-27]
valid: [2016-09-26, 2018-05-29]
test: [2018-09-21, 2020-06-30]
seq_len: 60
horizon: 21
num_states: *num_states
batch_size: 512

View File

@@ -0,0 +1 @@
Data Link: https://drive.google.com/drive/folders/1fMqZYSeLyrHiWmVzygeI4sw3vp5Gt8cY?usp=sharing

View File

@@ -0,0 +1,39 @@
import argparse
import qlib
import ruamel.yaml as yaml
from qlib.utils import init_instance_by_config
def main(seed, config_file="configs/config_alstm.yaml"):
# set random seed
with open(config_file) as f:
config = yaml.safe_load(f)
# seed_suffix = "/seed1000" if "init" in config_file else f"/seed{seed}"
seed_suffix = ""
config["task"]["model"]["kwargs"].update(
{"seed": seed, "logdir": config["task"]["model"]["kwargs"]["logdir"] + seed_suffix}
)
# initialize workflow
qlib.init(
provider_uri=config["qlib_init"]["provider_uri"],
region=config["qlib_init"]["region"],
)
dataset = init_instance_by_config(config["task"]["dataset"])
model = init_instance_by_config(config["task"]["model"])
# train model
model.fit(dataset)
if __name__ == "__main__":
# set params from cmd
parser = argparse.ArgumentParser(allow_abbrev=False)
parser.add_argument("--seed", type=int, default=1000, help="random seed")
parser.add_argument("--config_file", type=str, default="configs/config_alstm.yaml", help="config file")
args = parser.parse_args()
main(**vars(args))

View File

@@ -0,0 +1,29 @@
#!/bin/bash
# we used random seed(1 1000 2000 3000 4000 5000) in our experiments
# Directly run from Qlib command `qrun`
qrun configs/config_alstm.yaml
qrun configs/config_transformer.yaml
qrun configs/config_transformer_tra_init.yaml
qrun configs/config_transformer_tra.yaml
qrun configs/config_alstm_tra_init.yaml
qrun configs/config_alstm_tra.yaml
# Or setting different parameters with example.py
python example.py --config_file configs/config_alstm.yaml
python example.py --config_file configs/config_transformer.yaml
python example.py --config_file configs/config_transformer_tra_init.yaml
python example.py --config_file configs/config_transformer_tra.yaml
python example.py --config_file configs/config_alstm_tra_init.yaml
python example.py --config_file configs/config_alstm_tra.yaml

View File

@@ -0,0 +1,253 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import copy
import torch
import numpy as np
import pandas as pd
from qlib.utils import init_instance_by_config
from qlib.data.dataset import DatasetH, DataHandler
device = "cuda" if torch.cuda.is_available() else "cpu"
def _to_tensor(x):
if not isinstance(x, torch.Tensor):
return torch.tensor(x, dtype=torch.float, device=device)
return x
def _create_ts_slices(index, seq_len):
"""
create time series slices from pandas index
Args:
index (pd.MultiIndex): pandas multiindex with <instrument, datetime> order
seq_len (int): sequence length
"""
assert index.is_lexsorted(), "index should be sorted"
# number of dates for each code
sample_count_by_codes = pd.Series(0, index=index).groupby(level=0).size().values
# start_index for each code
start_index_of_codes = np.roll(np.cumsum(sample_count_by_codes), 1)
start_index_of_codes[0] = 0
# all the [start, stop) indices of features
# features btw [start, stop) are used to predict the `stop - 1` label
slices = []
for cur_loc, cur_cnt in zip(start_index_of_codes, sample_count_by_codes):
for stop in range(1, cur_cnt + 1):
end = cur_loc + stop
start = max(end - seq_len, 0)
slices.append(slice(start, end))
slices = np.array(slices)
return slices
def _get_date_parse_fn(target):
"""get date parse function
This method is used to parse date arguments as target type.
Example:
get_date_parse_fn('20120101')('2017-01-01') => '20170101'
get_date_parse_fn(20120101)('2017-01-01') => 20170101
"""
if isinstance(target, pd.Timestamp):
_fn = lambda x: pd.Timestamp(x) # Timestamp('2020-01-01')
elif isinstance(target, str) and len(target) == 8:
_fn = lambda x: str(x).replace("-", "")[:8] # '20200201'
elif isinstance(target, int):
_fn = lambda x: int(str(x).replace("-", "")[:8]) # 20200201
else:
_fn = lambda x: x
return _fn
class MTSDatasetH(DatasetH):
"""Memory Augmented Time Series Dataset
Args:
handler (DataHandler): data handler
segments (dict): data split segments
seq_len (int): time series sequence length
horizon (int): label horizon (to mask historical loss for TRA)
num_states (int): how many memory states to be added (for TRA)
batch_size (int): batch size (<0 means daily batch)
shuffle (bool): whether shuffle data
pin_memory (bool): whether pin data to gpu memory
drop_last (bool): whether drop last batch < batch_size
"""
def __init__(
self,
handler,
segments,
seq_len=60,
horizon=0,
num_states=1,
batch_size=-1,
shuffle=True,
pin_memory=False,
drop_last=False,
**kwargs
):
assert horizon > 0, "please specify `horizon` to avoid data leakage"
self.seq_len = seq_len
self.horizon = horizon
self.num_states = num_states
self.batch_size = batch_size
self.shuffle = shuffle
self.drop_last = drop_last
self.pin_memory = pin_memory
self.params = (batch_size, drop_last, shuffle) # for train/eval switch
super().__init__(handler, segments, **kwargs)
def setup_data(self, handler_kwargs: dict = None, **kwargs):
super().setup_data()
# change index to <code, date>
# NOTE: we will use inplace sort to reduce memory use
df = self.handler._data
df.index = df.index.swaplevel()
df.sort_index(inplace=True)
self._data = df["feature"].values.astype("float32")
self._label = df["label"].squeeze().astype("float32")
self._index = df.index
# add memory to feature
self._data = np.c_[self._data, np.zeros((len(self._data), self.num_states), dtype=np.float32)]
# padding tensor
self.zeros = np.zeros((self.seq_len, self._data.shape[1]), dtype=np.float32)
# pin memory
if self.pin_memory:
self._data = _to_tensor(self._data)
self._label = _to_tensor(self._label)
self.zeros = _to_tensor(self.zeros)
# create batch slices
self.batch_slices = _create_ts_slices(self._index, self.seq_len)
# create daily slices
index = [slc.stop - 1 for slc in self.batch_slices]
act_index = self.restore_index(index)
daily_slices = {date: [] for date in sorted(act_index.unique(level=1))}
for i, (code, date) in enumerate(act_index):
daily_slices[date].append(self.batch_slices[i])
self.daily_slices = list(daily_slices.values())
def _prepare_seg(self, slc, **kwargs):
fn = _get_date_parse_fn(self._index[0][1])
start_date = fn(slc.start)
end_date = fn(slc.stop)
obj = copy.copy(self) # shallow copy
# NOTE: Seriable will disable copy `self._data` so we manually assign them here
obj._data = self._data
obj._label = self._label
obj._index = self._index
new_batch_slices = []
for batch_slc in self.batch_slices:
date = self._index[batch_slc.stop - 1][1]
if start_date <= date <= end_date:
new_batch_slices.append(batch_slc)
obj.batch_slices = np.array(new_batch_slices)
new_daily_slices = []
for daily_slc in self.daily_slices:
date = self._index[daily_slc[0].stop - 1][1]
if start_date <= date <= end_date:
new_daily_slices.append(daily_slc)
obj.daily_slices = new_daily_slices
return obj
def restore_index(self, index):
if isinstance(index, torch.Tensor):
index = index.cpu().numpy()
return self._index[index]
def assign_data(self, index, vals):
if isinstance(self._data, torch.Tensor):
vals = _to_tensor(vals)
elif isinstance(vals, torch.Tensor):
vals = vals.detach().cpu().numpy()
index = index.detach().cpu().numpy()
self._data[index, -self.num_states :] = vals
def clear_memory(self):
self._data[:, -self.num_states :] = 0
# TODO: better train/eval mode design
def train(self):
"""enable traning mode"""
self.batch_size, self.drop_last, self.shuffle = self.params
def eval(self):
"""enable evaluation mode"""
self.batch_size = -1
self.drop_last = False
self.shuffle = False
def _get_slices(self):
if self.batch_size < 0:
slices = self.daily_slices.copy()
batch_size = -1 * self.batch_size
else:
slices = self.batch_slices.copy()
batch_size = self.batch_size
return slices, batch_size
def __len__(self):
slices, batch_size = self._get_slices()
if self.drop_last:
return len(slices) // batch_size
return (len(slices) + batch_size - 1) // batch_size
def __iter__(self):
slices, batch_size = self._get_slices()
if self.shuffle:
np.random.shuffle(slices)
for i in range(len(slices))[::batch_size]:
if self.drop_last and i + batch_size > len(slices):
break
# get slices for this batch
slices_subset = slices[i : i + batch_size]
if self.batch_size < 0:
slices_subset = np.concatenate(slices_subset)
# collect data
data = []
label = []
index = []
for slc in slices_subset:
_data = self._data[slc].clone() if self.pin_memory else self._data[slc].copy()
if len(_data) != self.seq_len:
if self.pin_memory:
_data = torch.cat([self.zeros[: self.seq_len - len(_data)], _data], axis=0)
else:
_data = np.concatenate([self.zeros[: self.seq_len - len(_data)], _data], axis=0)
if self.num_states > 0:
_data[-self.horizon :, -self.num_states :] = 0
data.append(_data)
label.append(self._label[slc.stop - 1])
index.append(slc.stop - 1)
# concate
index = torch.tensor(index, device=device)
if isinstance(data[0], torch.Tensor):
data = torch.stack(data)
label = torch.stack(label)
else:
data = _to_tensor(np.stack(data))
label = _to_tensor(np.stack(label))
# yield -> generator
yield {"data": data, "label": label, "index": index}

View File

@@ -0,0 +1,603 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import os
import copy
import math
import json
import collections
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from tqdm import tqdm
from qlib.utils import get_or_create_path
from qlib.log import get_module_logger
from qlib.model.base import Model
device = "cuda" if torch.cuda.is_available() else "cpu"
class TRAModel(Model):
def __init__(
self,
model_config,
tra_config,
model_type="LSTM",
lr=1e-3,
n_epochs=500,
early_stop=50,
smooth_steps=5,
max_steps_per_epoch=None,
freeze_model=False,
model_init_state=None,
lamb=0.0,
rho=0.99,
seed=0,
logdir=None,
eval_train=True,
eval_test=False,
avg_params=True,
**kwargs,
):
np.random.seed(seed)
torch.manual_seed(seed)
self.logger = get_module_logger("TRA")
self.logger.info("TRA Model...")
self.model = eval(model_type)(**model_config).to(device)
if model_init_state:
self.model.load_state_dict(torch.load(model_init_state, map_location="cpu")["model"])
if freeze_model:
for param in self.model.parameters():
param.requires_grad_(False)
else:
self.logger.info("# model params: %d" % sum([p.numel() for p in self.model.parameters()]))
self.tra = TRA(self.model.output_size, **tra_config).to(device)
self.logger.info("# tra params: %d" % sum([p.numel() for p in self.tra.parameters()]))
self.optimizer = optim.Adam(list(self.model.parameters()) + list(self.tra.parameters()), lr=lr)
self.model_config = model_config
self.tra_config = tra_config
self.lr = lr
self.n_epochs = n_epochs
self.early_stop = early_stop
self.smooth_steps = smooth_steps
self.max_steps_per_epoch = max_steps_per_epoch
self.lamb = lamb
self.rho = rho
self.seed = seed
self.logdir = logdir
self.eval_train = eval_train
self.eval_test = eval_test
self.avg_params = avg_params
if self.tra.num_states > 1 and not self.eval_train:
self.logger.warn("`eval_train` will be ignored when using TRA")
if self.logdir is not None:
if os.path.exists(self.logdir):
self.logger.warn(f"logdir {self.logdir} is not empty")
os.makedirs(self.logdir, exist_ok=True)
self.fitted = False
self.global_step = -1
def train_epoch(self, data_set):
self.model.train()
self.tra.train()
data_set.train()
max_steps = self.n_epochs
if self.max_steps_per_epoch is not None:
max_steps = min(self.max_steps_per_epoch, self.n_epochs)
count = 0
total_loss = 0
total_count = 0
for batch in tqdm(data_set, total=max_steps):
count += 1
if count > max_steps:
break
self.global_step += 1
data, label, index = batch["data"], batch["label"], batch["index"]
feature = data[:, :, : -self.tra.num_states]
hist_loss = data[:, : -data_set.horizon, -self.tra.num_states :]
hidden = self.model(feature)
pred, all_preds, prob = self.tra(hidden, hist_loss)
loss = (pred - label).pow(2).mean()
L = (all_preds.detach() - label[:, None]).pow(2)
L -= L.min(dim=-1, keepdim=True).values # normalize & ensure postive input
data_set.assign_data(index, L) # save loss to memory
if prob is not None:
P = sinkhorn(-L, epsilon=0.01) # sample assignment matrix
lamb = self.lamb * (self.rho ** self.global_step)
reg = prob.log().mul(P).sum(dim=-1).mean()
loss = loss - lamb * reg
loss.backward()
self.optimizer.step()
self.optimizer.zero_grad()
total_loss += loss.item()
total_count += len(pred)
total_loss /= total_count
return total_loss
def test_epoch(self, data_set, return_pred=False):
self.model.eval()
self.tra.eval()
data_set.eval()
preds = []
metrics = []
for batch in tqdm(data_set):
data, label, index = batch["data"], batch["label"], batch["index"]
feature = data[:, :, : -self.tra.num_states]
hist_loss = data[:, : -data_set.horizon, -self.tra.num_states :]
with torch.no_grad():
hidden = self.model(feature)
pred, all_preds, prob = self.tra(hidden, hist_loss)
L = (all_preds - label[:, None]).pow(2)
L -= L.min(dim=-1, keepdim=True).values # normalize & ensure postive input
data_set.assign_data(index, L) # save loss to memory
X = np.c_[
pred.cpu().numpy(),
label.cpu().numpy(),
]
columns = ["score", "label"]
if prob is not None:
X = np.c_[X, all_preds.cpu().numpy(), prob.cpu().numpy()]
columns += ["score_%d" % d for d in range(all_preds.shape[1])] + [
"prob_%d" % d for d in range(all_preds.shape[1])
]
pred = pd.DataFrame(X, index=index.cpu().numpy(), columns=columns)
metrics.append(evaluate(pred))
if return_pred:
preds.append(pred)
metrics = pd.DataFrame(metrics)
metrics = {
"MSE": metrics.MSE.mean(),
"MAE": metrics.MAE.mean(),
"IC": metrics.IC.mean(),
"ICIR": metrics.IC.mean() / metrics.IC.std(),
}
if return_pred:
preds = pd.concat(preds, axis=0)
preds.index = data_set.restore_index(preds.index)
preds.index = preds.index.swaplevel()
preds.sort_index(inplace=True)
return metrics, preds
def fit(self, dataset, evals_result=dict()):
train_set, valid_set, test_set = dataset.prepare(["train", "valid", "test"])
best_score = -1
best_epoch = 0
stop_rounds = 0
best_params = {
"model": copy.deepcopy(self.model.state_dict()),
"tra": copy.deepcopy(self.tra.state_dict()),
}
params_list = {
"model": collections.deque(maxlen=self.smooth_steps),
"tra": collections.deque(maxlen=self.smooth_steps),
}
evals_result["train"] = []
evals_result["valid"] = []
evals_result["test"] = []
# train
self.fitted = True
self.global_step = -1
if self.tra.num_states > 1:
self.logger.info("init memory...")
self.test_epoch(train_set)
for epoch in range(self.n_epochs):
self.logger.info("Epoch %d:", epoch)
self.logger.info("training...")
self.train_epoch(train_set)
self.logger.info("evaluating...")
# average params for inference
params_list["model"].append(copy.deepcopy(self.model.state_dict()))
params_list["tra"].append(copy.deepcopy(self.tra.state_dict()))
self.model.load_state_dict(average_params(params_list["model"]))
self.tra.load_state_dict(average_params(params_list["tra"]))
# NOTE: during evaluating, the whole memory will be refreshed
if self.tra.num_states > 1 or self.eval_train:
train_set.clear_memory() # NOTE: clear the shared memory
train_metrics = self.test_epoch(train_set)[0]
evals_result["train"].append(train_metrics)
self.logger.info("\ttrain metrics: %s" % train_metrics)
valid_metrics = self.test_epoch(valid_set)[0]
evals_result["valid"].append(valid_metrics)
self.logger.info("\tvalid metrics: %s" % valid_metrics)
if self.eval_test:
test_metrics = self.test_epoch(test_set)[0]
evals_result["test"].append(test_metrics)
self.logger.info("\ttest metrics: %s" % test_metrics)
if valid_metrics["IC"] > best_score:
best_score = valid_metrics["IC"]
stop_rounds = 0
best_epoch = epoch
best_params = {
"model": copy.deepcopy(self.model.state_dict()),
"tra": copy.deepcopy(self.tra.state_dict()),
}
else:
stop_rounds += 1
if stop_rounds >= self.early_stop:
self.logger.info("early stop @ %s" % epoch)
break
# restore parameters
self.model.load_state_dict(params_list["model"][-1])
self.tra.load_state_dict(params_list["tra"][-1])
self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch))
self.model.load_state_dict(best_params["model"])
self.tra.load_state_dict(best_params["tra"])
metrics, preds = self.test_epoch(test_set, return_pred=True)
self.logger.info("test metrics: %s" % metrics)
if self.logdir:
self.logger.info("save model & pred to local directory")
pd.concat({name: pd.DataFrame(evals_result[name]) for name in evals_result}, axis=1).to_csv(
self.logdir + "/logs.csv", index=False
)
torch.save(best_params, self.logdir + "/model.bin")
preds.to_pickle(self.logdir + "/pred.pkl")
info = {
"config": {
"model_config": self.model_config,
"tra_config": self.tra_config,
"lr": self.lr,
"n_epochs": self.n_epochs,
"early_stop": self.early_stop,
"smooth_steps": self.smooth_steps,
"max_steps_per_epoch": self.max_steps_per_epoch,
"lamb": self.lamb,
"rho": self.rho,
"seed": self.seed,
"logdir": self.logdir,
},
"best_eval_metric": -best_score, # NOTE: minux -1 for minimize
"metric": metrics,
}
with open(self.logdir + "/info.json", "w") as f:
json.dump(info, f)
def predict(self, dataset, segment="test"):
if not self.fitted:
raise ValueError("model is not fitted yet!")
test_set = dataset.prepare(segment)
metrics, preds = self.test_epoch(test_set, return_pred=True)
self.logger.info("test metrics: %s" % metrics)
return preds
class LSTM(nn.Module):
"""LSTM Model
Args:
input_size (int): input size (# features)
hidden_size (int): hidden size
num_layers (int): number of hidden layers
use_attn (bool): whether use attention layer.
we use concat attention as https://github.com/fulifeng/Adv-ALSTM/
dropout (float): dropout rate
input_drop (float): input dropout for data augmentation
noise_level (float): add gaussian noise to input for data augmentation
"""
def __init__(
self,
input_size=16,
hidden_size=64,
num_layers=2,
use_attn=True,
dropout=0.0,
input_drop=0.0,
noise_level=0.0,
*args,
**kwargs,
):
super().__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.use_attn = use_attn
self.noise_level = noise_level
self.input_drop = nn.Dropout(input_drop)
self.rnn = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
batch_first=True,
dropout=dropout,
)
if self.use_attn:
self.W = nn.Linear(hidden_size, hidden_size)
self.u = nn.Linear(hidden_size, 1, bias=False)
self.output_size = hidden_size * 2
else:
self.output_size = hidden_size
def forward(self, x):
x = self.input_drop(x)
if self.training and self.noise_level > 0:
noise = torch.randn_like(x).to(x)
x = x + noise * self.noise_level
rnn_out, _ = self.rnn(x)
last_out = rnn_out[:, -1]
if self.use_attn:
laten = self.W(rnn_out).tanh()
scores = self.u(laten).softmax(dim=1)
att_out = (rnn_out * scores).sum(dim=1).squeeze()
last_out = torch.cat([last_out, att_out], dim=1)
return last_out
class PositionalEncoding(nn.Module):
# reference: https://pytorch.org/tutorials/beginner/transformer_tutorial.html
def __init__(self, d_model, dropout=0.1, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer("pe", pe)
def forward(self, x):
x = x + self.pe[: x.size(0), :]
return self.dropout(x)
class Transformer(nn.Module):
"""Transformer Model
Args:
input_size (int): input size (# features)
hidden_size (int): hidden size
num_layers (int): number of transformer layers
num_heads (int): number of heads in transformer
dropout (float): dropout rate
input_drop (float): input dropout for data augmentation
noise_level (float): add gaussian noise to input for data augmentation
"""
def __init__(
self,
input_size=16,
hidden_size=64,
num_layers=2,
num_heads=2,
dropout=0.0,
input_drop=0.0,
noise_level=0.0,
**kwargs,
):
super().__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.num_heads = num_heads
self.noise_level = noise_level
self.input_drop = nn.Dropout(input_drop)
self.input_proj = nn.Linear(input_size, hidden_size)
self.pe = PositionalEncoding(input_size, dropout)
layer = nn.TransformerEncoderLayer(
nhead=num_heads, dropout=dropout, d_model=hidden_size, dim_feedforward=hidden_size * 4
)
self.encoder = nn.TransformerEncoder(layer, num_layers=num_layers)
self.output_size = hidden_size
def forward(self, x):
x = self.input_drop(x)
if self.training and self.noise_level > 0:
noise = torch.randn_like(x).to(x)
x = x + noise * self.noise_level
x = x.permute(1, 0, 2).contiguous() # the first dim need to be sequence
x = self.pe(x)
x = self.input_proj(x)
out = self.encoder(x)
return out[-1]
class TRA(nn.Module):
"""Temporal Routing Adaptor (TRA)
TRA takes historical prediction erros & latent representation as inputs,
then routes the input sample to a specific predictor for training & inference.
Args:
input_size (int): input size (RNN/Transformer's hidden size)
num_states (int): number of latent states (i.e., trading patterns)
If `num_states=1`, then TRA falls back to traditional methods
hidden_size (int): hidden size of the router
tau (float): gumbel softmax temperature
"""
def __init__(self, input_size, num_states=1, hidden_size=8, tau=1.0, src_info="LR_TPE"):
super().__init__()
self.num_states = num_states
self.tau = tau
self.src_info = src_info
if num_states > 1:
self.router = nn.LSTM(
input_size=num_states,
hidden_size=hidden_size,
num_layers=1,
batch_first=True,
)
self.fc = nn.Linear(hidden_size + input_size, num_states)
self.predictors = nn.Linear(input_size, num_states)
def forward(self, hidden, hist_loss):
preds = self.predictors(hidden)
if self.num_states == 1:
return preds.squeeze(-1), preds, None
# information type
router_out, _ = self.router(hist_loss)
if "LR" in self.src_info:
latent_representation = hidden
else:
latent_representation = torch.randn(hidden.shape).to(hidden)
if "TPE" in self.src_info:
temporal_pred_error = router_out[:, -1]
else:
temporal_pred_error = torch.randn(router_out[:, -1].shape).to(hidden)
out = self.fc(torch.cat([temporal_pred_error, latent_representation], dim=-1))
prob = F.gumbel_softmax(out, dim=-1, tau=self.tau, hard=False)
if self.training:
final_pred = (preds * prob).sum(dim=-1)
else:
final_pred = preds[range(len(preds)), prob.argmax(dim=-1)]
return final_pred, preds, prob
def evaluate(pred):
pred = pred.rank(pct=True) # transform into percentiles
score = pred.score
label = pred.label
diff = score - label
MSE = (diff ** 2).mean()
MAE = (diff.abs()).mean()
IC = score.corr(label)
return {"MSE": MSE, "MAE": MAE, "IC": IC}
def average_params(params_list):
assert isinstance(params_list, (tuple, list, collections.deque))
n = len(params_list)
if n == 1:
return params_list[0]
new_params = collections.OrderedDict()
keys = None
for i, params in enumerate(params_list):
if keys is None:
keys = params.keys()
for k, v in params.items():
if k not in keys:
raise ValueError("the %d-th model has different params" % i)
if k not in new_params:
new_params[k] = v / n
else:
new_params[k] += v / n
return new_params
def shoot_infs(inp_tensor):
"""Replaces inf by maximum of tensor"""
mask_inf = torch.isinf(inp_tensor)
ind_inf = torch.nonzero(mask_inf, as_tuple=False)
if len(ind_inf) > 0:
for ind in ind_inf:
if len(ind) == 2:
inp_tensor[ind[0], ind[1]] = 0
elif len(ind) == 1:
inp_tensor[ind[0]] = 0
m = torch.max(inp_tensor)
for ind in ind_inf:
if len(ind) == 2:
inp_tensor[ind[0], ind[1]] = m
elif len(ind) == 1:
inp_tensor[ind[0]] = m
return inp_tensor
def sinkhorn(Q, n_iters=3, epsilon=0.01):
# epsilon should be adjusted according to logits value's scale
with torch.no_grad():
Q = shoot_infs(Q)
Q = torch.exp(Q / epsilon)
for i in range(n_iters):
Q /= Q.sum(dim=0, keepdim=True)
Q /= Q.sum(dim=1, keepdim=True)
return Q