diff --git a/README.md b/README.md index 4ebb70466..fd20e7c3a 100644 --- a/README.md +++ b/README.md @@ -45,13 +45,11 @@ For more details, please refer to our paper ["Qlib: An AI-oriented Quantitative At the module level, Qlib is a platform that consists of the above components. The components are designed as loose-coupled modules and each component could be used stand-alone. -| Name | Description | -| ------ | ----- | -| `Data layer` | `DataServer` focuses on providing high-performance infrastructure for users to manage and retrieve raw data. `DataEnhancement` will preprocess the data and provide the best dataset to be fed into the models. | -| `Interday Model` | `Interday model` focuses on producing prediction scores (aka. _alpha_). Models are trained by `Model Creator` and managed by `Model Manager`. Users could choose one or multiple models for prediction. Multiple models could be combined with `Ensemble` module. | -| `Interday Strategy` | `Portfolio Generator` will take prediction scores as input and output the orders based on the current position to achieve the target portfolio. | -| `Intraday Trading` | `Order Executor` is responsible for executing orders output by `Interday Strategy` and returning the executed results. | -| `Analysis` | Users could get a detailed analysis report of forecasting signals and portfolios in this part. | +| Name | Description | +| ------ | ----- | +| `Infrastructure` layer | `Infrastructure` layer provides underlying support for Quant research. `DataServer` provides high-performance infrastructure for users to manage and retrieve raw data. `Trainer` provides flexible interface to control the training process of models which enable algorithms controlling the training process. | +| `Workflow` layer | `Workflow` layer covers the whole workflow of quantitative investment. `Information Extractor` extracts data for models. `Forecast Model` focuses on producing all kinds of forecast signals (e.g. _alpha_, risk) for other modules. With these signals `Portfolio Generator` will generate the target portfolio and produce orders to be executed by `Order Executor`. | +| `Interface` layer | `Interface` layer tries to present a user-friendly interface for the underlying system. `Analyser` module will provide users detailed analysis reports of forecasting signals, portfolios and execution results | * The modules with hand-drawn style are under development and will be released in the future. * The modules with dashed borders are highly user-customizable and extendible. diff --git a/docs/_static/img/framework.png b/docs/_static/img/framework.png index 92c6b6127..d8242f7c1 100644 Binary files a/docs/_static/img/framework.png and b/docs/_static/img/framework.png differ diff --git a/examples/benchmarks/GATs/workflow_config_gats.yaml b/examples/benchmarks/GATs/workflow_config_gats.yaml index 37bced99d..33aa0fe8d 100644 --- a/examples/benchmarks/GATs/workflow_config_gats.yaml +++ b/examples/benchmarks/GATs/workflow_config_gats.yaml @@ -36,7 +36,6 @@ task: n_epochs: 200 lr: 1e-3 early_stop: 20 - batch_size: 800 metric: loss loss: mse base_model: LSTM diff --git a/examples/benchmarks/HATS/worflow_config_hats.yaml b/examples/benchmarks/HATS/worflow_config_hats.yaml index d8fb55198..0abed6c62 100644 --- a/examples/benchmarks/HATS/worflow_config_hats.yaml +++ b/examples/benchmarks/HATS/worflow_config_hats.yaml @@ -36,7 +36,6 @@ task: n_epochs: 200 lr: 1e-3 early_stop: 20 - batch_size: 800 metric: IC loss: mse base_model: GRU diff --git a/examples/workflow_by_code_gats.py b/examples/workflow_by_code_gats.py index ac413932b..984c1755a 100644 --- a/examples/workflow_by_code_gats.py +++ b/examples/workflow_by_code_gats.py @@ -65,13 +65,12 @@ if __name__ == "__main__": "n_epochs": 200, "lr": 1e-3, "early_stop": 20, - "batch_size": 800, "metric": "loss", "loss": "mse", "base_model": "LSTM", "with_pretrain": True, "seed": 0, - "GPU": 0, + "GPU": "0", }, }, "dataset": { @@ -94,7 +93,6 @@ if __name__ == "__main__": # "record": ['SignalRecord', 'SigAnaRecord', 'PortAnaRecord'], } - # model = train_model(task) model = init_instance_by_config(task["model"]) dataset = init_instance_by_config(task["dataset"]) model.fit(dataset) diff --git a/examples/workflow_by_code_gru.py b/examples/workflow_by_code_gru.py index fdd0d9220..dece520d1 100644 --- a/examples/workflow_by_code_gru.py +++ b/examples/workflow_by_code_gru.py @@ -70,7 +70,7 @@ if __name__ == "__main__": "lr": 1e-3, "early_stop": 20, "batch_size": 800, - "metric": "IC", + "metric": "loss", "loss": "mse", "seed": 0, "GPU": 0, diff --git a/examples/workflow_by_code_hats.py b/examples/workflow_by_code_hats.py index 15e5ae130..192d97ee3 100644 --- a/examples/workflow_by_code_hats.py +++ b/examples/workflow_by_code_hats.py @@ -62,12 +62,11 @@ if __name__ == "__main__": "n_epochs": 200, "lr": 1e-3, "early_stop": 20, - "batch_size": 800, "metric": "IC", "loss": "mse", "base_model": "LSTM", "seed": 0, - "GPU": "0", + "GPU": "2", }, }, "dataset": { diff --git a/qlib/contrib/data/handler.py b/qlib/contrib/data/handler.py index 07ef2267a..5e6616b41 100644 --- a/qlib/contrib/data/handler.py +++ b/qlib/contrib/data/handler.py @@ -126,6 +126,7 @@ class ALPHA360(DataHandlerLP): learn_processors=_DEFAULT_LEARN_PROCESSORS, fit_start_time=None, fit_end_time=None, + **kwargs, ): infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time) learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time) @@ -135,7 +136,7 @@ class ALPHA360(DataHandlerLP): "kwargs": { "config": { "feature": self.get_feature_config(), - "label": self.get_label_config(), + "label": kwargs.get("label", self.get_label_config()), }, }, } @@ -206,6 +207,7 @@ class Alpha158(DataHandlerLP): learn_processors=_DEFAULT_LEARN_PROCESSORS, fit_start_time=None, fit_end_time=None, + **kwargs, ): infer_processors = check_transform_proc(infer_processors, fit_start_time, fit_end_time) learn_processors = check_transform_proc(learn_processors, fit_start_time, fit_end_time) @@ -213,7 +215,7 @@ class Alpha158(DataHandlerLP): data_loader = { "class": "QlibDataLoader", "kwargs": { - "config": {"feature": self.get_feature_config(), "label": self.get_label_config()}, + "config": {"feature": self.get_feature_config(), "label": kwargs.get("label", self.get_label_config())}, }, } super().__init__( diff --git a/qlib/contrib/model/pytorch_gats.py b/qlib/contrib/model/pytorch_gats.py index 72cd5c36f..5d2dbd9a4 100755 --- a/qlib/contrib/model/pytorch_gats.py +++ b/qlib/contrib/model/pytorch_gats.py @@ -9,10 +9,8 @@ import os import numpy as np import pandas as pd import copy -from sklearn.metrics import roc_auc_score, mean_squared_error -import logging -from ...utils import unpack_archive_with_buffer, save_multiple_parts_file, create_save_path, drop_nan_by_y_index -from ...log import get_module_logger, TimeInspector +from ...utils import create_save_path +from ...log import get_module_logger import torch import torch.nn as nn @@ -49,7 +47,6 @@ class GAT(Model): n_epochs=200, lr=0.001, metric="IC", - batch_size=2000, early_stop=20, loss="mse", base_model="GRU", @@ -71,7 +68,6 @@ class GAT(Model): self.n_epochs = n_epochs self.lr = lr self.metric = metric - self.batch_size = batch_size self.early_stop = early_stop self.optimizer = optimizer.lower() self.loss = loss @@ -90,7 +86,6 @@ class GAT(Model): "\nn_epochs : {}" "\nlr : {}" "\nmetric : {}" - "\nbatch_size : {}" "\nearly_stop : {}" "\noptimizer : {}" "\nloss_type : {}" @@ -106,7 +101,6 @@ class GAT(Model): n_epochs, lr, metric, - batch_size, early_stop, optimizer.lower(), loss, @@ -165,23 +159,31 @@ class GAT(Model): def cal_ic(self, pred, label): return torch.mean(pred * label) + def get_daily_inter(self, df, shuffle=False): + # organize the train data into daily inter as daily batches + daily_count = df.groupby(level=0).size().values + daily_index = np.roll(np.cumsum(daily_count), 1) + daily_index[0] = 0 + if shuffle: + # shuffle the daily inter data + daily_shuffle = list(zip(daily_index, daily_count)) + np.random.shuffle(daily_shuffle) + daily_index, daily_count = zip(*daily_shuffle) + return daily_index, daily_count + def train_epoch(self, x_train, y_train): x_train_values = x_train.values y_train_values = np.squeeze(y_train.values) * 100 - self.GAT_model.train() - indices = np.arange(len(x_train_values)) - np.random.shuffle(indices) + # organize the train data into daily inter as daily batches + daily_index, daily_count = self.get_daily_inter(x_train, shuffle=True) - for i in range(len(indices))[:: self.batch_size]: - - if len(indices) - i < self.batch_size: - break - - feature = torch.from_numpy(x_train_values[indices[i : i + self.batch_size]]).float() - label = torch.from_numpy(y_train_values[indices[i : i + self.batch_size]]).float() + for idx, count in zip(daily_index, daily_count): + batch = slice(idx, idx + count) + feature = torch.from_numpy(x_train_values[batch]).float() + label = torch.from_numpy(y_train_values[batch]).float() if self.use_gpu: feature = feature.cuda() @@ -206,15 +208,13 @@ class GAT(Model): scores = [] losses = [] - indices = np.arange(len(x_values)) + # organize the test data into daily inter as daily batches + daily_index, daily_count = self.get_daily_inter(data_x, shuffle=False) - for i in range(len(indices))[:: self.batch_size]: - - if len(indices) - i < self.batch_size: - break - - feature = torch.from_numpy(x_values[indices[i : i + self.batch_size]]).float() - label = torch.from_numpy(y_values[indices[i : i + self.batch_size]]).float() + for idx, count in zip(daily_index, daily_count): + batch = slice(idx, idx + count) + feature = torch.from_numpy(x_values[batch]).float() + label = torch.from_numpy(y_values[batch]).float() if self.use_gpu: feature = feature.cuda() @@ -247,7 +247,6 @@ class GAT(Model): if save_path == None: save_path = create_save_path(save_path) stop_steps = 0 - train_loss = 0 best_score = -np.inf best_epoch = 0 evals_result["train"] = [] @@ -314,17 +313,14 @@ class GAT(Model): index = x_test.index self.GAT_model.eval() x_values = x_test.values - sample_num = x_values.shape[0] preds = [] - for begin in range(sample_num)[:: self.batch_size]: + # organize the data into daily inter as daily batches + daily_index, daily_count = self.get_daily_inter(x_test, shuffle=False) - if sample_num - begin < self.batch_size: - end = sample_num - else: - end = begin + self.batch_size - - x_batch = torch.from_numpy(x_values[begin:end]).float() + for idx, count in zip(daily_index, daily_count): + batch = slice(idx, idx + count) + x_batch = torch.from_numpy(x_values[batch]).float() if self.use_gpu: x_batch = x_batch.cuda() diff --git a/qlib/contrib/model/pytorch_gru.py b/qlib/contrib/model/pytorch_gru.py index 2dd8464e2..02664b6ac 100755 --- a/qlib/contrib/model/pytorch_gru.py +++ b/qlib/contrib/model/pytorch_gru.py @@ -46,7 +46,7 @@ class GRU(Model): dropout=0.0, n_epochs=200, lr=0.001, - metric="IC", + metric="", batch_size=2000, early_stop=20, loss="mse", @@ -140,21 +140,16 @@ class GRU(Model): def metric_fn(self, pred, label): mask = torch.isfinite(label) - if self.metric == "IC": - return self.cal_ic(pred[mask], label[mask]) if self.metric == "" or self.metric == "loss": # use loss return -self.loss_fn(pred[mask], label[mask]) raise ValueError("unknown metric `%s`" % self.metric) - def cal_ic(self, pred, label): - return torch.mean(pred * label) - def train_epoch(self, x_train, y_train): x_train_values = x_train.values - y_train_values = np.squeeze(y_train.values) * 100 + y_train_values = np.squeeze(y_train.values) self.gru_model.train() @@ -193,7 +188,6 @@ class GRU(Model): losses = [] indices = np.arange(len(x_values)) - np.random.shuffle(indices) for i in range(len(indices))[:: self.batch_size]: diff --git a/qlib/contrib/model/pytorch_hats.py b/qlib/contrib/model/pytorch_hats.py index 05f89ced0..bdb68be28 100644 --- a/qlib/contrib/model/pytorch_hats.py +++ b/qlib/contrib/model/pytorch_hats.py @@ -54,7 +54,6 @@ class HATS(Model): n_epochs=200, lr=0.01, metric="IC", - batch_size=800, early_stop=20, loss="mse", base_model="GRU", @@ -76,7 +75,6 @@ class HATS(Model): self.n_epochs = n_epochs self.lr = lr self.metric = metric - self.batch_size = batch_size self.early_stop = early_stop self.optimizer = optimizer.lower() self.loss = loss @@ -95,7 +93,6 @@ class HATS(Model): "\nn_epochs : {}" "\nlr : {}" "\nmetric : {}" - "\nbatch_size : {}" "\nearly_stop : {}" "\noptimizer : {}" "\nloss_type : {}" @@ -111,7 +108,6 @@ class HATS(Model): n_epochs, lr, metric, - batch_size, early_stop, optimizer.lower(), loss, @@ -169,6 +165,18 @@ class HATS(Model): def cal_ic(self, pred, label): return torch.mean(pred * label) + def get_daily_inter(self, df, shuffle=False): + # organize the train data into daily inter as daily batches + daily_count = df.groupby(level=0).size().values + daily_index = np.roll(np.cumsum(daily_count), 1) + daily_index[0] = 0 + if shuffle: + # shuffle the daily inter data + daily_shuffle = list(zip(daily_index, daily_count)) + np.random.shuffle(daily_shuffle) + daily_index, daily_count = zip(*daily_shuffle) + return daily_index, daily_count + def train_epoch(self, x_train, y_train): x_train_values = x_train.values @@ -176,16 +184,13 @@ class HATS(Model): self.HATS_model.train() - indices = np.arange(len(x_train_values)) - np.random.shuffle(indices) + # organize the train data into daily inter as daily batches + daily_index, daily_count = self.get_daily_inter(x_train, shuffle=True) - for i in range(len(indices))[:: self.batch_size]: - - if len(indices) - i < self.batch_size: - break - - feature = torch.from_numpy(x_train_values[indices[i : i + self.batch_size]]).float() - label = torch.from_numpy(y_train_values[indices[i : i + self.batch_size]]).float() + for idx, count in zip(daily_index, daily_count): + batch = slice(idx, idx + count) + feature = torch.from_numpy(x_train_values[batch]).float() + label = torch.from_numpy(y_train_values[batch]).float() if self.use_gpu: feature = feature.cuda() @@ -210,15 +215,13 @@ class HATS(Model): scores = [] losses = [] - indices = np.arange(len(x_values)) + # organize the test data into daily inter as daily batches + daily_index, daily_count = self.get_daily_inter(data_x, shuffle=False) - for i in range(len(indices))[:: self.batch_size]: - - if len(indices) - i < self.batch_size: - break - - feature = torch.from_numpy(x_values[indices[i : i + self.batch_size]]).float() - label = torch.from_numpy(y_values[indices[i : i + self.batch_size]]).float() + for idx, count in zip(daily_index, daily_count): + batch = slice(idx, idx + count) + feature = torch.from_numpy(x_values[batch]).float() + label = torch.from_numpy(y_values[batch]).float() if self.use_gpu: feature = feature.cuda() @@ -319,14 +322,12 @@ class HATS(Model): sample_num = x_values.shape[0] preds = [] - for begin in range(sample_num)[:: self.batch_size]: + # organize the data into daily inter as daily batches + daily_index, daily_count = self.get_daily_inter(x_test, shuffle=False) - if sample_num - begin < self.batch_size: - end = sample_num - else: - end = begin + self.batch_size - - x_batch = torch.from_numpy(x_values[begin:end]).float() + for idx, count in zip(daily_index, daily_count): + batch = slice(idx, idx + count) + x_batch = torch.from_numpy(x_values[batch]).float() if self.use_gpu: x_batch = x_batch.cuda() diff --git a/qlib/model/trainer.py b/qlib/model/trainer.py new file mode 100644 index 000000000..e4fc8eef9 --- /dev/null +++ b/qlib/model/trainer.py @@ -0,0 +1,40 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +from qlib.utils import init_instance_by_config, flatten_dict +from qlib.workflow import R +from qlib.workflow.record_temp import SignalRecord + + +def task_train(config: dict, experiment_name): + """ + task based training + + Parameters + ---------- + config : dict + A dict describing the training process + """ + + # model initiaiton + model = init_instance_by_config(config.get("task")["model"]) + dataset = init_instance_by_config(config.get("task")["dataset"]) + + # start exp + with R.start(experiment_name=experiment_name): + # train model + R.log_params(**flatten_dict(config.get("task"))) + model.fit(dataset) + recorder = R.get_recorder() + + # generate records: prediction, backtest, and analysis + for record in config.get("task")["record"]: + if record["class"] == SignalRecord.__name__: + srconf = {"model": model, "dataset": dataset, "recorder": recorder} + record["kwargs"].update(srconf) + sr = init_instance_by_config(record) + sr.generate() + else: + rconf = {"recorder": recorder} + record["kwargs"].update(rconf) + ar = init_instance_by_config(record) + ar.generate() diff --git a/qlib/workflow/cli.py b/qlib/workflow/cli.py index 4c02f0592..b9c040e87 100644 --- a/qlib/workflow/cli.py +++ b/qlib/workflow/cli.py @@ -8,9 +8,7 @@ import qlib import fire import pandas as pd import ruamel.yaml as yaml -from qlib.utils import init_instance_by_config, flatten_dict -from qlib.workflow import R -from qlib.workflow.record_temp import SignalRecord +from ..model.trainer import task_train def get_path_list(path): @@ -54,29 +52,7 @@ def workflow(config_path, experiment_name="workflow"): region = config.get("region") qlib.init(provider_uri=provider_uri, region=region) - # model initiaiton - model = init_instance_by_config(config.get("task")["model"]) - dataset = init_instance_by_config(config.get("task")["dataset"]) - - # start exp - with R.start(experiment_name=experiment_name): - # train model - R.log_params(**flatten_dict(config.get("task"))) - model.fit(dataset) - recorder = R.get_recorder() - - # generate records: prediction, backtest, and analysis - for record in config.get("task")["record"]: - if record["class"] == SignalRecord.__name__: - srconf = {"model": model, "dataset": dataset, "recorder": recorder} - record["kwargs"].update(srconf) - sr = init_instance_by_config(record) - sr.generate() - else: - rconf = {"recorder": recorder} - record["kwargs"].update(rconf) - ar = init_instance_by_config(record) - ar.generate() + task_train(config, experiment_name=experiment_name) # function to run worklflow by config