1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-03 19:10:58 +08:00

Add Recorder and ExpManager

This commit is contained in:
Jactus
2020-10-27 14:30:24 +08:00
parent aee507d5dd
commit 1a9ee6cef8
3 changed files with 765 additions and 0 deletions

View File

@@ -0,0 +1,157 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from contextlib import contextmanager
from .record import MLflowRecorder
from .exp import MLflowExpManager
class Record:
def __init__(self):
pass
@contextmanager
def start_exp(self, experiment_name=None, uri=None, project_path=None, artifact_location=None, nested=False):
raise NotImplementedError(f"Please implement the `start_exp` method.")
def search_runs(self, experiment_ids=None, filter_string='', run_view_type=1, max_results=100000, order_by=None):
raise NotImplementedError(f"Please implement the `search_runs` method.")
def get_exp(self, experiment_id):
raise NotImplementedError(f"Please implement the `get_exp` method.")
def get_exp_by_name(self, experiment_name):
raise NotImplementedError(f"Please implement the `get_exp_by_name` method.")
def create_exp(self, experiment_name, artifact_location=None):
raise NotImplementedError(f"Please implement the `create_exp` method.")
def set_exp(self, experiment_name):
raise NotImplementedError(f"Please implement the `set_exp` method.")
def delete_exp(self, experiment_id):
raise NotImplementedError(f"Please implement the `create_exp` method.")
def set_tracking_uri(self, uri):
raise NotImplementedError(f"Please implement the `set_tracking_uri` method.")
def get_tracking_uri(self):
raise NotImplementedError(f"Please implement the `get_tracking_uri` method.")
def get_recorder(self):
raise NotImplementedError(f"Please implement the `get_recorder` method.")
def save_object(self, name, data):
raise NotImplementedError(f"Please implement the `save_object` method.")
def save_objects(self, name_data_list):
raise NotImplementedError(f"Please implement the `save_objects` method.")
def load_object(self, name):
raise NotImplementedError(f"Please implement the `load_object` method.")
def log_param(self, key, value):
raise NotImplementedError(f"Please implement the `log_param` method.")
def log_params(self, params):
raise NotImplementedError(f"Please implement the `log_params` method.")
def log_metric(self, key, value, step=None):
raise NotImplementedError(f"Please implement the `log_metric` method.")
def log_metrics(self, metrics, step=None):
raise NotImplementedError(f"Please implement the `log_metrics` method.")
def set_tag(self, key, value):
raise NotImplementedError(f"Please implement the `set_tag` method.")
def set_tags(self, tags):
raise NotImplementedError(f"Please implement the `log_tags` method.")
def delete_tag(self, key):
raise NotImplementedError(f"Please implement the `delete_tag` method.")
def log_artifact(self, local_path, artifact_path=None):
raise NotImplementedError(f"Please implement the `log_artifact` method.")
def log_artifacts(self, local_dir, artifact_path=None):
raise NotImplementedError(f"Please implement the `log_artifacts` method.")
def get_artifact_uri(self, artifact_path=None):
raise NotImplementedError(f"Please implement the `get_artifact_uri` method.")
class MLflowRecord(Record):
def __init__(self):
self.exp_manager = MLflowExpManager()
@contextmanager
def start_exp(self, experiment_name=None, uri=None, project_path=None, artifact_location=None, nested=False):
yield self.exp_manager.start_exp(experiment_name, uri, project_path, artifact_location, nested)
def search_runs(self, experiment_ids=None, filter_string='', run_view_type=1, max_results=100000, order_by=None):
return self.exp_manager.search_runs(experiment_ids, filter_string, run_view_type, max_results, order_by)
def get_exp(self, experiment_id):
return self.exp_manager.get_exp(experiment_id)
def get_exp_by_name(self, experiment_name):
return self.exp_manager.get_exp_by_name(experiment_name)
def create_exp(self, experiment_name, artifact_location=None):
self.exp_manager.create_exp(experiment_name, artifact_location)
def set_exp(self, experiment_name):
self.exp_manager.set_exp(experiment_name)
def delete_exp(self, experiment_id):
self.exp_manager.delete_exp(experiment_id)
def set_tracking_uri(self, uri):
self.exp_manager.set_tracking_uri(uri)
def get_tracking_uri(self):
return self.exp_manager.get_tracking_uri()
def get_recorder(self):
return self.exp_manager.get_recorder()
def save_object(self, name, data):
self.exp_manager.active_recorder.save_object(name, data)
def save_objects(self, name_data_list):
self.exp_manager.active_recorder.save_objects(name_data_list)
def load_object(self, name):
return self.exp_manager.active_recorder.load_object(name)
def log_param(self, key, value):
self.exp_manager.active_recorder.log_param(key, value)
def log_params(self, params):
self.exp_manager.active_recorder.log_params(params)
def log_metric(self, key, value, step=None):
self.exp_manager.active_recorder.log_metric(key, value, step)
def log_metrics(self, metrics, step=None):
self.exp_manager.active_recorder.log_metrics(metrics, step)
def set_tag(self, key, value):
self.exp_manager.active_recorder.set_tag(key, value)
def set_tags(self, tags):
self.exp_manager.active_recorder.set_tags(tags)
def delete_tag(self, key):
self.exp_manager.active_recorder.delete_tag(key)
def log_artifact(self, local_path, artifact_path=None):
self.exp_manager.active_recorder.log_artifact(local_path, artifact_path)
def log_artifacts(self, local_dir, artifact_path=None):
self.exp_manager.active_recorder.log_artifacts(local_dir, artifact_path)
def get_artifact_uri(self, artifact_path=None):
return self.exp_manager.active_recorder.get_artifact_uri(artifact_path)
# global record
R = MLflowRecord()

