diff --git a/qlib/model/trainer.py b/qlib/model/trainer.py index 4bfa6337b..84ae2a6c6 100644 --- a/qlib/model/trainer.py +++ b/qlib/model/trainer.py @@ -12,7 +12,7 @@ In ``DelayTrainer``, the first step is only to save some necessary info to model """ import socket -from typing import Callable, List +from typing import Callable, List, Optional from tqdm.auto import tqdm @@ -219,7 +219,13 @@ class TrainerR(Trainer): STATUS_BEGIN = "begin_task_train" STATUS_END = "end_task_train" - def __init__(self, experiment_name: str = None, train_func: Callable = task_train, call_in_subproc: bool = False): + def __init__( + self, + experiment_name: Optional[str] = None, + train_func: Callable = task_train, + call_in_subproc: bool = False, + default_rec_name: Optional[str] = None, + ): """ Init TrainerR. @@ -230,6 +236,7 @@ class TrainerR(Trainer): """ super().__init__() self.experiment_name = experiment_name + self.default_rec_name = default_rec_name self.train_func = train_func self._call_in_subproc = call_in_subproc @@ -259,7 +266,7 @@ class TrainerR(Trainer): if self._call_in_subproc: get_module_logger("TrainerR").info("running models in sub process (for forcing release memroy).") train_func = call_in_subproc(train_func, C) - rec = train_func(task, experiment_name, **kwargs) + rec = train_func(task, experiment_name, recorder_name=self.default_rec_name, **kwargs) rec.set_tags(**{self.STATUS_KEY: self.STATUS_BEGIN}) recs.append(rec) return recs @@ -286,7 +293,9 @@ class DelayTrainerR(TrainerR): A delayed implementation based on TrainerR, which means `train` method may only do some preparation and `end_train` method can do the real model fitting. """ - def __init__(self, experiment_name: str = None, train_func=begin_task_train, end_train_func=end_task_train): + def __init__( + self, experiment_name: str = None, train_func=begin_task_train, end_train_func=end_task_train, **kwargs + ): """ Init TrainerRM. @@ -295,7 +304,7 @@ class DelayTrainerR(TrainerR): train_func (Callable, optional): default train method. Defaults to `begin_task_train`. end_train_func (Callable, optional): default end_train method. Defaults to `end_task_train`. """ - super().__init__(experiment_name, train_func) + super().__init__(experiment_name, train_func, **kwargs) self.end_train_func = end_train_func self.delay = True @@ -344,7 +353,12 @@ class TrainerRM(Trainer): TM_ID = "_id in TaskManager" def __init__( - self, experiment_name: str = None, task_pool: str = None, train_func=task_train, skip_run_task: bool = False + self, + experiment_name: str = None, + task_pool: str = None, + train_func=task_train, + skip_run_task: bool = False, + default_rec_name: Optional[str] = None, ): """ Init TrainerR. @@ -363,6 +377,7 @@ class TrainerRM(Trainer): self.task_pool = task_pool self.train_func = train_func self.skip_run_task = skip_run_task + self.default_rec_name = default_rec_name def train( self, @@ -371,6 +386,7 @@ class TrainerRM(Trainer): experiment_name: str = None, before_status: str = TaskManager.STATUS_WAITING, after_status: str = TaskManager.STATUS_DONE, + default_rec_name: Optional[str] = None, **kwargs, ) -> List[Recorder]: """ @@ -398,6 +414,8 @@ class TrainerRM(Trainer): train_func = self.train_func if experiment_name is None: experiment_name = self.experiment_name + if default_rec_name is None: + default_rec_name = self.default_rec_name task_pool = self.task_pool if task_pool is None: task_pool = experiment_name @@ -412,6 +430,7 @@ class TrainerRM(Trainer): experiment_name=experiment_name, before_status=before_status, after_status=after_status, + recorder_name=default_rec_name, **kwargs, ) @@ -480,6 +499,7 @@ class DelayTrainerRM(TrainerRM): train_func=begin_task_train, end_train_func=end_task_train, skip_run_task: bool = False, + **kwargs, ): """ Init DelayTrainerRM. @@ -494,7 +514,7 @@ class DelayTrainerRM(TrainerRM): Only run_task in the worker. Otherwise skip run_task. E.g. Starting trainer on a CPU VM and then waiting tasks to be finished on GPU VMs. """ - super().__init__(experiment_name, task_pool, train_func) + super().__init__(experiment_name, task_pool, train_func, **kwargs) self.end_train_func = end_train_func self.delay = True self.skip_run_task = skip_run_task diff --git a/qlib/workflow/recorder.py b/qlib/workflow/recorder.py index 94cbac425..9d6e03b4e 100644 --- a/qlib/workflow/recorder.py +++ b/qlib/workflow/recorder.py @@ -2,11 +2,13 @@ # Licensed under the MIT License. import os +import sys import mlflow import logging import shutil import pickle import tempfile +import subprocess from pathlib import Path from datetime import datetime @@ -296,8 +298,32 @@ class MLflowRecorder(Recorder): # - This may cause delay when uploading results # - The logging time may not be accurate self.async_log = AsyncCaller() + + # TODO: currently, this is only supported in MLflowRecorder. + # Maybe we can make this feature more general. + self._log_uncommitted_code() + + self.log_params(**{"cmd-sys.argv": " ".join(sys.argv)}) # log the command to produce current experiment return run + def _log_uncommitted_code(self): + """ + Mlflow only log the commit id of the current repo. But usually, user will have a lot of uncommitted changes. + So this tries to automatically to log them all. + """ + # TODO: the sub-directories maybe git repos. + # So it will be better if we can walk the sub-directories and log the uncommitted changes. + for cmd, fname in [ + ("git diff", "code_diff.txt"), + ("git status", "code_status.txt"), + ("git diff --cached", "code_cached.txt"), + ]: + try: + out = subprocess.check_output(cmd, shell=True) + self.client.log_text(self.id, out.decode(), fname) # this behaves same as above + except subprocess.CalledProcessError: + logger.info(f"Fail to log the uncommitted code of $CWD when run `{cmd}`") + def end_run(self, status: str = Recorder.STATUS_S): assert status in [ Recorder.STATUS_S,