From 58bd2339c050152503be1fc21b3bcb5954fcb0ee Mon Sep 17 00:00:00 2001 From: Jactus Date: Wed, 18 Nov 2020 17:55:45 +0800 Subject: [PATCH] Update expm and exp --- qlib/utils/__init__.py | 25 +++++ qlib/workflow/__init__.py | 1 - qlib/workflow/exp.py | 140 ++++++++++++---------------- qlib/workflow/expm.py | 171 +++++++++++++++++------------------ qlib/workflow/record_temp.py | 33 +------ qlib/workflow/recorder.py | 8 +- qlib/workflow/utils.py | 5 +- 7 files changed, 176 insertions(+), 207 deletions(-) diff --git a/qlib/utils/__init__.py b/qlib/utils/__init__.py index 30483af2e..575ed24aa 100644 --- a/qlib/utils/__init__.py +++ b/qlib/utils/__init__.py @@ -20,6 +20,7 @@ import requests import tempfile import importlib import contextlib +import collections import numpy as np import pandas as pd from pathlib import Path @@ -641,6 +642,30 @@ def lexsort_index(df: pd.DataFrame) -> pd.DataFrame: return df.sort_index() +def flatten_dict(d, parent_key="", sep="."): + """flatten_dict. + >>> flatten_dict({'a': 1, 'c': {'a': 2, 'b': {'x': 5, 'y' : 10}}, 'd': [1, 2, 3]}) + >>> {'a': 1, 'c.a': 2, 'c.b.x': 5, 'd': [1, 2, 3], 'c.b.y': 10} + + Parameters + ---------- + d : + d + parent_key : + parent_key + sep : + sep + """ + items = [] + for k, v in d.items(): + new_key = parent_key + sep + k if parent_key else k + if isinstance(v, collections.MutableMapping): + items.extend(flatten_dict(v, new_key, sep=sep).items()) + else: + items.append((new_key, v)) + return dict(items) + + #################### Wrapper ##################### class Wrapper(object): """Wrapper class for anything that needs to set up during qlib.init""" diff --git a/qlib/workflow/__init__.py b/qlib/workflow/__init__.py index 6a8b857fc..9da65480f 100644 --- a/qlib/workflow/__init__.py +++ b/qlib/workflow/__init__.py @@ -323,7 +323,6 @@ class QlibRecorder: experiment_name : str name of the experiment. - Returns ------- A recorder instance. diff --git a/qlib/workflow/exp.py b/qlib/workflow/exp.py index b7ef160df..5a74ab28a 100644 --- a/qlib/workflow/exp.py +++ b/qlib/workflow/exp.py @@ -165,6 +165,7 @@ class MLflowExperiment(Experiment): super(MLflowExperiment, self).__init__(id, name) self._uri = uri self._default_name = None + self._default_rec_name = "mlflow_recorder" self.client = mlflow.tracking.MlflowClient(tracking_uri=self._uri) def start(self, recorder_name=None): @@ -175,7 +176,7 @@ class MLflowExperiment(Experiment): recorder = self.create_recorder(recorder_name) self.active_recorder = recorder # start the recorder - run = self.active_recorder.start_run() + self.active_recorder.start_run() return self.active_recorder @@ -186,13 +187,66 @@ class MLflowExperiment(Experiment): def create_recorder(self, recorder_name=None): if recorder_name is None: - recorders = self.list_recorders() - num = len(recorders) - recorder_name = "Recorder_{}".format(num + 1) - recorder = MLflowRecorder(recorder_name, self.id, self._uri) + recorder_name = self._default_rec_name + recorder = MLflowRecorder(self.id, self._uri, recorder_name) return recorder + def get_recorder(self, recorder_id=None, recorder_name=None, create=True): + # special case of getting the recorder + if recorder_id is None and recorder_name is None: + if self.active_recorder is not None: + return self.active_recorder + recorder_name = self._default_rec_name + if create: + recorder, is_new = self._get_or_create_rec(recorder_id=recorder_id, recorder_name=recorder_name) + else: + recorder, is_new = self._get_recorder(recorder_id=recorder_id, recorder_name=recorder_name), False + if is_new: + mlflow.set_experiment(self.name) + self.active_recorder = recorder + # start the recorder + self.active_recorder.start_run() + return recorder + + def _get_or_create_rec(self, recorder_id=None, recorder_name=None) -> (object, bool): + """ + Method for getting or creating a recorder. It will try to first get a valid recorder, if exception occurs, it will + automatically create a new recorder based on the given id and name. + """ + try: + return self._get_recorder(recorder_id=recorder_id, recorder_name=recorder_name), False + except ValueError: + if recorder_name is None: + recorder_name = self._default_rec_name + logger.info(f"No valid recorder found. Create a new recorder with name {recorder_name}.") + return self.create(recorder_name), True + + def _get_recorder(self, recorder_id=None, recorder_name=None): + """ + Method for getting or creating a recorder. It will try to first get a valid recorder, if exception occurs, it will + raise errors. + """ + assert ( + recorder_id is not None or recorder_name is not None + ), "Please input at least one of recorder id or name before retrieving recorder." + if recorder_id is not None: + try: + run = self.client.get_run(recorder_id) + recorder = MLflowRecorder(self.id, self._uri, mlflow_run=run) + return recorder + except MlflowException as e: + raise ValueError("No valid recorder has been found, please make sure the input recorder id is correct.") + elif recorder_name is not None: + logger.warning( + f"Please make sure the recorder name {recorder_name} is unique, we will only return the first recorder if there exist several matched the given name." + ) + recorders = self.list_recorders() + for rid in recorders: + if recorders[rid].name == recorder_name: + return recorders[rid] + raise ValueError("No valid recorder has been found, please make sure the input recorder name is correct.") + def search_records(self, **kwargs): filter_string = "" if kwargs.get("filter_string") is None else kwargs.get("filter_string") run_view_type = 1 if kwargs.get("run_view_type") is None else kwargs.get("run_view_type") @@ -209,7 +263,6 @@ class MLflowExperiment(Experiment): if recorder_id is not None: self.client.delete_run(recorder_id) else: - recorders = self.list_recorders() recorder = self._get_recorder_by_name(recorder_name) self.client.delete_run(recorder.id) except MlflowException as e: @@ -217,84 +270,11 @@ class MLflowExperiment(Experiment): f"Error: {e}. Something went wrong when deleting recorder. Please check if the name/id of the recorder is correct." ) - def _get_recorder_by_id(self, recorder_id=None, create=False): - """ - Get a recorder by its id. If the `create` is set to True, this method will also start to run the recorder. - - Parameters - ---------- - recorder_id : str - the id of the recorder to be returned. - create : boolean - create the recorder if it hasn't been created before. - - Returns - ------- - The specific recorder with given id. - """ - recorders = self.list_recorders() - if recorder_id in recorders: - return recorders[recorder_id] - else: - if create: - logger.warning(f"No valid recorder found. Create a new recorder with name {recorder_name}.") - self.start(recorder_name) - return self.active_recorder - else: - raise Exception( - "Something went wrong when retrieving recorders. Please check if id of the recorder is correct." - ) - - def _get_recorder_by_name(self, recorder_name=None, create=False): - """ - Get a recorder by its name. If the `create` is set to True, this method will also start to run the recorder. - - Parameters - ---------- - recorder_name : str - the name of the recorder to be returned. - create : boolean - create the recorder if it hasn't been created before. - - Returns - ------- - The specific recorder with given name. - """ - recorders = self.list_recorders() - for rid in recorders: - if recorders[rid].name == recorder_name: - return recorders[rid] - if create: - logger.warning(f"No valid recorder found. Create a new recorder with name {recorder_name}.") - self.start(recorder_name) - return self.active_recorder - else: - raise Exception( - "Something went wrong when retrieving recorders. Please check if the name of the experiment is correct." - ) - - def get_recorder(self, recorder_id=None, recorder_name=None, create=True): - """ - MLflow doesn't support create recorder with a specific id. Thus, when user only provides recorder id and `create` - is set to True, this method will not automatically create an active recorder. - """ - # retrive all the recorders under this experiment - if recorder_id is None and recorder_name is None: - if self.active_recorder: - return self.active_recorder - else: - return self._get_recorder_by_name(create=create) - else: - if recorder_id is not None: - return self._get_recorder_by_id(recorder_id, create=create) - else: - return self._get_recorder_by_name(recorder_name, create=create) - def list_recorders(self): runs = self.client.search_runs(self.id, run_view_type=1)[::-1] recorders = dict() for i in range(len(runs)): - recorder = MLflowRecorder(f"Recorder_{i+1}", self.id, self._uri, runs[i]) + recorder = MLflowRecorder(self.id, self._uri, mlflow_run=runs[i]) recorders[runs[i].info.run_id] = recorder return recorders diff --git a/qlib/workflow/expm.py b/qlib/workflow/expm.py index 00637e29d..8fb7962e9 100644 --- a/qlib/workflow/expm.py +++ b/qlib/workflow/expm.py @@ -57,6 +57,21 @@ class ExpManager: """ raise NotImplementedError(f"Please implement the `end_exp` method.") + def create_exp(self, experiment_name=None): + """ + Create an experiment. + + Parameters + ---------- + experiment_name : str + the experiment name, which must be unique. + + Returns + ------- + An experiment object. + """ + raise NotImplementedError(f"Please implement the `create_exp` method.") + def search_records(self, experiment_ids=None, **kwargs): """ Get a pandas DataFrame of records that fit the search criteria of the experiment. @@ -71,7 +86,7 @@ class ExpManager: """ raise NotImplementedError(f"Please implement the `search_records` method.") - def get_exp(self, experiment_id=None, experiment_name=None, create: bool = True, run: bool = False): + def get_exp(self, experiment_id=None, experiment_name=None, create: bool = True): """ Retrieve an experiment. This method includes getting an active experiment, and get_or_create a specific experiment. The returned experiment will be running. @@ -108,8 +123,6 @@ class ExpManager: name of the experiment to return. create : boolean create the experiment it if hasn't been created before. - run : boolean - run the experiment when it is created for the first time. Returns ------- @@ -162,7 +175,7 @@ class MLflowExpManager(ExpManager): def start_exp(self, experiment_name=None, recorder_name=None, uri=None): # create experiment - experiment = self.get_exp(experiment_name=experiment_name, run=False) + experiment, _ = self._get_or_create_exp(experiment_name=experiment_name) # set up active experiment self.active_experiment = experiment # start the experiment @@ -183,94 +196,72 @@ class MLflowExpManager(ExpManager): self.active_experiment.end(recorder_status) self.active_experiment = None - def __get_exp_by_id(self, experiment_id=None, create=False, run=False): - """ - Method for retrieving an experiment by its id. If the `create` is set to True, this method will also start to run the experiment. + def create_exp(self, experiment_name=None): + # init experiment + experiment_id = self.client.create_experiment(experiment_name) + experiment = MLflowExperiment(experiment_id, experiment_name, self.uri) + experiment._default_name = self.default_exp_name - Parameters - ---------- - experiment_id : str - the id of the experiment to be returned. - create : boolean - create the experiment if it hasn't been created before. + return experiment - Returns - ------- - The specific experiment with given id. - """ - # retrive all created experiments - experiments = self.list_experiments() - for name in experiments: - if experiments[name].id == experiment_id: - return experiments[name] - if create: - logger.warning(f"No valid experiment found. Use the Default experiment for further process.") - return self.__get_exp_by_name(create=create, run=True) - else: - raise Exception( - "Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct." - ) - - def __get_exp_by_name(self, experiment_name=None, create=False, run=False): - """ - Method for retrieving an experiment by its name. If the `create` is set to True, this method will also start to run the experiment. - - Parameters - ---------- - experiment_name : str - the name of the experiment to be returned. - create : boolean - create the experiment if it hasn't been created before. - - Returns - ------- - The specific experiment with given name. - """ - # retrive all created experiments - experiments = self.list_experiments() - if experiment_name in experiments: - return experiments[experiment_name] - if create: - if experiment_name is None: - logger.info( - f"No experiment name provided. Create experiment with name {self.default_exp_name} for further process." - ) - experiment_name = self.default_exp_name - if self.client.get_experiment_by_name(experiment_name) is not None: - logger.info( - "The experiment has already been created before and deleted. Try to restore the experiment with a new recorder..." - ) - experiment_id = self.client.get_experiment_by_name(experiment_name).experiment_id - self.client.restore_experiment(experiment_id) - else: - experiment_id = self.client.create_experiment(experiment_name) - - # init experiment - experiment = MLflowExperiment(experiment_id, experiment_name, self.uri) - experiment._default_name = self.default_exp_name - if run: - self.active_experiment = experiment - self.active_experiment.start() - - return experiment - else: - if experiment_name is None and self.default_exp_name in experiments: - return experiments[self.default_exp_name] - raise Exception( - "Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct." - ) - - def get_exp(self, experiment_id=None, experiment_name=None, create=True, run=True): + def get_exp(self, experiment_id=None, experiment_name=None, create=True): + # special case of getting experiment if experiment_id is None and experiment_name is None: - if self.active_experiment: + if self.active_experiment is not None: return self.active_experiment - else: - return self.__get_exp_by_name(create=create, run=run) + if create: + exp, is_new = self._get_or_create_exp(experiment_id=experiment_id, experiment_name=experiment_name) else: - if experiment_name is not None: - return self.__get_exp_by_name(experiment_name, create=create, run=run) - else: - return self.__get_exp_by_id(experiment_id, create=create, run=run) + exp, is_new = self._get_exp(experiment_id=experiment_id, experiment_name=experiment_name), False + if is_new: + self.active_experiment = exp + # start the recorder + self.active_experiment.start() + return exp + + def _get_or_create_exp(self, experiment_id=None, experiment_name=None) -> (object, bool): + """ + Method for getting or creating an experiment. It will try to first get a valid experiment, if exception occurs, it will + automatically create a new experiment based on the given id and name. + """ + try: + return self._get_exp(experiment_id=experiment_id, experiment_name=experiment_name), False + except ValueError: + if experiment_name is None: + experiment = self.default_exp_name + logger.info(f"No valid experiment found. Create a new experiment with name {experiment_name}.") + return self.create_exp(experiment_name), True + + def _get_exp(self, experiment_id=None, experiment_name=None): + """ + Method for getting or creating an experiment. It will try to first get a valid experiment, if exception occurs, it will + raise errors. + """ + assert ( + experiment_id is not None or experiment_name is not None + ), "Please input at least one of experiment/recorder id or name before retrieving experiment/recorder." + if experiment_id is not None: + try: + exp = self.client.get_experiment(experiment_id) + if exp.lifecycle_stage.upper() == "DELETED": + raise MlflowException("No valid experiment has been found.") + experiment = MLflowExperiment(exp.experiment_id, exp.name, self.uri) + return experiment + except MlflowException as e: + raise ValueError( + "No valid experiment has been found, please make sure the input experiment id is correct." + ) + elif experiment_name is not None: + try: + exp = self.client.get_experiment_by_name(experiment_name) + if exp is None or exp.lifecycle_stage.upper() == "DELETED": + raise MlflowException("No valid experiment has been found.") + experiment = MLflowExperiment(exp.experiment_id, experiment_name, self.uri) + return experiment + except MlflowException as e: + raise ValueError( + "No valid experiment has been found, please make sure the input experiment name is correct." + ) def search_records(self, experiment_ids, **kwargs): filter_string = "" if kwargs.get("filter_string") is None else kwargs.get("filter_string") @@ -288,6 +279,8 @@ class MLflowExpManager(ExpManager): self.client.delete_experiment(experiment_id) else: experiment = self.client.get_experiment_by_name(experiment_name) + if experiment is None: + raise MlflowException("No valid experiment has been found.") self.client.delete_experiment(experiment.experiment_id) except MlflowException as e: raise Exception( @@ -299,9 +292,7 @@ class MLflowExpManager(ExpManager): exps = self.client.list_experiments(view_type=1) experiments = dict() for exp in exps: - eid = exp.experiment_id - ename = exp.name - experiment = MLflowExperiment(eid, ename, self.uri) + experiment = MLflowExperiment(exp.experiment_id, exp.name, self.uri) experiments[ename] = experiment return experiments diff --git a/qlib/workflow/record_temp.py b/qlib/workflow/record_temp.py index e3e19bd10..d6b4d608e 100644 --- a/qlib/workflow/record_temp.py +++ b/qlib/workflow/record_temp.py @@ -10,6 +10,7 @@ from ..contrib.evaluate import ( ) from ..utils import init_instance_by_config, get_module_by_module_path from ..log import get_module_logger +from ..utils import flatten_dict logger = get_module_logger("workflow", "INFO") @@ -149,37 +150,11 @@ class PortAnaRecord(SignalRecord): analysis["excess_return_with_cost"] = risk_analysis( report_normal["return"] - report_normal["bench"] - report_normal["cost"] ) - # log metrics - self.recorder.log_metrics( - excess_return_without_cost_mean=analysis["excess_return_without_cost"]["risk"]["mean"] - ) - self.recorder.log_metrics(excess_return_without_cost_std=analysis["excess_return_without_cost"]["risk"]["std"]) - self.recorder.log_metrics( - excess_return_without_cost_annualized_return=analysis["excess_return_without_cost"]["risk"][ - "annualized_return" - ] - ) - self.recorder.log_metrics( - excess_return_without_cost_information_ratio=analysis["excess_return_without_cost"]["risk"][ - "information_ratio" - ] - ) - self.recorder.log_metrics( - excess_return_without_cost_max_drawdown=analysis["excess_return_without_cost"]["risk"]["max_drawdown"] - ) - self.recorder.log_metrics(excess_return_with_cost_mean=analysis["excess_return_with_cost"]["risk"]["mean"]) - self.recorder.log_metrics(excess_return_with_cost_std=analysis["excess_return_with_cost"]["risk"]["std"]) - self.recorder.log_metrics( - excess_return_with_cost_annualized_return=analysis["excess_return_with_cost"]["risk"]["annualized_return"] - ) - self.recorder.log_metrics( - excess_return_with_cost_information_ratio=analysis["excess_return_with_cost"]["risk"]["information_ratio"] - ) - self.recorder.log_metrics( - excess_return_with_cost_max_drawdown=analysis["excess_return_with_cost"]["risk"]["max_drawdown"] - ) # save portfolio analysis results analysis_df = pd.concat(analysis) # type: pd.DataFrame + # log metrics + self.recorder.log_metrics(**flatten_dict(analysis_df["risk"].unstack().T.to_dict())) + # save results self.recorder.save_objects(**{"port_analysis.pkl": analysis_df}, artifact_path=self.artifact_path) logger.info( f"Portfolio analysis record 'port_analysis.pkl' has been saved as the artifact of the Experiment {self.recorder.experiment_id}" diff --git a/qlib/workflow/recorder.py b/qlib/workflow/recorder.py index a2cddadcb..71f13381f 100644 --- a/qlib/workflow/recorder.py +++ b/qlib/workflow/recorder.py @@ -25,7 +25,7 @@ class Recorder: STATUS_FI = "FINISHED" STATUS_FA = "FAILED" - def __init__(self, name, experiment_id): + def __init__(self, experiment_id, name): self.id = None self.name = name self.experiment_id = experiment_id @@ -168,8 +168,8 @@ class MLflowRecorder(Recorder): use file manager to help maintain the objects in the project. """ - def __init__(self, name, experiment_id, uri, mlflow_run=None): - super(MLflowRecorder, self).__init__(name, experiment_id) + def __init__(self, experiment_id, uri, name=None, mlflow_run=None): + super(MLflowRecorder, self).__init__(experiment_id, name) self._uri = uri self.artifact_uri = None # set up file manager for saving objects @@ -179,7 +179,7 @@ class MLflowRecorder(Recorder): # construct from mlflow run if mlflow_run is not None: assert isinstance(mlflow_run, mlflow.entities.run.Run), "Please input with a MLflow Run object." - self.name = mlflow_run.data.tags["mlflow.runName"] if mlflow_run.data.tags["mlflow.runName"] != "" else name + self.name = mlflow_run.data.tags["mlflow.runName"] self.id = mlflow_run.info.run_id self.status = mlflow_run.info.status self.start_time = ( diff --git a/qlib/workflow/utils.py b/qlib/workflow/utils.py index d4594d28e..b57879d0e 100644 --- a/qlib/workflow/utils.py +++ b/qlib/workflow/utils.py @@ -31,10 +31,9 @@ def experiment_exception_hook(type, value, tb): value: Exception's value tb: Exception's traceback """ - error_msg = "An exception has been raised.\n" f"Type: {type}\n" - logger.error(error_msg) + logger.error("An exception has been raised.") traceback.print_tb(tb) - logger.error(f"Value: {value}") + print(f"{type}: {value}") R.end_exp(recorder_status=Recorder.STATUS_FA)