diff --git a/examples/workflow_by_code_gru.py b/examples/workflow_by_code_gru.py index 52d3c451a..06fea1511 100755 --- a/examples/workflow_by_code_gru.py +++ b/examples/workflow_by_code_gru.py @@ -87,14 +87,8 @@ if __name__ == "__main__": }, "segments": { "train": ("2008-01-01", "2014-12-31"), - "valid": ( - "2015-01-01", - "2016-12-31", - ), - "test": ( - "2017-01-01", - "2020-08-01", - ), + "valid": ("2015-01-01", "2016-12-31"), + "test": ("2017-01-01", "2020-08-01"), }, }, } diff --git a/examples/workflow_by_code_xgboost.py b/examples/workflow_by_code_xgboost.py index 8883bacee..94b43f449 100755 --- a/examples/workflow_by_code_xgboost.py +++ b/examples/workflow_by_code_xgboost.py @@ -85,14 +85,8 @@ if __name__ == "__main__": }, "segments": { "train": ("2008-01-01", "2014-12-31"), - "valid": ( - "2015-01-01", - "2016-12-31", - ), - "test": ( - "2017-01-01", - "2020-08-01", - ), + "valid": ("2015-01-01", "2016-12-31"), + "test": ("2017-01-01", "2020-08-01"), }, }, } diff --git a/qlib/__init__.py b/qlib/__init__.py index b26ac986d..7fe7e79dc 100644 --- a/qlib/__init__.py +++ b/qlib/__init__.py @@ -11,6 +11,7 @@ import re import subprocess import platform import yaml +import atexit from pathlib import Path from .utils import can_use_cache, init_instance_by_config, get_module_by_module_path @@ -63,12 +64,10 @@ def init(default_conf="client", **kwargs): if not os.path.exists(C["provider_uri"]): if C["auto_mount"]: LOG.error( - "Invalid provider uri: {}, please check if a valid provider uri has been set. This path does not exist.".format( - C["provider_uri"] - ) + f"Invalid provider uri: {C['provider_uri']}, please check if a valid provider uri has been set. This path does not exist." ) else: - LOG.warning("auto_path is False, please make sure {} is mounted".format(C["mount_path"])) + LOG.warning(f"auto_path is False, please make sure {C['mount_path']} is mounted") elif C.get_uri_type() == QlibConfig.NFS_URI: _mount_nfs_uri(C) else: @@ -83,10 +82,11 @@ def init(default_conf="client", **kwargs): LOG.info(f"flask_server={C['flask_server']}, flask_port={C['flask_port']}") # set up QlibRecorder - module = get_module_by_module_path("qlib.workflow.expm") - exp_manager = init_instance_by_config(C["exp_manager"], module) + exp_manager = init_instance_by_config(C["exp_manager"]) qr = QlibRecorder(exp_manager) R.register(qr) + # clean up experiment when python program ends + atexit.register(R.end_exp, status="FAILED") # will not take effect if experiment ends def _mount_nfs_uri(C): @@ -102,9 +102,7 @@ def _mount_nfs_uri(C): if not C["auto_mount"]: if not os.path.exists(C["mount_path"]): raise FileNotFoundError( - "Invalid mount path: {}! Please mount manually: {} or Set init parameter `auto_mount=True`".format( - C["mount_path"], mount_command - ) + f"Invalid mount path: {C['mount_path']}! Please mount manually: {mount_command} or Set init parameter `auto_mount=True`" ) else: # Judging system type @@ -161,9 +159,7 @@ def _mount_nfs_uri(C): os.makedirs(C["mount_path"], exist_ok=True) except Exception: raise OSError( - "Failed to create directory {}, please create {} manually!".format( - C["mount_path"], C["mount_path"] - ) + f"Failed to create directory {C['mount_path']}, please create {C['mount_path']} manually!" ) # check nfs-common @@ -175,17 +171,15 @@ def _mount_nfs_uri(C): command_status = os.system(mount_command) if command_status == 256: raise OSError( - "mount {} on {} error! Needs SUDO! Please mount manually: {}".format( - C["provider_uri"], C["mount_path"], mount_command - ) + f"mount {C['provider_uri']} on {C['mount_path']} error! Needs SUDO! Please mount manually: {mount_command}" ) elif command_status == 32512: # LOG.error("Command error") - raise OSError("mount {} on {} error! Command error".format(C["provider_uri"], C["mount_path"])) + raise OSError(f"mount {C['provider_uri']} on {C['mount_path']} error! Command error") elif command_status == 0: LOG.info("Mount finished") else: - LOG.warning("{} on {} is already mounted".format(_remote_uri, _mount_path)) + LOG.warning(f"{_remote_uri} on {_mount_path} is already mounted") def init_from_yaml_conf(conf_path): diff --git a/qlib/config.py b/qlib/config.py index 31acfc535..6c744a9f0 100644 --- a/qlib/config.py +++ b/qlib/config.py @@ -126,8 +126,14 @@ _default_config = { "loggers": {"qlib": {"level": "DEBUG", "handlers": ["console"]}}, }, # Defatult config for experiment manager - "exp_manager": {"class": "MLflowExpManager", "kwargs": {}}, - "exp_uri": str(Path(os.getcwd()).resolve() / "mlruns"), + "exp_manager": { + "class": "MLflowExpManager", + "module_path": "qlib.workflow.expm", + "kwargs": { + "uri": str(Path(os.getcwd()).resolve() / "mlruns"), + "default_exp_name": "Experiment", + }, + }, } MODE_CONF = { diff --git a/qlib/contrib/model/pytorch_gru.py b/qlib/contrib/model/pytorch_gru.py index fb422f491..464cd9ba0 100755 --- a/qlib/contrib/model/pytorch_gru.py +++ b/qlib/contrib/model/pytorch_gru.py @@ -294,7 +294,6 @@ class GRU(Model): return pd.Series(preds, index=index) - class AverageMeter(object): """Computes and stores the average and current value""" diff --git a/qlib/workflow/__init__.py b/qlib/workflow/__init__.py index b801da880..978e45c27 100644 --- a/qlib/workflow/__init__.py +++ b/qlib/workflow/__init__.py @@ -4,20 +4,51 @@ from contextlib import contextmanager from .expm import MLflowExpManager from ..utils import Wrapper -from ..config import C class QlibRecorder: """ A global system that helps to manage the experiments. + + The components of the system: + 1) ExperimentManager: a class managing experiments. + 2) Experiment: a class of experiment, and each instance of it is responsible for a single experiment. + 3) Recorder: a class of recorder, and each instance of it is responsible for a single run. + + The general structure of the system: + ExperimentManager + - Experiment 1 + - Recorder 1 + - Recorder 2 + - ... + - Experiment 2 + - ... + - ... + """ def __init__(self, exp_manager): self.exp_manager = exp_manager - self.uri = C["exp_uri"] @contextmanager def start(self, experiment_name): + """ + Method to start an experiment. This method can only be called within a Python's `with` statement. + + Use case: + --------- + ``` + with R.start('test'): + model.fit(dataset) + R.log... + ... # further operations + ``` + + Parameters + ---------- + experiment_name : str + name of the experiment one wants to start. + """ run = self.start_exp(experiment_name) try: yield run @@ -26,44 +57,425 @@ class QlibRecorder: raise e self.end_exp("FINISHED") - def start_exp(self, experiment_name=None): - return self.exp_manager.start_exp(experiment_name, self.uri) + def start_exp(self, experiment_name=None, uri=None): + """ + Lower leverl method for starting an experiment. When use this method, one should end the experiment manually + and the status of the recorder may not be handled properly. + + Use case: + --------- + ``` + R.start_exp(experiment_name='test') + ... # further operations + R.end_exp('FINISHED') + ``` + + Parameters + ---------- + experiment_name : str + the name of the experiment to be started + uri : str + the tracking uri of the experiment, where all the artifacts/metrics etc. will be stored. + + Returns + ------- + An experiment instance being started. + """ + return self.exp_manager.start_exp(experiment_name, uri) def end_exp(self, status): + """ + Method for ending an experiment manually. It will end the current active experiment, as well as its + active recorder with the specified `status` type. + + Use case: + --------- + ``` + R.start_exp(experiment_name='test') + ... # further operations + R.end_exp('FINISHED') + ``` + + Parameters + ---------- + status : str + The status of a recorder, which can be SCHEDULED, RUNNING, FINISHED, FAILED. + """ self.exp_manager.end_exp(status) def search_records(self, experiment_ids, **kwargs): + """ + Get a pandas DataFrame of records that fit the search criteria. + + Use case: + --------- + ``` + R.log_metrics(m=2.50, step=0) + records = R.search_runs([experiment_id], order_by=["metrics.m DESC"]) + ``` + + Parameters + ---------- + experiment_ids : list + list of experiment IDs. + filter_string : str + filter query string, defaults to searching all runs. + run_view_type : int + one of enum values ACTIVE_ONLY, DELETED_ONLY, or ALL (e.g. in mlflow.entities.ViewType). + max_results : int + the maximum number of runs to put in the dataframe. + order_by : list + list of columns to order by (e.g., “metrics.rmse”). + + Returns + ------- + A pandas.DataFrame of records, where each metric, parameter, and tag + are expanded into their own columns named metrics.*, params.*, and tags.* + respectively. For records that don't have a particular metric, parameter, or tag, their + value will be (NumPy) Nan, None, or None respectively. + """ return self.exp_manager.search_records(experiment_ids, **kwargs) - def get_exp(self, experiment_id=None, experiment_name=None): - return self.exp_manager.get_exp(experiment_id, experiment_name) + def list_experiments(self): + """ + Method for listing all the existing experiments (except for those being deleted.) - def delete_exp(self, experiment_id): - self.exp_manager.delete_exp(experiment_id) + Use case: + --------- + ``` + exps = R.list_experiments() + ``` + + Returns + ------- + A dictionary (name -> experiment) of experiments information that being stored. + """ + return self.exp_manager.list_experiments() + + def list_recorders(self, experiment_id=None, experiment_name=None): + """ + Method for listing all the recorders of experiment with given id or name. + + Use case: + --------- + ``` + recorders = R.list_recorders(experiment_name='test') + ``` + + Parameters + ---------- + experiment_id : str + id of the experiment. + experiment_name : str + name of the experiment. + + Returns + ------- + A dictionary (id -> recorder) of recorder information that being stored. + """ + return self.get_exp(experiment_id, experiment_name).list_recorders() + + def get_exp(self, experiment_id=None, experiment_name=None, create=True): + """ + Method for retrieving an experiment with given id or name. Once the `create` argument is set to + True, if no valid experiment is found, this method will create one for you. Otherwise, it will + only retrieve a specific experiment or raise an Error. + + If `create` is True: + If R's running: + 1) no id or name specified, return the active experiment. + 2) if id or name is specified, return the specified experiment. If no such exp found, + create a new experiment with given id or name. + If R's not running: + 1) no id or name specified, create a default experiment. + 2) if id or name is specified, return the specified experiment. If no such exp found, + create a new experiment with given id or name. + Else If `create` is False: + If R's running: + 1) no id or name specified, return the active experiment. + 2) if id or name is specified, return the specified experiment. If no such exp found, + raise Error. + If R's not running: + 1) no id or name specified, raise Error. + 2) if id or name is specified, return the specified experiment. If no such exp found, + raise Error. + + Use case: + --------- + ``` + # Case 1 + with R.start('test'): + exp = R.get_exp() + recorders = exp.list_recorders() + + # Case 2 + with R.start('test'): + exp = R.get_exp('test1') + + # Case 3 + exp = R.get_exp() -> a default experiment. + + # Case 4 + exp = R.get_exp(experiment_name='test') + + # Case 5 + exp = R.get_exp(create=False) -> Error + ``` + + Parameters + ---------- + experiment_id : str + id of the experiment. + experiment_name : str + name of the experiment. + create : boolean + decide whether to create an default experiment. + + Returns + ------- + An experiment instance with given id or name. + """ + return self.exp_manager.get_exp(experiment_id, experiment_name, create) + + def delete_exp(self, experiment_id=None, experiment_name=None): + """ + Method for deleting the experiment with given id or name. At least one of id or name must be given, + otherwise, error will occur. + + Use case: + --------- + ``` + R.delete_exp(experiment_name='test') + ``` + + Parameters + ---------- + experiment_id : str + id of the experiment. + experiment_name : str + name of the experiment. + """ + self.exp_manager.delete_exp(experiment_id, experiment_name) def get_uri(self): + """ + Method for retrieving the uri of current experiment manager. + + Use case: + --------- + ``` + uri = R.get_uri() + ``` + + Returns + ------- + The uri of current experiment manager. + """ return self.exp_manager.get_uri() - def get_recorder(self, recorder_id=None, recorder_name=None): - return self.exp_manager.active_experiment.get_recorder(recorder_id, recorder_name) + def get_recorder(self, recorder_id=None, recorder_name=None, experiment_name=None): + """ + Method for retrieving a recorder. + + If R's running: 1) no id or name specified, return the active recorder. 2) if id or name is + specified, return the specified recorder. + If R's not running: 1) no id or name specified, raise Error. 2) if id or name is specified, + and the corresponding experiment_name must be given, return the specified recorder. Otherwise, + raise Error. + + The recorder can be used for further process such as `save_object`, `load_object`, `log_params`, + `log_metrics`, etc. + + Use case: + --------- + ``` + # Case 1 + with R.start('test'): + recorder = R.get_recorder() + + # Case 2 + with R.start('test'): + recorder = R.get_recorder(recorder_id='2e7a4efd66574fa49039e00ffaefa99d') + + # Case 3 + recorder = R.get_recorder() -> Error + + # Case 4 + recorder = R.get_recorder(recorder_id='2e7a4efd66574fa49039e00ffaefa99d') -> Error + + # Case 5 + recorder = R.get_recorder(recorder_id='2e7a4efd66574fa49039e00ffaefa99d', experiment_name='test') + ``` + + Parameters + ---------- + recorder_id : str + id of the recorder. + recorder_name : str + name of the recorder. + experiment_name : str + name of the experiment. + + + Returns + ------- + A recorder instance. + """ + return self.get_exp(experiment_name=experiment_name, create=False).get_recorder( + recorder_id, recorder_name, create=False + ) + + def delete_recorder(self, recorder_id=None, recorder_name=None): + """ + Method for deleting the recorders with given id or name. At least one of id or name must be given, + otherwise, error will occur. + + Use case: + --------- + ``` + R.delete_recorder(recorder_id='2e7a4efd66574fa49039e00ffaefa99d') + ``` + + Parameters + ---------- + recorder_id : str + id of the experiment. + recorder_name : str + name of the experiment. + """ + self.get_exp().delete_recorder(recorder_id, recorder_name) def save_objects(self, local_path=None, artifact_path=None, **kwargs): - self.exp_manager.active_experiment.active_recorder.save_objects(local_path, artifact_path, **kwargs) + """ + Method for saving objects as artifacts in the experiment to the uri. It supports either saving + from a local file/directory, or directly saving objects. - def load_object(self, name): - return self.exp_manager.active_experiment.active_recorder.load_object(name) + If R's running: it will save the objects through the running recorder. + If R's not running: the system will create a default experiment, and a new recorder and + save objects under it. + + If one wants to save objects with a specific recorder. It is recommended to first + get the specific recorder through `get_recorder` API and use the recorder the save objects. + The supported arguments are the same as this method. + + Use case: + --------- + ``` + # Case 1 + with R.start('test'): + pred = model.predict(dataset) + R.save_objects(data=pred, name='pred.pkl', artifact_path='prediction') + + # Case 2 + with R.start('test'): + pred1 = model1.predict(dataset) + pred2 = model2.predict(dataset) + dn_list = [(pred1, 'pred1.pkl'), (pred2, 'pred2.pkl')] + R.save_objects(data_name_list=dn_list) + + # Case 3 + with R.start('test'): + R.save_objects(local_path='results/pred.pkl') + ``` + + Parameters + ---------- + data : any type + the data to be saved. + name : str + name of the file to be saved. + data_name_list : list + list of (data, name) pairs + local_path : str + if provided, them save the file or directory to the artifact URI. + artifact_path=None : str + the relative path for the artifact to be stored in the URI. + """ + self.get_exp().get_recorder().save_objects(local_path, artifact_path, **kwargs) def log_params(self, **kwargs): - self.exp_manager.active_experiment.active_recorder.log_params(**kwargs) + """ + Method for logging parameters during an experiment. + + If R's running: it will log parameters through the running recorder. + If R's not running: the system will create a default experiment as well as a new recorder, and + log parameters under it. + + One can also log to a specific recorder after getting it with `get_recorder` API. + + Use case: + --------- + ``` + # Case 1 + with R.start('test'): + R.log_params(learning_rate=0.01) + + # Case 2 + R.log_params(learning_rate=0.01) + ``` + + Parameters + ---------- + keyword argument: + name1=value1, name2=value2, ... + """ + self.get_exp().get_recorder().log_params(**kwargs) def log_metrics(self, step=None, **kwargs): - self.exp_manager.active_experiment.active_recorder.log_metrics(step, **kwargs) + """ + Method for logging metrics during an experiment. + + If R's running: it will log metrics through the running recorder. + If R's not running: the system will create a default experiment as well as a new recorder, and + log metrics under it. + + One can also log to a specific recorder after getting it with `get_recorder` API. + + Use case: + --------- + ``` + # Case 1 + with R.start('test'): + R.log_metrics(train_loss=0.33, step=1) + + # Case 2 + R.log_metrics(train_loss=0.33, step=1) + ``` + + Parameters + ---------- + keyword argument: + name1=value1, name2=value2, ... + """ + self.get_exp().get_recorder().log_metrics(step, **kwargs) def set_tags(self, **kwargs): - self.exp_manager.active_experiment.active_recorder.set_tags(**kwargs) + """ + Method for setting tags for a recorder. - def delete_tag(self, *key): - self.exp_manager.active_experiment.active_recorder.delete_tag(*key) + If R's running: it will set tags through the running recorder. + If R's not running: the system will create a default experiment as well as a new recorder, and + set the tags under it. + + One can also set the tag to a specific recorder after getting it with `get_recorder` API. + + Use case: + --------- + ``` + # Case 1 + with R.start('test'): + R.set_tags(release_version=2.2.0) + + # Case 2 + R.set_tags(release_version=2.2.0) + ``` + + Parameters + ---------- + keyword argument: + name1=value1, name2=value2, ... + """ + self.get_exp().get_recorder().set_tags(**kwargs) # global record diff --git a/qlib/workflow/exp.py b/qlib/workflow/exp.py index 432497fda..005b113df 100644 --- a/qlib/workflow/exp.py +++ b/qlib/workflow/exp.py @@ -2,6 +2,7 @@ # Licensed under the MIT License. import mlflow +from datetime import datetime from pathlib import Path from .recorder import MLflowRecorder from ..log import get_module_logger @@ -11,12 +12,13 @@ logger = get_module_logger("workflow", "INFO") class Experiment: """ - Thie is the `Experiment` class for each experiment being run. The API is designed + Thie is the `Experiment` class for each experiment being run. The API is designed similar to mlflow. + (The link: https://mlflow.org/docs/latest/python_api/mlflow.html) """ - def __init__(self): - self.name = None - self.id = None + def __init__(self, id, name): + self.id = id + self.name = name self.active_recorder = None # only one recorder can running each time self.recorders = dict() # recorder id -> object @@ -32,16 +34,14 @@ class Experiment: output["class"] = "Experiment" output["id"] = self.id output["name"] = self.name - output["active_recorder"] = self.active_recorder.id + output["active_recorder"] = self.active_recorder.id if self.active_recorder is not None else None output["recorders"] = list(self.recorders.keys()) + return output def start(self): """ Start the experiment. - Parameters - ---------- - Returns ------- A running recorder instance. @@ -63,9 +63,6 @@ class Experiment: """ Create a recorder for each experiment. - Parameters - ---------- - Returns ------- A recorder object. @@ -124,13 +121,31 @@ class Experiment: """ raise NotImplementedError(f"Please implement the `get_recorder` method.") + def list_recorders(self): + """ + List all the existing recorders of this experiment. + + Returns + ------- + A dictionary (id -> recorder) of recorder information that being stored. + """ + raise NotImplementedError(f"Please implement the `list_recorders` method.") + class MLflowExperiment(Experiment): """ Use mlflow to implement Experiment. """ + def __init__(self, id, name, uri): + super(MLflowExperiment, self).__init__(id, name) + self._uri = uri + self._total_recorders = 0 + self._default_name = None + def start(self): + # get all the recorders of the experiment + self.recorders = self.list_recorders() # set up recorder recorder = self.create_recorder() self.active_recorder = recorder @@ -138,17 +153,22 @@ class MLflowExperiment(Experiment): run = self.active_recorder.start_run() # store the recorder self.recorders[self.active_recorder.id] = recorder + self._total_recorders += 1 # update recorder num + logger.info(f"Experiment {self.id} starts running ...") + return self.active_recorder def end(self, status): if self.active_recorder is not None: self.active_recorder.end_run(status) self.active_recorder = None + self._total_recorders -= 1 def create_recorder(self): num = len(self.recorders) name = "Recorder_{}".format(num + 1) - recorder = MLflowRecorder(name, self.id) + recorder = MLflowRecorder(name, self.id, self._uri) + return recorder def search_records(self, **kwargs): @@ -156,21 +176,92 @@ class MLflowExperiment(Experiment): run_view_type = 1 if kwargs.get("run_view_type") is None else kwargs.get("run_view_type") max_results = 100000 if kwargs.get("max_results") is None else kwargs.get("max_results") order_by = kwargs.get("order_by") + return mlflow.search_runs([self.id], filter_string, run_view_type, max_results, order_by) - def delete_recorder(self, recorder_id): - mlflow.delete_run(recorder_id) - self.recorders = [r for r in self.recorders if r.id == recorder_id] + def delete_recorder(self, recorder_id=None, recorder_name=None): + assert ( + recorder_id is not None or recorder_name is not None + ), "Please input a valid recorder id or name before deleting." + try: + if recorder_id is not None: + mlflow.delete_run(recorder_id) + self.recorders = [r for r in self.recorders if r == recorder_id] + else: + for r in self.recorders: + if self.recorders[r].name == recorder_name: + recorder_id = r + break + mlflow.delete_run(recorder_id) + except: + raise Exception( + "Something went wrong when deleting recorder. Please check if the name/id of the recorder is correct." + ) - def get_recorder(self, recorder_id=None, recorder_name=None): - if recorder_id is not None: - return self.recorders[recorder_id] - elif recorder_name is not None: - for rid in self.recorders: - if self.recorders[rid].name == recorder_name: - return self.recorders[rid] - elif self.active_recorder is None: - raise Exception("No valid active recorder exists. Please make sure the experiment is running.") + def get_recorder(self, recorder_id=None, recorder_name=None, create=True): + if recorder_id is None and recorder_name is None: + if self.active_recorder: + return self.active_recorder + else: + if create: + self.start() + logger.warning( + f"Recorder {self.active_recorder.id} is running under the experiment with name {self.name}..." + ) + return self.active_recorder + else: + raise Exception( + "Something went wrong when retrieving recorders. Please check if QlibRecorder is running or the name/id of the recorder is correct." + ) else: - logger.info("No experiment id or name is given. Return the current active experiment.") - return self.active_recorder + if recorder_id is not None: + if recorder_id in self.recorders: + return self.recorders[recorder_id] + else: + # mlflow does not support create a run with given id + raise Exception( + "Something went wrong when retrieving recorders. Please check if QlibRecorder is running or the name/id of the recorder is correct." + ) + else: + for rid in self.recorders: + if self.recorders[rid].name == recorder_name: + return self.recorders[rid] + if create: + self.recorders = self.list_recorders() + logger.warning(f"No valid recorder found. Create a new recorder with name {recorder_name}.") + recorder = self.create_recorder() + recorder.name = recorder_name + recorder.start_run() + return recorder + else: + raise Exception( + "Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct." + ) + + def list_recorders(self): + client = mlflow.tracking.MlflowClient(tracking_uri=self._uri) + runs = client.list_run_infos(self.id)[::-1] + recorders = dict() + self._total_recorders = len(runs) + for i in range(len(runs)): + rid = runs[i].run_id + status = runs[i].status + start_time = runs[i].start_time + end_time = runs[i].end_time + recorder = MLflowRecorder(f"Recorder_{i+1}", self.id, self._uri) + recorder.id = rid + recorder.status = status + recorder.start_time = ( + datetime.fromtimestamp(float(start_time) / 1000.0).strftime("%Y-%m-%d %H:%M:%S") + if start_time is not None + else None + ) + recorder.end_time = ( + datetime.fromtimestamp(float(end_time) / 1000.0).strftime("%Y-%m-%d %H:%M:%S") + if end_time is not None + else None + ) + recorder._uri = self._uri + recorders[rid] = recorder + + return recorders diff --git a/qlib/workflow/expm.py b/qlib/workflow/expm.py index f597d4a96..ebf6aeb7f 100644 --- a/qlib/workflow/expm.py +++ b/qlib/workflow/expm.py @@ -18,8 +18,9 @@ class ExpManager: (The link: https://mlflow.org/docs/latest/python_api/mlflow.html) """ - def __init__(self): - self.uri = None + def __init__(self, uri, default_exp_name): + self.uri = uri + self.default_exp_name = default_exp_name self.active_experiment = None # only one experiment can running each time self.experiments = dict() # store the experiment name --> Experiment object @@ -39,6 +40,7 @@ class ExpManager: controls whether run is nested in parent run. Returns + ------- An active recorder. """ raise NotImplementedError(f"Please implement the `start_exp` method.") @@ -112,7 +114,7 @@ class ExpManager: """ raise NotImplementedError(f"Please implement the `get_exp` method.") - def delete_exp(self, experiment_id): + def delete_exp(self, experiment_id=None, experiment_name=None): """ Delete an experiment. @@ -120,41 +122,51 @@ class ExpManager: ---------- experiment_id : str the experiment id. + experiment_name : str + the experiment name. """ - raise NotImplementedError(f"Please implement the `create_exp` method.") + raise NotImplementedError(f"Please implement the `delete_exp` method.") def get_uri(self): """ Get the default tracking URI or current URI. - Parameters - ---------- - Returns ------- The tracking URI string. """ return self.uri + def list_experiments(self): + """ + List all the existing experiments. + + Returns + ------- + A dictionary (name -> experiment) of experiments information that being stored. + """ + raise NotImplementedError(f"Please implement the `list_experiments` method.") + class MLflowExpManager(ExpManager): """ Use mlflow to implement ExpManager. """ - def __init__(self): - super(MLflowExpManager, self).__init__() - self.uri = None + def __init__(self, uri, default_exp_name): + super(MLflowExpManager, self).__init__(uri, default_exp_name) + self._total_exps = 0 + # get all the exps + self.experiments = self.list_experiments() def start_exp(self, experiment_name=None, uri=None): # create experiment experiment = self.create_exp(experiment_name, uri) # set up active experiment self.active_experiment = experiment - # store the experiment - self.experiments[experiment_name] = experiment # start the experiment self.active_experiment.start() + self._total_exps += 1 # update exp num return self.active_experiment @@ -162,10 +174,9 @@ class MLflowExpManager(ExpManager): if self.active_experiment is not None: self.active_experiment.end(status) self.active_experiment = None + self._total_exps -= 1 def create_exp(self, experiment_name=None, uri=None): - # init experiment - experiment = MLflowExperiment() # set the tracking uri if uri is None: logger.info( @@ -176,15 +187,19 @@ class MLflowExpManager(ExpManager): mlflow.set_tracking_uri(self.uri) # start the experiment if experiment_name is None: - logger.info("No experiment name provided. The default experiment name is set as `experiment`.") - experiment_id = mlflow.create_experiment("experiment") + logger.info( + f"No experiment name provided. The default experiment name is set as `{self.default_exp_name}`." + ) + experiment_id = mlflow.create_experiment(self.default_exp_name) # set the active experiment - mlflow.set_experiment("experiment") - experiment_name = "experiment" + mlflow.set_experiment(self.default_exp_name) + experiment_name = self.default_exp_name else: if experiment_name not in self.experiments: if mlflow.get_experiment_by_name(experiment_name) is not None: - logger.info("The experiment has already been created before. Try to resume the experiment...") + logger.info( + "The experiment has already been created before. Try to resume the experiment with a new recorder..." + ) experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id else: experiment_id = mlflow.create_experiment(experiment_name) @@ -193,9 +208,11 @@ class MLflowExpManager(ExpManager): experiment = self.experiments[experiment_name] # set the active experiment mlflow.set_experiment(experiment_name) - # set up experiment - experiment.id = experiment_id - experiment.name = experiment_name + # init experiment + experiment = MLflowExperiment(experiment_id, experiment_name, self.uri) + experiment._default_name = self.default_exp_name + # store the experiment + self.experiments[experiment_name] = experiment return experiment @@ -206,19 +223,73 @@ class MLflowExpManager(ExpManager): order_by = kwargs.get("order_by") return mlflow.search_runs(experiment_ids, filter_string, run_view_type, max_results, order_by) - def get_exp(self, experiment_id=None, experiment_name=None): - if experiment_name is not None: - return self.experiments[experiment_name] - elif experiment_id is not None: - for name in self.experiments: - if self.experiments[name].id == experiment_id: - return self.experiments[name] - elif self.active_experiment is None: - raise Exception("No valid active experiment exists. Please make sure experiment manager is running.") + def get_exp(self, experiment_id=None, experiment_name=None, create=True): + if experiment_id is None and experiment_name is None: + if self.active_experiment: + return self.active_experiment + else: + if create: + logger.warning("QlibRecorder is not running. Use the Default experiment for further process.") + return self.start_exp() + else: + raise Exception( + "Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct." + ) else: - logger.info("No experiment id or name is given. Return the current active experiment.") - return self.active_experiment + if experiment_name is not None: + if experiment_name in self.experiments: + return self.experiments[experiment_name] + else: + if create: + logger.warning( + f"No valid experiment found. Create experiment with name {experiment_name} for further process." + ) + return self.start_exp(experiment_name) + else: + raise Exception( + "Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct." + ) + else: + for name in self.experiments: + if self.experiments[name].id == experiment_id: + return self.experiments[name] + if create: + logger.warning(f"No valid experiment found. Use the Default experiment for further process.") + return self.start_exp() + else: + raise Exception( + "Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct." + ) - def delete_exp(self, experiment_id): - mlflow.delete_experiment(experiment_id) - self.experiments = {key: val for key, val in self.experiments.items() if val.id != experiment_id} + def delete_exp(self, experiment_id=None, experiment_name=None): + assert ( + experiment_id is not None or experiment_name is not None + ), "Please input a valid experiment id or name before deleting." + try: + if experiment_id is not None: + mlflow.delete_experiment(experiment_id) + self.experiments = {key: val for key, val in self.experiments.items() if val.id != experiment_id} + else: + experiment_id = self.experiments[experiment_name].id + mlflow.delete_experiment(experiment_id) + except: + raise Exception( + "Something went wrong when deleting experiment. Please check if the name/id of the experiment is correct." + ) + + def list_experiments(self): + # retrieve all the existing experiments + client = mlflow.tracking.MlflowClient(tracking_uri=self.uri) + exps = client.list_experiments() + experiments = dict() + self._total_exps = len(exps) + for i in range(len(exps)): + eid = exps[i].experiment_id + ename = exps[i].name + experiment = MLflowExperiment(eid, ename, self.uri) + experiment.id = eid + experiment.name = ename + experiment._uri = self.uri + experiments[ename] = experiment + + return experiments diff --git a/qlib/workflow/recorder.py b/qlib/workflow/recorder.py index 68ce5432b..1adaa3f8a 100644 --- a/qlib/workflow/recorder.py +++ b/qlib/workflow/recorder.py @@ -2,7 +2,7 @@ # Licensed under the MIT License. import mlflow -import shutil, os, pickle, tempfile, codecs +import shutil, os, pickle, tempfile, codecs, datetime from pathlib import Path from ..utils.objm import FileManager @@ -19,6 +19,8 @@ class Recorder: self.id = None self.name = name self.experiment_id = experiment_id + self.start_time = None + self.end_time = None self.status = "SCHEDULED" def __repr__(self): @@ -34,7 +36,10 @@ class Recorder: output["id"] = self.id output["name"] = self.name output["experiment_id"] = self.experiment_id + output["start_time"] = self.start_time + output["end_time"] = self.end_time output["status"] = self.status + return output def set_recorder_name(self, rname): self.recorder_name = rname @@ -78,9 +83,6 @@ class Recorder: Start running or resuming the Recorder. The return value can be used as a context manager within a `with` block; otherwise, you must call end_run() to terminate the current run. (See `ActiveRun` class in mlflow) - Parameters - ---------- - Returns ------- An active running object (e.g. mlflow.ActiveRun object). @@ -139,7 +141,7 @@ class Recorder: def list_artifacts(self, artifact_path=None): """ - Delete some tags from a run. + List all the artifacts of a recorder. Parameters ---------- @@ -161,10 +163,13 @@ class MLflowRecorder(Recorder): use file manager to help maintain the objects in the project. """ - def __init__(self, name, experiment_id): + def __init__(self, name, experiment_id, uri): super(MLflowRecorder, self).__init__(name, experiment_id) - self.fm = None - self.temp_dir = None + self._uri = uri + self.artifact_uri = None + # set up file manager for saving objects + self.temp_dir = tempfile.mkdtemp() + self.fm = FileManager(Path(self.temp_dir).absolute()) def start_run(self): # start the run @@ -172,19 +177,21 @@ class MLflowRecorder(Recorder): # save the run id and artifact_uri self.id = run.info.run_id self.artifact_uri = run.info.artifact_uri - self._uri = mlflow.get_tracking_uri() # Fix!!! : this is not proper to have uri in recorder - # set up file manager for saving objects - self.temp_dir = tempfile.mkdtemp() - self.fm = FileManager(Path(self.temp_dir).absolute()) + self.start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.status = "RUNNING" + return run def end_run(self, status): + assert status in ["SCHEDULED", "RUNNING", "FINISHED", "FAILED"], f"The status type {status} is not supported." mlflow.end_run(status) - self.status = status + self.end_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + if self.status is not "FINISHED": + self.status = status shutil.rmtree(self.temp_dir) def save_objects(self, data_name_list=None, local_path=None, artifact_path=None, **kwargs): + assert self._uri is not None, "Please start the experiment and recorder first before using recorder directly." client = mlflow.tracking.MlflowClient(tracking_uri=self._uri) if local_path is not None: client.log_artifacts(self.id, local_path, artifact_path) @@ -200,6 +207,7 @@ class MLflowRecorder(Recorder): raise Exception("Please provide valid arguments in order to save object properly.") def load_object(self, name): + assert self._uri is not None, "Please start the experiment and recorder first before using recorder directly." client = mlflow.tracking.MlflowClient(tracking_uri=self._uri) path = client.download_artifacts(self.id, name) try: @@ -235,12 +243,16 @@ class MLflowRecorder(Recorder): for count, key in enumerate(keys): mlflow.delete_tag(key) - def get_artifact_uri(self, artifact_path=None): + def get_artifact_uri(self): if self.artifact_uri is not None: return self.artifact_uri - return mlflow.get_artifact_uri(artifact_path) + else: + raise Exception( + "Please make sure the recorder has been created and started properly before getting artifact uri." + ) def list_artifacts(self, artifact_path=None): + assert self._uri is not None, "Please start the experiment and recorder first before using recorder directly." client = mlflow.tracking.MlflowClient(tracking_uri=self._uri) artifacts = client.list_artifacts(self.id, artifact_path) return artifacts