From 58bd2339c050152503be1fc21b3bcb5954fcb0ee Mon Sep 17 00:00:00 2001 From: Jactus Date: Wed, 18 Nov 2020 17:55:45 +0800 Subject: [PATCH 1/4] Update expm and exp --- qlib/utils/__init__.py | 25 +++++ qlib/workflow/__init__.py | 1 - qlib/workflow/exp.py | 140 ++++++++++++---------------- qlib/workflow/expm.py | 171 +++++++++++++++++------------------ qlib/workflow/record_temp.py | 33 +------ qlib/workflow/recorder.py | 8 +- qlib/workflow/utils.py | 5 +- 7 files changed, 176 insertions(+), 207 deletions(-) diff --git a/qlib/utils/__init__.py b/qlib/utils/__init__.py index 30483af2e..575ed24aa 100644 --- a/qlib/utils/__init__.py +++ b/qlib/utils/__init__.py @@ -20,6 +20,7 @@ import requests import tempfile import importlib import contextlib +import collections import numpy as np import pandas as pd from pathlib import Path @@ -641,6 +642,30 @@ def lexsort_index(df: pd.DataFrame) -> pd.DataFrame: return df.sort_index() +def flatten_dict(d, parent_key="", sep="."): + """flatten_dict. + >>> flatten_dict({'a': 1, 'c': {'a': 2, 'b': {'x': 5, 'y' : 10}}, 'd': [1, 2, 3]}) + >>> {'a': 1, 'c.a': 2, 'c.b.x': 5, 'd': [1, 2, 3], 'c.b.y': 10} + + Parameters + ---------- + d : + d + parent_key : + parent_key + sep : + sep + """ + items = [] + for k, v in d.items(): + new_key = parent_key + sep + k if parent_key else k + if isinstance(v, collections.MutableMapping): + items.extend(flatten_dict(v, new_key, sep=sep).items()) + else: + items.append((new_key, v)) + return dict(items) + + #################### Wrapper ##################### class Wrapper(object): """Wrapper class for anything that needs to set up during qlib.init""" diff --git a/qlib/workflow/__init__.py b/qlib/workflow/__init__.py index 6a8b857fc..9da65480f 100644 --- a/qlib/workflow/__init__.py +++ b/qlib/workflow/__init__.py @@ -323,7 +323,6 @@ class QlibRecorder: experiment_name : str name of the experiment. - Returns ------- A recorder instance. diff --git a/qlib/workflow/exp.py b/qlib/workflow/exp.py index b7ef160df..5a74ab28a 100644 --- a/qlib/workflow/exp.py +++ b/qlib/workflow/exp.py @@ -165,6 +165,7 @@ class MLflowExperiment(Experiment): super(MLflowExperiment, self).__init__(id, name) self._uri = uri self._default_name = None + self._default_rec_name = "mlflow_recorder" self.client = mlflow.tracking.MlflowClient(tracking_uri=self._uri) def start(self, recorder_name=None): @@ -175,7 +176,7 @@ class MLflowExperiment(Experiment): recorder = self.create_recorder(recorder_name) self.active_recorder = recorder # start the recorder - run = self.active_recorder.start_run() + self.active_recorder.start_run() return self.active_recorder @@ -186,13 +187,66 @@ class MLflowExperiment(Experiment): def create_recorder(self, recorder_name=None): if recorder_name is None: - recorders = self.list_recorders() - num = len(recorders) - recorder_name = "Recorder_{}".format(num + 1) - recorder = MLflowRecorder(recorder_name, self.id, self._uri) + recorder_name = self._default_rec_name + recorder = MLflowRecorder(self.id, self._uri, recorder_name) return recorder + def get_recorder(self, recorder_id=None, recorder_name=None, create=True): + # special case of getting the recorder + if recorder_id is None and recorder_name is None: + if self.active_recorder is not None: + return self.active_recorder + recorder_name = self._default_rec_name + if create: + recorder, is_new = self._get_or_create_rec(recorder_id=recorder_id, recorder_name=recorder_name) + else: + recorder, is_new = self._get_recorder(recorder_id=recorder_id, recorder_name=recorder_name), False + if is_new: + mlflow.set_experiment(self.name) + self.active_recorder = recorder + # start the recorder + self.active_recorder.start_run() + return recorder + + def _get_or_create_rec(self, recorder_id=None, recorder_name=None) -> (object, bool): + """ + Method for getting or creating a recorder. It will try to first get a valid recorder, if exception occurs, it will + automatically create a new recorder based on the given id and name. + """ + try: + return self._get_recorder(recorder_id=recorder_id, recorder_name=recorder_name), False + except ValueError: + if recorder_name is None: + recorder_name = self._default_rec_name + logger.info(f"No valid recorder found. Create a new recorder with name {recorder_name}.") + return self.create(recorder_name), True + + def _get_recorder(self, recorder_id=None, recorder_name=None): + """ + Method for getting or creating a recorder. It will try to first get a valid recorder, if exception occurs, it will + raise errors. + """ + assert ( + recorder_id is not None or recorder_name is not None + ), "Please input at least one of recorder id or name before retrieving recorder." + if recorder_id is not None: + try: + run = self.client.get_run(recorder_id) + recorder = MLflowRecorder(self.id, self._uri, mlflow_run=run) + return recorder + except MlflowException as e: + raise ValueError("No valid recorder has been found, please make sure the input recorder id is correct.") + elif recorder_name is not None: + logger.warning( + f"Please make sure the recorder name {recorder_name} is unique, we will only return the first recorder if there exist several matched the given name." + ) + recorders = self.list_recorders() + for rid in recorders: + if recorders[rid].name == recorder_name: + return recorders[rid] + raise ValueError("No valid recorder has been found, please make sure the input recorder name is correct.") + def search_records(self, **kwargs): filter_string = "" if kwargs.get("filter_string") is None else kwargs.get("filter_string") run_view_type = 1 if kwargs.get("run_view_type") is None else kwargs.get("run_view_type") @@ -209,7 +263,6 @@ class MLflowExperiment(Experiment): if recorder_id is not None: self.client.delete_run(recorder_id) else: - recorders = self.list_recorders() recorder = self._get_recorder_by_name(recorder_name) self.client.delete_run(recorder.id) except MlflowException as e: @@ -217,84 +270,11 @@ class MLflowExperiment(Experiment): f"Error: {e}. Something went wrong when deleting recorder. Please check if the name/id of the recorder is correct." ) - def _get_recorder_by_id(self, recorder_id=None, create=False): - """ - Get a recorder by its id. If the `create` is set to True, this method will also start to run the recorder. - - Parameters - ---------- - recorder_id : str - the id of the recorder to be returned. - create : boolean - create the recorder if it hasn't been created before. - - Returns - ------- - The specific recorder with given id. - """ - recorders = self.list_recorders() - if recorder_id in recorders: - return recorders[recorder_id] - else: - if create: - logger.warning(f"No valid recorder found. Create a new recorder with name {recorder_name}.") - self.start(recorder_name) - return self.active_recorder - else: - raise Exception( - "Something went wrong when retrieving recorders. Please check if id of the recorder is correct." - ) - - def _get_recorder_by_name(self, recorder_name=None, create=False): - """ - Get a recorder by its name. If the `create` is set to True, this method will also start to run the recorder. - - Parameters - ---------- - recorder_name : str - the name of the recorder to be returned. - create : boolean - create the recorder if it hasn't been created before. - - Returns - ------- - The specific recorder with given name. - """ - recorders = self.list_recorders() - for rid in recorders: - if recorders[rid].name == recorder_name: - return recorders[rid] - if create: - logger.warning(f"No valid recorder found. Create a new recorder with name {recorder_name}.") - self.start(recorder_name) - return self.active_recorder - else: - raise Exception( - "Something went wrong when retrieving recorders. Please check if the name of the experiment is correct." - ) - - def get_recorder(self, recorder_id=None, recorder_name=None, create=True): - """ - MLflow doesn't support create recorder with a specific id. Thus, when user only provides recorder id and `create` - is set to True, this method will not automatically create an active recorder. - """ - # retrive all the recorders under this experiment - if recorder_id is None and recorder_name is None: - if self.active_recorder: - return self.active_recorder - else: - return self._get_recorder_by_name(create=create) - else: - if recorder_id is not None: - return self._get_recorder_by_id(recorder_id, create=create) - else: - return self._get_recorder_by_name(recorder_name, create=create) - def list_recorders(self): runs = self.client.search_runs(self.id, run_view_type=1)[::-1] recorders = dict() for i in range(len(runs)): - recorder = MLflowRecorder(f"Recorder_{i+1}", self.id, self._uri, runs[i]) + recorder = MLflowRecorder(self.id, self._uri, mlflow_run=runs[i]) recorders[runs[i].info.run_id] = recorder return recorders diff --git a/qlib/workflow/expm.py b/qlib/workflow/expm.py index 00637e29d..8fb7962e9 100644 --- a/qlib/workflow/expm.py +++ b/qlib/workflow/expm.py @@ -57,6 +57,21 @@ class ExpManager: """ raise NotImplementedError(f"Please implement the `end_exp` method.") + def create_exp(self, experiment_name=None): + """ + Create an experiment. + + Parameters + ---------- + experiment_name : str + the experiment name, which must be unique. + + Returns + ------- + An experiment object. + """ + raise NotImplementedError(f"Please implement the `create_exp` method.") + def search_records(self, experiment_ids=None, **kwargs): """ Get a pandas DataFrame of records that fit the search criteria of the experiment. @@ -71,7 +86,7 @@ class ExpManager: """ raise NotImplementedError(f"Please implement the `search_records` method.") - def get_exp(self, experiment_id=None, experiment_name=None, create: bool = True, run: bool = False): + def get_exp(self, experiment_id=None, experiment_name=None, create: bool = True): """ Retrieve an experiment. This method includes getting an active experiment, and get_or_create a specific experiment. The returned experiment will be running. @@ -108,8 +123,6 @@ class ExpManager: name of the experiment to return. create : boolean create the experiment it if hasn't been created before. - run : boolean - run the experiment when it is created for the first time. Returns ------- @@ -162,7 +175,7 @@ class MLflowExpManager(ExpManager): def start_exp(self, experiment_name=None, recorder_name=None, uri=None): # create experiment - experiment = self.get_exp(experiment_name=experiment_name, run=False) + experiment, _ = self._get_or_create_exp(experiment_name=experiment_name) # set up active experiment self.active_experiment = experiment # start the experiment @@ -183,94 +196,72 @@ class MLflowExpManager(ExpManager): self.active_experiment.end(recorder_status) self.active_experiment = None - def __get_exp_by_id(self, experiment_id=None, create=False, run=False): - """ - Method for retrieving an experiment by its id. If the `create` is set to True, this method will also start to run the experiment. + def create_exp(self, experiment_name=None): + # init experiment + experiment_id = self.client.create_experiment(experiment_name) + experiment = MLflowExperiment(experiment_id, experiment_name, self.uri) + experiment._default_name = self.default_exp_name - Parameters - ---------- - experiment_id : str - the id of the experiment to be returned. - create : boolean - create the experiment if it hasn't been created before. + return experiment - Returns - ------- - The specific experiment with given id. - """ - # retrive all created experiments - experiments = self.list_experiments() - for name in experiments: - if experiments[name].id == experiment_id: - return experiments[name] - if create: - logger.warning(f"No valid experiment found. Use the Default experiment for further process.") - return self.__get_exp_by_name(create=create, run=True) - else: - raise Exception( - "Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct." - ) - - def __get_exp_by_name(self, experiment_name=None, create=False, run=False): - """ - Method for retrieving an experiment by its name. If the `create` is set to True, this method will also start to run the experiment. - - Parameters - ---------- - experiment_name : str - the name of the experiment to be returned. - create : boolean - create the experiment if it hasn't been created before. - - Returns - ------- - The specific experiment with given name. - """ - # retrive all created experiments - experiments = self.list_experiments() - if experiment_name in experiments: - return experiments[experiment_name] - if create: - if experiment_name is None: - logger.info( - f"No experiment name provided. Create experiment with name {self.default_exp_name} for further process." - ) - experiment_name = self.default_exp_name - if self.client.get_experiment_by_name(experiment_name) is not None: - logger.info( - "The experiment has already been created before and deleted. Try to restore the experiment with a new recorder..." - ) - experiment_id = self.client.get_experiment_by_name(experiment_name).experiment_id - self.client.restore_experiment(experiment_id) - else: - experiment_id = self.client.create_experiment(experiment_name) - - # init experiment - experiment = MLflowExperiment(experiment_id, experiment_name, self.uri) - experiment._default_name = self.default_exp_name - if run: - self.active_experiment = experiment - self.active_experiment.start() - - return experiment - else: - if experiment_name is None and self.default_exp_name in experiments: - return experiments[self.default_exp_name] - raise Exception( - "Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct." - ) - - def get_exp(self, experiment_id=None, experiment_name=None, create=True, run=True): + def get_exp(self, experiment_id=None, experiment_name=None, create=True): + # special case of getting experiment if experiment_id is None and experiment_name is None: - if self.active_experiment: + if self.active_experiment is not None: return self.active_experiment - else: - return self.__get_exp_by_name(create=create, run=run) + if create: + exp, is_new = self._get_or_create_exp(experiment_id=experiment_id, experiment_name=experiment_name) else: - if experiment_name is not None: - return self.__get_exp_by_name(experiment_name, create=create, run=run) - else: - return self.__get_exp_by_id(experiment_id, create=create, run=run) + exp, is_new = self._get_exp(experiment_id=experiment_id, experiment_name=experiment_name), False + if is_new: + self.active_experiment = exp + # start the recorder + self.active_experiment.start() + return exp + + def _get_or_create_exp(self, experiment_id=None, experiment_name=None) -> (object, bool): + """ + Method for getting or creating an experiment. It will try to first get a valid experiment, if exception occurs, it will + automatically create a new experiment based on the given id and name. + """ + try: + return self._get_exp(experiment_id=experiment_id, experiment_name=experiment_name), False + except ValueError: + if experiment_name is None: + experiment = self.default_exp_name + logger.info(f"No valid experiment found. Create a new experiment with name {experiment_name}.") + return self.create_exp(experiment_name), True + + def _get_exp(self, experiment_id=None, experiment_name=None): + """ + Method for getting or creating an experiment. It will try to first get a valid experiment, if exception occurs, it will + raise errors. + """ + assert ( + experiment_id is not None or experiment_name is not None + ), "Please input at least one of experiment/recorder id or name before retrieving experiment/recorder." + if experiment_id is not None: + try: + exp = self.client.get_experiment(experiment_id) + if exp.lifecycle_stage.upper() == "DELETED": + raise MlflowException("No valid experiment has been found.") + experiment = MLflowExperiment(exp.experiment_id, exp.name, self.uri) + return experiment + except MlflowException as e: + raise ValueError( + "No valid experiment has been found, please make sure the input experiment id is correct." + ) + elif experiment_name is not None: + try: + exp = self.client.get_experiment_by_name(experiment_name) + if exp is None or exp.lifecycle_stage.upper() == "DELETED": + raise MlflowException("No valid experiment has been found.") + experiment = MLflowExperiment(exp.experiment_id, experiment_name, self.uri) + return experiment + except MlflowException as e: + raise ValueError( + "No valid experiment has been found, please make sure the input experiment name is correct." + ) def search_records(self, experiment_ids, **kwargs): filter_string = "" if kwargs.get("filter_string") is None else kwargs.get("filter_string") @@ -288,6 +279,8 @@ class MLflowExpManager(ExpManager): self.client.delete_experiment(experiment_id) else: experiment = self.client.get_experiment_by_name(experiment_name) + if experiment is None: + raise MlflowException("No valid experiment has been found.") self.client.delete_experiment(experiment.experiment_id) except MlflowException as e: raise Exception( @@ -299,9 +292,7 @@ class MLflowExpManager(ExpManager): exps = self.client.list_experiments(view_type=1) experiments = dict() for exp in exps: - eid = exp.experiment_id - ename = exp.name - experiment = MLflowExperiment(eid, ename, self.uri) + experiment = MLflowExperiment(exp.experiment_id, exp.name, self.uri) experiments[ename] = experiment return experiments diff --git a/qlib/workflow/record_temp.py b/qlib/workflow/record_temp.py index e3e19bd10..d6b4d608e 100644 --- a/qlib/workflow/record_temp.py +++ b/qlib/workflow/record_temp.py @@ -10,6 +10,7 @@ from ..contrib.evaluate import ( ) from ..utils import init_instance_by_config, get_module_by_module_path from ..log import get_module_logger +from ..utils import flatten_dict logger = get_module_logger("workflow", "INFO") @@ -149,37 +150,11 @@ class PortAnaRecord(SignalRecord): analysis["excess_return_with_cost"] = risk_analysis( report_normal["return"] - report_normal["bench"] - report_normal["cost"] ) - # log metrics - self.recorder.log_metrics( - excess_return_without_cost_mean=analysis["excess_return_without_cost"]["risk"]["mean"] - ) - self.recorder.log_metrics(excess_return_without_cost_std=analysis["excess_return_without_cost"]["risk"]["std"]) - self.recorder.log_metrics( - excess_return_without_cost_annualized_return=analysis["excess_return_without_cost"]["risk"][ - "annualized_return" - ] - ) - self.recorder.log_metrics( - excess_return_without_cost_information_ratio=analysis["excess_return_without_cost"]["risk"][ - "information_ratio" - ] - ) - self.recorder.log_metrics( - excess_return_without_cost_max_drawdown=analysis["excess_return_without_cost"]["risk"]["max_drawdown"] - ) - self.recorder.log_metrics(excess_return_with_cost_mean=analysis["excess_return_with_cost"]["risk"]["mean"]) - self.recorder.log_metrics(excess_return_with_cost_std=analysis["excess_return_with_cost"]["risk"]["std"]) - self.recorder.log_metrics( - excess_return_with_cost_annualized_return=analysis["excess_return_with_cost"]["risk"]["annualized_return"] - ) - self.recorder.log_metrics( - excess_return_with_cost_information_ratio=analysis["excess_return_with_cost"]["risk"]["information_ratio"] - ) - self.recorder.log_metrics( - excess_return_with_cost_max_drawdown=analysis["excess_return_with_cost"]["risk"]["max_drawdown"] - ) # save portfolio analysis results analysis_df = pd.concat(analysis) # type: pd.DataFrame + # log metrics + self.recorder.log_metrics(**flatten_dict(analysis_df["risk"].unstack().T.to_dict())) + # save results self.recorder.save_objects(**{"port_analysis.pkl": analysis_df}, artifact_path=self.artifact_path) logger.info( f"Portfolio analysis record 'port_analysis.pkl' has been saved as the artifact of the Experiment {self.recorder.experiment_id}" diff --git a/qlib/workflow/recorder.py b/qlib/workflow/recorder.py index a2cddadcb..71f13381f 100644 --- a/qlib/workflow/recorder.py +++ b/qlib/workflow/recorder.py @@ -25,7 +25,7 @@ class Recorder: STATUS_FI = "FINISHED" STATUS_FA = "FAILED" - def __init__(self, name, experiment_id): + def __init__(self, experiment_id, name): self.id = None self.name = name self.experiment_id = experiment_id @@ -168,8 +168,8 @@ class MLflowRecorder(Recorder): use file manager to help maintain the objects in the project. """ - def __init__(self, name, experiment_id, uri, mlflow_run=None): - super(MLflowRecorder, self).__init__(name, experiment_id) + def __init__(self, experiment_id, uri, name=None, mlflow_run=None): + super(MLflowRecorder, self).__init__(experiment_id, name) self._uri = uri self.artifact_uri = None # set up file manager for saving objects @@ -179,7 +179,7 @@ class MLflowRecorder(Recorder): # construct from mlflow run if mlflow_run is not None: assert isinstance(mlflow_run, mlflow.entities.run.Run), "Please input with a MLflow Run object." - self.name = mlflow_run.data.tags["mlflow.runName"] if mlflow_run.data.tags["mlflow.runName"] != "" else name + self.name = mlflow_run.data.tags["mlflow.runName"] self.id = mlflow_run.info.run_id self.status = mlflow_run.info.status self.start_time = ( diff --git a/qlib/workflow/utils.py b/qlib/workflow/utils.py index d4594d28e..b57879d0e 100644 --- a/qlib/workflow/utils.py +++ b/qlib/workflow/utils.py @@ -31,10 +31,9 @@ def experiment_exception_hook(type, value, tb): value: Exception's value tb: Exception's traceback """ - error_msg = "An exception has been raised.\n" f"Type: {type}\n" - logger.error(error_msg) + logger.error("An exception has been raised.") traceback.print_tb(tb) - logger.error(f"Value: {value}") + print(f"{type}: {value}") R.end_exp(recorder_status=Recorder.STATUS_FA) From d8414b949ae7aba700365543645b513082c8517d Mon Sep 17 00:00:00 2001 From: Jactus Date: Wed, 18 Nov 2020 19:17:19 +0800 Subject: [PATCH 2/4] Update pytorch_nn --- qlib/contrib/model/pytorch_nn.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/qlib/contrib/model/pytorch_nn.py b/qlib/contrib/model/pytorch_nn.py index 82b7d0950..9bad755b6 100644 --- a/qlib/contrib/model/pytorch_nn.py +++ b/qlib/contrib/model/pytorch_nn.py @@ -193,7 +193,6 @@ class DNNModelPytorch(Model): w_val_auto = w_val_auto.cuda() for step in range(self.max_steps): - self.logger.info(step) if stop_steps >= self.early_stop_rounds: if verbose: self.logger.info("\tearly stop") @@ -201,7 +200,6 @@ class DNNModelPytorch(Model): loss = AverageMeter() self.dnn_model.train() self.train_optimizer.zero_grad() - self.logger.info("INIT") choice = np.random.choice(train_num, self.batch_size) x_batch_auto = x_train_values[choice] From e6a902c659f511186c571940d6cdbf5935010d2b Mon Sep 17 00:00:00 2001 From: lwwang1995 Date: Wed, 18 Nov 2020 21:53:28 +0800 Subject: [PATCH 3/4] Add LSTM and Gats --- examples/workflow_by_code_gats.py | 145 +++++++++++ examples/workflow_by_code_lstm.py | 144 +++++++++++ qlib/contrib/model/pytorch_gats.py | 383 +++++++++++++++++++++++++++++ qlib/contrib/model/pytorch_lstm.py | 340 +++++++++++++++++++++++++ 4 files changed, 1012 insertions(+) create mode 100755 examples/workflow_by_code_gats.py create mode 100755 examples/workflow_by_code_lstm.py create mode 100755 qlib/contrib/model/pytorch_gats.py create mode 100755 qlib/contrib/model/pytorch_lstm.py diff --git a/examples/workflow_by_code_gats.py b/examples/workflow_by_code_gats.py new file mode 100755 index 000000000..06845d448 --- /dev/null +++ b/examples/workflow_by_code_gats.py @@ -0,0 +1,145 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import sys +from pathlib import Path + +import qlib +import pandas as pd +from qlib.config import REG_CN +from qlib.contrib.model.pytorch_gats import GAT +from qlib.contrib.data.handler import ALPHA360_Denoise +from qlib.contrib.strategy.strategy import TopkDropoutStrategy +from qlib.contrib.evaluate import ( + backtest as normal_backtest, + risk_analysis, +) +from qlib.utils import exists_qlib_data + +# from qlib.model.learner import train_model +from qlib.utils import init_instance_by_config + +import pickle + +if __name__ == "__main__": + + # use default data + provider_uri = "~/.qlib/qlib_data/cn_data" # target_dir + if not exists_qlib_data(provider_uri): + print(f"Qlib data is not found in {provider_uri}") + sys.path.append(str(Path(__file__).resolve().parent.parent.joinpath("scripts"))) + from get_data import GetData + + GetData().qlib_data_cn(target_dir=provider_uri) + + qlib.init(provider_uri=provider_uri, region=REG_CN) + + MARKET = "csi300" + BENCHMARK = "SH000300" + + ################################### + # train model + ################################### + DATA_HANDLER_CONFIG = { + "start_time": "2008-01-01", + "end_time": "2020-08-01", + "fit_start_time": "2008-01-01", + "fit_end_time": "2014-12-31", + "instruments": MARKET, + } + + TRAINER_CONFIG = { + "train_start_time": "2008-01-01", + "train_end_time": "2014-12-31", + "validate_start_time": "2015-01-01", + "validate_end_time": "2016-12-31", + "test_start_time": "2017-01-01", + "test_end_time": "2020-08-01", + } + + task = { + "model": { + "class": "GAT", + "module_path": "qlib.contrib.model.pytorch_gats", + "kwargs": { + "d_feat": 6, + "hidden_size": 64, + "num_layers": 2, + "dropout": 0.0, + "n_epochs": 200, + "lr": 1e-3, + "early_stop": 20, + "batch_size": 800, + "metric": "IC", + "loss": "mse", + "base_model":"GRU", + "seed": 0, + "GPU": 0, + }, + }, + "dataset": { + "class": "DatasetH", + "module_path": "qlib.data.dataset", + "kwargs": { + "handler": { + "class": "ALPHA360_Denoise", + "module_path": "qlib.contrib.data.handler", + "kwargs": DATA_HANDLER_CONFIG, + }, + "segments": { + "train": ("2008-01-01", "2014-12-31"), + "valid": ("2015-01-01", "2016-12-31"), + "test": ("2017-01-01", "2020-08-01"), + }, + }, + } + # You shoud record the data in specific sequence + # "record": ['SignalRecord', 'SigAnaRecord', 'PortAnaRecord'], + } + + # model = train_model(task) + model = init_instance_by_config(task["model"]) + dataset = init_instance_by_config(task["dataset"]) + model.fit(dataset) + + pred_score = model.predict(dataset) + + # save pred_score to file + pred_score_path = Path("~/tmp/qlib/pred_score.pkl").expanduser() + pred_score_path.parent.mkdir(exist_ok=True, parents=True) + pred_score.to_pickle(pred_score_path) + + ################################### + # backtest + ################################### + STRATEGY_CONFIG = { + "topk": 50, + "n_drop": 5, + } + BACKTEST_CONFIG = { + "verbose": False, + "limit_threshold": 0.095, + "account": 100000000, + "benchmark": BENCHMARK, + "deal_price": "close", + "open_cost": 0.0005, + "close_cost": 0.0015, + "min_cost": 5, + } + + # use default strategy + # custom Strategy, refer to: TODO: Strategy API url + strategy = TopkDropoutStrategy(**STRATEGY_CONFIG) + report_normal, positions_normal = normal_backtest(pred_score, strategy=strategy, **BACKTEST_CONFIG) + + ################################### + # analyze + # If need a more detailed analysis, refer to: examples/train_and_bakctest.ipynb + ################################### + analysis = dict() + analysis["excess_return_without_cost"] = risk_analysis(report_normal["return"] - report_normal["bench"]) + analysis["excess_return_with_cost"] = risk_analysis( + report_normal["return"] - report_normal["bench"] - report_normal["cost"] + ) + analysis_df = pd.concat(analysis) # type: pd.DataFrame + print(analysis_df) diff --git a/examples/workflow_by_code_lstm.py b/examples/workflow_by_code_lstm.py new file mode 100755 index 000000000..1815d2fec --- /dev/null +++ b/examples/workflow_by_code_lstm.py @@ -0,0 +1,144 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import sys +from pathlib import Path + +import qlib +import pandas as pd +from qlib.config import REG_CN +from qlib.contrib.model.pytorch_lstm import LSTM +from qlib.contrib.data.handler import ALPHA360_Denoise +from qlib.contrib.strategy.strategy import TopkDropoutStrategy +from qlib.contrib.evaluate import ( + backtest as normal_backtest, + risk_analysis, +) +from qlib.utils import exists_qlib_data + +# from qlib.model.learner import train_model +from qlib.utils import init_instance_by_config + +import pickle + +if __name__ == "__main__": + + # use default data + provider_uri = "~/.qlib/qlib_data/cn_data" # target_dir + if not exists_qlib_data(provider_uri): + print(f"Qlib data is not found in {provider_uri}") + sys.path.append(str(Path(__file__).resolve().parent.parent.joinpath("scripts"))) + from get_data import GetData + + GetData().qlib_data_cn(target_dir=provider_uri) + + qlib.init(provider_uri=provider_uri, region=REG_CN) + + MARKET = "csi300" + BENCHMARK = "SH000300" + + ################################### + # train model + ################################### + DATA_HANDLER_CONFIG = { + "start_time": "2008-01-01", + "end_time": "2020-08-01", + "fit_start_time": "2008-01-01", + "fit_end_time": "2014-12-31", + "instruments": MARKET, + } + + TRAINER_CONFIG = { + "train_start_time": "2008-01-01", + "train_end_time": "2014-12-31", + "validate_start_time": "2015-01-01", + "validate_end_time": "2016-12-31", + "test_start_time": "2017-01-01", + "test_end_time": "2020-08-01", + } + + task = { + "model": { + "class": "LSTM", + "module_path": "qlib.contrib.model.pytorch_lstm", + "kwargs": { + "d_feat": 6, + "hidden_size": 64, + "num_layers": 2, + "dropout": 0.0, + "n_epochs": 200, + "lr": 1e-3, + "early_stop": 20, + "batch_size": 800, + "metric": "IC", + "loss": "mse", + "seed": 0, + "GPU": 0, + }, + }, + "dataset": { + "class": "DatasetH", + "module_path": "qlib.data.dataset", + "kwargs": { + "handler": { + "class": "ALPHA360_Denoise", + "module_path": "qlib.contrib.data.handler", + "kwargs": DATA_HANDLER_CONFIG, + }, + "segments": { + "train": ("2008-01-01", "2014-12-31"), + "valid": ("2015-01-01", "2016-12-31"), + "test": ("2017-01-01", "2020-08-01"), + }, + }, + } + # You shoud record the data in specific sequence + # "record": ['SignalRecord', 'SigAnaRecord', 'PortAnaRecord'], + } + + # model = train_model(task) + model = init_instance_by_config(task["model"]) + dataset = init_instance_by_config(task["dataset"]) + model.fit(dataset) + + pred_score = model.predict(dataset) + + # save pred_score to file + pred_score_path = Path("~/tmp/qlib/pred_score.pkl").expanduser() + pred_score_path.parent.mkdir(exist_ok=True, parents=True) + pred_score.to_pickle(pred_score_path) + + ################################### + # backtest + ################################### + STRATEGY_CONFIG = { + "topk": 50, + "n_drop": 5, + } + BACKTEST_CONFIG = { + "verbose": False, + "limit_threshold": 0.095, + "account": 100000000, + "benchmark": BENCHMARK, + "deal_price": "close", + "open_cost": 0.0005, + "close_cost": 0.0015, + "min_cost": 5, + } + + # use default strategy + # custom Strategy, refer to: TODO: Strategy API url + strategy = TopkDropoutStrategy(**STRATEGY_CONFIG) + report_normal, positions_normal = normal_backtest(pred_score, strategy=strategy, **BACKTEST_CONFIG) + + ################################### + # analyze + # If need a more detailed analysis, refer to: examples/train_and_bakctest.ipynb + ################################### + analysis = dict() + analysis["excess_return_without_cost"] = risk_analysis(report_normal["return"] - report_normal["bench"]) + analysis["excess_return_with_cost"] = risk_analysis( + report_normal["return"] - report_normal["bench"] - report_normal["cost"] + ) + analysis_df = pd.concat(analysis) # type: pd.DataFrame + print(analysis_df) diff --git a/qlib/contrib/model/pytorch_gats.py b/qlib/contrib/model/pytorch_gats.py new file mode 100755 index 000000000..edfb26d72 --- /dev/null +++ b/qlib/contrib/model/pytorch_gats.py @@ -0,0 +1,383 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +from __future__ import division +from __future__ import print_function + +import os +import numpy as np +import pandas as pd +import copy +from sklearn.metrics import roc_auc_score, mean_squared_error +import logging +from ...utils import unpack_archive_with_buffer, save_multiple_parts_file, create_save_path, drop_nan_by_y_index +from ...log import get_module_logger, TimeInspector + +import torch +import torch.nn as nn +import torch.optim as optim + +from ...model.base import Model +from ...data.dataset import DatasetH +from ...data.dataset.handler import DataHandlerLP + + +class GAT(Model): + """GAT Model + + Parameters + ---------- + input_dim : int + input dimension + output_dim : int + output dimension + layers : tuple + layer sizes + lr : float + learning rate + optimizer : str + optimizer name + GPU : str + the GPU ID(s) used for training + """ + + def __init__( + self, + d_feat=6, + hidden_size=64, + num_layers=2, + dropout=0.0, + n_epochs=200, + lr=0.001, + metric="IC", + batch_size=2000, + early_stop=20, + loss="mse", + base_model="GRU", + optimizer="adam", + GPU="0", + seed=0, + **kwargs + ): + # Set logger. + self.logger = get_module_logger("GAT") + self.logger.info("GAT pytorch version...") + + # set hyper-parameters. + self.d_feat = d_feat + self.hidden_size = hidden_size + self.num_layers = num_layers + self.dropout = dropout + self.n_epochs = n_epochs + self.lr = lr + self.metric = metric + self.batch_size = batch_size + self.early_stop = early_stop + self.optimizer = optimizer.lower() + self.loss = loss + self.base_model = base_model + self.visible_GPU = GPU + self.use_gpu = torch.cuda.is_available() + self.seed = seed + + self.logger.info( + "GAT parameters setting:" + "\nd_feat : {}" + "\nhidden_size : {}" + "\nnum_layers : {}" + "\ndropout : {}" + "\nn_epochs : {}" + "\nlr : {}" + "\nmetric : {}" + "\nbatch_size : {}" + "\nearly_stop : {}" + "\noptimizer : {}" + "\nloss_type : {}" + "\nbase_model : {}" + "\nvisible_GPU : {}" + "\nuse_GPU : {}" + "\nseed : {}".format( + d_feat, + hidden_size, + num_layers, + dropout, + n_epochs, + lr, + metric, + batch_size, + early_stop, + optimizer.lower(), + loss, + base_model, + GPU, + self.use_gpu, + seed, + ) + ) + + if loss not in {"mse", "binary"}: + raise NotImplementedError("loss {} is not supported!".format(loss)) + self._scorer = mean_squared_error if loss == "mse" else roc_auc_score + + self.GAT_model = GATModel( + d_feat=self.d_feat, hidden_size=self.hidden_size, num_layers=self.num_layers, dropout=self.dropout, base_model=self.base_model + ) + if optimizer.lower() == "adam": + self.train_optimizer = optim.Adam(self.GAT_model.parameters(), lr=self.lr) + elif optimizer.lower() == "gd": + self.train_optimizer = optim.SGD(self.GAT_model.parameters(), lr=self.lr) + else: + raise NotImplementedError("optimizer {} is not supported!".format(optimizer)) + + self._fitted = False + if self.use_gpu: + self.GAT_model.cuda() + # set the visible GPU + if self.visible_GPU: + os.environ["CUDA_VISIBLE_DEVICES"] = self.visible_GPU + + def mse(self, pred, label): + loss = (pred - label) ** 2 + return torch.mean(loss) + + def loss_fn(self, pred, label): + mask = ~torch.isnan(label) + + if self.loss == "mse": + return self.mse(pred[mask], label[mask]) + + raise ValueError("unknown loss `%s`" % self.loss) + + def metric_fn(self, pred, label): + + mask = torch.isfinite(label) + if self.metric == "IC": + return self.cal_ic(pred[mask], label[mask]) + + if self.metric == "" or self.metric == "loss": # use loss + return -self.loss_fn(pred[mask], label[mask]) + + raise ValueError("unknown metric `%s`" % self.metric) + + def cal_ic(self, pred, label): + return torch.mean(pred * label) + + def train_epoch(self, x_train, y_train): + + x_train_values = x_train.values + y_train_values = np.squeeze(y_train.values) * 100 + + self.GAT_model.train() + + indices = np.arange(len(x_train_values)) + np.random.shuffle(indices) + + for i in range(len(indices))[:: self.batch_size]: + + if len(indices) - i < self.batch_size: + break + + feature = torch.from_numpy(x_train_values[indices[i : i + self.batch_size]]).float() + label = torch.from_numpy(y_train_values[indices[i : i + self.batch_size]]).float() + + if self.use_gpu: + feature = feature.cuda() + label = label.cuda() + + pred = self.GAT_model(feature) + loss = self.loss_fn(pred, label) + + self.train_optimizer.zero_grad() + loss.backward() + torch.nn.utils.clip_grad_value_(self.GAT_model.parameters(), 3.0) + self.train_optimizer.step() + + def test_epoch(self, data_x, data_y): + + # prepare training data + x_values = data_x.values + y_values = np.squeeze(data_y.values) + + self.GAT_model.eval() + + scores = [] + losses = [] + + indices = np.arange(len(x_values)) + np.random.shuffle(indices) + + for i in range(len(indices))[:: self.batch_size]: + + if len(indices) - i < self.batch_size: + break + + feature = torch.from_numpy(x_values[indices[i : i + self.batch_size]]).float() + label = torch.from_numpy(y_values[indices[i : i + self.batch_size]]).float() + + if self.use_gpu: + feature = feature.cuda() + label = label.cuda() + + pred = self.GAT_model(feature) + loss = self.loss_fn(pred, label) + losses.append(loss.item()) + + score = self.metric_fn(pred, label) + scores.append(score.item()) + + return np.mean(losses), np.mean(scores) + + def fit( + self, + dataset: DatasetH, + evals_result=dict(), + verbose=True, + save_path=None, + ): + + df_train, df_valid, df_test = dataset.prepare( + ["train", "valid", "test"], col_set=["feature", "label"], data_key=DataHandlerLP.DK_L + ) + + x_train, y_train = df_train["feature"], df_train["label"] + x_valid, y_valid = df_valid["feature"], df_valid["label"] + + if save_path == None: + save_path = create_save_path(save_path) + stop_steps = 0 + train_loss = 0 + best_score = -np.inf + best_epoch = 0 + evals_result["train"] = [] + evals_result["valid"] = [] + + # train + self.logger.info("training...") + self._fitted = True + # return + + for step in range(self.n_epochs): + self.logger.info("Epoch%d:", step) + self.logger.info("training...") + self.train_epoch(x_train, y_train) + self.logger.info("evaluating...") + train_loss, train_score = self.test_epoch(x_train, y_train) + val_loss, val_score = self.test_epoch(x_valid, y_valid) + self.logger.info("train %.6f, valid %.6f" % (train_score, val_score)) + evals_result["train"].append(train_score) + evals_result["valid"].append(val_score) + + if val_score > best_score: + best_score = val_score + stop_steps = 0 + best_epoch = step + best_param = copy.deepcopy(self.GAT_model.state_dict()) + else: + stop_steps += 1 + if stop_steps >= self.early_stop: + self.logger.info("early stop") + break + + self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch)) + self.GAT_model.load_state_dict(best_param) + torch.save(best_param, save_path) + + if self.use_gpu: + torch.cuda.empty_cache() + + def predict(self, dataset): + if not self._fitted: + raise ValueError("model is not fitted yet!") + + x_test = dataset.prepare("test", col_set="feature") + index = x_test.index + self.GAT_model.eval() + x_values = x_test.values + sample_num = x_values.shape[0] + preds = [] + + for begin in range(sample_num)[:: self.batch_size]: + + if sample_num - begin < self.batch_size: + end = sample_num + else: + end = begin + self.batch_size + + x_batch = torch.from_numpy(x_values[begin:end]).float() + + if self.use_gpu: + x_batch = x_batch.cuda() + + with torch.no_grad(): + if self.use_gpu: + pred = self.GAT_model(x_batch).detach().cpu().numpy() + else: + pred = self.GAT_model(x_batch).detach().numpy() + + preds.append(pred) + + return pd.Series(np.concatenate(preds), index=index) + + +class GATModel(nn.Module): + + def __init__(self, d_feat=6, hidden_size=64, num_layers=2, dropout=0.0, base_model='GRU'): + super().__init__() + + if base_model == 'GRU': + self.rnn = nn.GRU( + input_size=d_feat, + hidden_size=hidden_size, + num_layers=num_layers, + batch_first=True, + dropout=dropout, + ) + elif base_model == 'LSTM': + self.rnn = nn.LSTM( + input_size=d_feat, + hidden_size=hidden_size, + num_layers=num_layers, + batch_first=True, + dropout=dropout, + ) + else: + raise ValueError('unknown base model name `%s`'%base_model) + + self.hidden_size = hidden_size + self.bn1 = nn.BatchNorm1d(num_features=hidden_size, track_running_stats=False) + self.fc = nn.Linear(hidden_size, hidden_size) + self.bn2 = nn.BatchNorm1d(num_features=hidden_size, track_running_stats=False) + self.fc_out = nn.Linear(hidden_size, 1) + self.leaky_relu = nn.LeakyReLU() + self.softmax = nn.Softmax(dim=1) + + self.d_feat = d_feat + + def cal_convariance(self, x, y): # the 2nd dimension of x and y are the same + e_x = torch.mean(x, dim = 1).reshape(-1, 1) + e_y = torch.mean(y, dim = 1).reshape(-1, 1) + e_x_e_y = e_x.mm(torch.t(e_y)) + x_extend = x.reshape(x.shape[0], 1, x.shape[1]).repeat(1, y.shape[0], 1) + y_extend = y.reshape(1, y.shape[0], y.shape[1]).repeat(x.shape[0], 1, 1) + e_xy = torch.mean(x_extend*y_extend, dim = 2) + return e_xy - e_x_e_y + + def forward(self, x): + # x: [N, F*T] + x = x.reshape(len(x), self.d_feat, -1) # [N, F, T] + x = x.permute(0, 2, 1) # [N, T, F] + out, _ = self.rnn(x) + hidden = out[:, -1, :] + hidden = self.bn1(hidden) + + gamma = self.cal_convariance(hidden, hidden) + # gamma = hidden.mm(torch.t(hidden)) + # gamma = self.leaky_relu(gamma) + # gamma = self.softmax(gamma) + # gamma = gamma * (torch.ones(x.shape[0], x.shape[0]).to(device) - torch.diag(torch.ones(x.shape[0])).to(device)) + output = gamma.mm(hidden) + output = self.fc(output) + output = self.bn2(output) + output = self.leaky_relu(output) + return self.fc_out(output).squeeze() \ No newline at end of file diff --git a/qlib/contrib/model/pytorch_lstm.py b/qlib/contrib/model/pytorch_lstm.py new file mode 100755 index 000000000..4eb41c250 --- /dev/null +++ b/qlib/contrib/model/pytorch_lstm.py @@ -0,0 +1,340 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +from __future__ import division +from __future__ import print_function + +import os +import numpy as np +import pandas as pd +import copy +from sklearn.metrics import roc_auc_score, mean_squared_error +import logging +from ...utils import unpack_archive_with_buffer, save_multiple_parts_file, create_save_path, drop_nan_by_y_index +from ...log import get_module_logger, TimeInspector + +import torch +import torch.nn as nn +import torch.optim as optim + +from ...model.base import Model +from ...data.dataset import DatasetH +from ...data.dataset.handler import DataHandlerLP + + +class LSTM(Model): + """LSTM Model + + Parameters + ---------- + input_dim : int + input dimension + output_dim : int + output dimension + layers : tuple + layer sizes + lr : float + learning rate + optimizer : str + optimizer name + GPU : str + the GPU ID(s) used for training + """ + + def __init__( + self, + d_feat=6, + hidden_size=64, + num_layers=2, + dropout=0.0, + n_epochs=200, + lr=0.001, + metric="IC", + batch_size=2000, + early_stop=20, + loss="mse", + optimizer="adam", + GPU="0", + seed=0, + **kwargs + ): + # Set logger. + self.logger = get_module_logger("LSTM") + self.logger.info("LSTM pytorch version...") + + # set hyper-parameters. + self.d_feat = d_feat + self.hidden_size = hidden_size + self.num_layers = num_layers + self.dropout = dropout + self.n_epochs = n_epochs + self.lr = lr + self.metric = metric + self.batch_size = batch_size + self.early_stop = early_stop + self.optimizer = optimizer.lower() + self.loss = loss + self.visible_GPU = GPU + self.use_gpu = torch.cuda.is_available() + self.seed = seed + + self.logger.info( + "LSTM parameters setting:" + "\nd_feat : {}" + "\nhidden_size : {}" + "\nnum_layers : {}" + "\ndropout : {}" + "\nn_epochs : {}" + "\nlr : {}" + "\nmetric : {}" + "\nbatch_size : {}" + "\nearly_stop : {}" + "\noptimizer : {}" + "\nloss_type : {}" + "\nvisible_GPU : {}" + "\nuse_GPU : {}" + "\nseed : {}".format( + d_feat, + hidden_size, + num_layers, + dropout, + n_epochs, + lr, + metric, + batch_size, + early_stop, + optimizer.lower(), + loss, + GPU, + self.use_gpu, + seed, + ) + ) + + if loss not in {"mse", "binary"}: + raise NotImplementedError("loss {} is not supported!".format(loss)) + self._scorer = mean_squared_error if loss == "mse" else roc_auc_score + + self.lstm_model = LSTMModel( + d_feat=self.d_feat, hidden_size=self.hidden_size, num_layers=self.num_layers, dropout=self.dropout + ) + if optimizer.lower() == "adam": + self.train_optimizer = optim.Adam(self.lstm_model.parameters(), lr=self.lr) + elif optimizer.lower() == "gd": + self.train_optimizer = optim.SGD(self.lstm_model.parameters(), lr=self.lr) + else: + raise NotImplementedError("optimizer {} is not supported!".format(optimizer)) + + self._fitted = False + if self.use_gpu: + self.lstm_model.cuda() + # set the visible GPU + if self.visible_GPU: + os.environ["CUDA_VISIBLE_DEVICES"] = self.visible_GPU + + def mse(self, pred, label): + loss = (pred - label) ** 2 + return torch.mean(loss) + + def loss_fn(self, pred, label): + mask = ~torch.isnan(label) + + if self.loss == "mse": + return self.mse(pred[mask], label[mask]) + + raise ValueError("unknown loss `%s`" % self.loss) + + def metric_fn(self, pred, label): + + mask = torch.isfinite(label) + if self.metric == "IC": + return self.cal_ic(pred[mask], label[mask]) + + if self.metric == "" or self.metric == "loss": # use loss + return -self.loss_fn(pred[mask], label[mask]) + + raise ValueError("unknown metric `%s`" % self.metric) + + def cal_ic(self, pred, label): + return torch.mean(pred * label) + + def train_epoch(self, x_train, y_train): + + x_train_values = x_train.values + y_train_values = np.squeeze(y_train.values) * 100 + + self.lstm_model.train() + + indices = np.arange(len(x_train_values)) + np.random.shuffle(indices) + + for i in range(len(indices))[:: self.batch_size]: + + if len(indices) - i < self.batch_size: + break + + feature = torch.from_numpy(x_train_values[indices[i : i + self.batch_size]]).float() + label = torch.from_numpy(y_train_values[indices[i : i + self.batch_size]]).float() + + if self.use_gpu: + feature = feature.cuda() + label = label.cuda() + + pred = self.lstm_model(feature) + loss = self.loss_fn(pred, label) + + self.train_optimizer.zero_grad() + loss.backward() + torch.nn.utils.clip_grad_value_(self.lstm_model.parameters(), 3.0) + self.train_optimizer.step() + + def test_epoch(self, data_x, data_y): + + # prepare training data + x_values = data_x.values + y_values = np.squeeze(data_y.values) + + self.lstm_model.eval() + + scores = [] + losses = [] + + indices = np.arange(len(x_values)) + np.random.shuffle(indices) + + for i in range(len(indices))[:: self.batch_size]: + + if len(indices) - i < self.batch_size: + break + + feature = torch.from_numpy(x_values[indices[i : i + self.batch_size]]).float() + label = torch.from_numpy(y_values[indices[i : i + self.batch_size]]).float() + + if self.use_gpu: + feature = feature.cuda() + label = label.cuda() + + pred = self.lstm_model(feature) + loss = self.loss_fn(pred, label) + losses.append(loss.item()) + + score = self.metric_fn(pred, label) + scores.append(score.item()) + + return np.mean(losses), np.mean(scores) + + def fit( + self, + dataset: DatasetH, + evals_result=dict(), + verbose=True, + save_path=None, + ): + + df_train, df_valid, df_test = dataset.prepare( + ["train", "valid", "test"], col_set=["feature", "label"], data_key=DataHandlerLP.DK_L + ) + + x_train, y_train = df_train["feature"], df_train["label"] + x_valid, y_valid = df_valid["feature"], df_valid["label"] + + if save_path == None: + save_path = create_save_path(save_path) + stop_steps = 0 + train_loss = 0 + best_score = -np.inf + best_epoch = 0 + evals_result["train"] = [] + evals_result["valid"] = [] + + # train + self.logger.info("training...") + self._fitted = True + # return + + for step in range(self.n_epochs): + self.logger.info("Epoch%d:", step) + self.logger.info("training...") + self.train_epoch(x_train, y_train) + self.logger.info("evaluating...") + train_loss, train_score = self.test_epoch(x_train, y_train) + val_loss, val_score = self.test_epoch(x_valid, y_valid) + self.logger.info("train %.6f, valid %.6f" % (train_score, val_score)) + evals_result["train"].append(train_score) + evals_result["valid"].append(val_score) + + if val_score > best_score: + best_score = val_score + stop_steps = 0 + best_epoch = step + best_param = copy.deepcopy(self.lstm_model.state_dict()) + else: + stop_steps += 1 + if stop_steps >= self.early_stop: + self.logger.info("early stop") + break + + self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch)) + self.lstm_model.load_state_dict(best_param) + torch.save(best_param, save_path) + + if self.use_gpu: + torch.cuda.empty_cache() + + def predict(self, dataset): + if not self._fitted: + raise ValueError("model is not fitted yet!") + + x_test = dataset.prepare("test", col_set="feature") + index = x_test.index + self.lstm_model.eval() + x_values = x_test.values + sample_num = x_values.shape[0] + preds = [] + + for begin in range(sample_num)[:: self.batch_size]: + + if sample_num - begin < self.batch_size: + end = sample_num + else: + end = begin + self.batch_size + + x_batch = torch.from_numpy(x_values[begin:end]).float() + + if self.use_gpu: + x_batch = x_batch.cuda() + + with torch.no_grad(): + if self.use_gpu: + pred = self.lstm_model(x_batch).detach().cpu().numpy() + else: + pred = self.lstm_model(x_batch).detach().numpy() + + preds.append(pred) + + return pd.Series(np.concatenate(preds), index=index) + + +class LSTMModel(nn.Module): + + def __init__(self, d_feat=6, hidden_size=64, num_layers=2, dropout=0.0): + super().__init__() + + self.rnn = nn.LSTM( + input_size=d_feat, + hidden_size=hidden_size, + num_layers=num_layers, + batch_first=True, + dropout=dropout, + ) + self.fc_out = nn.Linear(hidden_size, 1) + + self.d_feat = d_feat + + def forward(self, x): + # x: [N, F*T] + x = x.reshape(len(x), self.d_feat, -1) # [N, F, T] + x = x.permute(0, 2, 1) # [N, T, F] + out, _ = self.rnn(x) + return self.fc_out(out[:, -1, :]).squeeze() \ No newline at end of file From dfc93510963fd5b7bc6e7ef8860fc081c6c586e7 Mon Sep 17 00:00:00 2001 From: Dong Zhou Date: Thu, 19 Nov 2020 09:09:56 +0800 Subject: [PATCH 4/4] fix mlflow uri --- qlib/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qlib/config.py b/qlib/config.py index 002134f9d..90369c79f 100644 --- a/qlib/config.py +++ b/qlib/config.py @@ -130,7 +130,7 @@ _default_config = { "class": "MLflowExpManager", "module_path": "qlib.workflow.expm", "kwargs": { - "uri": str(Path(os.getcwd()).resolve() / "mlruns"), + "uri": 'file:' + str(Path(os.getcwd()).resolve() / "mlruns"), "default_exp_name": "Experiment", }, },