265
qlib/workflow/exp.py Normal file
View File

@@ -0,0 +1,265 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import mlflow
from contextlib import contextmanager
from .record import MLflowRecorder
class ExpManager:
"""
This is the `ExpManager` class for managing the experiments. The API is designed similar to mlflow.
(The link: https://mlflow.org/docs/latest/python_api/mlflow.html)
"""
def __init__(self):
self.active_recorder = None
self.experiments = dict() # store the experiment names -> list of recorders.
self.exp_ids = list()
def _store_exp(self, id, name):
"""
Store the experiments in the experiments holder.
"""
if id in self.exp_ids:
raise Exception('Something went wrong when creating the experiment. Please check if the experiment is already created.')
if name in self.experiments:
assert int(id) == int(self.experiments[name][0]), 'Experiment id and name are not consistent when storing the experiment.'
else:
self.exp_ids.append(id)
self.experiments[name] = [id]
def start_exp(self, project_path, experiment_name=None, uri=None, artifact_location=None, nested=False):
"""
Start running an experiment. This method can only work in the `with` statement.
Parameters
----------
project_path : str
path for the project.
experiment_name : str
name of the active experiment.
uri : str
the current tracking URI.
artifact_location : str
the location to store all the artifacts.
nested : boolean
controls whether run is nested in parent run.
Returns
None
"""
raise NotImplementedError(f"Please implement the `start_exp` method.")
def end_exp(self):
"""
End an active experiment.
"""
raise NotImplementedError(f"Please implement the `end_exp` method.")
def search_runs(self, experiment_ids=None, filter_string='', run_view_type=1, max_results=100000, order_by=None):
"""
Get a pandas DataFrame of runs that fit the search criteria.
Parameters
----------
experiment_ids : list
list of experiment IDs.
filter_string : str
filter query string, defaults to searching all runs.
run_view_type : int
one of enum values ACTIVE_ONLY, DELETED_ONLY, or ALL (e.g. in mlflow.entities.ViewType).
max_results : int
the maximum number of runs to put in the dataframe.
order_by : list
list of columns to order by (e.g., “metrics.rmse”).
Returns
-------
A pandas.DataFrame of runs.
"""
raise NotImplementedError(f"Please implement the `search_runs` method.")
def get_exp(self, experiment_id):
"""
Retrieve an experiment by experiment_id from the backend store.
Parameters
----------
experiment_id : str
the experiment id to return.
Returns
-------
An experiment object (e.g. mlflow.entities.Experiment).
"""
raise NotImplementedError(f"Please implement the `get_exp` method.")
def get_exp_by_name(self, experiment_name):
"""
Retrieve an experiment by experiment name from the backend store.
Parameters
----------
experiment_name : str
the experiment name to return.
Returns
-------
An experiment object (e.g. mlflow.entities.Experiment).
"""
raise NotImplementedError(f"Please implement the `get_exp_by_name` method.")
def create_exp(self, experiment_name, artifact_location=None):
"""
Create an experiment.
Parameters
----------
experiment_name : str
the experiment name, which must be unique.
artifact_location : str
the location to store run artifacts.
Returns
-------
String id of created experiment.
"""
raise NotImplementedError(f"Please implement the `create_exp` method.")
def set_exp(self, experiment_name):
"""
Set the experiment to be active.
Parameters
----------
experiment_name : str
the experiment name, which must be unique.
Returns
-------
String id of created experiment.
"""
raise NotImplementedError(f"Please implement the `set_exp` method.")
def delete_exp(self, experiment_id):
"""
Delete an experiment.
Parameters
----------
experiment_id : str
the experiment id.
Returns
-------
None
"""
raise NotImplementedError(f"Please implement the `create_exp` method.")
def set_tracking_uri(self, uri):
"""
Set the tracking server URI.
Parameters
----------
uri : str
the uri of the tracking server, can be An empty string, or a local file path, prefixed with file:/.
or An HTTP URI or A Databricks workspace.
Returns
-------
None
"""
raise NotImplementedError(f"Please implement the `set_tracking_uri` method.")
def get_tracking_uri(self):
"""
Get the tracking server URI.
Parameters
----------
Returns
-------
The tracking URI.
"""
raise NotImplementedError(f"Please implement the `get_tracking_uri` method.")
def get_recorder(self):
"""
Get the current active Recorder.
Parameters
----------
Returns
-------
An Recorder object.
"""
raise NotImplementedError(f"Please implement the `get_recorder` method.")
class MLflowExpManager(ExpManager):
'''
Use mlflow to implement ExpManager.
'''
def start_exp(self, experiment_name=None, uri=None, project_path=None, artifact_location=None, nested=False):
# set the tracking uri
if uri is None:
assert project_path is not None, "Please provide the project_path if no uri is provided in order to set a proper tracking uri."
print('No tracking URI is provided. The default tracking URI is set as `mlruns` under the project path.')
mlflow.set_tracking_uri(str(project_path / "mlruns"))
else:
mlflow.set_tracking_uri(uri)
# start the experiment
if experiment_name is None:
print('No experiment name provided. The default experiment name is set as `experiment`.')
experiment_id = self.create_exp('experiment', artifact_location)
# set the active experiment
self.set_exp('experiment')
experiment_name = 'experiment'
else:
if experiment_name not in self.experiments:
if self.get_exp_by_name(experiment_name) is not None:
raise Exception('The experiment has already been created before. Please pick another name or delete the files under tracking uri.')
experiment_id = self.create_exp(experiment_name, artifact_location)
else:
experiment_id = self.experiments(experiment_name)[0]
# set the active experiment
self.set_exp(experiment_name)
# store the id and name
self._store_exp(experiment_id, experiment_name)
# set up recorder
recorder = MLflowRecorder(experiment_id)
self.active_recorder = recorder
# store the recorder
self.experiments[experiment_name].append(self.active_recorder)
return self.active_recorder.start_run(experiment_id=experiment_id, nested=nested)
def search_runs(self, experiment_ids=None, filter_string='', run_view_type=1, max_results=100000, order_by=None):
return mlflow.search_runs(experiment_ids, filter_string, run_view_type, max_results, order_by)
def get_exp(self, experiment_id):
return mlflow.get_experiment(experiment_id)
def get_exp_by_name(self, experiment_name):
return mlflow.get_experiment_by_name(experiment_name)
def create_exp(self, experiment_name, artifact_location=None):
return mlflow.create_experiment(experiment_name, artifact_location)
def set_exp(self, experiment_name):
mlflow.set_experiment(experiment_name)
def delete_exp(self, experiment_id):
mlflow.delete_experiment(experiment_id)
self.experiments = {key:val for key, val in self.experiments.items() if val[0] != experiment_id}
def set_tracking_uri(self, uri):
mlflow.set_tracking_uri(uri)
def get_tracking_uri(self):
return mlflow.get_tracking_uri()
def get_recorder(self):
return self.active_recorder

