1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-06-06 14:01:28 +08:00

Compare commits

...

2 Commits

Author SHA1 Message Date
Young
949d96d768 log environment automatically 2022-08-09 11:48:47 +08:00
Young
597359f98f Refine type hint and recorder 2022-08-09 11:12:06 +08:00
5 changed files with 156 additions and 29 deletions

View File

@@ -4,6 +4,8 @@
"""Commonly used types."""
import sys
from typing import Union
from pathlib import Path
__all__ = ["Literal", "TypedDict", "final"]
@@ -11,3 +13,51 @@ if sys.version_info >= (3, 8):
from typing import Literal, TypedDict, final # type: ignore # pylint: disable=no-name-in-module
else:
from typing_extensions import Literal, TypedDict, final
class InstDictConf(TypedDict):
"""
InstDictConf is a Dict-based config to describe an instance
case 1)
{
'class': 'ClassName',
'kwargs': dict, # It is optional. {} will be used if not given
'model_path': path, # It is optional if module is given in the class
}
case 2)
{
'class': <The class it self>,
'kwargs': dict, # It is optional. {} will be used if not given
}
"""
# class: str # because class is a keyword of Python. We have to comment it
kwargs: dict # It is optional. {} will be used if not given
module_path: str # It is optional if module is given in the class
InstConf = Union[InstDictConf, str, object, Path]
"""
InstConf is a type to describe an instance; it will be passed into init_instance_by_config for Qlib
config : Union[str, dict, object, Path]
InstDictConf example.
please refer to the docs of InstDictConf
str example.
1) specify a pickle object
- path like 'file:///<path to pickle file>/obj.pkl'
2) specify a class name
- "ClassName": getattr(module, "ClassName")() will be used.
3) specify module path with class name
- "a.b.c.ClassName" getattr(<a.b.c.module>, "ClassName")() will be used.
object example:
instance of accept_types
Path example:
specify a pickle object
- it will be treated like 'file:///<path to pickle file>/obj.pkl'
"""

View File

