diff --git a/docs/start/initialization.rst b/docs/start/initialization.rst index cffe11f52..bcb09925e 100644 --- a/docs/start/initialization.rst +++ b/docs/start/initialization.rst @@ -59,7 +59,14 @@ Besides `provider_uri` and `region`, `qlib.init` has other parameters. The follo If Qlib fails to connect redis via `redis_host` and `redis_port`, cache mechanism will not be used! Please refer to `Cache <../component/data.html#cache>`_ for details. - `exp_manager` - Type: str, optional parameter(default: "MLflowExpManager"), the experiment manager to be used in qlib. -- `exp_uri` - Type: str, optional parameter(default: "mlruns" in local execution path), the tracking uri of the experiment manager. - It can either be a local path or a remote uri. \ No newline at end of file + Type: dict, optional parameter, the setting of experiment manager to be used in qlib. Users can specify an experiment manager class, as well as the tracking URI for all the experiments. However, please be aware that we only support input of a dictionary in the following style for `exp_manager`. + :: + + { + "class": "MLflowExpManager", + "module_path": "qlib.workflow.expm", + "kwargs": { + "uri": "python_execution_path/mlruns"), + "default_exp_name": "Experiment", + } + } \ No newline at end of file diff --git a/examples/workflow_by_code.py b/examples/workflow_by_code.py index cae890672..b70a9e963 100644 --- a/examples/workflow_by_code.py +++ b/examples/workflow_by_code.py @@ -89,7 +89,7 @@ if __name__ == "__main__": "kwargs": { "topk": 50, "n_drop": 5, - } + }, }, "backtest": { "verbose": False, diff --git a/examples/workflow_by_code_finetune.py b/examples/workflow_by_code_finetune.py index 041e23b83..6df8c9821 100644 --- a/examples/workflow_by_code_finetune.py +++ b/examples/workflow_by_code_finetune.py @@ -89,7 +89,7 @@ if __name__ == "__main__": "kwargs": { "topk": 50, "n_drop": 5, - } + }, }, "backtest": { "verbose": False, @@ -113,7 +113,6 @@ if __name__ == "__main__": R.save_objects(init_model=model) rid = R.get_recorder().id - # Finetune model based on previous trained model with R.start(experiment_name="finetune model"): recorder = R.get_recorder(rid, experiment_name="init models") diff --git a/examples/workflow_by_config.py b/examples/workflow_by_config.py deleted file mode 100644 index 7955d29d0..000000000 --- a/examples/workflow_by_config.py +++ /dev/null @@ -1,49 +0,0 @@ -# Copyright (c) Microsoft Corporation. -# Licensed under the MIT License. - -import sys -from pathlib import Path - -import qlib -import fire -import yaml -import pandas as pd -from qlib.config import REG_CN -from qlib.utils import exists_qlib_data, init_instance_by_config -from qlib.workflow import R -from qlib.workflow.record_temp import SignalRecord, PortAnaRecord - -# worflow handler function -def workflow(config_path): - with open(config_path) as fp: - config = yaml.load(fp, Loader=yaml.FullLoader) - - provider_uri = config.get("PROVIDER_URI") - if not exists_qlib_data(provider_uri): - print(f"Qlib data is not found in {provider_uri}") - sys.path.append(str(Path(__file__).resolve().parent.parent.joinpath("scripts"))) - from get_data import GetData - - GetData().qlib_data_cn(target_dir=provider_uri) - - qlib.init(provider_uri=provider_uri, region=REG_CN) - - # model initiaiton - model = init_instance_by_config(config.get("TASK")["model"]) - dataset = init_instance_by_config(config.get("TASK")["dataset"]) - - # start exp - with R.start("workflow"): - model.fit(dataset) - - # prediction - recorder = R.get_recorder() - sr = SignalRecord(model, dataset, recorder) - sr.generate() - - # backtest - par = PortAnaRecord(recorder, config.get("PORT_ANALYSIS_CONFIG")) - par.generate() - -if __name__ == "__main__": - fire.Fire(workflow) \ No newline at end of file diff --git a/examples/workflow_config.yaml b/examples/workflow_config.yaml index 2698423df..212558044 100644 --- a/examples/workflow_config.yaml +++ b/examples/workflow_config.yaml @@ -1,13 +1,29 @@ -PROVIDER_URI: "~/.qlib/qlib_data/cn_data" -MARKET: &market csi300 -BENCHMARK: &benchmark SH000300 -DATA_HANDLER_CONFIG: &data_handerler_config +provider_uri: "~/.qlib/qlib_data/cn_data" +market: &market csi300 +benchmark: &benchmark SH000300 +data_handler_config: &data_handler_config start_time: 2008-01-01 end_time: 2020-08-01 fit_start_time: 2008-01-01 fit_end_time: 2014-12-31 instruments: *market -TASK: +port_analysis_config: &port_analysis_config + strategy: + class: TopkDropoutStrategy + module_path: qlib.contrib.strategy.strategy + kwargs: + topk: 50 + n_drop: 5 + backtest: + verbose: False + limit_threshold: 0.095 + account: 100000000 + benchmark: *benchmark + deal_price: close + open_cost: 0.0005 + close_cost: 0.0015 + min_cost: 5 +task: model: class: LGBModel module_path: qlib.contrib.model.gbdt @@ -28,25 +44,16 @@ TASK: handler: class: Alpha158 module_path: qlib.contrib.data.handler - kwargs: *data_handerler_config + kwargs: *data_handler_config segments: train: [2008-01-01, 2014-12-31] valid: [2015-01-01, 2016-12-31] test: [2017-01-01, 2020-08-01] - record: [SignalRecord, PortAnaRecord] -PORT_ANALYSIS_CONFIG: - strategy: - class: TopkDropoutStrategy - module_path: qlib.contrib.strategy.strategy - kwargs: - topk: 50 - n_drop: 5 - backtest: - verbose: False - limit_threshold: 0.095 - account: 100000000 - benchmark: *benchmark - deal_price: close - open_cost: 0.0005 - close_cost: 0.0015 - min_cost: 5 \ No newline at end of file + record: + - class: SignalRecord + module_path: qlib.workflow.record_temp + kwargs: {} + - class: PortAnaRecord + module_path: qlib.workflow.record_temp + kwargs: + config: *port_analysis_config \ No newline at end of file diff --git a/qlib/__init__.py b/qlib/__init__.py index c9159dadf..ed6ae1543 100644 --- a/qlib/__init__.py +++ b/qlib/__init__.py @@ -9,15 +9,13 @@ import re import sys import copy import yaml -import atexit -import signal import logging import platform import subprocess from pathlib import Path from .utils import can_use_cache, init_instance_by_config, get_module_by_module_path -from .workflow.utils import experiment_exception_hook, experiment_kill_signal_handler +from .workflow.utils import experiment_exit_handler # init qlib def init(default_conf="client", **kwargs): @@ -46,14 +44,9 @@ def init(default_conf="client", **kwargs): C.set_region(kwargs.get("region", C["region"] if "region" in C else REG_CN)) for k, v in kwargs.items(): - if k == "exp_manager": - C["exp_manager"].update({"class": v}) - elif k == "exp_uri": - C["exp_manager"]["kwargs"].update({"uri": v}) - else: - C[k] = v - if k not in C: - LOG.warning("Unrecognized config %s" % k) + C[k] = v + if k not in C: + LOG.warning("Unrecognized config %s" % k) C.resolve_path() @@ -93,9 +86,7 @@ def init(default_conf="client", **kwargs): qr = QlibRecorder(exp_manager) R.register(qr) # clean up experiment when python program ends - atexit.register(R.end_exp, recorder_status="FINISHED") # will not take effect if experiment ends - signal.signal(signal.SIGINT, experiment_kill_signal_handler) - sys.excepthook = experiment_exception_hook + experiment_exit_handler() def _mount_nfs_uri(C): diff --git a/qlib/contrib/data/handler.py b/qlib/contrib/data/handler.py index c69345173..99a601b9e 100644 --- a/qlib/contrib/data/handler.py +++ b/qlib/contrib/data/handler.py @@ -9,6 +9,7 @@ from ...log import TimeInspector from inspect import getfullargspec import copy + class ALPHA360_Denoise(DataHandlerLP): def __init__(self, instruments="csi500", start_time=None, end_time=None, fit_start_time=None, fit_end_time=None): data_loader = { diff --git a/qlib/contrib/model/gbdt.py b/qlib/contrib/model/gbdt.py index 535e9b453..41b773756 100644 --- a/qlib/contrib/model/gbdt.py +++ b/qlib/contrib/model/gbdt.py @@ -12,6 +12,7 @@ from ...data.dataset.handler import DataHandlerLP class LGBModel(ModelFT): """LightGBM Model""" + def __init__(self, loss="mse", **kwargs): if loss not in {"mse", "binary"}: raise NotImplementedError @@ -20,9 +21,9 @@ class LGBModel(ModelFT): self.model = None def _prepare_data(self, dataset: DatasetH): - df_train, df_valid = dataset.prepare(["train", "valid"], - col_set=["feature", "label"], - data_key=DataHandlerLP.DK_L) + df_train, df_valid = dataset.prepare( + ["train", "valid"], col_set=["feature", "label"], data_key=DataHandlerLP.DK_L + ) x_train, y_train = df_train["feature"], df_train["label"] x_valid, y_valid = df_valid["feature"], df_valid["label"] @@ -36,23 +37,27 @@ class LGBModel(ModelFT): dvalid = lgb.Dataset(x_valid.values, label=y_valid) return dtrain, dvalid - def fit(self, - dataset: DatasetH, - num_boost_round=1000, - early_stopping_rounds=50, - verbose_eval=20, - evals_result=dict(), - **kwargs): + def fit( + self, + dataset: DatasetH, + num_boost_round=1000, + early_stopping_rounds=50, + verbose_eval=20, + evals_result=dict(), + **kwargs + ): dtrain, dvalid = self._prepare_data(dataset) - self.model = lgb.train(self.params, - dtrain, - num_boost_round=num_boost_round, - valid_sets=[dtrain, dvalid], - valid_names=["train", "valid"], - early_stopping_rounds=early_stopping_rounds, - verbose_eval=verbose_eval, - evals_result=evals_result, - **kwargs) + self.model = lgb.train( + self.params, + dtrain, + num_boost_round=num_boost_round, + valid_sets=[dtrain, dvalid], + valid_names=["train", "valid"], + early_stopping_rounds=early_stopping_rounds, + verbose_eval=verbose_eval, + evals_result=evals_result, + **kwargs + ) evals_result["train"] = list(evals_result["train"].values())[0] evals_result["valid"] = list(evals_result["valid"].values())[0] @@ -76,10 +81,12 @@ class LGBModel(ModelFT): verbose level """ dtrain, _ = self._prepare_data(dataset) - self.model = lgb.train(self.params, - dtrain, - num_boost_round=num_boost_round, - init_model=self.model, - valid_sets=[dtrain], - valid_names=["train"], - verbose_eval=verbose_eval) + self.model = lgb.train( + self.params, + dtrain, + num_boost_round=num_boost_round, + init_model=self.model, + valid_sets=[dtrain], + valid_names=["train"], + verbose_eval=verbose_eval, + ) diff --git a/qlib/contrib/model/pytorch_gru.py b/qlib/contrib/model/pytorch_gru.py index a78ee27af..4cc7f9852 100755 --- a/qlib/contrib/model/pytorch_gru.py +++ b/qlib/contrib/model/pytorch_gru.py @@ -50,7 +50,7 @@ class GRU(Model): dropout=0.0, n_epochs=200, lr=0.001, - metric='IC', + metric="IC", batch_size=2000, early_stop=20, loss="mse", @@ -134,48 +134,48 @@ class GRU(Model): os.environ["CUDA_VISIBLE_DEVICES"] = self.visible_GPU def mse(self, pred, label): - loss = (pred - label)**2 + loss = (pred - label) ** 2 return torch.mean(loss) def loss_fn(self, pred, label): mask = ~torch.isnan(label) - if self.loss == 'mse': + if self.loss == "mse": return self.mse(pred[mask], label[mask]) - raise ValueError('unknown loss `%s`'%self.loss) + raise ValueError("unknown loss `%s`" % self.loss) def metric_fn(self, pred, label): mask = torch.isfinite(label) - if self.metric == 'IC': + if self.metric == "IC": return self.cal_ic(pred[mask], label[mask]) - if self.metric == '' or self.metric == 'loss': # use loss + if self.metric == "" or self.metric == "loss": # use loss return -self.loss_fn(pred[mask], label[mask]) - raise ValueError('unknown metric `%s`'%self.metric) + raise ValueError("unknown metric `%s`" % self.metric) def cal_ic(self, pred, label): - return torch.mean(pred * label) + return torch.mean(pred * label) def train_epoch(self, x_train, y_train): x_train_values = x_train.values - y_train_values = np.squeeze(y_train.values)*100 + y_train_values = np.squeeze(y_train.values) * 100 self.gru_model.train() indices = np.arange(len(x_train_values)) np.random.shuffle(indices) - for i in range(len(indices))[::self.batch_size]: + for i in range(len(indices))[:: self.batch_size]: if len(indices) - i < self.batch_size: break - feature = torch.from_numpy(x_train_values[indices[i:i+self.batch_size]]).float() - label = torch.from_numpy(y_train_values[indices[i:i+self.batch_size]]).float() + feature = torch.from_numpy(x_train_values[indices[i : i + self.batch_size]]).float() + label = torch.from_numpy(y_train_values[indices[i : i + self.batch_size]]).float() if self.use_gpu: feature = feature.cuda() @@ -186,10 +186,9 @@ class GRU(Model): self.train_optimizer.zero_grad() loss.backward() - torch.nn.utils.clip_grad_value_(self.gru_model.parameters(), 3.) + torch.nn.utils.clip_grad_value_(self.gru_model.parameters(), 3.0) self.train_optimizer.step() - def test_epoch(self, data_x, data_y): # prepare training data @@ -204,13 +203,13 @@ class GRU(Model): indices = np.arange(len(x_values)) np.random.shuffle(indices) - for i in range(len(indices))[::self.batch_size]: + for i in range(len(indices))[:: self.batch_size]: if len(indices) - i < self.batch_size: break - feature = torch.from_numpy(x_values[indices[i:i+self.batch_size]]).float() - label = torch.from_numpy(y_values[indices[i:i+self.batch_size]]).float() + feature = torch.from_numpy(x_values[indices[i : i + self.batch_size]]).float() + label = torch.from_numpy(y_values[indices[i : i + self.batch_size]]).float() if self.use_gpu: feature = feature.cuda() @@ -255,13 +254,13 @@ class GRU(Model): # return for step in range(self.n_epochs): - self.logger.info('Epoch%d:', step) - self.logger.info('training...') + self.logger.info("Epoch%d:", step) + self.logger.info("training...") self.train_epoch(x_train, y_train) - self.logger.info('evaluating...') + self.logger.info("evaluating...") train_loss, train_score = self.test_epoch(x_train, y_train) val_loss, val_score = self.test_epoch(x_valid, y_valid) - self.logger.info('train %.6f, valid %.6f'%(train_score, val_score)) + self.logger.info("train %.6f, valid %.6f" % (train_score, val_score)) evals_result["train"].append(train_score) evals_result["valid"].append(val_score) @@ -273,17 +272,16 @@ class GRU(Model): else: stop_steps += 1 if stop_steps >= self.early_stop: - self.logger.info('early stop') + self.logger.info("early stop") break - self.logger.info('best score: %.6lf @ %d'%(best_score, best_epoch)) + self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch)) self.gru_model.load_state_dict(best_param) torch.save(best_param, save_path) - + if self.use_gpu: torch.cuda.empty_cache() - def predict(self, dataset): if not self._fitted: raise ValueError("model is not fitted yet!") @@ -295,16 +293,15 @@ class GRU(Model): sample_num = x_values.shape[0] preds = [] - for begin in range(sample_num)[::self.batch_size]: + for begin in range(sample_num)[:: self.batch_size]: - if sample_num-begin < self.batch_size: + if sample_num - begin < self.batch_size: end = sample_num else: - end = begin+self.batch_size + end = begin + self.batch_size x_batch = torch.from_numpy(x_values[begin:end]).float() - if self.use_gpu: x_batch = x_batch.cuda() diff --git a/qlib/data/dataset/processor.py b/qlib/data/dataset/processor.py index 400574320..3970c8a0a 100755 --- a/qlib/data/dataset/processor.py +++ b/qlib/data/dataset/processor.py @@ -89,11 +89,13 @@ class DropnaLabel(DropnaProcessor): """The samples are dropped according to label. So it is not usable for inference""" return False + class TanhProcess(Processor): """ Use tanh to process noise data""" + def __call__(self, df): def tanh_denoise(data): - mask = data.columns.get_level_values(1).str.contains('LABEL') + mask = data.columns.get_level_values(1).str.contains("LABEL") col = df.columns[~mask] data[col] = data[col] - 1 data[col] = np.tanh(data[col]) diff --git a/qlib/model/base.py b/qlib/model/base.py index 719b69581..3fe83445c 100644 --- a/qlib/model/base.py +++ b/qlib/model/base.py @@ -48,7 +48,7 @@ class Model(BaseModel): class ModelFT(Model): - '''Model (F)ine(t)unable''' + """Model (F)ine(t)unable""" @abc.abstractmethod def finetune(self, dataset: Dataset): diff --git a/qlib/utils/__init__.py b/qlib/utils/__init__.py index c469829d2..30483af2e 100644 --- a/qlib/utils/__init__.py +++ b/qlib/utils/__init__.py @@ -643,7 +643,7 @@ def lexsort_index(df: pd.DataFrame) -> pd.DataFrame: #################### Wrapper ##################### class Wrapper(object): - """Data Provider Wrapper""" + """Wrapper class for anything that needs to set up during qlib.init""" def __init__(self): self._provider = None diff --git a/qlib/workflow/__init__.py b/qlib/workflow/__init__.py index 457dc4acd..6a8b857fc 100644 --- a/qlib/workflow/__init__.py +++ b/qlib/workflow/__init__.py @@ -32,14 +32,14 @@ class QlibRecorder: self.exp_manager = exp_manager @contextmanager - def start(self, experiment_name=None): + def start(self, experiment_name=None, recorder_name=None): """ Method to start an experiment. This method can only be called within a Python's `with` statement. Use case: --------- ``` - with R.start('test'): + with R.start('test', 'recorder_1'): model.fit(dataset) R.log... ... # further operations @@ -49,8 +49,10 @@ class QlibRecorder: ---------- experiment_name : str name of the experiment one wants to start. + recorder_name : str + name of the recorder under the experiment one wants to start. """ - run = self.start_exp(experiment_name) + run = self.start_exp(experiment_name, recorder_name) try: yield run except Exception as e: @@ -58,7 +60,7 @@ class QlibRecorder: raise e self.end_exp(Recorder.STATUS_FI) - def start_exp(self, experiment_name=None, uri=None): + def start_exp(self, experiment_name=None, recorder_name=None, uri=None): """ Lower level method for starting an experiment. When use this method, one should end the experiment manually and the status of the recorder may not be handled properly. @@ -66,7 +68,7 @@ class QlibRecorder: Use case: --------- ``` - R.start_exp(experiment_name='test') + R.start_exp(experiment_name='test', recorder_name='recorder_1') ... # further operations R.end_exp('FINISHED') or R.end_exp(Recorder.STATUS_S) ``` @@ -75,14 +77,17 @@ class QlibRecorder: ---------- experiment_name : str the name of the experiment to be started + recorder_name : str + name of the recorder under the experiment one wants to start. uri : str the tracking uri of the experiment, where all the artifacts/metrics etc. will be stored. + The default uri are set in the qlib.config. Returns ------- An experiment instance being started. """ - return self.exp_manager.start_exp(experiment_name, uri) + return self.exp_manager.start_exp(experiment_name, recorder_name, uri) def end_exp(self, recorder_status=Recorder.STATUS_FI): """ @@ -188,9 +193,9 @@ class QlibRecorder: 2) if id or name is specified, return the specified experiment. If no such exp found, create a new experiment with given id or name, and the experiment is set to be running. If R's not running: - 1) no id or name specified, create a default experiment. + 1) no id or name specified, create a default experiment, and the experiment is set to be running. 2) if id or name is specified, return the specified experiment. If no such exp found, - create a new experiment with given id or name, and the experiment is set to be running. + create a new experiment with given name or the default experiment, and the experiment is set to be running. Else If `create` is False: If R's running: 1) no id or name specified, return the active experiment. @@ -367,8 +372,7 @@ class QlibRecorder: # Case 1 with R.start('test'): pred = model.predict(dataset) - kwargs = {"pred.pkl": pred} - R.save_objects(**kwargs, artifact_path='prediction') + R.save_objects(**{"pred.pkl": pred}, artifact_path='prediction') # Case 2 with R.start('test'): diff --git a/qlib/workflow/cli.py b/qlib/workflow/cli.py new file mode 100644 index 000000000..6acbee66e --- /dev/null +++ b/qlib/workflow/cli.py @@ -0,0 +1,53 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import sys +from pathlib import Path + +import qlib +import fire +import yaml +import pandas as pd +from qlib.config import REG_CN +from qlib.utils import init_instance_by_config +from qlib.workflow import R +from qlib.workflow.record_temp import SignalRecord + +# worflow handler function +def workflow(config_path): + with open(config_path) as fp: + config = yaml.load(fp, Loader=yaml.FullLoader) + + provider_uri = config.get("provider_uri") + qlib.init(provider_uri=provider_uri, region=REG_CN) + + # model initiaiton + model = init_instance_by_config(config.get("task")["model"]) + dataset = init_instance_by_config(config.get("task")["dataset"]) + + # start exp + with R.start("workflow"): + model.fit(dataset) + recorder = R.get_recorder() + + # generate records + for record in config.get("task")["record"]: + if record["class"] == SignalRecord.__name__: + srconf = {"model": model, "dataset": dataset, "recorder": recorder} + record["kwargs"].update(srconf) + sr = init_instance_by_config(record) + sr.generate() + else: + rconf = {"recorder": recorder} + record["kwargs"].update(rconf) + ar = init_instance_by_config(record) + ar.generate() + + +# function to run worklflow by config +def run(): + fire.Fire(workflow) + + +if __name__ == "__main__": + run() diff --git a/qlib/workflow/exp.py b/qlib/workflow/exp.py index b64b1544c..b7ef160df 100644 --- a/qlib/workflow/exp.py +++ b/qlib/workflow/exp.py @@ -2,7 +2,7 @@ # Licensed under the MIT License. import mlflow -from datetime import datetime +from mlflow.exceptions import MlflowException from pathlib import Path from .recorder import Recorder, MLflowRecorder from ..log import get_module_logger @@ -38,10 +38,15 @@ class Experiment: output["recorders"] = list(recorders.keys()) return output - def start(self): + def start(self, recorder_name=None): """ Start the experiment and set it to be active. This method will also start a new recorder. + Parameters + ---------- + recorder_name : str + the name of the recorder to be created. + Returns ------- An active recorder. @@ -59,10 +64,15 @@ class Experiment: """ raise NotImplementedError(f"Please implement the `end` method.") - def create_recorder(self): + def create_recorder(self, name=None): """ Create a recorder for each experiment. + Parameters + ---------- + name : str + the name of the recorder to be created. + Returns ------- A recorder object. @@ -126,6 +136,8 @@ class Experiment: the id of the recorder to be deleted. recorder_name : str the name of the recorder to be deleted. + create : boolean + create the recorder if it hasn't been created before. Returns ------- @@ -155,12 +167,12 @@ class MLflowExperiment(Experiment): self._default_name = None self.client = mlflow.tracking.MlflowClient(tracking_uri=self._uri) - def start(self): + def start(self, recorder_name=None): # set the active experiment mlflow.set_experiment(self.name) logger.info(f"Experiment {self.id} starts running ...") # set up recorder - recorder = self.create_recorder() + recorder = self.create_recorder(recorder_name) self.active_recorder = recorder # start the recorder run = self.active_recorder.start_run() @@ -172,11 +184,12 @@ class MLflowExperiment(Experiment): self.active_recorder.end_run(recorder_status) self.active_recorder = None - def create_recorder(self): - recorders = self.list_recorders() - num = len(recorders) - name = "Recorder_{}".format(num + 1) - recorder = MLflowRecorder(name, self.id, self._uri) + def create_recorder(self, recorder_name=None): + if recorder_name is None: + recorders = self.list_recorders() + num = len(recorders) + recorder_name = "Recorder_{}".format(num + 1) + recorder = MLflowRecorder(recorder_name, self.id, self._uri) return recorder @@ -197,14 +210,67 @@ class MLflowExperiment(Experiment): self.client.delete_run(recorder_id) else: recorders = self.list_recorders() - for r in recorders: - if recorders[r].name == recorder_name: - recorder_id = r - break - self.client.delete_run(recorder_id) - except: + recorder = self._get_recorder_by_name(recorder_name) + self.client.delete_run(recorder.id) + except MlflowException as e: raise Exception( - "Something went wrong when deleting recorder. Please check if the name/id of the recorder is correct." + f"Error: {e}. Something went wrong when deleting recorder. Please check if the name/id of the recorder is correct." + ) + + def _get_recorder_by_id(self, recorder_id=None, create=False): + """ + Get a recorder by its id. If the `create` is set to True, this method will also start to run the recorder. + + Parameters + ---------- + recorder_id : str + the id of the recorder to be returned. + create : boolean + create the recorder if it hasn't been created before. + + Returns + ------- + The specific recorder with given id. + """ + recorders = self.list_recorders() + if recorder_id in recorders: + return recorders[recorder_id] + else: + if create: + logger.warning(f"No valid recorder found. Create a new recorder with name {recorder_name}.") + self.start(recorder_name) + return self.active_recorder + else: + raise Exception( + "Something went wrong when retrieving recorders. Please check if id of the recorder is correct." + ) + + def _get_recorder_by_name(self, recorder_name=None, create=False): + """ + Get a recorder by its name. If the `create` is set to True, this method will also start to run the recorder. + + Parameters + ---------- + recorder_name : str + the name of the recorder to be returned. + create : boolean + create the recorder if it hasn't been created before. + + Returns + ------- + The specific recorder with given name. + """ + recorders = self.list_recorders() + for rid in recorders: + if recorders[rid].name == recorder_name: + return recorders[rid] + if create: + logger.warning(f"No valid recorder found. Create a new recorder with name {recorder_name}.") + self.start(recorder_name) + return self.active_recorder + else: + raise Exception( + "Something went wrong when retrieving recorders. Please check if the name of the experiment is correct." ) def get_recorder(self, recorder_id=None, recorder_name=None, create=True): @@ -213,68 +279,22 @@ class MLflowExperiment(Experiment): is set to True, this method will not automatically create an active recorder. """ # retrive all the recorders under this experiment - recorders = self.list_recorders() if recorder_id is None and recorder_name is None: if self.active_recorder: return self.active_recorder else: - if create: - self.start() - logger.warning( - f"Recorder {self.active_recorder.id} is running under the experiment with name {self.name}..." - ) - return self.active_recorder - else: - raise Exception( - "Something went wrong when retrieving recorders. Please check if QlibRecorder is running." - ) + return self._get_recorder_by_name(create=create) else: if recorder_id is not None: - if recorder_id in recorders: - return recorders[recorder_id] - else: - # mlflow does not support create a run with given id - raise Exception( - "Something went wrong when retrieving recorders. Please check if id of the recorder is correct." - ) + return self._get_recorder_by_id(recorder_id, create=create) else: - for rid in recorders: - if recorders[rid].name == recorder_name: - return recorders[rid] - if create: - recorders = self.list_recorders() - logger.warning(f"No valid recorder found. Create a new recorder with name {recorder_name}.") - recorder = self.create_recorder() - recorder.name = recorder_name - recorder.start_run() - return recorder - else: - raise Exception( - "Something went wrong when retrieving experiments. Please check if the name of the experiment is correct." - ) + return self._get_recorder_by_name(recorder_name, create=create) def list_recorders(self): - runs = self.client.list_run_infos(self.id, run_view_type=1)[::-1] + runs = self.client.search_runs(self.id, run_view_type=1)[::-1] recorders = dict() for i in range(len(runs)): - rid = runs[i].run_id - status = runs[i].status - start_time = runs[i].start_time - end_time = runs[i].end_time - recorder = MLflowRecorder(f"Recorder_{i+1}", self.id, self._uri) - recorder.id = rid - recorder.status = status - recorder.start_time = ( - datetime.fromtimestamp(float(start_time) / 1000.0).strftime("%Y-%m-%d %H:%M:%S") - if start_time is not None - else None - ) - recorder.end_time = ( - datetime.fromtimestamp(float(end_time) / 1000.0).strftime("%Y-%m-%d %H:%M:%S") - if end_time is not None - else None - ) - recorder._uri = self._uri - recorders[rid] = recorder + recorder = MLflowRecorder(f"Recorder_{i+1}", self.id, self._uri, runs[i]) + recorders[runs[i].info.run_id] = recorder return recorders diff --git a/qlib/workflow/expm.py b/qlib/workflow/expm.py index 1fe345577..00637e29d 100644 --- a/qlib/workflow/expm.py +++ b/qlib/workflow/expm.py @@ -2,6 +2,7 @@ # Licensed under the MIT License. import mlflow +from mlflow.exceptions import MlflowException import os from pathlib import Path from contextlib import contextmanager @@ -23,14 +24,17 @@ class ExpManager: self.default_exp_name = default_exp_name self.active_experiment = None # only one experiment can running each time - def start_exp(self, experiment_name=None, uri=None, **kwargs): + def start_exp(self, experiment_name=None, recorder_name=None, uri=None, **kwargs): """ - Start an experiment. + Start an experiment. This method includes first get_or_create an experiment, and then + set it to be running. Parameters ---------- experiment_name : str name of the active experiment. + recorder_name : str + name of the recorder to be started. uri : str the current tracking URI. @@ -38,14 +42,7 @@ class ExpManager: ------- An active experiment. """ - # create experiment - experiment = self.create_exp(experiment_name, uri) - # set up active experiment - self.active_experiment = experiment - # start the experiment - self.active_experiment.start() - - return self.active_experiment + raise NotImplementedError(f"Please implement the `start_exp` method.") def end_exp(self, recorder_status: str = Recorder.STATUS_S, **kwargs): """ @@ -58,9 +55,7 @@ class ExpManager: recorder_status : str the status of the active recorder of the experiment. """ - if self.active_experiment is not None: - self.active_experiment.end(recorder_status) - self.active_experiment = None + raise NotImplementedError(f"Please implement the `end_exp` method.") def search_records(self, experiment_ids=None, **kwargs): """ @@ -76,29 +71,15 @@ class ExpManager: """ raise NotImplementedError(f"Please implement the `search_records` method.") - def create_exp(self, experiment_name=None, uri=None): + def get_exp(self, experiment_id=None, experiment_name=None, create: bool = True, run: bool = False): """ - Create an experiment. + Retrieve an experiment. This method includes getting an active experiment, and get_or_create a specific experiment. + The returned experiment will be running. - Parameters - ---------- - experiment_name : str - the experiment name, which must be unique. - uri : str - the tracking uri of the experiment. - - Returns - ------- - An experiment object. - """ - raise NotImplementedError(f"Please implement the `create_exp` method.") - - def get_exp(self, experiment_id=None, experiment_name=None, create: bool = True): - """ - Retrieve an experiment. When user specify experiment id and name, the method will try to return the - specific experiment. When user does not provide recorder id or name, the method will try to return the current - active experiment. The `create` argument determines whether the method will automatically create a new experiment - according to user's specification if the experiment hasn't been created before + When user specify experiment id and name, the method will try to return the specific experiment. + When user does not provide recorder id or name, the method will try to return the current active experiment. + The `create` argument determines whether the method will automatically create a new experiment according + to user's specification if the experiment hasn't been created before If `create` is True: If R's running: @@ -122,9 +103,13 @@ class ExpManager: Parameters ---------- experiment_id : str - the experiment id to return. + id of the experiment to return. + experiment_name : str + name of the experiment to return. create : boolean - create the experiment if hasn't been created before. + create the experiment it if hasn't been created before. + run : boolean + run the experiment when it is created for the first time. Returns ------- @@ -175,9 +160,13 @@ class MLflowExpManager(ExpManager): super(MLflowExpManager, self).__init__(uri, default_exp_name) self.client = mlflow.tracking.MlflowClient(tracking_uri=self.uri) - def create_exp(self, experiment_name=None, uri=None): - # retrieve all created experiments - experiments = self.list_experiments() + def start_exp(self, experiment_name=None, recorder_name=None, uri=None): + # create experiment + experiment = self.get_exp(experiment_name=experiment_name, run=False) + # set up active experiment + self.active_experiment = experiment + # start the experiment + self.active_experiment.start(recorder_name) # set the tracking uri if uri is None: logger.info( @@ -186,35 +175,102 @@ class MLflowExpManager(ExpManager): else: self.uri = uri mlflow.set_tracking_uri(self.uri) - # start the experiment - if experiment_name is None: - logger.info( - f"No experiment name provided. The default experiment name is set as `{self.default_exp_name}`." - ) - if self.default_exp_name not in experiments: - experiment_id = self.client.create_experiment(self.default_exp_name) - else: - experiment_id = self.client.get_experiment_by_name(self.default_exp_name).experiment_id - # set the active experiment - mlflow.set_experiment(self.default_exp_name) - experiment_name = self.default_exp_name - else: - if experiment_name not in experiments: - if self.client.get_experiment_by_name(experiment_name) is not None: - logger.info( - "The experiment has already been created before. Try to resume the experiment with a new recorder..." - ) - experiment_id = self.client.get_experiment_by_name(experiment_name).experiment_id - else: - experiment_id = self.client.create_experiment(experiment_name) - else: - experiment_id = experiments[experiment_name].id - experiment = experiments[experiment_name] - # init experiment - experiment = MLflowExperiment(experiment_id, experiment_name, self.uri) - experiment._default_name = self.default_exp_name - return experiment + return self.active_experiment + + def end_exp(self, recorder_status: str = Recorder.STATUS_S): + if self.active_experiment is not None: + self.active_experiment.end(recorder_status) + self.active_experiment = None + + def __get_exp_by_id(self, experiment_id=None, create=False, run=False): + """ + Method for retrieving an experiment by its id. If the `create` is set to True, this method will also start to run the experiment. + + Parameters + ---------- + experiment_id : str + the id of the experiment to be returned. + create : boolean + create the experiment if it hasn't been created before. + + Returns + ------- + The specific experiment with given id. + """ + # retrive all created experiments + experiments = self.list_experiments() + for name in experiments: + if experiments[name].id == experiment_id: + return experiments[name] + if create: + logger.warning(f"No valid experiment found. Use the Default experiment for further process.") + return self.__get_exp_by_name(create=create, run=True) + else: + raise Exception( + "Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct." + ) + + def __get_exp_by_name(self, experiment_name=None, create=False, run=False): + """ + Method for retrieving an experiment by its name. If the `create` is set to True, this method will also start to run the experiment. + + Parameters + ---------- + experiment_name : str + the name of the experiment to be returned. + create : boolean + create the experiment if it hasn't been created before. + + Returns + ------- + The specific experiment with given name. + """ + # retrive all created experiments + experiments = self.list_experiments() + if experiment_name in experiments: + return experiments[experiment_name] + if create: + if experiment_name is None: + logger.info( + f"No experiment name provided. Create experiment with name {self.default_exp_name} for further process." + ) + experiment_name = self.default_exp_name + if self.client.get_experiment_by_name(experiment_name) is not None: + logger.info( + "The experiment has already been created before and deleted. Try to restore the experiment with a new recorder..." + ) + experiment_id = self.client.get_experiment_by_name(experiment_name).experiment_id + self.client.restore_experiment(experiment_id) + else: + experiment_id = self.client.create_experiment(experiment_name) + + # init experiment + experiment = MLflowExperiment(experiment_id, experiment_name, self.uri) + experiment._default_name = self.default_exp_name + if run: + self.active_experiment = experiment + self.active_experiment.start() + + return experiment + else: + if experiment_name is None and self.default_exp_name in experiments: + return experiments[self.default_exp_name] + raise Exception( + "Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct." + ) + + def get_exp(self, experiment_id=None, experiment_name=None, create=True, run=True): + if experiment_id is None and experiment_name is None: + if self.active_experiment: + return self.active_experiment + else: + return self.__get_exp_by_name(create=create, run=run) + else: + if experiment_name is not None: + return self.__get_exp_by_name(experiment_name, create=create, run=run) + else: + return self.__get_exp_by_id(experiment_id, create=create, run=run) def search_records(self, experiment_ids, **kwargs): filter_string = "" if kwargs.get("filter_string") is None else kwargs.get("filter_string") @@ -223,48 +279,6 @@ class MLflowExpManager(ExpManager): order_by = kwargs.get("order_by") return self.client.search_runs(experiment_ids, filter_string, run_view_type, max_results, order_by) - def get_exp(self, experiment_id=None, experiment_name=None, create=True): - # retrive all created experiments - experiments = self.list_experiments() - if experiment_id is None and experiment_name is None: - if self.active_experiment: - return self.active_experiment - else: - if create: - logger.warning("QlibRecorder is not running. Use the Default experiment for further process.") - return self.start_exp() - else: - if self.default_exp_name in experiments: - return experiments[self.default_exp_name] - raise Exception( - "Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct." - ) - else: - if experiment_name is not None: - if experiment_name in experiments: - return experiments[experiment_name] - else: - if create: - logger.warning( - f"No valid experiment found. Create experiment with name {experiment_name} for further process." - ) - return self.start_exp(experiment_name) - else: - raise Exception( - "Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct." - ) - else: - for name in experiments: - if experiments[name].id == experiment_id: - return experiments[name] - if create: - logger.warning(f"No valid experiment found. Use the Default experiment for further process.") - return self.start_exp() - else: - raise Exception( - "Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct." - ) - def delete_exp(self, experiment_id=None, experiment_name=None): assert ( experiment_id is not None or experiment_name is not None @@ -275,22 +289,19 @@ class MLflowExpManager(ExpManager): else: experiment = self.client.get_experiment_by_name(experiment_name) self.client.delete_experiment(experiment.experiment_id) - except: + except MlflowException as e: raise Exception( - "Something went wrong when deleting experiment. Please check if the name/id of the experiment is correct." + f"Error: {e}. Something went wrong when deleting experiment. Please check if the name/id of the experiment is correct." ) def list_experiments(self): # retrieve all the existing experiments exps = self.client.list_experiments(view_type=1) experiments = dict() - for i in range(len(exps)): - eid = exps[i].experiment_id - ename = exps[i].name + for exp in exps: + eid = exp.experiment_id + ename = exp.name experiment = MLflowExperiment(eid, ename, self.uri) - experiment.id = eid - experiment.name = ename - experiment._uri = self.uri experiments[ename] = experiment return experiments diff --git a/qlib/workflow/record_temp.py b/qlib/workflow/record_temp.py index 0c20f6efc..e3e19bd10 100644 --- a/qlib/workflow/record_temp.py +++ b/qlib/workflow/record_temp.py @@ -3,6 +3,7 @@ import pandas as pd from pathlib import Path +from pprint import pprint from ..contrib.evaluate import ( backtest as normal_backtest, risk_analysis, @@ -83,14 +84,14 @@ class SignalRecord(RecordTemp): logger.info( f"Signal record 'pred.pkl' has been saved as the artifact of the Experiment {self.recorder.experiment_id}" ) + # print out results + pprint(f"The following are prediction results of the {type(self.model).__name__} model.") + pprint(pred.head(5)) def load(self): # try to load the saved object - try: - pred = self.recorder.load_object("pred.pkl") - return pred - except: - raise Exception("Something went wrong when loading the saved object.") + pred = self.recorder.load_object("pred.pkl") + return pred def check(self, **kwargs): artifacts = self.recorder.list_artifacts() @@ -148,19 +149,51 @@ class PortAnaRecord(SignalRecord): analysis["excess_return_with_cost"] = risk_analysis( report_normal["return"] - report_normal["bench"] - report_normal["cost"] ) + # log metrics + self.recorder.log_metrics( + excess_return_without_cost_mean=analysis["excess_return_without_cost"]["risk"]["mean"] + ) + self.recorder.log_metrics(excess_return_without_cost_std=analysis["excess_return_without_cost"]["risk"]["std"]) + self.recorder.log_metrics( + excess_return_without_cost_annualized_return=analysis["excess_return_without_cost"]["risk"][ + "annualized_return" + ] + ) + self.recorder.log_metrics( + excess_return_without_cost_information_ratio=analysis["excess_return_without_cost"]["risk"][ + "information_ratio" + ] + ) + self.recorder.log_metrics( + excess_return_without_cost_max_drawdown=analysis["excess_return_without_cost"]["risk"]["max_drawdown"] + ) + self.recorder.log_metrics(excess_return_with_cost_mean=analysis["excess_return_with_cost"]["risk"]["mean"]) + self.recorder.log_metrics(excess_return_with_cost_std=analysis["excess_return_with_cost"]["risk"]["std"]) + self.recorder.log_metrics( + excess_return_with_cost_annualized_return=analysis["excess_return_with_cost"]["risk"]["annualized_return"] + ) + self.recorder.log_metrics( + excess_return_with_cost_information_ratio=analysis["excess_return_with_cost"]["risk"]["information_ratio"] + ) + self.recorder.log_metrics( + excess_return_with_cost_max_drawdown=analysis["excess_return_with_cost"]["risk"]["max_drawdown"] + ) + # save portfolio analysis results analysis_df = pd.concat(analysis) # type: pd.DataFrame self.recorder.save_objects(**{"port_analysis.pkl": analysis_df}, artifact_path=self.artifact_path) logger.info( f"Portfolio analysis record 'port_analysis.pkl' has been saved as the artifact of the Experiment {self.recorder.experiment_id}" ) + # print out results + pprint("The following are analysis results of the excess return without cost.") + pprint(analysis["excess_return_without_cost"]) + pprint("The following are analysis results of the excess return with cost.") + pprint(analysis["excess_return_with_cost"]) def load(self): # try to load the saved object - try: - pred = self.recorder.load_object(self.artifact_path / "port_analysis.pkl") - return pred - except: - raise Exception("Something went wrong when loading the saved object.") + pred = self.recorder.load_object(self.artifact_path / "port_analysis.pkl") + return pred def check(self): artifacts = self.recorder.list_artifacts(self.artifact_path) diff --git a/qlib/workflow/recorder.py b/qlib/workflow/recorder.py index 6c83641a9..a2cddadcb 100644 --- a/qlib/workflow/recorder.py +++ b/qlib/workflow/recorder.py @@ -2,8 +2,9 @@ # Licensed under the MIT License. import mlflow -import shutil, os, pickle, tempfile, codecs, datetime +import shutil, os, pickle, tempfile, codecs from pathlib import Path +from datetime import datetime from ..utils.objm import FileManager from ..log import get_module_logger @@ -167,7 +168,7 @@ class MLflowRecorder(Recorder): use file manager to help maintain the objects in the project. """ - def __init__(self, name, experiment_id, uri): + def __init__(self, name, experiment_id, uri, mlflow_run=None): super(MLflowRecorder, self).__init__(name, experiment_id) self._uri = uri self.artifact_uri = None @@ -175,6 +176,22 @@ class MLflowRecorder(Recorder): self.temp_dir = tempfile.mkdtemp() self.fm = FileManager(Path(self.temp_dir).absolute()) self.client = mlflow.tracking.MlflowClient(tracking_uri=self._uri) + # construct from mlflow run + if mlflow_run is not None: + assert isinstance(mlflow_run, mlflow.entities.run.Run), "Please input with a MLflow Run object." + self.name = mlflow_run.data.tags["mlflow.runName"] if mlflow_run.data.tags["mlflow.runName"] != "" else name + self.id = mlflow_run.info.run_id + self.status = mlflow_run.info.status + self.start_time = ( + datetime.fromtimestamp(float(mlflow_run.info.start_time) / 1000.0).strftime("%Y-%m-%d %H:%M:%S") + if mlflow_run.info.start_time is not None + else None + ) + self.end_time = ( + datetime.fromtimestamp(float(mlflow_run.info.end_time) / 1000.0).strftime("%Y-%m-%d %H:%M:%S") + if mlflow_run.info.end_time is not None + else None + ) def start_run(self): # start the run @@ -182,7 +199,7 @@ class MLflowRecorder(Recorder): # save the run id and artifact_uri self.id = run.info.run_id self.artifact_uri = run.info.artifact_uri - self.start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + self.start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.status = Recorder.STATUS_R logger.info(f"Recorder {self.id} starts running under Experiment {self.experiment_id} ...") @@ -196,7 +213,7 @@ class MLflowRecorder(Recorder): Recorder.STATUS_FA, ], f"The status type {status} is not supported." mlflow.end_run(status) - self.end_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + self.end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if self.status != Recorder.STATUS_S: self.status = status shutil.rmtree(self.temp_dir) @@ -213,31 +230,23 @@ class MLflowRecorder(Recorder): def load_object(self, name): assert self._uri is not None, "Please start the experiment and recorder first before using recorder directly." path = self.client.download_artifacts(self.id, name) - try: - with Path(path).open("rb") as f: - f.seek(0) - return pickle.load(f) - except: - with codecs.open(path, mode="r", encoding="utf-8") as f: - return f.read() + with Path(path).open("rb") as f: + return pickle.load(f) def log_params(self, **kwargs): - keys = list(kwargs.keys()) for name, data in kwargs.items(): self.client.log_param(self.id, name, data) def log_metrics(self, step=None, **kwargs): - keys = list(kwargs.keys()) for name, data in kwargs.items(): self.client.log_metric(self.id, name, data) def set_tags(self, **kwargs): - keys = list(kwargs.keys()) for name, data in kwargs.items(): self.client.set_tag(self.id, name, data) def delete_tags(self, *keys): - for count, key in enumerate(keys): + for key in keys: self.client.delete_tag(self.id, key) def get_artifact_uri(self): diff --git a/qlib/workflow/utils.py b/qlib/workflow/utils.py index 196669fea..d4594d28e 100644 --- a/qlib/workflow/utils.py +++ b/qlib/workflow/utils.py @@ -1,13 +1,25 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. -import sys, traceback, signal +import sys, traceback, signal, atexit from . import R from .recorder import Recorder from ..log import get_module_logger logger = get_module_logger("workflow", "INFO") +# function to handle the experiment when unusual program ending occurs +def experiment_exit_handler(): + """ + Method for handling the experiment when any unusual program ending occurs. + The `atexit` handler should be put in the last, since, as long as the program ends, it will be called. + Thus, if any exception or user interuption occurs beforehead, we should handle them first. Once `R` is + ended, another call of `R.end_exp` will not take effect. + """ + signal.signal(signal.SIGINT, experiment_kill_signal_handler) # handle user keyboard interupt + sys.excepthook = experiment_exception_hook # handle uncaught exception + atexit.register(R.end_exp, recorder_status=Recorder.STATUS_FI) # will not take effect if experiment ends + def experiment_exception_hook(type, value, tb): """ @@ -19,15 +31,16 @@ def experiment_exception_hook(type, value, tb): value: Exception's value tb: Exception's traceback """ - error_msg = "An exception has been raised.\n" f"Type: {type}\n" f"Value: {value}\n" + error_msg = "An exception has been raised.\n" f"Type: {type}\n" logger.error(error_msg) traceback.print_tb(tb) + logger.error(f"Value: {value}") R.end_exp(recorder_status=Recorder.STATUS_FA) def experiment_kill_signal_handler(signum, frame): """ - End an experiment when user kill the program (CTRL+C, etc.). + End an experiment when user kill the program through keyboard (CTRL+C, etc.). """ R.end_exp(recorder_status=Recorder.STATUS_FA) diff --git a/setup.py b/setup.py index 38a84ef7c..8ad124750 100644 --- a/setup.py +++ b/setup.py @@ -100,7 +100,7 @@ setup( entry_points={ # 'console_scripts': ['mycli=mymodule:cli'], "console_scripts": [ - "estimator=qlib.contrib.estimator.launcher:run", + "workflow_by_config=qlib.workflow.cli:run", ], }, ext_modules=extensions,