mirror of
https://github.com/microsoft/qlib.git
synced 2026-06-06 05:51:17 +08:00
Auto log uncommmitted code (#1167)
* Auto log uncommmitted code * Support set record name & trainer; * Update recorder.py
This commit is contained in:
@@ -12,7 +12,7 @@ In ``DelayTrainer``, the first step is only to save some necessary info to model
|
||||
"""
|
||||
|
||||
import socket
|
||||
from typing import Callable, List
|
||||
from typing import Callable, List, Optional
|
||||
|
||||
from tqdm.auto import tqdm
|
||||
|
||||
@@ -219,7 +219,13 @@ class TrainerR(Trainer):
|
||||
STATUS_BEGIN = "begin_task_train"
|
||||
STATUS_END = "end_task_train"
|
||||
|
||||
def __init__(self, experiment_name: str = None, train_func: Callable = task_train, call_in_subproc: bool = False):
|
||||
def __init__(
|
||||
self,
|
||||
experiment_name: Optional[str] = None,
|
||||
train_func: Callable = task_train,
|
||||
call_in_subproc: bool = False,
|
||||
default_rec_name: Optional[str] = None,
|
||||
):
|
||||
"""
|
||||
Init TrainerR.
|
||||
|
||||
@@ -230,6 +236,7 @@ class TrainerR(Trainer):
|
||||
"""
|
||||
super().__init__()
|
||||
self.experiment_name = experiment_name
|
||||
self.default_rec_name = default_rec_name
|
||||
self.train_func = train_func
|
||||
self._call_in_subproc = call_in_subproc
|
||||
|
||||
@@ -259,7 +266,7 @@ class TrainerR(Trainer):
|
||||
if self._call_in_subproc:
|
||||
get_module_logger("TrainerR").info("running models in sub process (for forcing release memroy).")
|
||||
train_func = call_in_subproc(train_func, C)
|
||||
rec = train_func(task, experiment_name, **kwargs)
|
||||
rec = train_func(task, experiment_name, recorder_name=self.default_rec_name, **kwargs)
|
||||
rec.set_tags(**{self.STATUS_KEY: self.STATUS_BEGIN})
|
||||
recs.append(rec)
|
||||
return recs
|
||||
@@ -286,7 +293,9 @@ class DelayTrainerR(TrainerR):
|
||||
A delayed implementation based on TrainerR, which means `train` method may only do some preparation and `end_train` method can do the real model fitting.
|
||||
"""
|
||||
|
||||
def __init__(self, experiment_name: str = None, train_func=begin_task_train, end_train_func=end_task_train):
|
||||
def __init__(
|
||||
self, experiment_name: str = None, train_func=begin_task_train, end_train_func=end_task_train, **kwargs
|
||||
):
|
||||
"""
|
||||
Init TrainerRM.
|
||||
|
||||
@@ -295,7 +304,7 @@ class DelayTrainerR(TrainerR):
|
||||
train_func (Callable, optional): default train method. Defaults to `begin_task_train`.
|
||||
end_train_func (Callable, optional): default end_train method. Defaults to `end_task_train`.
|
||||
"""
|
||||
super().__init__(experiment_name, train_func)
|
||||
super().__init__(experiment_name, train_func, **kwargs)
|
||||
self.end_train_func = end_train_func
|
||||
self.delay = True
|
||||
|
||||
@@ -344,7 +353,12 @@ class TrainerRM(Trainer):
|
||||
TM_ID = "_id in TaskManager"
|
||||
|
||||
def __init__(
|
||||
self, experiment_name: str = None, task_pool: str = None, train_func=task_train, skip_run_task: bool = False
|
||||
self,
|
||||
experiment_name: str = None,
|
||||
task_pool: str = None,
|
||||
train_func=task_train,
|
||||
skip_run_task: bool = False,
|
||||
default_rec_name: Optional[str] = None,
|
||||
):
|
||||
"""
|
||||
Init TrainerR.
|
||||
@@ -363,6 +377,7 @@ class TrainerRM(Trainer):
|
||||
self.task_pool = task_pool
|
||||
self.train_func = train_func
|
||||
self.skip_run_task = skip_run_task
|
||||
self.default_rec_name = default_rec_name
|
||||
|
||||
def train(
|
||||
self,
|
||||
@@ -371,6 +386,7 @@ class TrainerRM(Trainer):
|
||||
experiment_name: str = None,
|
||||
before_status: str = TaskManager.STATUS_WAITING,
|
||||
after_status: str = TaskManager.STATUS_DONE,
|
||||
default_rec_name: Optional[str] = None,
|
||||
**kwargs,
|
||||
) -> List[Recorder]:
|
||||
"""
|
||||
@@ -398,6 +414,8 @@ class TrainerRM(Trainer):
|
||||
train_func = self.train_func
|
||||
if experiment_name is None:
|
||||
experiment_name = self.experiment_name
|
||||
if default_rec_name is None:
|
||||
default_rec_name = self.default_rec_name
|
||||
task_pool = self.task_pool
|
||||
if task_pool is None:
|
||||
task_pool = experiment_name
|
||||
@@ -412,6 +430,7 @@ class TrainerRM(Trainer):
|
||||
experiment_name=experiment_name,
|
||||
before_status=before_status,
|
||||
after_status=after_status,
|
||||
recorder_name=default_rec_name,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
@@ -480,6 +499,7 @@ class DelayTrainerRM(TrainerRM):
|
||||
train_func=begin_task_train,
|
||||
end_train_func=end_task_train,
|
||||
skip_run_task: bool = False,
|
||||
**kwargs,
|
||||
):
|
||||
"""
|
||||
Init DelayTrainerRM.
|
||||
@@ -494,7 +514,7 @@ class DelayTrainerRM(TrainerRM):
|
||||
Only run_task in the worker. Otherwise skip run_task.
|
||||
E.g. Starting trainer on a CPU VM and then waiting tasks to be finished on GPU VMs.
|
||||
"""
|
||||
super().__init__(experiment_name, task_pool, train_func)
|
||||
super().__init__(experiment_name, task_pool, train_func, **kwargs)
|
||||
self.end_train_func = end_train_func
|
||||
self.delay = True
|
||||
self.skip_run_task = skip_run_task
|
||||
|
||||
@@ -2,11 +2,13 @@
|
||||
# Licensed under the MIT License.
|
||||
|
||||
import os
|
||||
import sys
|
||||
import mlflow
|
||||
import logging
|
||||
import shutil
|
||||
import pickle
|
||||
import tempfile
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
|
||||
@@ -296,8 +298,32 @@ class MLflowRecorder(Recorder):
|
||||
# - This may cause delay when uploading results
|
||||
# - The logging time may not be accurate
|
||||
self.async_log = AsyncCaller()
|
||||
|
||||
# TODO: currently, this is only supported in MLflowRecorder.
|
||||
# Maybe we can make this feature more general.
|
||||
self._log_uncommitted_code()
|
||||
|
||||
self.log_params(**{"cmd-sys.argv": " ".join(sys.argv)}) # log the command to produce current experiment
|
||||
return run
|
||||
|
||||
def _log_uncommitted_code(self):
|
||||
"""
|
||||
Mlflow only log the commit id of the current repo. But usually, user will have a lot of uncommitted changes.
|
||||
So this tries to automatically to log them all.
|
||||
"""
|
||||
# TODO: the sub-directories maybe git repos.
|
||||
# So it will be better if we can walk the sub-directories and log the uncommitted changes.
|
||||
for cmd, fname in [
|
||||
("git diff", "code_diff.txt"),
|
||||
("git status", "code_status.txt"),
|
||||
("git diff --cached", "code_cached.txt"),
|
||||
]:
|
||||
try:
|
||||
out = subprocess.check_output(cmd, shell=True)
|
||||
self.client.log_text(self.id, out.decode(), fname) # this behaves same as above
|
||||
except subprocess.CalledProcessError:
|
||||
logger.info(f"Fail to log the uncommitted code of $CWD when run `{cmd}`")
|
||||
|
||||
def end_run(self, status: str = Recorder.STATUS_S):
|
||||
assert status in [
|
||||
Recorder.STATUS_S,
|
||||
|
||||
Reference in New Issue
Block a user