@@ -11,6 +11,7 @@ import re
import sys
import copy
import json
from qlib.typehint import InstConf
import yaml
import redis
import bisect
@@ -291,7 +292,11 @@ def get_module_by_module_path(module_path: Union[str, ModuleType]):
:param module_path:
:return:
:raises: ModuleNotFoundError
"""
if module_path is None:
raise ModuleNotFoundError("None is passed in as parameters as module_path")
if isinstance(module_path, ModuleType):
module = module_path
else:
@@ -324,7 +329,7 @@ def split_module_path(module_path: str) -> Tuple[str, str]:
return m_path, cls
def get_callable_kwargs(config: Union[dict, str], default_module: Union[str, ModuleType] = None) -> (type, dict):
def get_callable_kwargs(config: InstConf, default_module: Union[str, ModuleType] = None) -> (type, dict):
"""
extract class/func and kwargs from config info
@@ -343,6 +348,10 @@ def get_callable_kwargs(config: Union[dict, str], default_module: Union[str, Mod
-------
(type, dict):
the class/func object and it's arguments.
Raises
------
ModuleNotFoundError
"""
if isinstance(config, dict):
key = "class" if "class" in config else "func"
@@ -376,7 +385,7 @@ get_cls_kwargs = get_callable_kwargs # NOTE: this is for compatibility for the
def init_instance_by_config(
config: Union[str, dict, object, Path], # TODO: use a user-defined type to replace this Union.
config: InstConf,
default_module=None,
accept_types: Union[type, Tuple[type]] = (),
try_kwargs: Dict = {},
@@ -387,31 +396,8 @@ def init_instance_by_config(
Parameters
----------
config : Union[str, dict, object]
dict example.
case 1)
{
'class': 'ClassName',
'kwargs': dict, # It is optional. {} will be used if not given
'model_path': path, # It is optional if module is given
}
case 2)
{
'class': <The class it self>,
'kwargs': dict, # It is optional. {} will be used if not given
}
str example.
1) specify a pickle object
- path like 'file:///<path to pickle file>/obj.pkl'
2) specify a class name
- "ClassName": getattr(module, "ClassName")() will be used.
3) specify module path with class name
- "a.b.c.ClassName" getattr(<a.b.c.module>, "ClassName")() will be used.
object example:
instance of accept_types
Path example:
specify a pickle object
- it will be treated like 'file:///<path to pickle file>/obj.pkl'
config : InstConf
default_module : Python module
Optional. It should be a python module.
NOTE: the "module_path" will be override by `module` arguments

View File

@@ -575,6 +575,44 @@ class QlibRecorder:
"""
self.get_exp(start=True).get_recorder(start=True).log_metrics(step, **kwargs)
def log_artifact(self, local_path: str, artifact_path: Optional[str] = None):
"""
Log a local file or directory as an artifact of the currently active run
- If `active recorder` exists: it will set tags through the active recorder.
- If `active recorder` not exists: the system will create a default experiment as well as a new recorder, and set the tags under it.
Parameters
----------
local_path : str
Path to the file to write.
artifact_path : Optional[str]
If provided, the directory in ``artifact_uri`` to write to.
"""
self.get_exp(start=True).get_recorder(start=True).log_artifact(local_path, artifact_path)
def download_artifact(self, path: str, dst_path: Optional[str] = None) -> str:
"""
Download an artifact file or directory from a run to a local directory if applicable,
and return a local path for it.
Parameters
----------
path : str
Relative source path to the desired artifact.
dst_path : Optional[str]
Absolute path of the local filesystem destination directory to which to
download the specified artifacts. This directory must already exist.
If unspecified, the artifacts will either be downloaded to a new
uniquely-named directory on the local filesystem.
Returns
-------
str
Local path of desired artifact.
"""
self.get_exp(start=True).get_recorder(start=True).download_artifact(path, dst_path)
def set_tags(self, **kwargs):
"""
Method for setting tags for a recorder. In addition to using ``R``, one can also set the tag to a specific recorder after getting it with `get_recorder` API.
@@ -611,7 +649,7 @@ class RecorderWrapper(Wrapper):
expm = getattr(self._provider, "exp_manager")
if expm.active_experiment is not None:
raise RecorderInitializationError(
"Please don't reinitialize Qlib if QlibRecorder is already acivated. Otherwise, the experiment stored location will be modified."
"Please don't reinitialize Qlib if QlibRecorder is already activated. Otherwise, the experiment stored location will be modified."
)
self._provider = provider

View File

@@ -111,7 +111,7 @@ class Experiment:
"""
raise NotImplementedError(f"Please implement the `delete_recorder` method.")
def get_recorder(self, recorder_id=None, recorder_name=None, create: bool = True, start: bool = False):
def get_recorder(self, recorder_id=None, recorder_name=None, create: bool = True, start: bool = False) -> Recorder:
"""
Retrieve a Recorder for user. When user specify recorder id and name, the method will try to return the
specific recorder. When user does not provide recorder id or name, the method will try to return the current

View File

@@ -3,6 +3,7 @@
import os
import sys
from typing import Optional
import mlflow
import logging
import shutil
@@ -138,6 +139,19 @@ class Recorder:
"""
raise NotImplementedError(f"Please implement the `log_metrics` method.")
def log_artifact(self, local_path: str, artifact_path: Optional[str] = None):
"""
Log a local file or directory as an artifact of the currently active run.
Parameters
----------
local_path : str
Path to the file to write.
artifact_path : Optional[str]
If provided, the directory in ``artifact_uri`` to write to.
"""
raise NotImplementedError(f"Please implement the `log_metrics` method.")
def set_tags(self, **kwargs):
"""
Log a batch of tags for the current run.
@@ -175,6 +189,28 @@ class Recorder:
"""
raise NotImplementedError(f"Please implement the `list_artifacts` method.")
def download_artifact(self, path: str, dst_path: Optional[str] = None) -> str:
"""
Download an artifact file or directory from a run to a local directory if applicable,
and return a local path for it.
Parameters
----------
path : str
Relative source path to the desired artifact.
dst_path : Optional[str]
Absolute path of the local filesystem destination directory to which to
download the specified artifacts. This directory must already exist.
If unspecified, the artifacts will either be downloaded to a new
uniquely-named directory on the local filesystem.
Returns
-------
str
Local path of desired artifact.
"""
raise NotImplementedError(f"Please implement the `list_artifacts` method.")
def list_metrics(self):
"""
List all the metrics of a recorder.
@@ -212,6 +248,14 @@ class MLflowRecorder(Recorder):
Due to the fact that mlflow will only log artifact from a file or directory, we decide to
use file manager to help maintain the objects in the project.
Instead of using mlflow directly, we use another interface wrapping mlflow to log experiments.
Though it takes extra efforts, but it brings users benefits due to following reasons.
- It will be more convenient to change the experiment logging backend without changing any code in upper level
- We can provide more convenience to automatically do some extra things and make interface easier. For examples:
- Automatically logging the uncommitted code
- Automatically logging part of environment variables
- User can control several different runs by just creating different Recorder (in mlflow, you always have to switch artifact_uri and pass in run ids frequently)
"""
def __init__(self, experiment_id, uri, name=None, mlflow_run=None):
@@ -304,6 +348,9 @@ class MLflowRecorder(Recorder):
self._log_uncommitted_code()
self.log_params(**{"cmd-sys.argv": " ".join(sys.argv)}) # log the command to produce current experiment
self.log_params(
**{k: v for k, v in os.environ.items() if k.startswith("_QLIB_")}
) # Log necessary environment variables
return run
def _log_uncommitted_code(self):
@@ -398,6 +445,9 @@ class MLflowRecorder(Recorder):
for name, data in kwargs.items():
self.client.log_metric(self.id, name, data, step=step)
def log_artifact(self, local_path, artifact_path: Optional[str] = None):
self.client.log_artifact(self.id, local_path=local_path, artifact_path=artifact_path)
@AsyncCaller.async_dec(ac_attr="async_log")
def set_tags(self, **kwargs):
for name, data in kwargs.items():
@@ -420,6 +470,9 @@ class MLflowRecorder(Recorder):
artifacts = self.client.list_artifacts(self.id, artifact_path)
return [art.path for art in artifacts]
def download_artifact(self, path: str, dst_path: Optional[str] = None) -> str:
return self.client.download_artifacts(self.id, path, dst_path)
def list_metrics(self):
run = self.client.get_run(self.id)
return run.data.metrics