343
qlib/workflow/record.py Normal file
View File

@@ -0,0 +1,343 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import mlflow
import shutil
from pathlib import Path
from ..utils.objm import FileManager
class Recorder:
"""
This is the `Recorder` class for logging the experiments. The API is designed similar to mlflow.
(The link: https://mlflow.org/docs/latest/python_api/mlflow.html)
"""
def __init__(self, experiment_id, project_path=None):
self.experiment_id = experiment_id
self.recorder_id = None
self.recorder_name = None
self.fm = None
self.artifact_uri = None
def set_recorder_name(self, rname):
self.recorder_name = rname
def save_object(self, name, data):
"""
Save object such as prediction file or model checkpoints.
Parameters
----------
name : str
name of the file to be saved.
data : any type
the data to be saved.
Returns
-------
None.
"""
raise NotImplementedError(f"Please implement the `save_object` method.")
def save_objects(self, name_data_list):
"""
Save objects such as prediction file or model checkpoints.
Parameters
----------
name_data_list : list
list of (name, data) pairs
Returns
-------
None.
"""
raise NotImplementedError(f"Please implement the `save_objects` method.")
def load_object(self, name):
"""
Load objects such as prediction file or model checkpoints.
Parameters
----------
name : str
name of the file to be loaded.
Returns
-------
The saved object.
"""
raise NotImplementedError(f"Please implement the `load_object` method.")
def start_run(self, run_id=None, experiment_id=None,
run_name=None, nested=False):
"""
Start running the Recorder. The return value can be used as a context manager within a `with` block;
otherwise, you must call end_run() to terminate the current run. (See `ActiveRun` class in mlflow)
Parameters
----------
run_id : str
id of the active Recorder.
experiment_id : str
id of the active experiment.
run_name : str
name of the Recorder.
nested : boolean
controls whether run is nested in parent run.
Returns
-------
An active running object (e.g. mlflow.ActiveRun object).
"""
raise NotImplementedError(f"Please implement the `start_run` method.")
def end_run(self):
"""
End an active Recorder.
"""
raise NotImplementedError(f"Please implement the `end_run` method.")
def log_param(self, key, value):
"""
Log a parameter under the current run.
Parameters
----------
key : str
the name of the parameter
value : str
the value of the parameter
Returns
-------
None
"""
raise NotImplementedError(f"Please implement the `log_param` method.")
def log_params(self, params):
"""
Log a batch of params for the current run.
Parameters
----------
params : dict
dictionary of param_name: String -> value: String.
Returns
-------
None
"""
raise NotImplementedError(f"Please implement the `log_params` method.")
def log_metric(self, key, value, step=None):
"""
Log a metric under the current run.
Parameters
----------
key : str
the name of the metric
value : float
the value of the metric
Returns
-------
None
"""
raise NotImplementedError(f"Please implement the `log_metric` method.")
def log_metrics(self, metrics, step=None):
"""
Log multiple metrics for the current run.
Parameters
----------
metrics : dict
dictionary of metric_name: String -> value: Float.
Returns
-------
None
"""
raise NotImplementedError(f"Please implement the `log_metrics` method.")
def set_tag(self, key, value):
"""
Set a tag under the current run.
Parameters
----------
key : str
the name of the tag
value : str
the value of the tag
Returns
-------
None
"""
raise NotImplementedError(f"Please implement the `set_tag` method.")
def set_tags(self, tags):
"""
Log a batch of tags for the current run.
Parameters
----------
tags : dict
dictionary of tag_name: String -> value: String.
Returns
-------
None
"""
raise NotImplementedError(f"Please implement the `log_tags` method.")
def delete_tag(self, key):
"""
Delete a tag from a run.
Parameters
----------
key : str
the name of the tag to be deleted.
Returns
-------
None
"""
raise NotImplementedError(f"Please implement the `delete_tag` method.")
def log_artifact(self, local_path, artifact_path=None):
"""
Log a local file or directory as an artifact of the currently active run.
Parameters
----------
local_path : str
path to the file to write.
artifact_path : str
the directory in `artifact_uri` to write to.
Returns
-------
None
"""
raise NotImplementedError(f"Please implement the `log_artifact` method.")
def log_artifacts(self, local_dir, artifact_path=None):
"""
Log all the contents of a local directory as artifacts of the run.
Parameters
----------
local_dir : str
path to the directory of files to write.
artifact_path : str
the directory in `artifact_uri` to write to.
Returns
-------
None
"""
raise NotImplementedError(f"Please implement the `log_artifacts` method.")
def get_artifact_uri(self, artifact_path=None):
"""
Get the absolute URI of the specified artifact in the currently active run.
Parameters
----------
artifact_path : str
the directory in `artifact_uri` to write to.
Returns
-------
An absolute URI referring to the specified artifact or currently active Recorder.
"""
raise NotImplementedError(f"Please implement the `get_artifact_uri` method.")
class MLflowRecorder(Recorder):
'''
Use mlflow to implement a Recorder.
'''
def start_run(self, run_id=None, experiment_id=None,
run_name=None, nested=False):
if run_id is None:
run_id = self.recorder_id
if experiment_id is None:
experiment_id = self.experiment_id
if run_name is None:
run_name = self.recorder_name
# start the run
run = mlflow.start_run(run_id, experiment_id, run_name, nested)
# save the run id and artifact_uri
self.recorder_id = run.info.run_id
self.artifact_uri = run.info.artifact_uri
# set up file manager for saving objects
if self.artifact_uri.startswith('file:/'):
self.fm = FileManager(Path(urllib.parse.urlparse(self.artifact_uri).path))
else:
self.fm = FileManager(Path(self.artifact_uri))
print(self.artifact_uri)
return run
def end_run(self):
mlflow.end_run()
def save_object(self, name, data):
self.fm.save_obj(data, name)
import urllib
print(urllib.parse.urlparse(self.artifact_uri).scheme)
try:
self.log_artifact(self.fm.path / name)
except shutil.SameFileError:
pass
except Exception as e:
print(e)
def save_objects(self, name_data_list):
self.fm.save_objs(name_data_list)
try:
self.log_artifacts(self.fm.path)
except shutil.SameFileError:
pass
except Exception as e:
print(e)
def load_object(self, name):
return self.fm.load_obj(name)
def log_param(self, key, value):
mlflow.log_param(key, value)
def log_params(self, params):
mlflow.log_params(params)
def log_metric(self, key, value, step=None):
mlflow.log_metric(key, value, step)
def log_metrics(self, metrics, step=None):
mlflow.log_metrics(metrics, step)
def set_tag(self, key, value):
mlflow.set_tag(key, value)
def set_tags(self, tags):
mlflow.set_tags(tags)
def delete_tag(self, key):
mlflow.delete_tag(key)
def log_artifact(self, local_path, artifact_path=None):
mlflow.log_artifact(local_path, artifact_path)
def log_artifacts(self, local_dir, artifact_path=None):
mlflow.log_artifacts(local_dir, artifact_path)
def get_artifact_uri(self, artifact_path=None):
if self.artifact_uri is not None:
return self.artifact_uri
return mlflow.get_artifact_uri(artifact_path)