1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-06-30 17:41:18 +08:00

Update structure for resuming

This commit is contained in:
Jactus
2021-03-16 17:16:00 +08:00
parent 08b44ed727
commit 447fed8e54
3 changed files with 91 additions and 72 deletions

View File

@@ -22,21 +22,25 @@ class QlibRecorder:
@contextmanager
def start(
self, experiment_name: Optional[Text] = None, recorder_name: Optional[Text] = None, uri: Optional[Text] = None
self,
experiment_name: Optional[Text] = None,
recorder_name: Optional[Text] = None,
uri: Optional[Text] = None,
resume: bool = False,
):
"""
Method to start an experiment. This method can only be called within a Python's `with` statement. Here is the example code:
.. code-block:: Python
# start new experimetn and recorder
# start new experiment and recorder
with R.start('test', 'recorder_1'):
model.fit(dataset)
R.log...
... # further operations
# resume previous experiment and recorder
with R.start('test', 'recorder_1'): # if users want to resume recorder, they have to specify the exact same name for experiment and recorder.
with R.start('test', 'recorder_1', resume=True): # if users want to resume recorder, they have to specify the exact same name for experiment and recorder.
... # further operations
Parameters
@@ -50,8 +54,10 @@ class QlibRecorder:
The default uri is set in the qlib.config. Note that this uri argument will not change the one defined in the config file.
Therefore, the next time when users call this function in the same experiment,
they have to also specify this argument with the same value. Otherwise, inconsistent uri may occur.
resume : bool
whether to resume the specific recorder with given name under the given experiment.
"""
run = self.start_exp(experiment_name, recorder_name, uri)
run = self.start_exp(experiment_name, recorder_name, uri, resume)
try:
yield run
except Exception as e:
@@ -59,7 +65,7 @@ class QlibRecorder:
raise e
self.end_exp(Recorder.STATUS_FI)
def start_exp(self, experiment_name=None, recorder_name=None, uri=None):
def start_exp(self, experiment_name=None, recorder_name=None, uri=None, resume=False):
"""
Lower level method for starting an experiment. When use this method, one should end the experiment manually
and the status of the recorder may not be handled properly. Here is the example code:
@@ -80,12 +86,14 @@ class QlibRecorder:
uri : str
the tracking uri of the experiment, where all the artifacts/metrics etc. will be stored.
The default uri are set in the qlib.config.
resume : bool
whether to resume the specific recorder with given name under the given experiment.
Returns
-------
An experiment instance being started.
"""
return self.exp_manager.start_exp(experiment_name, recorder_name, uri)
return self.exp_manager.start_exp(experiment_name, recorder_name, uri, resume)
def end_exp(self, recorder_status=Recorder.STATUS_FI):
"""

View File

@@ -39,7 +39,7 @@ class Experiment:
output["recorders"] = list(recorders.keys())
return output
def start(self, recorder_name=None):
def start(self, recorder_name=None, resume=False):
"""
Start the experiment and set it to be active. This method will also start a new recorder.
@@ -47,6 +47,8 @@ class Experiment:
----------
recorder_name : str
the name of the recorder to be created.
resume : bool
whether to resume the first recorder
Returns
-------
@@ -149,59 +151,6 @@ class Experiment:
-------
A recorder object.
"""
raise NotImplementedError(f"Please implement the `get_recorder` method.")
def list_recorders(self):
"""
List all the existing recorders of this experiment. Please first get the experiment instance before calling this method.
If user want to use the method `R.list_recorders()`, please refer to the related API document in `QlibRecorder`.
Returns
-------
A dictionary (id -> recorder) of recorder information that being stored.
"""
raise NotImplementedError(f"Please implement the `list_recorders` method.")
class MLflowExperiment(Experiment):
"""
Use mlflow to implement Experiment.
"""
def __init__(self, id, name, uri):
super(MLflowExperiment, self).__init__(id, name)
self._uri = uri
self._default_name = None
self._default_rec_name = "mlflow_recorder"
self._client = mlflow.tracking.MlflowClient(tracking_uri=self._uri)
def __repr__(self):
return "{name}(id={id}, info={info})".format(name=self.__class__.__name__, id=self.id, info=self.info)
def start(self, recorder_name=None):
logger.info(f"Experiment {self.id} starts running ...")
# Get or create recorder
recorder, _ = self._get_or_create_rec(recorder_name=recorder_name)
# Set up active recorder
self.active_recorder = recorder
# Start the recorder
self.active_recorder.start_run()
return self.active_recorder
def end(self, recorder_status):
if self.active_recorder is not None:
self.active_recorder.end_run(recorder_status)
self.active_recorder = None
def create_recorder(self, recorder_name=None):
if recorder_name is None:
recorder_name = self._default_rec_name
recorder = MLflowRecorder(self.id, self._uri, recorder_name)
return recorder
def get_recorder(self, recorder_id=None, recorder_name=None, create=True):
# special case of getting the recorder
if recorder_id is None and recorder_name is None:
if self.active_recorder is not None:
@@ -232,6 +181,63 @@ class MLflowExperiment(Experiment):
logger.info(f"No valid recorder found. Create a new recorder with name {recorder_name}.")
return self.create_recorder(recorder_name), True
def list_recorders(self):
"""
List all the existing recorders of this experiment. Please first get the experiment instance before calling this method.
If user want to use the method `R.list_recorders()`, please refer to the related API document in `QlibRecorder`.
Returns
-------
A dictionary (id -> recorder) of recorder information that being stored.
"""
raise NotImplementedError(f"Please implement the `list_recorders` method.")
class MLflowExperiment(Experiment):
"""
Use mlflow to implement Experiment.
"""
def __init__(self, id, name, uri):
super(MLflowExperiment, self).__init__(id, name)
self._uri = uri
self._default_name = None
self._default_rec_name = "mlflow_recorder"
self._client = mlflow.tracking.MlflowClient(tracking_uri=self._uri)
def __repr__(self):
return "{name}(id={id}, info={info})".format(name=self.__class__.__name__, id=self.id, info=self.info)
def start(self, recorder_name=None, resume=False):
logger.info(f"Experiment {self.id} starts running ...")
# Get or create recorder
if recorder_name is None:
recorder_name = self._default_rec_name
# resume the recorder
if resume:
recorder, _ = self._get_or_create_rec(recorder_name=recorder_name)
# create a new recorder
else:
recorder = self.create_recorder(recorder_name)
# Set up active recorder
self.active_recorder = recorder
# Start the recorder
self.active_recorder.start_run()
return self.active_recorder
def end(self, recorder_status):
if self.active_recorder is not None:
self.active_recorder.end_run(recorder_status)
self.active_recorder = None
def create_recorder(self, recorder_name=None):
if recorder_name is None:
recorder_name = self._default_rec_name
recorder = MLflowRecorder(self.id, self._uri, recorder_name)
return recorder
def _get_recorder(self, recorder_id=None, recorder_name=None):
"""
Method for getting or creating a recorder. It will try to first get a valid recorder, if exception occurs, it will
@@ -249,7 +255,7 @@ class MLflowExperiment(Experiment):
raise ValueError("No valid recorder has been found, please make sure the input recorder id is correct.")
elif recorder_name is not None:
logger.warning(
f"Please make sure the recorder name {recorder_name} is unique, we will only return the first recorder if there exist several matched the given name."
f"Please make sure the recorder name {recorder_name} is unique, we will only return the latest recorder if there exist several matched the given name."
)
recorders = self.list_recorders()
for rid in recorders:
@@ -283,7 +289,7 @@ class MLflowExperiment(Experiment):
UNLIMITED = 50000 # FIXME: Mlflow can only list 50000 records at most!!!!!!!
def list_recorders(self, max_results=UNLIMITED):
runs = self._client.search_runs(self.id, run_view_type=ViewType.ACTIVE_ONLY, max_results=max_results)[::-1]
runs = self._client.search_runs(self.id, run_view_type=ViewType.ACTIVE_ONLY, max_results=max_results)
recorders = dict()
for i in range(len(runs)):
recorder = MLflowRecorder(self.id, self._uri, mlflow_run=runs[i])

