1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-06-06 05:51:17 +08:00

Update expm and exp

This commit is contained in:
Jactus
2020-11-18 17:55:45 +08:00
parent 64ed43b791
commit 58bd2339c0
7 changed files with 176 additions and 207 deletions

View File

@@ -20,6 +20,7 @@ import requests
import tempfile
import importlib
import contextlib
import collections
import numpy as np
import pandas as pd
from pathlib import Path
@@ -641,6 +642,30 @@ def lexsort_index(df: pd.DataFrame) -> pd.DataFrame:
return df.sort_index()
def flatten_dict(d, parent_key="", sep="."):
"""flatten_dict.
>>> flatten_dict({'a': 1, 'c': {'a': 2, 'b': {'x': 5, 'y' : 10}}, 'd': [1, 2, 3]})
>>> {'a': 1, 'c.a': 2, 'c.b.x': 5, 'd': [1, 2, 3], 'c.b.y': 10}
Parameters
----------
d :
d
parent_key :
parent_key
sep :
sep
"""
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, collections.MutableMapping):
items.extend(flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
#################### Wrapper #####################
class Wrapper(object):
"""Wrapper class for anything that needs to set up during qlib.init"""

View File

@@ -323,7 +323,6 @@ class QlibRecorder:
experiment_name : str
name of the experiment.
Returns
-------
A recorder instance.

View File

@@ -165,6 +165,7 @@ class MLflowExperiment(Experiment):
super(MLflowExperiment, self).__init__(id, name)
self._uri = uri
self._default_name = None
self._default_rec_name = "mlflow_recorder"
self.client = mlflow.tracking.MlflowClient(tracking_uri=self._uri)
def start(self, recorder_name=None):
@@ -175,7 +176,7 @@ class MLflowExperiment(Experiment):
recorder = self.create_recorder(recorder_name)
self.active_recorder = recorder
# start the recorder
run = self.active_recorder.start_run()
self.active_recorder.start_run()
return self.active_recorder
@@ -186,13 +187,66 @@ class MLflowExperiment(Experiment):
def create_recorder(self, recorder_name=None):
if recorder_name is None:
recorders = self.list_recorders()
num = len(recorders)
recorder_name = "Recorder_{}".format(num + 1)
recorder = MLflowRecorder(recorder_name, self.id, self._uri)
recorder_name = self._default_rec_name
recorder = MLflowRecorder(self.id, self._uri, recorder_name)
return recorder
def get_recorder(self, recorder_id=None, recorder_name=None, create=True):
# special case of getting the recorder
if recorder_id is None and recorder_name is None:
if self.active_recorder is not None:
return self.active_recorder
recorder_name = self._default_rec_name
if create:
recorder, is_new = self._get_or_create_rec(recorder_id=recorder_id, recorder_name=recorder_name)
else:
recorder, is_new = self._get_recorder(recorder_id=recorder_id, recorder_name=recorder_name), False
if is_new:
mlflow.set_experiment(self.name)
self.active_recorder = recorder
# start the recorder
self.active_recorder.start_run()
return recorder
def _get_or_create_rec(self, recorder_id=None, recorder_name=None) -> (object, bool):
"""
Method for getting or creating a recorder. It will try to first get a valid recorder, if exception occurs, it will
automatically create a new recorder based on the given id and name.
"""
try:
return self._get_recorder(recorder_id=recorder_id, recorder_name=recorder_name), False
except ValueError:
if recorder_name is None:
recorder_name = self._default_rec_name
logger.info(f"No valid recorder found. Create a new recorder with name {recorder_name}.")
return self.create(recorder_name), True
def _get_recorder(self, recorder_id=None, recorder_name=None):
"""
Method for getting or creating a recorder. It will try to first get a valid recorder, if exception occurs, it will
raise errors.
"""
assert (
recorder_id is not None or recorder_name is not None
), "Please input at least one of recorder id or name before retrieving recorder."
if recorder_id is not None:
try:
run = self.client.get_run(recorder_id)
recorder = MLflowRecorder(self.id, self._uri, mlflow_run=run)
return recorder
except MlflowException as e:
raise ValueError("No valid recorder has been found, please make sure the input recorder id is correct.")
elif recorder_name is not None:
logger.warning(
f"Please make sure the recorder name {recorder_name} is unique, we will only return the first recorder if there exist several matched the given name."
)
recorders = self.list_recorders()
for rid in recorders:
if recorders[rid].name == recorder_name:
return recorders[rid]
raise ValueError("No valid recorder has been found, please make sure the input recorder name is correct.")
def search_records(self, **kwargs):
filter_string = "" if kwargs.get("filter_string") is None else kwargs.get("filter_string")
run_view_type = 1 if kwargs.get("run_view_type") is None else kwargs.get("run_view_type")
@@ -209,7 +263,6 @@ class MLflowExperiment(Experiment):
if recorder_id is not None:
self.client.delete_run(recorder_id)
else:
recorders = self.list_recorders()
recorder = self._get_recorder_by_name(recorder_name)
self.client.delete_run(recorder.id)
except MlflowException as e:
@@ -217,84 +270,11 @@ class MLflowExperiment(Experiment):
f"Error: {e}. Something went wrong when deleting recorder. Please check if the name/id of the recorder is correct."
)
def _get_recorder_by_id(self, recorder_id=None, create=False):
"""
Get a recorder by its id. If the `create` is set to True, this method will also start to run the recorder.
Parameters
----------
recorder_id : str
the id of the recorder to be returned.
create : boolean
create the recorder if it hasn't been created before.
Returns
-------
The specific recorder with given id.
"""
recorders = self.list_recorders()
if recorder_id in recorders:
return recorders[recorder_id]
else:
if create:
logger.warning(f"No valid recorder found. Create a new recorder with name {recorder_name}.")
self.start(recorder_name)
return self.active_recorder
else:
raise Exception(
"Something went wrong when retrieving recorders. Please check if id of the recorder is correct."
)
def _get_recorder_by_name(self, recorder_name=None, create=False):
"""
Get a recorder by its name. If the `create` is set to True, this method will also start to run the recorder.
Parameters
----------
recorder_name : str
the name of the recorder to be returned.
create : boolean
create the recorder if it hasn't been created before.
Returns
-------
The specific recorder with given name.
"""
recorders = self.list_recorders()
for rid in recorders:
if recorders[rid].name == recorder_name:
return recorders[rid]
if create:
logger.warning(f"No valid recorder found. Create a new recorder with name {recorder_name}.")
self.start(recorder_name)
return self.active_recorder
else:
raise Exception(
"Something went wrong when retrieving recorders. Please check if the name of the experiment is correct."
)
def get_recorder(self, recorder_id=None, recorder_name=None, create=True):
"""
MLflow doesn't support create recorder with a specific id. Thus, when user only provides recorder id and `create`
is set to True, this method will not automatically create an active recorder.
"""
# retrive all the recorders under this experiment
if recorder_id is None and recorder_name is None:
if self.active_recorder:
return self.active_recorder
else:
return self._get_recorder_by_name(create=create)
else:
if recorder_id is not None:
return self._get_recorder_by_id(recorder_id, create=create)
else:
return self._get_recorder_by_name(recorder_name, create=create)
def list_recorders(self):
runs = self.client.search_runs(self.id, run_view_type=1)[::-1]
recorders = dict()
for i in range(len(runs)):
recorder = MLflowRecorder(f"Recorder_{i+1}", self.id, self._uri, runs[i])
recorder = MLflowRecorder(self.id, self._uri, mlflow_run=runs[i])
recorders[runs[i].info.run_id] = recorder
return recorders

