mirror of
https://github.com/microsoft/qlib.git
synced 2026-06-06 05:51:17 +08:00
Update R related codes
This commit is contained in:
@@ -87,14 +87,8 @@ if __name__ == "__main__":
|
||||
},
|
||||
"segments": {
|
||||
"train": ("2008-01-01", "2014-12-31"),
|
||||
"valid": (
|
||||
"2015-01-01",
|
||||
"2016-12-31",
|
||||
),
|
||||
"test": (
|
||||
"2017-01-01",
|
||||
"2020-08-01",
|
||||
),
|
||||
"valid": ("2015-01-01", "2016-12-31"),
|
||||
"test": ("2017-01-01", "2020-08-01"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -85,14 +85,8 @@ if __name__ == "__main__":
|
||||
},
|
||||
"segments": {
|
||||
"train": ("2008-01-01", "2014-12-31"),
|
||||
"valid": (
|
||||
"2015-01-01",
|
||||
"2016-12-31",
|
||||
),
|
||||
"test": (
|
||||
"2017-01-01",
|
||||
"2020-08-01",
|
||||
),
|
||||
"valid": ("2015-01-01", "2016-12-31"),
|
||||
"test": ("2017-01-01", "2020-08-01"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import re
|
||||
import subprocess
|
||||
import platform
|
||||
import yaml
|
||||
import atexit
|
||||
from pathlib import Path
|
||||
|
||||
from .utils import can_use_cache, init_instance_by_config, get_module_by_module_path
|
||||
@@ -63,12 +64,10 @@ def init(default_conf="client", **kwargs):
|
||||
if not os.path.exists(C["provider_uri"]):
|
||||
if C["auto_mount"]:
|
||||
LOG.error(
|
||||
"Invalid provider uri: {}, please check if a valid provider uri has been set. This path does not exist.".format(
|
||||
C["provider_uri"]
|
||||
)
|
||||
f"Invalid provider uri: {C['provider_uri']}, please check if a valid provider uri has been set. This path does not exist."
|
||||
)
|
||||
else:
|
||||
LOG.warning("auto_path is False, please make sure {} is mounted".format(C["mount_path"]))
|
||||
LOG.warning(f"auto_path is False, please make sure {C['mount_path']} is mounted")
|
||||
elif C.get_uri_type() == QlibConfig.NFS_URI:
|
||||
_mount_nfs_uri(C)
|
||||
else:
|
||||
@@ -83,10 +82,11 @@ def init(default_conf="client", **kwargs):
|
||||
LOG.info(f"flask_server={C['flask_server']}, flask_port={C['flask_port']}")
|
||||
|
||||
# set up QlibRecorder
|
||||
module = get_module_by_module_path("qlib.workflow.expm")
|
||||
exp_manager = init_instance_by_config(C["exp_manager"], module)
|
||||
exp_manager = init_instance_by_config(C["exp_manager"])
|
||||
qr = QlibRecorder(exp_manager)
|
||||
R.register(qr)
|
||||
# clean up experiment when python program ends
|
||||
atexit.register(R.end_exp, status="FAILED") # will not take effect if experiment ends
|
||||
|
||||
|
||||
def _mount_nfs_uri(C):
|
||||
@@ -102,9 +102,7 @@ def _mount_nfs_uri(C):
|
||||
if not C["auto_mount"]:
|
||||
if not os.path.exists(C["mount_path"]):
|
||||
raise FileNotFoundError(
|
||||
"Invalid mount path: {}! Please mount manually: {} or Set init parameter `auto_mount=True`".format(
|
||||
C["mount_path"], mount_command
|
||||
)
|
||||
f"Invalid mount path: {C['mount_path']}! Please mount manually: {mount_command} or Set init parameter `auto_mount=True`"
|
||||
)
|
||||
else:
|
||||
# Judging system type
|
||||
@@ -161,9 +159,7 @@ def _mount_nfs_uri(C):
|
||||
os.makedirs(C["mount_path"], exist_ok=True)
|
||||
except Exception:
|
||||
raise OSError(
|
||||
"Failed to create directory {}, please create {} manually!".format(
|
||||
C["mount_path"], C["mount_path"]
|
||||
)
|
||||
f"Failed to create directory {C['mount_path']}, please create {C['mount_path']} manually!"
|
||||
)
|
||||
|
||||
# check nfs-common
|
||||
@@ -175,17 +171,15 @@ def _mount_nfs_uri(C):
|
||||
command_status = os.system(mount_command)
|
||||
if command_status == 256:
|
||||
raise OSError(
|
||||
"mount {} on {} error! Needs SUDO! Please mount manually: {}".format(
|
||||
C["provider_uri"], C["mount_path"], mount_command
|
||||
)
|
||||
f"mount {C['provider_uri']} on {C['mount_path']} error! Needs SUDO! Please mount manually: {mount_command}"
|
||||
)
|
||||
elif command_status == 32512:
|
||||
# LOG.error("Command error")
|
||||
raise OSError("mount {} on {} error! Command error".format(C["provider_uri"], C["mount_path"]))
|
||||
raise OSError(f"mount {C['provider_uri']} on {C['mount_path']} error! Command error")
|
||||
elif command_status == 0:
|
||||
LOG.info("Mount finished")
|
||||
else:
|
||||
LOG.warning("{} on {} is already mounted".format(_remote_uri, _mount_path))
|
||||
LOG.warning(f"{_remote_uri} on {_mount_path} is already mounted")
|
||||
|
||||
|
||||
def init_from_yaml_conf(conf_path):
|
||||
|
||||
@@ -126,8 +126,14 @@ _default_config = {
|
||||
"loggers": {"qlib": {"level": "DEBUG", "handlers": ["console"]}},
|
||||
},
|
||||
# Defatult config for experiment manager
|
||||
"exp_manager": {"class": "MLflowExpManager", "kwargs": {}},
|
||||
"exp_uri": str(Path(os.getcwd()).resolve() / "mlruns"),
|
||||
"exp_manager": {
|
||||
"class": "MLflowExpManager",
|
||||
"module_path": "qlib.workflow.expm",
|
||||
"kwargs": {
|
||||
"uri": str(Path(os.getcwd()).resolve() / "mlruns"),
|
||||
"default_exp_name": "Experiment",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
MODE_CONF = {
|
||||
|
||||
@@ -294,7 +294,6 @@ class GRU(Model):
|
||||
return pd.Series(preds, index=index)
|
||||
|
||||
|
||||
|
||||
class AverageMeter(object):
|
||||
"""Computes and stores the average and current value"""
|
||||
|
||||
|
||||
@@ -4,20 +4,51 @@
|
||||
from contextlib import contextmanager
|
||||
from .expm import MLflowExpManager
|
||||
from ..utils import Wrapper
|
||||
from ..config import C
|
||||
|
||||
|
||||
class QlibRecorder:
|
||||
"""
|
||||
A global system that helps to manage the experiments.
|
||||
|
||||
The components of the system:
|
||||
1) ExperimentManager: a class managing experiments.
|
||||
2) Experiment: a class of experiment, and each instance of it is responsible for a single experiment.
|
||||
3) Recorder: a class of recorder, and each instance of it is responsible for a single run.
|
||||
|
||||
The general structure of the system:
|
||||
ExperimentManager
|
||||
- Experiment 1
|
||||
- Recorder 1
|
||||
- Recorder 2
|
||||
- ...
|
||||
- Experiment 2
|
||||
- ...
|
||||
- ...
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, exp_manager):
|
||||
self.exp_manager = exp_manager
|
||||
self.uri = C["exp_uri"]
|
||||
|
||||
@contextmanager
|
||||
def start(self, experiment_name):
|
||||
"""
|
||||
Method to start an experiment. This method can only be called within a Python's `with` statement.
|
||||
|
||||
Use case:
|
||||
---------
|
||||
```
|
||||
with R.start('test'):
|
||||
model.fit(dataset)
|
||||
R.log...
|
||||
... # further operations
|
||||
```
|
||||
|
||||
Parameters
|
||||
----------
|
||||
experiment_name : str
|
||||
name of the experiment one wants to start.
|
||||
"""
|
||||
run = self.start_exp(experiment_name)
|
||||
try:
|
||||
yield run
|
||||
@@ -26,44 +57,425 @@ class QlibRecorder:
|
||||
raise e
|
||||
self.end_exp("FINISHED")
|
||||
|
||||
def start_exp(self, experiment_name=None):
|
||||
return self.exp_manager.start_exp(experiment_name, self.uri)
|
||||
def start_exp(self, experiment_name=None, uri=None):
|
||||
"""
|
||||
Lower leverl method for starting an experiment. When use this method, one should end the experiment manually
|
||||
and the status of the recorder may not be handled properly.
|
||||
|
||||
Use case:
|
||||
---------
|
||||
```
|
||||
R.start_exp(experiment_name='test')
|
||||
... # further operations
|
||||
R.end_exp('FINISHED')
|
||||
```
|
||||
|
||||
Parameters
|
||||
----------
|
||||
experiment_name : str
|
||||
the name of the experiment to be started
|
||||
uri : str
|
||||
the tracking uri of the experiment, where all the artifacts/metrics etc. will be stored.
|
||||
|
||||
Returns
|
||||
-------
|
||||
An experiment instance being started.
|
||||
"""
|
||||
return self.exp_manager.start_exp(experiment_name, uri)
|
||||
|
||||
def end_exp(self, status):
|
||||
"""
|
||||
Method for ending an experiment manually. It will end the current active experiment, as well as its
|
||||
active recorder with the specified `status` type.
|
||||
|
||||
Use case:
|
||||
---------
|
||||
```
|
||||
R.start_exp(experiment_name='test')
|
||||
... # further operations
|
||||
R.end_exp('FINISHED')
|
||||
```
|
||||
|
||||
Parameters
|
||||
----------
|
||||
status : str
|
||||
The status of a recorder, which can be SCHEDULED, RUNNING, FINISHED, FAILED.
|
||||
"""
|
||||
self.exp_manager.end_exp(status)
|
||||
|
||||
def search_records(self, experiment_ids, **kwargs):
|
||||
"""
|
||||
Get a pandas DataFrame of records that fit the search criteria.
|
||||
|
||||
Use case:
|
||||
---------
|
||||
```
|
||||
R.log_metrics(m=2.50, step=0)
|
||||
records = R.search_runs([experiment_id], order_by=["metrics.m DESC"])
|
||||
```
|
||||
|
||||
Parameters
|
||||
----------
|
||||
experiment_ids : list
|
||||
list of experiment IDs.
|
||||
filter_string : str
|
||||
filter query string, defaults to searching all runs.
|
||||
run_view_type : int
|
||||
one of enum values ACTIVE_ONLY, DELETED_ONLY, or ALL (e.g. in mlflow.entities.ViewType).
|
||||
max_results : int
|
||||
the maximum number of runs to put in the dataframe.
|
||||
order_by : list
|
||||
list of columns to order by (e.g., “metrics.rmse”).
|
||||
|
||||
Returns
|
||||
-------
|
||||
A pandas.DataFrame of records, where each metric, parameter, and tag
|
||||
are expanded into their own columns named metrics.*, params.*, and tags.*
|
||||
respectively. For records that don't have a particular metric, parameter, or tag, their
|
||||
value will be (NumPy) Nan, None, or None respectively.
|
||||
"""
|
||||
return self.exp_manager.search_records(experiment_ids, **kwargs)
|
||||
|
||||
def get_exp(self, experiment_id=None, experiment_name=None):
|
||||
return self.exp_manager.get_exp(experiment_id, experiment_name)
|
||||
def list_experiments(self):
|
||||
"""
|
||||
Method for listing all the existing experiments (except for those being deleted.)
|
||||
|
||||
def delete_exp(self, experiment_id):
|
||||
self.exp_manager.delete_exp(experiment_id)
|
||||
Use case:
|
||||
---------
|
||||
```
|
||||
exps = R.list_experiments()
|
||||
```
|
||||
|
||||
Returns
|
||||
-------
|
||||
A dictionary (name -> experiment) of experiments information that being stored.
|
||||
"""
|
||||
return self.exp_manager.list_experiments()
|
||||
|
||||
def list_recorders(self, experiment_id=None, experiment_name=None):
|
||||
"""
|
||||
Method for listing all the recorders of experiment with given id or name.
|
||||
|
||||
Use case:
|
||||
---------
|
||||
```
|
||||
recorders = R.list_recorders(experiment_name='test')
|
||||
```
|
||||
|
||||
Parameters
|
||||
----------
|
||||
experiment_id : str
|
||||
id of the experiment.
|
||||
experiment_name : str
|
||||
name of the experiment.
|
||||
|
||||
Returns
|
||||
-------
|
||||
A dictionary (id -> recorder) of recorder information that being stored.
|
||||
"""
|
||||
return self.get_exp(experiment_id, experiment_name).list_recorders()
|
||||
|
||||
def get_exp(self, experiment_id=None, experiment_name=None, create=True):
|
||||
"""
|
||||
Method for retrieving an experiment with given id or name. Once the `create` argument is set to
|
||||
True, if no valid experiment is found, this method will create one for you. Otherwise, it will
|
||||
only retrieve a specific experiment or raise an Error.
|
||||
|
||||
If `create` is True:
|
||||
If R's running:
|
||||
1) no id or name specified, return the active experiment.
|
||||
2) if id or name is specified, return the specified experiment. If no such exp found,
|
||||
create a new experiment with given id or name.
|
||||
If R's not running:
|
||||
1) no id or name specified, create a default experiment.
|
||||
2) if id or name is specified, return the specified experiment. If no such exp found,
|
||||
create a new experiment with given id or name.
|
||||
Else If `create` is False:
|
||||
If R's running:
|
||||
1) no id or name specified, return the active experiment.
|
||||
2) if id or name is specified, return the specified experiment. If no such exp found,
|
||||
raise Error.
|
||||
If R's not running:
|
||||
1) no id or name specified, raise Error.
|
||||
2) if id or name is specified, return the specified experiment. If no such exp found,
|
||||
raise Error.
|
||||
|
||||
Use case:
|
||||
---------
|
||||
```
|
||||
# Case 1
|
||||
with R.start('test'):
|
||||
exp = R.get_exp()
|
||||
recorders = exp.list_recorders()
|
||||
|
||||
# Case 2
|
||||
with R.start('test'):
|
||||
exp = R.get_exp('test1')
|
||||
|
||||
# Case 3
|
||||
exp = R.get_exp() -> a default experiment.
|
||||
|
||||
# Case 4
|
||||
exp = R.get_exp(experiment_name='test')
|
||||
|
||||
# Case 5
|
||||
exp = R.get_exp(create=False) -> Error
|
||||
```
|
||||
|
||||
Parameters
|
||||
----------
|
||||
experiment_id : str
|
||||
id of the experiment.
|
||||
experiment_name : str
|
||||
name of the experiment.
|
||||
create : boolean
|
||||
decide whether to create an default experiment.
|
||||
|
||||
Returns
|
||||
-------
|
||||
An experiment instance with given id or name.
|
||||
"""
|
||||
return self.exp_manager.get_exp(experiment_id, experiment_name, create)
|
||||
|
||||
def delete_exp(self, experiment_id=None, experiment_name=None):
|
||||
"""
|
||||
Method for deleting the experiment with given id or name. At least one of id or name must be given,
|
||||
otherwise, error will occur.
|
||||
|
||||
Use case:
|
||||
---------
|
||||
```
|
||||
R.delete_exp(experiment_name='test')
|
||||
```
|
||||
|
||||
Parameters
|
||||
----------
|
||||
experiment_id : str
|
||||
id of the experiment.
|
||||
experiment_name : str
|
||||
name of the experiment.
|
||||
"""
|
||||
self.exp_manager.delete_exp(experiment_id, experiment_name)
|
||||
|
||||
def get_uri(self):
|
||||
"""
|
||||
Method for retrieving the uri of current experiment manager.
|
||||
|
||||
Use case:
|
||||
---------
|
||||
```
|
||||
uri = R.get_uri()
|
||||
```
|
||||
|
||||
Returns
|
||||
-------
|
||||
The uri of current experiment manager.
|
||||
"""
|
||||
return self.exp_manager.get_uri()
|
||||
|
||||
def get_recorder(self, recorder_id=None, recorder_name=None):
|
||||
return self.exp_manager.active_experiment.get_recorder(recorder_id, recorder_name)
|
||||
def get_recorder(self, recorder_id=None, recorder_name=None, experiment_name=None):
|
||||
"""
|
||||
Method for retrieving a recorder.
|
||||
|
||||
If R's running: 1) no id or name specified, return the active recorder. 2) if id or name is
|
||||
specified, return the specified recorder.
|
||||
If R's not running: 1) no id or name specified, raise Error. 2) if id or name is specified,
|
||||
and the corresponding experiment_name must be given, return the specified recorder. Otherwise,
|
||||
raise Error.
|
||||
|
||||
The recorder can be used for further process such as `save_object`, `load_object`, `log_params`,
|
||||
`log_metrics`, etc.
|
||||
|
||||
Use case:
|
||||
---------
|
||||
```
|
||||
# Case 1
|
||||
with R.start('test'):
|
||||
recorder = R.get_recorder()
|
||||
|
||||
# Case 2
|
||||
with R.start('test'):
|
||||
recorder = R.get_recorder(recorder_id='2e7a4efd66574fa49039e00ffaefa99d')
|
||||
|
||||
# Case 3
|
||||
recorder = R.get_recorder() -> Error
|
||||
|
||||
# Case 4
|
||||
recorder = R.get_recorder(recorder_id='2e7a4efd66574fa49039e00ffaefa99d') -> Error
|
||||
|
||||
# Case 5
|
||||
recorder = R.get_recorder(recorder_id='2e7a4efd66574fa49039e00ffaefa99d', experiment_name='test')
|
||||
```
|
||||
|
||||
Parameters
|
||||
----------
|
||||
recorder_id : str
|
||||
id of the recorder.
|
||||
recorder_name : str
|
||||
name of the recorder.
|
||||
experiment_name : str
|
||||
name of the experiment.
|
||||
|
||||
|
||||
Returns
|
||||
-------
|
||||
A recorder instance.
|
||||
"""
|
||||
return self.get_exp(experiment_name=experiment_name, create=False).get_recorder(
|
||||
recorder_id, recorder_name, create=False
|
||||
)
|
||||
|
||||
def delete_recorder(self, recorder_id=None, recorder_name=None):
|
||||
"""
|
||||
Method for deleting the recorders with given id or name. At least one of id or name must be given,
|
||||
otherwise, error will occur.
|
||||
|
||||
Use case:
|
||||
---------
|
||||
```
|
||||
R.delete_recorder(recorder_id='2e7a4efd66574fa49039e00ffaefa99d')
|
||||
```
|
||||
|
||||
Parameters
|
||||
----------
|
||||
recorder_id : str
|
||||
id of the experiment.
|
||||
recorder_name : str
|
||||
name of the experiment.
|
||||
"""
|
||||
self.get_exp().delete_recorder(recorder_id, recorder_name)
|
||||
|
||||
def save_objects(self, local_path=None, artifact_path=None, **kwargs):
|
||||
self.exp_manager.active_experiment.active_recorder.save_objects(local_path, artifact_path, **kwargs)
|
||||
"""
|
||||
Method for saving objects as artifacts in the experiment to the uri. It supports either saving
|
||||
from a local file/directory, or directly saving objects.
|
||||
|
||||
def load_object(self, name):
|
||||
return self.exp_manager.active_experiment.active_recorder.load_object(name)
|
||||
If R's running: it will save the objects through the running recorder.
|
||||
If R's not running: the system will create a default experiment, and a new recorder and
|
||||
save objects under it.
|
||||
|
||||
If one wants to save objects with a specific recorder. It is recommended to first
|
||||
get the specific recorder through `get_recorder` API and use the recorder the save objects.
|
||||
The supported arguments are the same as this method.
|
||||
|
||||
Use case:
|
||||
---------
|
||||
```
|
||||
# Case 1
|
||||
with R.start('test'):
|
||||
pred = model.predict(dataset)
|
||||
R.save_objects(data=pred, name='pred.pkl', artifact_path='prediction')
|
||||
|
||||
# Case 2
|
||||
with R.start('test'):
|
||||
pred1 = model1.predict(dataset)
|
||||
pred2 = model2.predict(dataset)
|
||||
dn_list = [(pred1, 'pred1.pkl'), (pred2, 'pred2.pkl')]
|
||||
R.save_objects(data_name_list=dn_list)
|
||||
|
||||
# Case 3
|
||||
with R.start('test'):
|
||||
R.save_objects(local_path='results/pred.pkl')
|
||||
```
|
||||
|
||||
Parameters
|
||||
----------
|
||||
data : any type
|
||||
the data to be saved.
|
||||
name : str
|
||||
name of the file to be saved.
|
||||
data_name_list : list
|
||||
list of (data, name) pairs
|
||||
local_path : str
|
||||
if provided, them save the file or directory to the artifact URI.
|
||||
artifact_path=None : str
|
||||
the relative path for the artifact to be stored in the URI.
|
||||
"""
|
||||
self.get_exp().get_recorder().save_objects(local_path, artifact_path, **kwargs)
|
||||
|
||||
def log_params(self, **kwargs):
|
||||
self.exp_manager.active_experiment.active_recorder.log_params(**kwargs)
|
||||
"""
|
||||
Method for logging parameters during an experiment.
|
||||
|
||||
If R's running: it will log parameters through the running recorder.
|
||||
If R's not running: the system will create a default experiment as well as a new recorder, and
|
||||
log parameters under it.
|
||||
|
||||
One can also log to a specific recorder after getting it with `get_recorder` API.
|
||||
|
||||
Use case:
|
||||
---------
|
||||
```
|
||||
# Case 1
|
||||
with R.start('test'):
|
||||
R.log_params(learning_rate=0.01)
|
||||
|
||||
# Case 2
|
||||
R.log_params(learning_rate=0.01)
|
||||
```
|
||||
|
||||
Parameters
|
||||
----------
|
||||
keyword argument:
|
||||
name1=value1, name2=value2, ...
|
||||
"""
|
||||
self.get_exp().get_recorder().log_params(**kwargs)
|
||||
|
||||
def log_metrics(self, step=None, **kwargs):
|
||||
self.exp_manager.active_experiment.active_recorder.log_metrics(step, **kwargs)
|
||||
"""
|
||||
Method for logging metrics during an experiment.
|
||||
|
||||
If R's running: it will log metrics through the running recorder.
|
||||
If R's not running: the system will create a default experiment as well as a new recorder, and
|
||||
log metrics under it.
|
||||
|
||||
One can also log to a specific recorder after getting it with `get_recorder` API.
|
||||
|
||||
Use case:
|
||||
---------
|
||||
```
|
||||
# Case 1
|
||||
with R.start('test'):
|
||||
R.log_metrics(train_loss=0.33, step=1)
|
||||
|
||||
# Case 2
|
||||
R.log_metrics(train_loss=0.33, step=1)
|
||||
```
|
||||
|
||||
Parameters
|
||||
----------
|
||||
keyword argument:
|
||||
name1=value1, name2=value2, ...
|
||||
"""
|
||||
self.get_exp().get_recorder().log_metrics(step, **kwargs)
|
||||
|
||||
def set_tags(self, **kwargs):
|
||||
self.exp_manager.active_experiment.active_recorder.set_tags(**kwargs)
|
||||
"""
|
||||
Method for setting tags for a recorder.
|
||||
|
||||
def delete_tag(self, *key):
|
||||
self.exp_manager.active_experiment.active_recorder.delete_tag(*key)
|
||||
If R's running: it will set tags through the running recorder.
|
||||
If R's not running: the system will create a default experiment as well as a new recorder, and
|
||||
set the tags under it.
|
||||
|
||||
One can also set the tag to a specific recorder after getting it with `get_recorder` API.
|
||||
|
||||
Use case:
|
||||
---------
|
||||
```
|
||||
# Case 1
|
||||
with R.start('test'):
|
||||
R.set_tags(release_version=2.2.0)
|
||||
|
||||
# Case 2
|
||||
R.set_tags(release_version=2.2.0)
|
||||
```
|
||||
|
||||
Parameters
|
||||
----------
|
||||
keyword argument:
|
||||
name1=value1, name2=value2, ...
|
||||
"""
|
||||
self.get_exp().get_recorder().set_tags(**kwargs)
|
||||
|
||||
|
||||
# global record
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
# Licensed under the MIT License.
|
||||
|
||||
import mlflow
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from .recorder import MLflowRecorder
|
||||
from ..log import get_module_logger
|
||||
@@ -11,12 +12,13 @@ logger = get_module_logger("workflow", "INFO")
|
||||
|
||||
class Experiment:
|
||||
"""
|
||||
Thie is the `Experiment` class for each experiment being run. The API is designed
|
||||
Thie is the `Experiment` class for each experiment being run. The API is designed similar to mlflow.
|
||||
(The link: https://mlflow.org/docs/latest/python_api/mlflow.html)
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.name = None
|
||||
self.id = None
|
||||
def __init__(self, id, name):
|
||||
self.id = id
|
||||
self.name = name
|
||||
self.active_recorder = None # only one recorder can running each time
|
||||
self.recorders = dict() # recorder id -> object
|
||||
|
||||
@@ -32,16 +34,14 @@ class Experiment:
|
||||
output["class"] = "Experiment"
|
||||
output["id"] = self.id
|
||||
output["name"] = self.name
|
||||
output["active_recorder"] = self.active_recorder.id
|
||||
output["active_recorder"] = self.active_recorder.id if self.active_recorder is not None else None
|
||||
output["recorders"] = list(self.recorders.keys())
|
||||
return output
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Start the experiment.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
|
||||
Returns
|
||||
-------
|
||||
A running recorder instance.
|
||||
@@ -63,9 +63,6 @@ class Experiment:
|
||||
"""
|
||||
Create a recorder for each experiment.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
|
||||
Returns
|
||||
-------
|
||||
A recorder object.
|
||||
@@ -124,13 +121,31 @@ class Experiment:
|
||||
"""
|
||||
raise NotImplementedError(f"Please implement the `get_recorder` method.")
|
||||
|
||||
def list_recorders(self):
|
||||
"""
|
||||
List all the existing recorders of this experiment.
|
||||
|
||||
Returns
|
||||
-------
|
||||
A dictionary (id -> recorder) of recorder information that being stored.
|
||||
"""
|
||||
raise NotImplementedError(f"Please implement the `list_recorders` method.")
|
||||
|
||||
|
||||
class MLflowExperiment(Experiment):
|
||||
"""
|
||||
Use mlflow to implement Experiment.
|
||||
"""
|
||||
|
||||
def __init__(self, id, name, uri):
|
||||
super(MLflowExperiment, self).__init__(id, name)
|
||||
self._uri = uri
|
||||
self._total_recorders = 0
|
||||
self._default_name = None
|
||||
|
||||
def start(self):
|
||||
# get all the recorders of the experiment
|
||||
self.recorders = self.list_recorders()
|
||||
# set up recorder
|
||||
recorder = self.create_recorder()
|
||||
self.active_recorder = recorder
|
||||
@@ -138,17 +153,22 @@ class MLflowExperiment(Experiment):
|
||||
run = self.active_recorder.start_run()
|
||||
# store the recorder
|
||||
self.recorders[self.active_recorder.id] = recorder
|
||||
self._total_recorders += 1 # update recorder num
|
||||
logger.info(f"Experiment {self.id} starts running ...")
|
||||
|
||||
return self.active_recorder
|
||||
|
||||
def end(self, status):
|
||||
if self.active_recorder is not None:
|
||||
self.active_recorder.end_run(status)
|
||||
self.active_recorder = None
|
||||
self._total_recorders -= 1
|
||||
|
||||
def create_recorder(self):
|
||||
num = len(self.recorders)
|
||||
name = "Recorder_{}".format(num + 1)
|
||||
recorder = MLflowRecorder(name, self.id)
|
||||
recorder = MLflowRecorder(name, self.id, self._uri)
|
||||
|
||||
return recorder
|
||||
|
||||
def search_records(self, **kwargs):
|
||||
@@ -156,21 +176,92 @@ class MLflowExperiment(Experiment):
|
||||
run_view_type = 1 if kwargs.get("run_view_type") is None else kwargs.get("run_view_type")
|
||||
max_results = 100000 if kwargs.get("max_results") is None else kwargs.get("max_results")
|
||||
order_by = kwargs.get("order_by")
|
||||
|
||||
return mlflow.search_runs([self.id], filter_string, run_view_type, max_results, order_by)
|
||||
|
||||
def delete_recorder(self, recorder_id):
|
||||
mlflow.delete_run(recorder_id)
|
||||
self.recorders = [r for r in self.recorders if r.id == recorder_id]
|
||||
def delete_recorder(self, recorder_id=None, recorder_name=None):
|
||||
assert (
|
||||
recorder_id is not None or recorder_name is not None
|
||||
), "Please input a valid recorder id or name before deleting."
|
||||
try:
|
||||
if recorder_id is not None:
|
||||
mlflow.delete_run(recorder_id)
|
||||
self.recorders = [r for r in self.recorders if r == recorder_id]
|
||||
else:
|
||||
for r in self.recorders:
|
||||
if self.recorders[r].name == recorder_name:
|
||||
recorder_id = r
|
||||
break
|
||||
mlflow.delete_run(recorder_id)
|
||||
except:
|
||||
raise Exception(
|
||||
"Something went wrong when deleting recorder. Please check if the name/id of the recorder is correct."
|
||||
)
|
||||
|
||||
def get_recorder(self, recorder_id=None, recorder_name=None):
|
||||
if recorder_id is not None:
|
||||
return self.recorders[recorder_id]
|
||||
elif recorder_name is not None:
|
||||
for rid in self.recorders:
|
||||
if self.recorders[rid].name == recorder_name:
|
||||
return self.recorders[rid]
|
||||
elif self.active_recorder is None:
|
||||
raise Exception("No valid active recorder exists. Please make sure the experiment is running.")
|
||||
def get_recorder(self, recorder_id=None, recorder_name=None, create=True):
|
||||
if recorder_id is None and recorder_name is None:
|
||||
if self.active_recorder:
|
||||
return self.active_recorder
|
||||
else:
|
||||
if create:
|
||||
self.start()
|
||||
logger.warning(
|
||||
f"Recorder {self.active_recorder.id} is running under the experiment with name {self.name}..."
|
||||
)
|
||||
return self.active_recorder
|
||||
else:
|
||||
raise Exception(
|
||||
"Something went wrong when retrieving recorders. Please check if QlibRecorder is running or the name/id of the recorder is correct."
|
||||
)
|
||||
else:
|
||||
logger.info("No experiment id or name is given. Return the current active experiment.")
|
||||
return self.active_recorder
|
||||
if recorder_id is not None:
|
||||
if recorder_id in self.recorders:
|
||||
return self.recorders[recorder_id]
|
||||
else:
|
||||
# mlflow does not support create a run with given id
|
||||
raise Exception(
|
||||
"Something went wrong when retrieving recorders. Please check if QlibRecorder is running or the name/id of the recorder is correct."
|
||||
)
|
||||
else:
|
||||
for rid in self.recorders:
|
||||
if self.recorders[rid].name == recorder_name:
|
||||
return self.recorders[rid]
|
||||
if create:
|
||||
self.recorders = self.list_recorders()
|
||||
logger.warning(f"No valid recorder found. Create a new recorder with name {recorder_name}.")
|
||||
recorder = self.create_recorder()
|
||||
recorder.name = recorder_name
|
||||
recorder.start_run()
|
||||
return recorder
|
||||
else:
|
||||
raise Exception(
|
||||
"Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct."
|
||||
)
|
||||
|
||||
def list_recorders(self):
|
||||
client = mlflow.tracking.MlflowClient(tracking_uri=self._uri)
|
||||
runs = client.list_run_infos(self.id)[::-1]
|
||||
recorders = dict()
|
||||
self._total_recorders = len(runs)
|
||||
for i in range(len(runs)):
|
||||
rid = runs[i].run_id
|
||||
status = runs[i].status
|
||||
start_time = runs[i].start_time
|
||||
end_time = runs[i].end_time
|
||||
recorder = MLflowRecorder(f"Recorder_{i+1}", self.id, self._uri)
|
||||
recorder.id = rid
|
||||
recorder.status = status
|
||||
recorder.start_time = (
|
||||
datetime.fromtimestamp(float(start_time) / 1000.0).strftime("%Y-%m-%d %H:%M:%S")
|
||||
if start_time is not None
|
||||
else None
|
||||
)
|
||||
recorder.end_time = (
|
||||
datetime.fromtimestamp(float(end_time) / 1000.0).strftime("%Y-%m-%d %H:%M:%S")
|
||||
if end_time is not None
|
||||
else None
|
||||
)
|
||||
recorder._uri = self._uri
|
||||
recorders[rid] = recorder
|
||||
|
||||
return recorders
|
||||
|
||||
@@ -18,8 +18,9 @@ class ExpManager:
|
||||
(The link: https://mlflow.org/docs/latest/python_api/mlflow.html)
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.uri = None
|
||||
def __init__(self, uri, default_exp_name):
|
||||
self.uri = uri
|
||||
self.default_exp_name = default_exp_name
|
||||
self.active_experiment = None # only one experiment can running each time
|
||||
self.experiments = dict() # store the experiment name --> Experiment object
|
||||
|
||||
@@ -39,6 +40,7 @@ class ExpManager:
|
||||
controls whether run is nested in parent run.
|
||||
|
||||
Returns
|
||||
-------
|
||||
An active recorder.
|
||||
"""
|
||||
raise NotImplementedError(f"Please implement the `start_exp` method.")
|
||||
@@ -112,7 +114,7 @@ class ExpManager:
|
||||
"""
|
||||
raise NotImplementedError(f"Please implement the `get_exp` method.")
|
||||
|
||||
def delete_exp(self, experiment_id):
|
||||
def delete_exp(self, experiment_id=None, experiment_name=None):
|
||||
"""
|
||||
Delete an experiment.
|
||||
|
||||
@@ -120,41 +122,51 @@ class ExpManager:
|
||||
----------
|
||||
experiment_id : str
|
||||
the experiment id.
|
||||
experiment_name : str
|
||||
the experiment name.
|
||||
"""
|
||||
raise NotImplementedError(f"Please implement the `create_exp` method.")
|
||||
raise NotImplementedError(f"Please implement the `delete_exp` method.")
|
||||
|
||||
def get_uri(self):
|
||||
"""
|
||||
Get the default tracking URI or current URI.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
|
||||
Returns
|
||||
-------
|
||||
The tracking URI string.
|
||||
"""
|
||||
return self.uri
|
||||
|
||||
def list_experiments(self):
|
||||
"""
|
||||
List all the existing experiments.
|
||||
|
||||
Returns
|
||||
-------
|
||||
A dictionary (name -> experiment) of experiments information that being stored.
|
||||
"""
|
||||
raise NotImplementedError(f"Please implement the `list_experiments` method.")
|
||||
|
||||
|
||||
class MLflowExpManager(ExpManager):
|
||||
"""
|
||||
Use mlflow to implement ExpManager.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super(MLflowExpManager, self).__init__()
|
||||
self.uri = None
|
||||
def __init__(self, uri, default_exp_name):
|
||||
super(MLflowExpManager, self).__init__(uri, default_exp_name)
|
||||
self._total_exps = 0
|
||||
# get all the exps
|
||||
self.experiments = self.list_experiments()
|
||||
|
||||
def start_exp(self, experiment_name=None, uri=None):
|
||||
# create experiment
|
||||
experiment = self.create_exp(experiment_name, uri)
|
||||
# set up active experiment
|
||||
self.active_experiment = experiment
|
||||
# store the experiment
|
||||
self.experiments[experiment_name] = experiment
|
||||
# start the experiment
|
||||
self.active_experiment.start()
|
||||
self._total_exps += 1 # update exp num
|
||||
|
||||
return self.active_experiment
|
||||
|
||||
@@ -162,10 +174,9 @@ class MLflowExpManager(ExpManager):
|
||||
if self.active_experiment is not None:
|
||||
self.active_experiment.end(status)
|
||||
self.active_experiment = None
|
||||
self._total_exps -= 1
|
||||
|
||||
def create_exp(self, experiment_name=None, uri=None):
|
||||
# init experiment
|
||||
experiment = MLflowExperiment()
|
||||
# set the tracking uri
|
||||
if uri is None:
|
||||
logger.info(
|
||||
@@ -176,15 +187,19 @@ class MLflowExpManager(ExpManager):
|
||||
mlflow.set_tracking_uri(self.uri)
|
||||
# start the experiment
|
||||
if experiment_name is None:
|
||||
logger.info("No experiment name provided. The default experiment name is set as `experiment`.")
|
||||
experiment_id = mlflow.create_experiment("experiment")
|
||||
logger.info(
|
||||
f"No experiment name provided. The default experiment name is set as `{self.default_exp_name}`."
|
||||
)
|
||||
experiment_id = mlflow.create_experiment(self.default_exp_name)
|
||||
# set the active experiment
|
||||
mlflow.set_experiment("experiment")
|
||||
experiment_name = "experiment"
|
||||
mlflow.set_experiment(self.default_exp_name)
|
||||
experiment_name = self.default_exp_name
|
||||
else:
|
||||
if experiment_name not in self.experiments:
|
||||
if mlflow.get_experiment_by_name(experiment_name) is not None:
|
||||
logger.info("The experiment has already been created before. Try to resume the experiment...")
|
||||
logger.info(
|
||||
"The experiment has already been created before. Try to resume the experiment with a new recorder..."
|
||||
)
|
||||
experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id
|
||||
else:
|
||||
experiment_id = mlflow.create_experiment(experiment_name)
|
||||
@@ -193,9 +208,11 @@ class MLflowExpManager(ExpManager):
|
||||
experiment = self.experiments[experiment_name]
|
||||
# set the active experiment
|
||||
mlflow.set_experiment(experiment_name)
|
||||
# set up experiment
|
||||
experiment.id = experiment_id
|
||||
experiment.name = experiment_name
|
||||
# init experiment
|
||||
experiment = MLflowExperiment(experiment_id, experiment_name, self.uri)
|
||||
experiment._default_name = self.default_exp_name
|
||||
# store the experiment
|
||||
self.experiments[experiment_name] = experiment
|
||||
|
||||
return experiment
|
||||
|
||||
@@ -206,19 +223,73 @@ class MLflowExpManager(ExpManager):
|
||||
order_by = kwargs.get("order_by")
|
||||
return mlflow.search_runs(experiment_ids, filter_string, run_view_type, max_results, order_by)
|
||||
|
||||
def get_exp(self, experiment_id=None, experiment_name=None):
|
||||
if experiment_name is not None:
|
||||
return self.experiments[experiment_name]
|
||||
elif experiment_id is not None:
|
||||
for name in self.experiments:
|
||||
if self.experiments[name].id == experiment_id:
|
||||
return self.experiments[name]
|
||||
elif self.active_experiment is None:
|
||||
raise Exception("No valid active experiment exists. Please make sure experiment manager is running.")
|
||||
def get_exp(self, experiment_id=None, experiment_name=None, create=True):
|
||||
if experiment_id is None and experiment_name is None:
|
||||
if self.active_experiment:
|
||||
return self.active_experiment
|
||||
else:
|
||||
if create:
|
||||
logger.warning("QlibRecorder is not running. Use the Default experiment for further process.")
|
||||
return self.start_exp()
|
||||
else:
|
||||
raise Exception(
|
||||
"Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct."
|
||||
)
|
||||
else:
|
||||
logger.info("No experiment id or name is given. Return the current active experiment.")
|
||||
return self.active_experiment
|
||||
if experiment_name is not None:
|
||||
if experiment_name in self.experiments:
|
||||
return self.experiments[experiment_name]
|
||||
else:
|
||||
if create:
|
||||
logger.warning(
|
||||
f"No valid experiment found. Create experiment with name {experiment_name} for further process."
|
||||
)
|
||||
return self.start_exp(experiment_name)
|
||||
else:
|
||||
raise Exception(
|
||||
"Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct."
|
||||
)
|
||||
else:
|
||||
for name in self.experiments:
|
||||
if self.experiments[name].id == experiment_id:
|
||||
return self.experiments[name]
|
||||
if create:
|
||||
logger.warning(f"No valid experiment found. Use the Default experiment for further process.")
|
||||
return self.start_exp()
|
||||
else:
|
||||
raise Exception(
|
||||
"Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct."
|
||||
)
|
||||
|
||||
def delete_exp(self, experiment_id):
|
||||
mlflow.delete_experiment(experiment_id)
|
||||
self.experiments = {key: val for key, val in self.experiments.items() if val.id != experiment_id}
|
||||
def delete_exp(self, experiment_id=None, experiment_name=None):
|
||||
assert (
|
||||
experiment_id is not None or experiment_name is not None
|
||||
), "Please input a valid experiment id or name before deleting."
|
||||
try:
|
||||
if experiment_id is not None:
|
||||
mlflow.delete_experiment(experiment_id)
|
||||
self.experiments = {key: val for key, val in self.experiments.items() if val.id != experiment_id}
|
||||
else:
|
||||
experiment_id = self.experiments[experiment_name].id
|
||||
mlflow.delete_experiment(experiment_id)
|
||||
except:
|
||||
raise Exception(
|
||||
"Something went wrong when deleting experiment. Please check if the name/id of the experiment is correct."
|
||||
)
|
||||
|
||||
def list_experiments(self):
|
||||
# retrieve all the existing experiments
|
||||
client = mlflow.tracking.MlflowClient(tracking_uri=self.uri)
|
||||
exps = client.list_experiments()
|
||||
experiments = dict()
|
||||
self._total_exps = len(exps)
|
||||
for i in range(len(exps)):
|
||||
eid = exps[i].experiment_id
|
||||
ename = exps[i].name
|
||||
experiment = MLflowExperiment(eid, ename, self.uri)
|
||||
experiment.id = eid
|
||||
experiment.name = ename
|
||||
experiment._uri = self.uri
|
||||
experiments[ename] = experiment
|
||||
|
||||
return experiments
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
# Licensed under the MIT License.
|
||||
|
||||
import mlflow
|
||||
import shutil, os, pickle, tempfile, codecs
|
||||
import shutil, os, pickle, tempfile, codecs, datetime
|
||||
from pathlib import Path
|
||||
from ..utils.objm import FileManager
|
||||
|
||||
@@ -19,6 +19,8 @@ class Recorder:
|
||||
self.id = None
|
||||
self.name = name
|
||||
self.experiment_id = experiment_id
|
||||
self.start_time = None
|
||||
self.end_time = None
|
||||
self.status = "SCHEDULED"
|
||||
|
||||
def __repr__(self):
|
||||
@@ -34,7 +36,10 @@ class Recorder:
|
||||
output["id"] = self.id
|
||||
output["name"] = self.name
|
||||
output["experiment_id"] = self.experiment_id
|
||||
output["start_time"] = self.start_time
|
||||
output["end_time"] = self.end_time
|
||||
output["status"] = self.status
|
||||
return output
|
||||
|
||||
def set_recorder_name(self, rname):
|
||||
self.recorder_name = rname
|
||||
@@ -78,9 +83,6 @@ class Recorder:
|
||||
Start running or resuming the Recorder. The return value can be used as a context manager within a `with` block;
|
||||
otherwise, you must call end_run() to terminate the current run. (See `ActiveRun` class in mlflow)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
|
||||
Returns
|
||||
-------
|
||||
An active running object (e.g. mlflow.ActiveRun object).
|
||||
@@ -139,7 +141,7 @@ class Recorder:
|
||||
|
||||
def list_artifacts(self, artifact_path=None):
|
||||
"""
|
||||
Delete some tags from a run.
|
||||
List all the artifacts of a recorder.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
@@ -161,10 +163,13 @@ class MLflowRecorder(Recorder):
|
||||
use file manager to help maintain the objects in the project.
|
||||
"""
|
||||
|
||||
def __init__(self, name, experiment_id):
|
||||
def __init__(self, name, experiment_id, uri):
|
||||
super(MLflowRecorder, self).__init__(name, experiment_id)
|
||||
self.fm = None
|
||||
self.temp_dir = None
|
||||
self._uri = uri
|
||||
self.artifact_uri = None
|
||||
# set up file manager for saving objects
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
self.fm = FileManager(Path(self.temp_dir).absolute())
|
||||
|
||||
def start_run(self):
|
||||
# start the run
|
||||
@@ -172,19 +177,21 @@ class MLflowRecorder(Recorder):
|
||||
# save the run id and artifact_uri
|
||||
self.id = run.info.run_id
|
||||
self.artifact_uri = run.info.artifact_uri
|
||||
self._uri = mlflow.get_tracking_uri() # Fix!!! : this is not proper to have uri in recorder
|
||||
# set up file manager for saving objects
|
||||
self.temp_dir = tempfile.mkdtemp()
|
||||
self.fm = FileManager(Path(self.temp_dir).absolute())
|
||||
self.start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
self.status = "RUNNING"
|
||||
|
||||
return run
|
||||
|
||||
def end_run(self, status):
|
||||
assert status in ["SCHEDULED", "RUNNING", "FINISHED", "FAILED"], f"The status type {status} is not supported."
|
||||
mlflow.end_run(status)
|
||||
self.status = status
|
||||
self.end_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||
if self.status is not "FINISHED":
|
||||
self.status = status
|
||||
shutil.rmtree(self.temp_dir)
|
||||
|
||||
def save_objects(self, data_name_list=None, local_path=None, artifact_path=None, **kwargs):
|
||||
assert self._uri is not None, "Please start the experiment and recorder first before using recorder directly."
|
||||
client = mlflow.tracking.MlflowClient(tracking_uri=self._uri)
|
||||
if local_path is not None:
|
||||
client.log_artifacts(self.id, local_path, artifact_path)
|
||||
@@ -200,6 +207,7 @@ class MLflowRecorder(Recorder):
|
||||
raise Exception("Please provide valid arguments in order to save object properly.")
|
||||
|
||||
def load_object(self, name):
|
||||
assert self._uri is not None, "Please start the experiment and recorder first before using recorder directly."
|
||||
client = mlflow.tracking.MlflowClient(tracking_uri=self._uri)
|
||||
path = client.download_artifacts(self.id, name)
|
||||
try:
|
||||
@@ -235,12 +243,16 @@ class MLflowRecorder(Recorder):
|
||||
for count, key in enumerate(keys):
|
||||
mlflow.delete_tag(key)
|
||||
|
||||
def get_artifact_uri(self, artifact_path=None):
|
||||
def get_artifact_uri(self):
|
||||
if self.artifact_uri is not None:
|
||||
return self.artifact_uri
|
||||
return mlflow.get_artifact_uri(artifact_path)
|
||||
else:
|
||||
raise Exception(
|
||||
"Please make sure the recorder has been created and started properly before getting artifact uri."
|
||||
)
|
||||
|
||||
def list_artifacts(self, artifact_path=None):
|
||||
assert self._uri is not None, "Please start the experiment and recorder first before using recorder directly."
|
||||
client = mlflow.tracking.MlflowClient(tracking_uri=self._uri)
|
||||
artifacts = client.list_artifacts(self.id, artifact_path)
|
||||
return artifacts
|
||||
|
||||
Reference in New Issue
Block a user