View File

@@ -25,7 +25,7 @@ class ExpManager:
def __init__(self, uri: Text, default_exp_name: Optional[Text]):
self._current_uri = uri
self.default_exp_name = default_exp_name
self._default_exp_name = default_exp_name
self.active_experiment = None # only one experiment can active each time
def __repr__(self):
@@ -36,6 +36,7 @@ class ExpManager:
experiment_name: Optional[Text] = None,
recorder_name: Optional[Text] = None,
uri: Optional[Text] = None,
resume: bool = False,
**kwargs,
):
"""
@@ -50,6 +51,8 @@ class ExpManager:
name of the recorder to be started.
uri : str
the current tracking URI.
resume : boolean
whether to resume the experiment and recorder.
Returns
-------
@@ -151,9 +154,7 @@ class ExpManager:
if self.active_experiment is not None:
return self.active_experiment
# User don't want get active code now.
# Don't assume underlying code could handle the case of two None
if experiment_id is None and experiment_name is None:
experiment_name = self.default_exp_name
experiment_name = self._default_exp_name
if create:
exp, is_new = self._get_or_create_exp(experiment_id=experiment_id, experiment_name=experiment_name)
@@ -171,12 +172,10 @@ class ExpManager:
automatically create a new experiment based on the given id and name.
"""
try:
if experiment_id is None and experiment_name is None:
experiment_name = self.default_exp_name
return self._get_exp(experiment_id=experiment_id, experiment_name=experiment_name), False
except ValueError:
if experiment_name is None:
experiment_name = self.default_exp_name
experiment_name = self._default_exp_name
logger.info(f"No valid experiment found. Create a new experiment with name {experiment_name}.")
return self.create_exp(experiment_name), True
@@ -291,16 +290,22 @@ class MLflowExpManager(ExpManager):
return self._client
def start_exp(
self, experiment_name: Optional[Text] = None, recorder_name: Optional[Text] = None, uri: Optional[Text] = None
self,
experiment_name: Optional[Text] = None,
recorder_name: Optional[Text] = None,
uri: Optional[Text] = None,
resume: bool,
):
# Set the tracking uri
self.set_uri(uri)
# Create experiment
if experiment_name is None:
experiment_name = self._default_exp_name
experiment, _ = self._get_or_create_exp(experiment_name=experiment_name)
# Set up active experiment
self.active_experiment = experiment
# Start the experiment
self.active_experiment.start(recorder_name)
self.active_experiment.start(recorder_name, resume)
return self.active_experiment
@@ -316,7 +321,7 @@ class MLflowExpManager(ExpManager):
# init experiment
experiment_id = self.client.create_experiment(experiment_name)
experiment = MLflowExperiment(experiment_id, experiment_name, self.uri)
experiment._default_name = self.default_exp_name
experiment._default_name = self._default_exp_name
return experiment