From 0afe57f2fe2cac385951ad7cce1fc0066b7720f5 Mon Sep 17 00:00:00 2001 From: lwwang1995 Date: Mon, 16 Nov 2020 23:28:11 +0800 Subject: [PATCH] Update GRU model. --- examples/workflow_by_code_gru.py | 17 +- qlib/contrib/data/handler.py | 98 ++++++++-- qlib/contrib/model/pytorch_gru.py | 290 +++++++++++++++--------------- qlib/data/dataset/processor.py | 13 ++ 4 files changed, 258 insertions(+), 160 deletions(-) diff --git a/examples/workflow_by_code_gru.py b/examples/workflow_by_code_gru.py index 06fea1511..e55f0ae45 100755 --- a/examples/workflow_by_code_gru.py +++ b/examples/workflow_by_code_gru.py @@ -8,7 +8,7 @@ import qlib import pandas as pd from qlib.config import REG_CN from qlib.contrib.model.pytorch_gru import GRU -from qlib.contrib.data.handler import ALPHA360 +from qlib.contrib.data.handler import ALPHA360_Denoise from qlib.contrib.strategy.strategy import TopkDropoutStrategy from qlib.contrib.evaluate import ( backtest as normal_backtest, @@ -19,6 +19,7 @@ from qlib.utils import exists_qlib_data # from qlib.model.learner import train_model from qlib.utils import init_instance_by_config +import pickle if __name__ == "__main__": @@ -63,14 +64,13 @@ if __name__ == "__main__": "kwargs": { "d_feat": 6, "hidden_size": 64, - "num_layers": 3, + "num_layers": 2, "dropout": 0.0, - "n_epochs": 2000, - "lr": 1e-1, - "early_stop": 200, + "n_epochs": 200, + "lr": 1e-3, + "early_stop": 20, "batch_size": 800, - "smooth_steps": 5, - "metric": "mse", + "metric": "IC", "loss": "mse", "seed": 0, "GPU": 0, @@ -81,7 +81,7 @@ if __name__ == "__main__": "module_path": "qlib.data.dataset", "kwargs": { "handler": { - "class": "ALPHA360", + "class": "ALPHA360_Denoise", "module_path": "qlib.contrib.data.handler", "kwargs": DATA_HANDLER_CONFIG, }, @@ -99,7 +99,6 @@ if __name__ == "__main__": # model = train_model(task) model = init_instance_by_config(task["model"]) dataset = init_instance_by_config(task["dataset"]) - model.fit(dataset) pred_score = model.predict(dataset) diff --git a/qlib/contrib/data/handler.py b/qlib/contrib/data/handler.py index 54e9de27f..c69345173 100644 --- a/qlib/contrib/data/handler.py +++ b/qlib/contrib/data/handler.py @@ -9,6 +9,78 @@ from ...log import TimeInspector from inspect import getfullargspec import copy +class ALPHA360_Denoise(DataHandlerLP): + def __init__(self, instruments="csi500", start_time=None, end_time=None, fit_start_time=None, fit_end_time=None): + data_loader = { + "class": "QlibDataLoader", + "kwargs": { + "config": { + "feature": self.get_feature_config(), + "label": self.get_label_config(), + }, + }, + } + + learn_processors = [ + {"class": "DropnaLabel", "kwargs": {"group": "label"}}, + {"class": "CSZScoreNorm", "kwargs": {"fields_group": "label"}}, + ] + infer_processors = [ + {"class": "ProcessInf", "kwargs": {}}, + {"class": "TanhProcess", "kwargs": {}}, + {"class": "Fillna", "kwargs": {}}, + ] + + super().__init__( + instruments, + start_time, + end_time, + data_loader=data_loader, + learn_processors=learn_processors, + infer_processors=infer_processors, + ) + + def get_label_config(self): + return (["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"]) + + def get_feature_config(self): + + fields = [] + names = [] + + for i in range(59, 0, -1): + fields += ["Ref($close, %d)/$close" % (i)] + names += ["CLOSE%d" % (i)] + fields += ["$close/$close"] + names += ["CLOSE0"] + for i in range(59, 0, -1): + fields += ["Ref($open, %d)/$close" % (i)] + names += ["OPEN%d" % (i)] + fields += ["$open/$close"] + names += ["OPEN0"] + for i in range(59, 0, -1): + fields += ["Ref($high, %d)/$close" % (i)] + names += ["HIGH%d" % (i)] + fields += ["$high/$close"] + names += ["HIGH0"] + for i in range(59, 0, -1): + fields += ["Ref($low, %d)/$close" % (i)] + names += ["LOW%d" % (i)] + fields += ["$low/$close"] + names += ["LOW0"] + for i in range(59, 0, -1): + fields += ["Ref($vwap, %d)/$close" % (i)] + names += ["VWAP%d" % (i)] + fields += ["$vwap/$close"] + names += ["VWAP0"] + for i in range(59, 0, -1): + fields += ["Ref($volume, %d)/$volume" % (i)] + names += ["VOLUME%d" % (i)] + fields += ["$volume/$volume"] + names += ["VOLUME0"] + + return fields, names + class ALPHA360(DataHandlerLP): def __init__(self, instruments="csi500", start_time=None, end_time=None, fit_start_time=None, fit_end_time=None): @@ -52,28 +124,32 @@ class ALPHA360(DataHandlerLP): for i in range(59, 0, -1): fields += ["Ref($close, %d)/$close" % (i)] names += ["CLOSE%d" % (i)] + fields += ["$close/$close"] + names += ["CLOSE0"] + for i in range(59, 0, -1): fields += ["Ref($open, %d)/$close" % (i)] names += ["OPEN%d" % (i)] + fields += ["$open/$close"] + names += ["OPEN0"] + for i in range(59, 0, -1): fields += ["Ref($high, %d)/$close" % (i)] names += ["HIGH%d" % (i)] + fields += ["$high/$close"] + names += ["HIGH0"] + for i in range(59, 0, -1): fields += ["Ref($low, %d)/$close" % (i)] names += ["LOW%d" % (i)] + fields += ["$low/$close"] + names += ["LOW0"] + for i in range(59, 0, -1): fields += ["Ref($vwap, %d)/$close" % (i)] names += ["VWAP%d" % (i)] + fields += ["$vwap/$close"] + names += ["VWAP0"] + for i in range(59, 0, -1): fields += ["Ref($volume, %d)/$volume" % (i)] names += ["VOLUME%d" % (i)] - - fields += ["$close/$close"] - fields += ["$open/$close"] - fields += ["$high/$close"] - fields += ["$low/$close"] - fields += ["$vwap/$close"] fields += ["$volume/$volume"] - names += ["CLOSE0"] - names += ["OPEN0"] - names += ["HIGH0"] - names += ["LOW0"] - names += ["VWAP0"] names += ["VOLUME0"] return fields, names diff --git a/qlib/contrib/model/pytorch_gru.py b/qlib/contrib/model/pytorch_gru.py index 464cd9ba0..f118542d6 100755 --- a/qlib/contrib/model/pytorch_gru.py +++ b/qlib/contrib/model/pytorch_gru.py @@ -36,10 +36,6 @@ class GRU(Model): layer sizes lr : float learning rate - lr_decay : float - learning rate decay - lr_decay_steps : int - learning rate decay steps optimizer : str optimizer name GPU : str @@ -54,13 +50,11 @@ class GRU(Model): dropout=0.0, n_epochs=200, lr=0.001, + metric='IC', batch_size=2000, early_stop=20, - eval_steps=5, loss="mse", - lr_decay=0.96, - lr_decay_steps=100, - optimizer="gd", + optimizer="adam", GPU="0", seed=0, **kwargs @@ -76,13 +70,11 @@ class GRU(Model): self.dropout = dropout self.n_epochs = n_epochs self.lr = lr + self.metric = metric self.batch_size = batch_size self.early_stop = early_stop - self.eval_steps = eval_steps - self.lr_decay = lr_decay - self.lr_decay_steps = lr_decay_steps self.optimizer = optimizer.lower() - self.loss_type = loss + self.loss = loss self.visible_GPU = GPU self.use_gpu = torch.cuda.is_available() self.seed = seed @@ -95,11 +87,9 @@ class GRU(Model): "\ndropout : {}" "\nn_epochs : {}" "\nlr : {}" + "\nmetric : {}" "\nbatch_size : {}" "\nearly_stop : {}" - "\neval_steps : {}" - "\nlr_decay : {}" - "\nlr_decay_steps : {}" "\noptimizer : {}" "\nloss_type : {}" "\nvisible_GPU : {}" @@ -111,11 +101,9 @@ class GRU(Model): dropout, n_epochs, lr, + metric, batch_size, early_stop, - eval_steps, - lr_decay, - lr_decay_steps, optimizer.lower(), loss, GPU, @@ -138,20 +126,6 @@ class GRU(Model): else: raise NotImplementedError("optimizer {} is not supported!".format(optimizer)) - # Reduce learning rate when loss has stopped decrease - self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( - self.train_optimizer, - mode="min", - factor=0.5, - patience=10, - verbose=True, - threshold=0.0001, - threshold_mode="rel", - cooldown=0, - min_lr=0.00001, - eps=1e-08, - ) - self._fitted = False if self.use_gpu: self.gru_model.cuda() @@ -159,6 +133,98 @@ class GRU(Model): if self.visible_GPU: os.environ["CUDA_VISIBLE_DEVICES"] = self.visible_GPU + def mse(self, pred, label): + loss = (pred - label)**2 + return torch.mean(loss) + + def loss_fn(self, pred, label): + mask = ~torch.isnan(label) + + if self.loss == 'mse': + return self.mse(pred[mask], label[mask]) + + raise ValueError('unknown loss `%s`'%self.loss) + + def metric_fn(self, pred, label): + + mask = torch.isfinite(label) + if self.metric == 'IC': + return self.cal_ic(pred[mask], label[mask]) + + if self.metric == '' or self.metric == 'loss': # use loss + return -self.loss_fn(pred[mask], label[mask]) + + raise ValueError('unknown metric `%s`'%self.metric) + + def cal_ic(self, pred, label): + return torch.mean(pred * label) + + def train_epoch(self, x_train, y_train): + + x_train_values = x_train.values + y_train_values = np.squeeze(y_train.values)*100 + + self.gru_model.train() + + indices = np.arange(len(x_train_values)) + np.random.shuffle(indices) + + for i in range(len(indices))[::self.batch_size]: + + if len(indices) - i < self.batch_size: + break + + feature = torch.from_numpy(x_train_values[indices[i:i+self.batch_size]]).float() + label = torch.from_numpy(y_train_values[indices[i:i+self.batch_size]]).float() + + if self.use_gpu: + feature = feature.cuda() + label = label.cuda() + + pred = self.gru_model(feature) + loss = self.loss_fn(pred, label) + + self.train_optimizer.zero_grad() + loss.backward() + torch.nn.utils.clip_grad_value_(self.gru_model.parameters(), 3.) + self.train_optimizer.step() + + + def test_epoch(self, data_x, data_y): + + # prepare training data + x_values = data_x.values + y_values = np.squeeze(data_y.values) + + self.gru_model.eval() + + scores = [] + losses = [] + + indices = np.arange(len(x_values)) + np.random.shuffle(indices) + + for i in range(len(indices))[::self.batch_size]: + + if len(indices) - i < self.batch_size: + break + + feature = torch.from_numpy(x_values[indices[i:i+self.batch_size]]).float() + label = torch.from_numpy(y_values[indices[i:i+self.batch_size]]).float() + + if self.use_gpu: + feature = feature.cuda() + label = label.cuda() + + pred = self.gru_model(feature) + loss = self.loss_fn(pred, label) + losses.append(loss.item()) + + score = self.metric_fn(pred, label) + scores.append(score.item()) + + return np.mean(losses), np.mean(scores) + def fit( self, dataset: DatasetH, @@ -167,17 +233,23 @@ class GRU(Model): save_path=None, ): - df_train, df_valid = dataset.prepare( - ["train", "valid"], col_set=["feature", "label"], data_key=DataHandlerLP.DK_L + df_train, df_valid, df_test = dataset.prepare( + ["train", "valid", "test"], col_set=["feature", "label"], data_key=DataHandlerLP.DK_L ) + print(df_test) + df_train.to_pickle('~/df_train_2.pkl') + df_valid.to_pickle('~/df_valid_2.pkl') + df_test.to_pickle('~/df_test_2.pkl') + x_train, y_train = df_train["feature"], df_train["label"] x_valid, y_valid = df_valid["feature"], df_valid["label"] - # Lightgbm need 1D array as its label - save_path = create_save_path(save_path) + if save_path == None: + save_path = create_save_path(save_path) stop_steps = 0 train_loss = 0 - best_loss = np.inf + best_score = -np.inf + best_epoch = 0 evals_result["train"] = [] evals_result["valid"] = [] @@ -185,94 +257,36 @@ class GRU(Model): self.logger.info("training...") self._fitted = True # return - # prepare training data - x_train_values = torch.from_numpy(x_train.values).float() - y_train_values = torch.from_numpy(np.squeeze(y_train.values)).float() - train_num = y_train_values.shape[0] - - # prepare validation data - x_val_auto = torch.from_numpy(x_valid.values).float() - y_val_auto = torch.from_numpy(np.squeeze(y_valid.values)).float() - - if self.use_gpu: - x_val_auto = x_val_auto.cuda() - y_val_auto = y_val_auto.cuda() for step in range(self.n_epochs): - if stop_steps >= self.early_stop: - if verbose: - self.logger.info("\tearly stop") - break - loss = AverageMeter() - self.gru_model.train() - self.train_optimizer.zero_grad() + self.logger.info('Epoch%d:', step) + self.logger.info('training...') + self.train_epoch(x_train, y_train) + self.logger.info('evaluating...') + train_loss, train_score = self.test_epoch(x_train, y_train) + val_loss, val_score = self.test_epoch(x_valid, y_valid) + self.logger.info('train %.6f, valid %.6f'%(train_score, val_score)) + evals_result["train"].append(train_score) + evals_result["valid"].append(val_score) - choice = np.random.choice(train_num, self.batch_size) - x_batch_auto = x_train_values[choice] - y_batch_auto = y_train_values[choice] - - if self.use_gpu: - x_batch_auto = x_batch_auto.float().cuda() - y_batch_auto = y_batch_auto.float().cuda() - - # forward - preds = self.gru_model(x_batch_auto) - cur_loss = self.get_loss(preds, y_batch_auto, self.loss_type) - cur_loss.backward() - self.train_optimizer.step() - loss.update(cur_loss.item()) - - # validation - train_loss += loss.val - # print(loss.val) - if step and step % self.eval_steps == 0: + if val_score > best_score: + best_score = val_score + stop_steps = 0 + best_epoch = step + best_param = copy.deepcopy(self.gru_model.state_dict()) + else: stop_steps += 1 - train_loss /= self.eval_steps + if stop_steps >= self.early_stop: + self.logger.info('early stop') + break - with torch.no_grad(): - self.gru_model.eval() - loss_val = AverageMeter() - - # forward - preds = self.gru_model(x_val_auto) - cur_loss_val = self.get_loss(preds, y_val_auto, self.loss_type) - loss_val.update(cur_loss_val.item()) - - if verbose: - self.logger.info( - "[Epoch {}]: train_loss {:.6f}, valid_loss {:.6f}".format(step, train_loss, loss_val.val) - ) - evals_result["train"].append(train_loss) - evals_result["valid"].append(loss_val.val) - if loss_val.val < best_loss: - if verbose: - self.logger.info( - "\tvalid loss update from {:.6f} to {:.6f}, save checkpoint.".format( - best_loss, loss_val.val - ) - ) - best_loss = loss_val.val - stop_steps = 0 - torch.save(self.gru_model.state_dict(), save_path) - train_loss = 0 - # update learning rate - self.scheduler.step(cur_loss_val) - - # restore the optimal parameters after training ?? - # self.gru_model.load_state_dict(torch.load(save_path)) + self.logger.info('best score: %.6lf @ %d'%(best_score, best_epoch)) + self.gru_model.load_state_dict(best_param) + torch.save(best_param, save_path) + if self.use_gpu: torch.cuda.empty_cache() - def get_loss(self, pred, target, loss_type): - if loss_type == "mse": - sqr_loss = (pred - target) ** 2 - loss = sqr_loss.mean() - return loss - elif loss_type == "binary": - loss = nn.BCELoss() - return loss(pred, target) - else: - raise NotImplementedError("loss {} is not supported!".format(loss_type)) def predict(self, dataset): if not self._fitted: @@ -280,37 +294,33 @@ class GRU(Model): x_test = dataset.prepare("test", col_set="feature") index = x_test.index - x_test = torch.from_numpy(x_test.values).float() - - if self.use_gpu: - x_test = x_test.cuda() self.gru_model.eval() + x_values = x_test.values + sample_num = x_values.shape[0] + preds = [] - with torch.no_grad(): - if self.use_gpu: - preds = self.gru_model(x_test).detach().cpu().numpy() + for begin in range(sample_num)[::self.batch_size]: + + if sample_num-begin < self.batch_size: + end = sample_num else: - preds = self.gru_model(x_test).detach().numpy() - return pd.Series(preds, index=index) + end = begin+self.batch_size + + x_batch = torch.from_numpy(x_values[begin:end]).float() -class AverageMeter(object): - """Computes and stores the average and current value""" + if self.use_gpu: + x_batch = x_batch.cuda() - def __init__(self): - self.reset() + with torch.no_grad(): + if self.use_gpu: + pred = self.gru_model(x_batch).detach().cpu().numpy() + else: + pred = self.gru_model(x_batch).detach().numpy() - def reset(self): - self.val = 0 - self.avg = 0 - self.sum = 0 - self.count = 0 + preds.append(pred) - def update(self, val, n=1): - self.val = val - self.sum += val * n - self.count += n - self.avg = self.sum / self.count + return pd.Series(np.concatenate(preds), index=index) class GRUModel(nn.Module): diff --git a/qlib/data/dataset/processor.py b/qlib/data/dataset/processor.py index 308c531b9..400574320 100755 --- a/qlib/data/dataset/processor.py +++ b/qlib/data/dataset/processor.py @@ -89,6 +89,19 @@ class DropnaLabel(DropnaProcessor): """The samples are dropped according to label. So it is not usable for inference""" return False +class TanhProcess(Processor): + """ Use tanh to process noise data""" + def __call__(self, df): + def tanh_denoise(data): + mask = data.columns.get_level_values(1).str.contains('LABEL') + col = df.columns[~mask] + data[col] = data[col] - 1 + data[col] = np.tanh(data[col]) + + return data + + return tanh_denoise(df) + class ProcessInf(Processor): """Process infinity """