From 447fed8e54be6bbb2f993415da5a2e4d04024942 Mon Sep 17 00:00:00 2001 From: Jactus Date: Tue, 16 Mar 2021 17:16:00 +0800 Subject: [PATCH] Update structure for resuming --- qlib/workflow/__init__.py | 20 +++++-- qlib/workflow/exp.py | 118 ++++++++++++++++++++------------------ qlib/workflow/expm.py | 25 ++++---- 3 files changed, 91 insertions(+), 72 deletions(-) diff --git a/qlib/workflow/__init__.py b/qlib/workflow/__init__.py index 203f49bc0..3d787562e 100644 --- a/qlib/workflow/__init__.py +++ b/qlib/workflow/__init__.py @@ -22,21 +22,25 @@ class QlibRecorder: @contextmanager def start( - self, experiment_name: Optional[Text] = None, recorder_name: Optional[Text] = None, uri: Optional[Text] = None + self, + experiment_name: Optional[Text] = None, + recorder_name: Optional[Text] = None, + uri: Optional[Text] = None, + resume: bool = False, ): """ Method to start an experiment. This method can only be called within a Python's `with` statement. Here is the example code: .. code-block:: Python - # start new experimetn and recorder + # start new experiment and recorder with R.start('test', 'recorder_1'): model.fit(dataset) R.log... ... # further operations # resume previous experiment and recorder - with R.start('test', 'recorder_1'): # if users want to resume recorder, they have to specify the exact same name for experiment and recorder. + with R.start('test', 'recorder_1', resume=True): # if users want to resume recorder, they have to specify the exact same name for experiment and recorder. ... # further operations Parameters @@ -50,8 +54,10 @@ class QlibRecorder: The default uri is set in the qlib.config. Note that this uri argument will not change the one defined in the config file. Therefore, the next time when users call this function in the same experiment, they have to also specify this argument with the same value. Otherwise, inconsistent uri may occur. + resume : bool + whether to resume the specific recorder with given name under the given experiment. """ - run = self.start_exp(experiment_name, recorder_name, uri) + run = self.start_exp(experiment_name, recorder_name, uri, resume) try: yield run except Exception as e: @@ -59,7 +65,7 @@ class QlibRecorder: raise e self.end_exp(Recorder.STATUS_FI) - def start_exp(self, experiment_name=None, recorder_name=None, uri=None): + def start_exp(self, experiment_name=None, recorder_name=None, uri=None, resume=False): """ Lower level method for starting an experiment. When use this method, one should end the experiment manually and the status of the recorder may not be handled properly. Here is the example code: @@ -80,12 +86,14 @@ class QlibRecorder: uri : str the tracking uri of the experiment, where all the artifacts/metrics etc. will be stored. The default uri are set in the qlib.config. + resume : bool + whether to resume the specific recorder with given name under the given experiment. Returns ------- An experiment instance being started. """ - return self.exp_manager.start_exp(experiment_name, recorder_name, uri) + return self.exp_manager.start_exp(experiment_name, recorder_name, uri, resume) def end_exp(self, recorder_status=Recorder.STATUS_FI): """ diff --git a/qlib/workflow/exp.py b/qlib/workflow/exp.py index ce36cda19..12a8fc057 100644 --- a/qlib/workflow/exp.py +++ b/qlib/workflow/exp.py @@ -39,7 +39,7 @@ class Experiment: output["recorders"] = list(recorders.keys()) return output - def start(self, recorder_name=None): + def start(self, recorder_name=None, resume=False): """ Start the experiment and set it to be active. This method will also start a new recorder. @@ -47,6 +47,8 @@ class Experiment: ---------- recorder_name : str the name of the recorder to be created. + resume : bool + whether to resume the first recorder Returns ------- @@ -149,59 +151,6 @@ class Experiment: ------- A recorder object. """ - raise NotImplementedError(f"Please implement the `get_recorder` method.") - - def list_recorders(self): - """ - List all the existing recorders of this experiment. Please first get the experiment instance before calling this method. - If user want to use the method `R.list_recorders()`, please refer to the related API document in `QlibRecorder`. - - Returns - ------- - A dictionary (id -> recorder) of recorder information that being stored. - """ - raise NotImplementedError(f"Please implement the `list_recorders` method.") - - -class MLflowExperiment(Experiment): - """ - Use mlflow to implement Experiment. - """ - - def __init__(self, id, name, uri): - super(MLflowExperiment, self).__init__(id, name) - self._uri = uri - self._default_name = None - self._default_rec_name = "mlflow_recorder" - self._client = mlflow.tracking.MlflowClient(tracking_uri=self._uri) - - def __repr__(self): - return "{name}(id={id}, info={info})".format(name=self.__class__.__name__, id=self.id, info=self.info) - - def start(self, recorder_name=None): - logger.info(f"Experiment {self.id} starts running ...") - # Get or create recorder - recorder, _ = self._get_or_create_rec(recorder_name=recorder_name) - # Set up active recorder - self.active_recorder = recorder - # Start the recorder - self.active_recorder.start_run() - - return self.active_recorder - - def end(self, recorder_status): - if self.active_recorder is not None: - self.active_recorder.end_run(recorder_status) - self.active_recorder = None - - def create_recorder(self, recorder_name=None): - if recorder_name is None: - recorder_name = self._default_rec_name - recorder = MLflowRecorder(self.id, self._uri, recorder_name) - - return recorder - - def get_recorder(self, recorder_id=None, recorder_name=None, create=True): # special case of getting the recorder if recorder_id is None and recorder_name is None: if self.active_recorder is not None: @@ -232,6 +181,63 @@ class MLflowExperiment(Experiment): logger.info(f"No valid recorder found. Create a new recorder with name {recorder_name}.") return self.create_recorder(recorder_name), True + def list_recorders(self): + """ + List all the existing recorders of this experiment. Please first get the experiment instance before calling this method. + If user want to use the method `R.list_recorders()`, please refer to the related API document in `QlibRecorder`. + + Returns + ------- + A dictionary (id -> recorder) of recorder information that being stored. + """ + raise NotImplementedError(f"Please implement the `list_recorders` method.") + + +class MLflowExperiment(Experiment): + """ + Use mlflow to implement Experiment. + """ + + def __init__(self, id, name, uri): + super(MLflowExperiment, self).__init__(id, name) + self._uri = uri + self._default_name = None + self._default_rec_name = "mlflow_recorder" + self._client = mlflow.tracking.MlflowClient(tracking_uri=self._uri) + + def __repr__(self): + return "{name}(id={id}, info={info})".format(name=self.__class__.__name__, id=self.id, info=self.info) + + def start(self, recorder_name=None, resume=False): + logger.info(f"Experiment {self.id} starts running ...") + # Get or create recorder + if recorder_name is None: + recorder_name = self._default_rec_name + # resume the recorder + if resume: + recorder, _ = self._get_or_create_rec(recorder_name=recorder_name) + # create a new recorder + else: + recorder = self.create_recorder(recorder_name) + # Set up active recorder + self.active_recorder = recorder + # Start the recorder + self.active_recorder.start_run() + + return self.active_recorder + + def end(self, recorder_status): + if self.active_recorder is not None: + self.active_recorder.end_run(recorder_status) + self.active_recorder = None + + def create_recorder(self, recorder_name=None): + if recorder_name is None: + recorder_name = self._default_rec_name + recorder = MLflowRecorder(self.id, self._uri, recorder_name) + + return recorder + def _get_recorder(self, recorder_id=None, recorder_name=None): """ Method for getting or creating a recorder. It will try to first get a valid recorder, if exception occurs, it will @@ -249,7 +255,7 @@ class MLflowExperiment(Experiment): raise ValueError("No valid recorder has been found, please make sure the input recorder id is correct.") elif recorder_name is not None: logger.warning( - f"Please make sure the recorder name {recorder_name} is unique, we will only return the first recorder if there exist several matched the given name." + f"Please make sure the recorder name {recorder_name} is unique, we will only return the latest recorder if there exist several matched the given name." ) recorders = self.list_recorders() for rid in recorders: @@ -283,7 +289,7 @@ class MLflowExperiment(Experiment): UNLIMITED = 50000 # FIXME: Mlflow can only list 50000 records at most!!!!!!! def list_recorders(self, max_results=UNLIMITED): - runs = self._client.search_runs(self.id, run_view_type=ViewType.ACTIVE_ONLY, max_results=max_results)[::-1] + runs = self._client.search_runs(self.id, run_view_type=ViewType.ACTIVE_ONLY, max_results=max_results) recorders = dict() for i in range(len(runs)): recorder = MLflowRecorder(self.id, self._uri, mlflow_run=runs[i]) diff --git a/qlib/workflow/expm.py b/qlib/workflow/expm.py index 56fe810c3..7f4f8462c 100644 --- a/qlib/workflow/expm.py +++ b/qlib/workflow/expm.py @@ -25,7 +25,7 @@ class ExpManager: def __init__(self, uri: Text, default_exp_name: Optional[Text]): self._current_uri = uri - self.default_exp_name = default_exp_name + self._default_exp_name = default_exp_name self.active_experiment = None # only one experiment can active each time def __repr__(self): @@ -36,6 +36,7 @@ class ExpManager: experiment_name: Optional[Text] = None, recorder_name: Optional[Text] = None, uri: Optional[Text] = None, + resume: bool = False, **kwargs, ): """ @@ -50,6 +51,8 @@ class ExpManager: name of the recorder to be started. uri : str the current tracking URI. + resume : boolean + whether to resume the experiment and recorder. Returns ------- @@ -151,9 +154,7 @@ class ExpManager: if self.active_experiment is not None: return self.active_experiment # User don't want get active code now. - # Don't assume underlying code could handle the case of two None - if experiment_id is None and experiment_name is None: - experiment_name = self.default_exp_name + experiment_name = self._default_exp_name if create: exp, is_new = self._get_or_create_exp(experiment_id=experiment_id, experiment_name=experiment_name) @@ -171,12 +172,10 @@ class ExpManager: automatically create a new experiment based on the given id and name. """ try: - if experiment_id is None and experiment_name is None: - experiment_name = self.default_exp_name return self._get_exp(experiment_id=experiment_id, experiment_name=experiment_name), False except ValueError: if experiment_name is None: - experiment_name = self.default_exp_name + experiment_name = self._default_exp_name logger.info(f"No valid experiment found. Create a new experiment with name {experiment_name}.") return self.create_exp(experiment_name), True @@ -291,16 +290,22 @@ class MLflowExpManager(ExpManager): return self._client def start_exp( - self, experiment_name: Optional[Text] = None, recorder_name: Optional[Text] = None, uri: Optional[Text] = None + self, + experiment_name: Optional[Text] = None, + recorder_name: Optional[Text] = None, + uri: Optional[Text] = None, + resume: bool, ): # Set the tracking uri self.set_uri(uri) # Create experiment + if experiment_name is None: + experiment_name = self._default_exp_name experiment, _ = self._get_or_create_exp(experiment_name=experiment_name) # Set up active experiment self.active_experiment = experiment # Start the experiment - self.active_experiment.start(recorder_name) + self.active_experiment.start(recorder_name, resume) return self.active_experiment @@ -316,7 +321,7 @@ class MLflowExpManager(ExpManager): # init experiment experiment_id = self.client.create_experiment(experiment_name) experiment = MLflowExperiment(experiment_id, experiment_name, self.uri) - experiment._default_name = self.default_exp_name + experiment._default_name = self._default_exp_name return experiment