View File

@@ -57,6 +57,21 @@ class ExpManager:
"""
raise NotImplementedError(f"Please implement the `end_exp` method.")
def create_exp(self, experiment_name=None):
"""
Create an experiment.
Parameters
----------
experiment_name : str
the experiment name, which must be unique.
Returns
-------
An experiment object.
"""
raise NotImplementedError(f"Please implement the `create_exp` method.")
def search_records(self, experiment_ids=None, **kwargs):
"""
Get a pandas DataFrame of records that fit the search criteria of the experiment.
@@ -71,7 +86,7 @@ class ExpManager:
"""
raise NotImplementedError(f"Please implement the `search_records` method.")
def get_exp(self, experiment_id=None, experiment_name=None, create: bool = True, run: bool = False):
def get_exp(self, experiment_id=None, experiment_name=None, create: bool = True):
"""
Retrieve an experiment. This method includes getting an active experiment, and get_or_create a specific experiment.
The returned experiment will be running.
@@ -108,8 +123,6 @@ class ExpManager:
name of the experiment to return.
create : boolean
create the experiment it if hasn't been created before.
run : boolean
run the experiment when it is created for the first time.
Returns
-------
@@ -162,7 +175,7 @@ class MLflowExpManager(ExpManager):
def start_exp(self, experiment_name=None, recorder_name=None, uri=None):
# create experiment
experiment = self.get_exp(experiment_name=experiment_name, run=False)
experiment, _ = self._get_or_create_exp(experiment_name=experiment_name)
# set up active experiment
self.active_experiment = experiment
# start the experiment
@@ -183,94 +196,72 @@ class MLflowExpManager(ExpManager):
self.active_experiment.end(recorder_status)
self.active_experiment = None
def __get_exp_by_id(self, experiment_id=None, create=False, run=False):
"""
Method for retrieving an experiment by its id. If the `create` is set to True, this method will also start to run the experiment.
def create_exp(self, experiment_name=None):
# init experiment
experiment_id = self.client.create_experiment(experiment_name)
experiment = MLflowExperiment(experiment_id, experiment_name, self.uri)
experiment._default_name = self.default_exp_name
Parameters
----------
experiment_id : str
the id of the experiment to be returned.
create : boolean
create the experiment if it hasn't been created before.
return experiment
Returns
-------
The specific experiment with given id.
"""
# retrive all created experiments
experiments = self.list_experiments()
for name in experiments:
if experiments[name].id == experiment_id:
return experiments[name]
if create:
logger.warning(f"No valid experiment found. Use the Default experiment for further process.")
return self.__get_exp_by_name(create=create, run=True)
else:
raise Exception(
"Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct."
)
def __get_exp_by_name(self, experiment_name=None, create=False, run=False):
"""
Method for retrieving an experiment by its name. If the `create` is set to True, this method will also start to run the experiment.
Parameters
----------
experiment_name : str
the name of the experiment to be returned.
create : boolean
create the experiment if it hasn't been created before.
Returns
-------
The specific experiment with given name.
"""
# retrive all created experiments
experiments = self.list_experiments()
if experiment_name in experiments:
return experiments[experiment_name]
if create:
if experiment_name is None:
logger.info(
f"No experiment name provided. Create experiment with name {self.default_exp_name} for further process."
)
experiment_name = self.default_exp_name
if self.client.get_experiment_by_name(experiment_name) is not None:
logger.info(
"The experiment has already been created before and deleted. Try to restore the experiment with a new recorder..."
)
experiment_id = self.client.get_experiment_by_name(experiment_name).experiment_id
self.client.restore_experiment(experiment_id)
else:
experiment_id = self.client.create_experiment(experiment_name)
# init experiment
experiment = MLflowExperiment(experiment_id, experiment_name, self.uri)
experiment._default_name = self.default_exp_name
if run:
self.active_experiment = experiment
self.active_experiment.start()
return experiment
else:
if experiment_name is None and self.default_exp_name in experiments:
return experiments[self.default_exp_name]
raise Exception(
"Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct."
)
def get_exp(self, experiment_id=None, experiment_name=None, create=True, run=True):
def get_exp(self, experiment_id=None, experiment_name=None, create=True):
# special case of getting experiment
if experiment_id is None and experiment_name is None:
if self.active_experiment:
if self.active_experiment is not None:
return self.active_experiment
else:
return self.__get_exp_by_name(create=create, run=run)
if create:
exp, is_new = self._get_or_create_exp(experiment_id=experiment_id, experiment_name=experiment_name)
else:
if experiment_name is not None:
return self.__get_exp_by_name(experiment_name, create=create, run=run)
else:
return self.__get_exp_by_id(experiment_id, create=create, run=run)
exp, is_new = self._get_exp(experiment_id=experiment_id, experiment_name=experiment_name), False
if is_new:
self.active_experiment = exp
# start the recorder
self.active_experiment.start()
return exp
def _get_or_create_exp(self, experiment_id=None, experiment_name=None) -> (object, bool):
"""
Method for getting or creating an experiment. It will try to first get a valid experiment, if exception occurs, it will
automatically create a new experiment based on the given id and name.
"""
try:
return self._get_exp(experiment_id=experiment_id, experiment_name=experiment_name), False
except ValueError:
if experiment_name is None:
experiment = self.default_exp_name
logger.info(f"No valid experiment found. Create a new experiment with name {experiment_name}.")
return self.create_exp(experiment_name), True
def _get_exp(self, experiment_id=None, experiment_name=None):
"""
Method for getting or creating an experiment. It will try to first get a valid experiment, if exception occurs, it will
raise errors.
"""
assert (
experiment_id is not None or experiment_name is not None
), "Please input at least one of experiment/recorder id or name before retrieving experiment/recorder."
if experiment_id is not None:
try:
exp = self.client.get_experiment(experiment_id)
if exp.lifecycle_stage.upper() == "DELETED":
raise MlflowException("No valid experiment has been found.")
experiment = MLflowExperiment(exp.experiment_id, exp.name, self.uri)
return experiment
except MlflowException as e:
raise ValueError(
"No valid experiment has been found, please make sure the input experiment id is correct."
)
elif experiment_name is not None:
try:
exp = self.client.get_experiment_by_name(experiment_name)
if exp is None or exp.lifecycle_stage.upper() == "DELETED":
raise MlflowException("No valid experiment has been found.")
experiment = MLflowExperiment(exp.experiment_id, experiment_name, self.uri)
return experiment
except MlflowException as e:
raise ValueError(
"No valid experiment has been found, please make sure the input experiment name is correct."
)
def search_records(self, experiment_ids, **kwargs):
filter_string = "" if kwargs.get("filter_string") is None else kwargs.get("filter_string")
@@ -288,6 +279,8 @@ class MLflowExpManager(ExpManager):
self.client.delete_experiment(experiment_id)
else:
experiment = self.client.get_experiment_by_name(experiment_name)
if experiment is None:
raise MlflowException("No valid experiment has been found.")
self.client.delete_experiment(experiment.experiment_id)
except MlflowException as e:
raise Exception(
@@ -299,9 +292,7 @@ class MLflowExpManager(ExpManager):
exps = self.client.list_experiments(view_type=1)
experiments = dict()
for exp in exps:
eid = exp.experiment_id
ename = exp.name
experiment = MLflowExperiment(eid, ename, self.uri)
experiment = MLflowExperiment(exp.experiment_id, exp.name, self.uri)
experiments[ename] = experiment
return experiments

View File

@@ -10,6 +10,7 @@ from ..contrib.evaluate import (
)
from ..utils import init_instance_by_config, get_module_by_module_path
from ..log import get_module_logger
from ..utils import flatten_dict
logger = get_module_logger("workflow", "INFO")
@@ -149,37 +150,11 @@ class PortAnaRecord(SignalRecord):
analysis["excess_return_with_cost"] = risk_analysis(
report_normal["return"] - report_normal["bench"] - report_normal["cost"]
)
# log metrics
self.recorder.log_metrics(
excess_return_without_cost_mean=analysis["excess_return_without_cost"]["risk"]["mean"]
)
self.recorder.log_metrics(excess_return_without_cost_std=analysis["excess_return_without_cost"]["risk"]["std"])
self.recorder.log_metrics(
excess_return_without_cost_annualized_return=analysis["excess_return_without_cost"]["risk"][
"annualized_return"
]
)
self.recorder.log_metrics(
excess_return_without_cost_information_ratio=analysis["excess_return_without_cost"]["risk"][
"information_ratio"
]
)
self.recorder.log_metrics(
excess_return_without_cost_max_drawdown=analysis["excess_return_without_cost"]["risk"]["max_drawdown"]
)
self.recorder.log_metrics(excess_return_with_cost_mean=analysis["excess_return_with_cost"]["risk"]["mean"])
self.recorder.log_metrics(excess_return_with_cost_std=analysis["excess_return_with_cost"]["risk"]["std"])
self.recorder.log_metrics(
excess_return_with_cost_annualized_return=analysis["excess_return_with_cost"]["risk"]["annualized_return"]
)
self.recorder.log_metrics(
excess_return_with_cost_information_ratio=analysis["excess_return_with_cost"]["risk"]["information_ratio"]
)
self.recorder.log_metrics(
excess_return_with_cost_max_drawdown=analysis["excess_return_with_cost"]["risk"]["max_drawdown"]
)
# save portfolio analysis results
analysis_df = pd.concat(analysis) # type: pd.DataFrame
# log metrics
self.recorder.log_metrics(**flatten_dict(analysis_df["risk"].unstack().T.to_dict()))
# save results
self.recorder.save_objects(**{"port_analysis.pkl": analysis_df}, artifact_path=self.artifact_path)
logger.info(
f"Portfolio analysis record 'port_analysis.pkl' has been saved as the artifact of the Experiment {self.recorder.experiment_id}"

View File

@@ -25,7 +25,7 @@ class Recorder:
STATUS_FI = "FINISHED"
STATUS_FA = "FAILED"
def __init__(self, name, experiment_id):
def __init__(self, experiment_id, name):
self.id = None
self.name = name
self.experiment_id = experiment_id
@@ -168,8 +168,8 @@ class MLflowRecorder(Recorder):
use file manager to help maintain the objects in the project.
"""
def __init__(self, name, experiment_id, uri, mlflow_run=None):
super(MLflowRecorder, self).__init__(name, experiment_id)
def __init__(self, experiment_id, uri, name=None, mlflow_run=None):
super(MLflowRecorder, self).__init__(experiment_id, name)
self._uri = uri
self.artifact_uri = None
# set up file manager for saving objects
@@ -179,7 +179,7 @@ class MLflowRecorder(Recorder):
# construct from mlflow run
if mlflow_run is not None:
assert isinstance(mlflow_run, mlflow.entities.run.Run), "Please input with a MLflow Run object."
self.name = mlflow_run.data.tags["mlflow.runName"] if mlflow_run.data.tags["mlflow.runName"] != "" else name
self.name = mlflow_run.data.tags["mlflow.runName"]
self.id = mlflow_run.info.run_id
self.status = mlflow_run.info.status
self.start_time = (

View File

@@ -31,10 +31,9 @@ def experiment_exception_hook(type, value, tb):
value: Exception's value
tb: Exception's traceback
"""
error_msg = "An exception has been raised.\n" f"Type: {type}\n"
logger.error(error_msg)
logger.error("An exception has been raised.")
traceback.print_tb(tb)
logger.error(f"Value: {value}")
print(f"{type}: {value}")
R.end_exp(recorder_status=Recorder.STATUS_FA)