From 1a9ee6cef89c7ddf7c3813437f47c8f14ae0070d Mon Sep 17 00:00:00 2001 From: Jactus Date: Tue, 27 Oct 2020 14:30:24 +0800 Subject: [PATCH] Add Recorder and ExpManager --- qlib/workflow/__init__.py | 157 +++++++++++++++++ qlib/workflow/exp.py | 265 +++++++++++++++++++++++++++++ qlib/workflow/record.py | 343 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 765 insertions(+) create mode 100644 qlib/workflow/exp.py create mode 100644 qlib/workflow/record.py diff --git a/qlib/workflow/__init__.py b/qlib/workflow/__init__.py index e69de29bb..76d5e7d4c 100644 --- a/qlib/workflow/__init__.py +++ b/qlib/workflow/__init__.py @@ -0,0 +1,157 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from contextlib import contextmanager +from .record import MLflowRecorder +from .exp import MLflowExpManager + +class Record: + def __init__(self): + pass + + @contextmanager + def start_exp(self, experiment_name=None, uri=None, project_path=None, artifact_location=None, nested=False): + raise NotImplementedError(f"Please implement the `start_exp` method.") + + def search_runs(self, experiment_ids=None, filter_string='', run_view_type=1, max_results=100000, order_by=None): + raise NotImplementedError(f"Please implement the `search_runs` method.") + + def get_exp(self, experiment_id): + raise NotImplementedError(f"Please implement the `get_exp` method.") + + def get_exp_by_name(self, experiment_name): + raise NotImplementedError(f"Please implement the `get_exp_by_name` method.") + + def create_exp(self, experiment_name, artifact_location=None): + raise NotImplementedError(f"Please implement the `create_exp` method.") + + def set_exp(self, experiment_name): + raise NotImplementedError(f"Please implement the `set_exp` method.") + + def delete_exp(self, experiment_id): + raise NotImplementedError(f"Please implement the `create_exp` method.") + + def set_tracking_uri(self, uri): + raise NotImplementedError(f"Please implement the `set_tracking_uri` method.") + + def get_tracking_uri(self): + raise NotImplementedError(f"Please implement the `get_tracking_uri` method.") + + def get_recorder(self): + raise NotImplementedError(f"Please implement the `get_recorder` method.") + + def save_object(self, name, data): + raise NotImplementedError(f"Please implement the `save_object` method.") + + def save_objects(self, name_data_list): + raise NotImplementedError(f"Please implement the `save_objects` method.") + + def load_object(self, name): + raise NotImplementedError(f"Please implement the `load_object` method.") + + def log_param(self, key, value): + raise NotImplementedError(f"Please implement the `log_param` method.") + + def log_params(self, params): + raise NotImplementedError(f"Please implement the `log_params` method.") + + def log_metric(self, key, value, step=None): + raise NotImplementedError(f"Please implement the `log_metric` method.") + + def log_metrics(self, metrics, step=None): + raise NotImplementedError(f"Please implement the `log_metrics` method.") + + def set_tag(self, key, value): + raise NotImplementedError(f"Please implement the `set_tag` method.") + + def set_tags(self, tags): + raise NotImplementedError(f"Please implement the `log_tags` method.") + + def delete_tag(self, key): + raise NotImplementedError(f"Please implement the `delete_tag` method.") + + def log_artifact(self, local_path, artifact_path=None): + raise NotImplementedError(f"Please implement the `log_artifact` method.") + + def log_artifacts(self, local_dir, artifact_path=None): + raise NotImplementedError(f"Please implement the `log_artifacts` method.") + + def get_artifact_uri(self, artifact_path=None): + raise NotImplementedError(f"Please implement the `get_artifact_uri` method.") + +class MLflowRecord(Record): + def __init__(self): + self.exp_manager = MLflowExpManager() + + @contextmanager + def start_exp(self, experiment_name=None, uri=None, project_path=None, artifact_location=None, nested=False): + yield self.exp_manager.start_exp(experiment_name, uri, project_path, artifact_location, nested) + + def search_runs(self, experiment_ids=None, filter_string='', run_view_type=1, max_results=100000, order_by=None): + return self.exp_manager.search_runs(experiment_ids, filter_string, run_view_type, max_results, order_by) + + def get_exp(self, experiment_id): + return self.exp_manager.get_exp(experiment_id) + + def get_exp_by_name(self, experiment_name): + return self.exp_manager.get_exp_by_name(experiment_name) + + def create_exp(self, experiment_name, artifact_location=None): + self.exp_manager.create_exp(experiment_name, artifact_location) + + def set_exp(self, experiment_name): + self.exp_manager.set_exp(experiment_name) + + def delete_exp(self, experiment_id): + self.exp_manager.delete_exp(experiment_id) + + def set_tracking_uri(self, uri): + self.exp_manager.set_tracking_uri(uri) + + def get_tracking_uri(self): + return self.exp_manager.get_tracking_uri() + + def get_recorder(self): + return self.exp_manager.get_recorder() + + def save_object(self, name, data): + self.exp_manager.active_recorder.save_object(name, data) + + def save_objects(self, name_data_list): + self.exp_manager.active_recorder.save_objects(name_data_list) + + def load_object(self, name): + return self.exp_manager.active_recorder.load_object(name) + + def log_param(self, key, value): + self.exp_manager.active_recorder.log_param(key, value) + + def log_params(self, params): + self.exp_manager.active_recorder.log_params(params) + + def log_metric(self, key, value, step=None): + self.exp_manager.active_recorder.log_metric(key, value, step) + + def log_metrics(self, metrics, step=None): + self.exp_manager.active_recorder.log_metrics(metrics, step) + + def set_tag(self, key, value): + self.exp_manager.active_recorder.set_tag(key, value) + + def set_tags(self, tags): + self.exp_manager.active_recorder.set_tags(tags) + + def delete_tag(self, key): + self.exp_manager.active_recorder.delete_tag(key) + + def log_artifact(self, local_path, artifact_path=None): + self.exp_manager.active_recorder.log_artifact(local_path, artifact_path) + + def log_artifacts(self, local_dir, artifact_path=None): + self.exp_manager.active_recorder.log_artifacts(local_dir, artifact_path) + + def get_artifact_uri(self, artifact_path=None): + return self.exp_manager.active_recorder.get_artifact_uri(artifact_path) + +# global record +R = MLflowRecord() \ No newline at end of file diff --git a/qlib/workflow/exp.py b/qlib/workflow/exp.py new file mode 100644 index 000000000..f3cedea90 --- /dev/null +++ b/qlib/workflow/exp.py @@ -0,0 +1,265 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import mlflow +from contextlib import contextmanager +from .record import MLflowRecorder + +class ExpManager: + """ + This is the `ExpManager` class for managing the experiments. The API is designed similar to mlflow. + (The link: https://mlflow.org/docs/latest/python_api/mlflow.html) + """ + def __init__(self): + self.active_recorder = None + self.experiments = dict() # store the experiment names -> list of recorders. + self.exp_ids = list() + + def _store_exp(self, id, name): + """ + Store the experiments in the experiments holder. + """ + if id in self.exp_ids: + raise Exception('Something went wrong when creating the experiment. Please check if the experiment is already created.') + if name in self.experiments: + assert int(id) == int(self.experiments[name][0]), 'Experiment id and name are not consistent when storing the experiment.' + else: + self.exp_ids.append(id) + self.experiments[name] = [id] + + def start_exp(self, project_path, experiment_name=None, uri=None, artifact_location=None, nested=False): + """ + Start running an experiment. This method can only work in the `with` statement. + + Parameters + ---------- + project_path : str + path for the project. + experiment_name : str + name of the active experiment. + uri : str + the current tracking URI. + artifact_location : str + the location to store all the artifacts. + nested : boolean + controls whether run is nested in parent run. + + Returns + None + """ + raise NotImplementedError(f"Please implement the `start_exp` method.") + + def end_exp(self): + """ + End an active experiment. + """ + raise NotImplementedError(f"Please implement the `end_exp` method.") + + def search_runs(self, experiment_ids=None, filter_string='', run_view_type=1, max_results=100000, order_by=None): + """ + Get a pandas DataFrame of runs that fit the search criteria. + + Parameters + ---------- + experiment_ids : list + list of experiment IDs. + filter_string : str + filter query string, defaults to searching all runs. + run_view_type : int + one of enum values ACTIVE_ONLY, DELETED_ONLY, or ALL (e.g. in mlflow.entities.ViewType). + max_results : int + the maximum number of runs to put in the dataframe. + order_by : list + list of columns to order by (e.g., “metrics.rmse”). + + Returns + ------- + A pandas.DataFrame of runs. + """ + raise NotImplementedError(f"Please implement the `search_runs` method.") + + def get_exp(self, experiment_id): + """ + Retrieve an experiment by experiment_id from the backend store. + + Parameters + ---------- + experiment_id : str + the experiment id to return. + + Returns + ------- + An experiment object (e.g. mlflow.entities.Experiment). + """ + raise NotImplementedError(f"Please implement the `get_exp` method.") + + def get_exp_by_name(self, experiment_name): + """ + Retrieve an experiment by experiment name from the backend store. + + Parameters + ---------- + experiment_name : str + the experiment name to return. + + Returns + ------- + An experiment object (e.g. mlflow.entities.Experiment). + """ + raise NotImplementedError(f"Please implement the `get_exp_by_name` method.") + + def create_exp(self, experiment_name, artifact_location=None): + """ + Create an experiment. + + Parameters + ---------- + experiment_name : str + the experiment name, which must be unique. + artifact_location : str + the location to store run artifacts. + + Returns + ------- + String id of created experiment. + """ + raise NotImplementedError(f"Please implement the `create_exp` method.") + + def set_exp(self, experiment_name): + """ + Set the experiment to be active. + + Parameters + ---------- + experiment_name : str + the experiment name, which must be unique. + + Returns + ------- + String id of created experiment. + """ + raise NotImplementedError(f"Please implement the `set_exp` method.") + + def delete_exp(self, experiment_id): + """ + Delete an experiment. + + Parameters + ---------- + experiment_id : str + the experiment id. + + Returns + ------- + None + """ + raise NotImplementedError(f"Please implement the `create_exp` method.") + + def set_tracking_uri(self, uri): + """ + Set the tracking server URI. + + Parameters + ---------- + uri : str + the uri of the tracking server, can be An empty string, or a local file path, prefixed with file:/. + or An HTTP URI or A Databricks workspace. + Returns + ------- + None + """ + raise NotImplementedError(f"Please implement the `set_tracking_uri` method.") + + def get_tracking_uri(self): + """ + Get the tracking server URI. + + Parameters + ---------- + + Returns + ------- + The tracking URI. + """ + raise NotImplementedError(f"Please implement the `get_tracking_uri` method.") + + def get_recorder(self): + """ + Get the current active Recorder. + + Parameters + ---------- + + Returns + ------- + An Recorder object. + """ + raise NotImplementedError(f"Please implement the `get_recorder` method.") + + +class MLflowExpManager(ExpManager): + ''' + Use mlflow to implement ExpManager. + ''' + def start_exp(self, experiment_name=None, uri=None, project_path=None, artifact_location=None, nested=False): + # set the tracking uri + if uri is None: + assert project_path is not None, "Please provide the project_path if no uri is provided in order to set a proper tracking uri." + print('No tracking URI is provided. The default tracking URI is set as `mlruns` under the project path.') + mlflow.set_tracking_uri(str(project_path / "mlruns")) + else: + mlflow.set_tracking_uri(uri) + # start the experiment + if experiment_name is None: + print('No experiment name provided. The default experiment name is set as `experiment`.') + experiment_id = self.create_exp('experiment', artifact_location) + # set the active experiment + self.set_exp('experiment') + experiment_name = 'experiment' + else: + if experiment_name not in self.experiments: + if self.get_exp_by_name(experiment_name) is not None: + raise Exception('The experiment has already been created before. Please pick another name or delete the files under tracking uri.') + experiment_id = self.create_exp(experiment_name, artifact_location) + else: + experiment_id = self.experiments(experiment_name)[0] + # set the active experiment + self.set_exp(experiment_name) + + # store the id and name + self._store_exp(experiment_id, experiment_name) + # set up recorder + recorder = MLflowRecorder(experiment_id) + self.active_recorder = recorder + # store the recorder + self.experiments[experiment_name].append(self.active_recorder) + + return self.active_recorder.start_run(experiment_id=experiment_id, nested=nested) + + def search_runs(self, experiment_ids=None, filter_string='', run_view_type=1, max_results=100000, order_by=None): + return mlflow.search_runs(experiment_ids, filter_string, run_view_type, max_results, order_by) + + def get_exp(self, experiment_id): + return mlflow.get_experiment(experiment_id) + + def get_exp_by_name(self, experiment_name): + return mlflow.get_experiment_by_name(experiment_name) + + def create_exp(self, experiment_name, artifact_location=None): + return mlflow.create_experiment(experiment_name, artifact_location) + + def set_exp(self, experiment_name): + mlflow.set_experiment(experiment_name) + + def delete_exp(self, experiment_id): + mlflow.delete_experiment(experiment_id) + self.experiments = {key:val for key, val in self.experiments.items() if val[0] != experiment_id} + + def set_tracking_uri(self, uri): + mlflow.set_tracking_uri(uri) + + def get_tracking_uri(self): + return mlflow.get_tracking_uri() + + def get_recorder(self): + return self.active_recorder \ No newline at end of file diff --git a/qlib/workflow/record.py b/qlib/workflow/record.py new file mode 100644 index 000000000..7895cf0fb --- /dev/null +++ b/qlib/workflow/record.py @@ -0,0 +1,343 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import mlflow +import shutil +from pathlib import Path +from ..utils.objm import FileManager + +class Recorder: + """ + This is the `Recorder` class for logging the experiments. The API is designed similar to mlflow. + (The link: https://mlflow.org/docs/latest/python_api/mlflow.html) + """ + + def __init__(self, experiment_id, project_path=None): + self.experiment_id = experiment_id + self.recorder_id = None + self.recorder_name = None + self.fm = None + self.artifact_uri = None + + def set_recorder_name(self, rname): + self.recorder_name = rname + + def save_object(self, name, data): + """ + Save object such as prediction file or model checkpoints. + + Parameters + ---------- + name : str + name of the file to be saved. + data : any type + the data to be saved. + + Returns + ------- + None. + """ + raise NotImplementedError(f"Please implement the `save_object` method.") + + def save_objects(self, name_data_list): + """ + Save objects such as prediction file or model checkpoints. + + Parameters + ---------- + name_data_list : list + list of (name, data) pairs + + Returns + ------- + None. + """ + raise NotImplementedError(f"Please implement the `save_objects` method.") + + def load_object(self, name): + """ + Load objects such as prediction file or model checkpoints. + + Parameters + ---------- + name : str + name of the file to be loaded. + + Returns + ------- + The saved object. + """ + raise NotImplementedError(f"Please implement the `load_object` method.") + + def start_run(self, run_id=None, experiment_id=None, + run_name=None, nested=False): + """ + Start running the Recorder. The return value can be used as a context manager within a `with` block; + otherwise, you must call end_run() to terminate the current run. (See `ActiveRun` class in mlflow) + + Parameters + ---------- + run_id : str + id of the active Recorder. + experiment_id : str + id of the active experiment. + run_name : str + name of the Recorder. + nested : boolean + controls whether run is nested in parent run. + + Returns + ------- + An active running object (e.g. mlflow.ActiveRun object). + """ + raise NotImplementedError(f"Please implement the `start_run` method.") + + def end_run(self): + """ + End an active Recorder. + """ + raise NotImplementedError(f"Please implement the `end_run` method.") + + def log_param(self, key, value): + """ + Log a parameter under the current run. + + Parameters + ---------- + key : str + the name of the parameter + value : str + the value of the parameter + + Returns + ------- + None + """ + raise NotImplementedError(f"Please implement the `log_param` method.") + + def log_params(self, params): + """ + Log a batch of params for the current run. + + Parameters + ---------- + params : dict + dictionary of param_name: String -> value: String. + + Returns + ------- + None + """ + raise NotImplementedError(f"Please implement the `log_params` method.") + + def log_metric(self, key, value, step=None): + """ + Log a metric under the current run. + + Parameters + ---------- + key : str + the name of the metric + value : float + the value of the metric + + Returns + ------- + None + """ + raise NotImplementedError(f"Please implement the `log_metric` method.") + + def log_metrics(self, metrics, step=None): + """ + Log multiple metrics for the current run. + + Parameters + ---------- + metrics : dict + dictionary of metric_name: String -> value: Float. + + Returns + ------- + None + """ + raise NotImplementedError(f"Please implement the `log_metrics` method.") + + def set_tag(self, key, value): + """ + Set a tag under the current run. + + Parameters + ---------- + key : str + the name of the tag + value : str + the value of the tag + + Returns + ------- + None + """ + raise NotImplementedError(f"Please implement the `set_tag` method.") + + def set_tags(self, tags): + """ + Log a batch of tags for the current run. + + Parameters + ---------- + tags : dict + dictionary of tag_name: String -> value: String. + + Returns + ------- + None + """ + raise NotImplementedError(f"Please implement the `log_tags` method.") + + def delete_tag(self, key): + """ + Delete a tag from a run. + + Parameters + ---------- + key : str + the name of the tag to be deleted. + + Returns + ------- + None + """ + raise NotImplementedError(f"Please implement the `delete_tag` method.") + + def log_artifact(self, local_path, artifact_path=None): + """ + Log a local file or directory as an artifact of the currently active run. + + Parameters + ---------- + local_path : str + path to the file to write. + artifact_path : str + the directory in `artifact_uri` to write to. + + Returns + ------- + None + """ + raise NotImplementedError(f"Please implement the `log_artifact` method.") + + def log_artifacts(self, local_dir, artifact_path=None): + """ + Log all the contents of a local directory as artifacts of the run. + + Parameters + ---------- + local_dir : str + path to the directory of files to write. + artifact_path : str + the directory in `artifact_uri` to write to. + + Returns + ------- + None + """ + raise NotImplementedError(f"Please implement the `log_artifacts` method.") + + def get_artifact_uri(self, artifact_path=None): + """ + Get the absolute URI of the specified artifact in the currently active run. + + Parameters + ---------- + artifact_path : str + the directory in `artifact_uri` to write to. + + Returns + ------- + An absolute URI referring to the specified artifact or currently active Recorder. + """ + raise NotImplementedError(f"Please implement the `get_artifact_uri` method.") + + +class MLflowRecorder(Recorder): + ''' + Use mlflow to implement a Recorder. + ''' + def start_run(self, run_id=None, experiment_id=None, + run_name=None, nested=False): + if run_id is None: + run_id = self.recorder_id + if experiment_id is None: + experiment_id = self.experiment_id + if run_name is None: + run_name = self.recorder_name + # start the run + run = mlflow.start_run(run_id, experiment_id, run_name, nested) + # save the run id and artifact_uri + self.recorder_id = run.info.run_id + self.artifact_uri = run.info.artifact_uri + # set up file manager for saving objects + if self.artifact_uri.startswith('file:/'): + self.fm = FileManager(Path(urllib.parse.urlparse(self.artifact_uri).path)) + else: + self.fm = FileManager(Path(self.artifact_uri)) + print(self.artifact_uri) + return run + + def end_run(self): + mlflow.end_run() + + def save_object(self, name, data): + self.fm.save_obj(data, name) + import urllib + print(urllib.parse.urlparse(self.artifact_uri).scheme) + try: + self.log_artifact(self.fm.path / name) + except shutil.SameFileError: + pass + except Exception as e: + print(e) + + def save_objects(self, name_data_list): + self.fm.save_objs(name_data_list) + try: + self.log_artifacts(self.fm.path) + except shutil.SameFileError: + pass + except Exception as e: + print(e) + + def load_object(self, name): + return self.fm.load_obj(name) + + def log_param(self, key, value): + mlflow.log_param(key, value) + + def log_params(self, params): + mlflow.log_params(params) + + def log_metric(self, key, value, step=None): + mlflow.log_metric(key, value, step) + + def log_metrics(self, metrics, step=None): + mlflow.log_metrics(metrics, step) + + def set_tag(self, key, value): + mlflow.set_tag(key, value) + + def set_tags(self, tags): + mlflow.set_tags(tags) + + def delete_tag(self, key): + mlflow.delete_tag(key) + + def log_artifact(self, local_path, artifact_path=None): + mlflow.log_artifact(local_path, artifact_path) + + def log_artifacts(self, local_dir, artifact_path=None): + mlflow.log_artifacts(local_dir, artifact_path) + + def get_artifact_uri(self, artifact_path=None): + if self.artifact_uri is not None: + return self.artifact_uri + return mlflow.get_artifact_uri(artifact_path) \ No newline at end of file