1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-05 03:50:57 +08:00

Update R and workflow

This commit is contained in:
Jactus
2020-11-17 22:05:18 +08:00
parent a8b46dd41d
commit 64ed43b791
20 changed files with 481 additions and 376 deletions

View File

@@ -59,7 +59,14 @@ Besides `provider_uri` and `region`, `qlib.init` has other parameters. The follo
If Qlib fails to connect redis via `redis_host` and `redis_port`, cache mechanism will not be used! Please refer to `Cache <../component/data.html#cache>`_ for details.
- `exp_manager`
Type: str, optional parameter(default: "MLflowExpManager"), the experiment manager to be used in qlib.
- `exp_uri`
Type: str, optional parameter(default: "mlruns" in local execution path), the tracking uri of the experiment manager.
It can either be a local path or a remote uri.
Type: dict, optional parameter, the setting of experiment manager to be used in qlib. Users can specify an experiment manager class, as well as the tracking URI for all the experiments. However, please be aware that we only support input of a dictionary in the following style for `exp_manager`.
::
{
"class": "MLflowExpManager",
"module_path": "qlib.workflow.expm",
"kwargs": {
"uri": "python_execution_path/mlruns"),
"default_exp_name": "Experiment",
}
}

View File

@@ -89,7 +89,7 @@ if __name__ == "__main__":
"kwargs": {
"topk": 50,
"n_drop": 5,
}
},
},
"backtest": {
"verbose": False,

View File

@@ -89,7 +89,7 @@ if __name__ == "__main__":
"kwargs": {
"topk": 50,
"n_drop": 5,
}
},
},
"backtest": {
"verbose": False,
@@ -113,7 +113,6 @@ if __name__ == "__main__":
R.save_objects(init_model=model)
rid = R.get_recorder().id
# Finetune model based on previous trained model
with R.start(experiment_name="finetune model"):
recorder = R.get_recorder(rid, experiment_name="init models")

View File

@@ -1,49 +0,0 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import sys
from pathlib import Path
import qlib
import fire
import yaml
import pandas as pd
from qlib.config import REG_CN
from qlib.utils import exists_qlib_data, init_instance_by_config
from qlib.workflow import R
from qlib.workflow.record_temp import SignalRecord, PortAnaRecord
# worflow handler function
def workflow(config_path):
with open(config_path) as fp:
config = yaml.load(fp, Loader=yaml.FullLoader)
provider_uri = config.get("PROVIDER_URI")
if not exists_qlib_data(provider_uri):
print(f"Qlib data is not found in {provider_uri}")
sys.path.append(str(Path(__file__).resolve().parent.parent.joinpath("scripts")))
from get_data import GetData
GetData().qlib_data_cn(target_dir=provider_uri)
qlib.init(provider_uri=provider_uri, region=REG_CN)
# model initiaiton
model = init_instance_by_config(config.get("TASK")["model"])
dataset = init_instance_by_config(config.get("TASK")["dataset"])
# start exp
with R.start("workflow"):
model.fit(dataset)
# prediction
recorder = R.get_recorder()
sr = SignalRecord(model, dataset, recorder)
sr.generate()
# backtest
par = PortAnaRecord(recorder, config.get("PORT_ANALYSIS_CONFIG"))
par.generate()
if __name__ == "__main__":
fire.Fire(workflow)

View File

@@ -1,13 +1,29 @@
PROVIDER_URI: "~/.qlib/qlib_data/cn_data"
MARKET: &market csi300
BENCHMARK: &benchmark SH000300
DATA_HANDLER_CONFIG: &data_handerler_config
provider_uri: "~/.qlib/qlib_data/cn_data"
market: &market csi300
benchmark: &benchmark SH000300
data_handler_config: &data_handler_config
start_time: 2008-01-01
end_time: 2020-08-01
fit_start_time: 2008-01-01
fit_end_time: 2014-12-31
instruments: *market
TASK:
port_analysis_config: &port_analysis_config
strategy:
class: TopkDropoutStrategy
module_path: qlib.contrib.strategy.strategy
kwargs:
topk: 50
n_drop: 5
backtest:
verbose: False
limit_threshold: 0.095
account: 100000000
benchmark: *benchmark
deal_price: close
open_cost: 0.0005
close_cost: 0.0015
min_cost: 5
task:
model:
class: LGBModel
module_path: qlib.contrib.model.gbdt
@@ -28,25 +44,16 @@ TASK:
handler:
class: Alpha158
module_path: qlib.contrib.data.handler
kwargs: *data_handerler_config
kwargs: *data_handler_config
segments:
train: [2008-01-01, 2014-12-31]
valid: [2015-01-01, 2016-12-31]
test: [2017-01-01, 2020-08-01]
record: [SignalRecord, PortAnaRecord]
PORT_ANALYSIS_CONFIG:
strategy:
class: TopkDropoutStrategy
module_path: qlib.contrib.strategy.strategy
kwargs:
topk: 50
n_drop: 5
backtest:
verbose: False
limit_threshold: 0.095
account: 100000000
benchmark: *benchmark
deal_price: close
open_cost: 0.0005
close_cost: 0.0015
min_cost: 5
record:
- class: SignalRecord
module_path: qlib.workflow.record_temp
kwargs: {}
- class: PortAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
config: *port_analysis_config

View File

@@ -9,15 +9,13 @@ import re
import sys
import copy
import yaml
import atexit
import signal
import logging
import platform
import subprocess
from pathlib import Path
from .utils import can_use_cache, init_instance_by_config, get_module_by_module_path
from .workflow.utils import experiment_exception_hook, experiment_kill_signal_handler
from .workflow.utils import experiment_exit_handler
# init qlib
def init(default_conf="client", **kwargs):
@@ -46,14 +44,9 @@ def init(default_conf="client", **kwargs):
C.set_region(kwargs.get("region", C["region"] if "region" in C else REG_CN))
for k, v in kwargs.items():
if k == "exp_manager":
C["exp_manager"].update({"class": v})
elif k == "exp_uri":
C["exp_manager"]["kwargs"].update({"uri": v})
else:
C[k] = v
if k not in C:
LOG.warning("Unrecognized config %s" % k)
C[k] = v
if k not in C:
LOG.warning("Unrecognized config %s" % k)
C.resolve_path()
@@ -93,9 +86,7 @@ def init(default_conf="client", **kwargs):
qr = QlibRecorder(exp_manager)
R.register(qr)
# clean up experiment when python program ends
atexit.register(R.end_exp, recorder_status="FINISHED") # will not take effect if experiment ends
signal.signal(signal.SIGINT, experiment_kill_signal_handler)
sys.excepthook = experiment_exception_hook
experiment_exit_handler()
def _mount_nfs_uri(C):

View File

@@ -9,6 +9,7 @@ from ...log import TimeInspector
from inspect import getfullargspec
import copy
class ALPHA360_Denoise(DataHandlerLP):
def __init__(self, instruments="csi500", start_time=None, end_time=None, fit_start_time=None, fit_end_time=None):
data_loader = {

View File

@@ -12,6 +12,7 @@ from ...data.dataset.handler import DataHandlerLP
class LGBModel(ModelFT):
"""LightGBM Model"""
def __init__(self, loss="mse", **kwargs):
if loss not in {"mse", "binary"}:
raise NotImplementedError
@@ -20,9 +21,9 @@ class LGBModel(ModelFT):
self.model = None
def _prepare_data(self, dataset: DatasetH):
df_train, df_valid = dataset.prepare(["train", "valid"],
col_set=["feature", "label"],
data_key=DataHandlerLP.DK_L)
df_train, df_valid = dataset.prepare(
["train", "valid"], col_set=["feature", "label"], data_key=DataHandlerLP.DK_L
)
x_train, y_train = df_train["feature"], df_train["label"]
x_valid, y_valid = df_valid["feature"], df_valid["label"]
@@ -36,23 +37,27 @@ class LGBModel(ModelFT):
dvalid = lgb.Dataset(x_valid.values, label=y_valid)
return dtrain, dvalid
def fit(self,
dataset: DatasetH,
num_boost_round=1000,
early_stopping_rounds=50,
verbose_eval=20,
evals_result=dict(),
**kwargs):
def fit(
self,
dataset: DatasetH,
num_boost_round=1000,
early_stopping_rounds=50,
verbose_eval=20,
evals_result=dict(),
**kwargs
):
dtrain, dvalid = self._prepare_data(dataset)
self.model = lgb.train(self.params,
dtrain,
num_boost_round=num_boost_round,
valid_sets=[dtrain, dvalid],
valid_names=["train", "valid"],
early_stopping_rounds=early_stopping_rounds,
verbose_eval=verbose_eval,
evals_result=evals_result,
**kwargs)
self.model = lgb.train(
self.params,
dtrain,
num_boost_round=num_boost_round,
valid_sets=[dtrain, dvalid],
valid_names=["train", "valid"],
early_stopping_rounds=early_stopping_rounds,
verbose_eval=verbose_eval,
evals_result=evals_result,
**kwargs
)
evals_result["train"] = list(evals_result["train"].values())[0]
evals_result["valid"] = list(evals_result["valid"].values())[0]
@@ -76,10 +81,12 @@ class LGBModel(ModelFT):
verbose level
"""
dtrain, _ = self._prepare_data(dataset)
self.model = lgb.train(self.params,
dtrain,
num_boost_round=num_boost_round,
init_model=self.model,
valid_sets=[dtrain],
valid_names=["train"],
verbose_eval=verbose_eval)
self.model = lgb.train(
self.params,
dtrain,
num_boost_round=num_boost_round,
init_model=self.model,
valid_sets=[dtrain],
valid_names=["train"],
verbose_eval=verbose_eval,
)

View File

@@ -50,7 +50,7 @@ class GRU(Model):
dropout=0.0,
n_epochs=200,
lr=0.001,
metric='IC',
metric="IC",
batch_size=2000,
early_stop=20,
loss="mse",
@@ -134,48 +134,48 @@ class GRU(Model):
os.environ["CUDA_VISIBLE_DEVICES"] = self.visible_GPU
def mse(self, pred, label):
loss = (pred - label)**2
loss = (pred - label) ** 2
return torch.mean(loss)
def loss_fn(self, pred, label):
mask = ~torch.isnan(label)
if self.loss == 'mse':
if self.loss == "mse":
return self.mse(pred[mask], label[mask])
raise ValueError('unknown loss `%s`'%self.loss)
raise ValueError("unknown loss `%s`" % self.loss)
def metric_fn(self, pred, label):
mask = torch.isfinite(label)
if self.metric == 'IC':
if self.metric == "IC":
return self.cal_ic(pred[mask], label[mask])
if self.metric == '' or self.metric == 'loss': # use loss
if self.metric == "" or self.metric == "loss": # use loss
return -self.loss_fn(pred[mask], label[mask])
raise ValueError('unknown metric `%s`'%self.metric)
raise ValueError("unknown metric `%s`" % self.metric)
def cal_ic(self, pred, label):
return torch.mean(pred * label)
return torch.mean(pred * label)
def train_epoch(self, x_train, y_train):
x_train_values = x_train.values
y_train_values = np.squeeze(y_train.values)*100
y_train_values = np.squeeze(y_train.values) * 100
self.gru_model.train()
indices = np.arange(len(x_train_values))
np.random.shuffle(indices)
for i in range(len(indices))[::self.batch_size]:
for i in range(len(indices))[:: self.batch_size]:
if len(indices) - i < self.batch_size:
break
feature = torch.from_numpy(x_train_values[indices[i:i+self.batch_size]]).float()
label = torch.from_numpy(y_train_values[indices[i:i+self.batch_size]]).float()
feature = torch.from_numpy(x_train_values[indices[i : i + self.batch_size]]).float()
label = torch.from_numpy(y_train_values[indices[i : i + self.batch_size]]).float()
if self.use_gpu:
feature = feature.cuda()
@@ -186,10 +186,9 @@ class GRU(Model):
self.train_optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_value_(self.gru_model.parameters(), 3.)
torch.nn.utils.clip_grad_value_(self.gru_model.parameters(), 3.0)
self.train_optimizer.step()
def test_epoch(self, data_x, data_y):
# prepare training data
@@ -204,13 +203,13 @@ class GRU(Model):
indices = np.arange(len(x_values))
np.random.shuffle(indices)
for i in range(len(indices))[::self.batch_size]:
for i in range(len(indices))[:: self.batch_size]:
if len(indices) - i < self.batch_size:
break
feature = torch.from_numpy(x_values[indices[i:i+self.batch_size]]).float()
label = torch.from_numpy(y_values[indices[i:i+self.batch_size]]).float()
feature = torch.from_numpy(x_values[indices[i : i + self.batch_size]]).float()
label = torch.from_numpy(y_values[indices[i : i + self.batch_size]]).float()
if self.use_gpu:
feature = feature.cuda()
@@ -255,13 +254,13 @@ class GRU(Model):
# return
for step in range(self.n_epochs):
self.logger.info('Epoch%d:', step)
self.logger.info('training...')
self.logger.info("Epoch%d:", step)
self.logger.info("training...")
self.train_epoch(x_train, y_train)
self.logger.info('evaluating...')
self.logger.info("evaluating...")
train_loss, train_score = self.test_epoch(x_train, y_train)
val_loss, val_score = self.test_epoch(x_valid, y_valid)
self.logger.info('train %.6f, valid %.6f'%(train_score, val_score))
self.logger.info("train %.6f, valid %.6f" % (train_score, val_score))
evals_result["train"].append(train_score)
evals_result["valid"].append(val_score)
@@ -273,17 +272,16 @@ class GRU(Model):
else:
stop_steps += 1
if stop_steps >= self.early_stop:
self.logger.info('early stop')
self.logger.info("early stop")
break
self.logger.info('best score: %.6lf @ %d'%(best_score, best_epoch))
self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch))
self.gru_model.load_state_dict(best_param)
torch.save(best_param, save_path)
if self.use_gpu:
torch.cuda.empty_cache()
def predict(self, dataset):
if not self._fitted:
raise ValueError("model is not fitted yet!")
@@ -295,16 +293,15 @@ class GRU(Model):
sample_num = x_values.shape[0]
preds = []
for begin in range(sample_num)[::self.batch_size]:
for begin in range(sample_num)[:: self.batch_size]:
if sample_num-begin < self.batch_size:
if sample_num - begin < self.batch_size:
end = sample_num
else:
end = begin+self.batch_size
end = begin + self.batch_size
x_batch = torch.from_numpy(x_values[begin:end]).float()
if self.use_gpu:
x_batch = x_batch.cuda()

View File

@@ -89,11 +89,13 @@ class DropnaLabel(DropnaProcessor):
"""The samples are dropped according to label. So it is not usable for inference"""
return False
class TanhProcess(Processor):
""" Use tanh to process noise data"""
def __call__(self, df):
def tanh_denoise(data):
mask = data.columns.get_level_values(1).str.contains('LABEL')
mask = data.columns.get_level_values(1).str.contains("LABEL")
col = df.columns[~mask]
data[col] = data[col] - 1
data[col] = np.tanh(data[col])

View File

@@ -48,7 +48,7 @@ class Model(BaseModel):
class ModelFT(Model):
'''Model (F)ine(t)unable'''
"""Model (F)ine(t)unable"""
@abc.abstractmethod
def finetune(self, dataset: Dataset):

View File

@@ -643,7 +643,7 @@ def lexsort_index(df: pd.DataFrame) -> pd.DataFrame:
#################### Wrapper #####################
class Wrapper(object):
"""Data Provider Wrapper"""
"""Wrapper class for anything that needs to set up during qlib.init"""
def __init__(self):
self._provider = None

View File

@@ -32,14 +32,14 @@ class QlibRecorder:
self.exp_manager = exp_manager
@contextmanager
def start(self, experiment_name=None):
def start(self, experiment_name=None, recorder_name=None):
"""
Method to start an experiment. This method can only be called within a Python's `with` statement.
Use case:
---------
```
with R.start('test'):
with R.start('test', 'recorder_1'):
model.fit(dataset)
R.log...
... # further operations
@@ -49,8 +49,10 @@ class QlibRecorder:
----------
experiment_name : str
name of the experiment one wants to start.
recorder_name : str
name of the recorder under the experiment one wants to start.
"""
run = self.start_exp(experiment_name)
run = self.start_exp(experiment_name, recorder_name)
try:
yield run
except Exception as e:
@@ -58,7 +60,7 @@ class QlibRecorder:
raise e
self.end_exp(Recorder.STATUS_FI)
def start_exp(self, experiment_name=None, uri=None):
def start_exp(self, experiment_name=None, recorder_name=None, uri=None):
"""
Lower level method for starting an experiment. When use this method, one should end the experiment manually
and the status of the recorder may not be handled properly.
@@ -66,7 +68,7 @@ class QlibRecorder:
Use case:
---------
```
R.start_exp(experiment_name='test')
R.start_exp(experiment_name='test', recorder_name='recorder_1')
... # further operations
R.end_exp('FINISHED') or R.end_exp(Recorder.STATUS_S)
```
@@ -75,14 +77,17 @@ class QlibRecorder:
----------
experiment_name : str
the name of the experiment to be started
recorder_name : str
name of the recorder under the experiment one wants to start.
uri : str
the tracking uri of the experiment, where all the artifacts/metrics etc. will be stored.
The default uri are set in the qlib.config.
Returns
-------
An experiment instance being started.
"""
return self.exp_manager.start_exp(experiment_name, uri)
return self.exp_manager.start_exp(experiment_name, recorder_name, uri)
def end_exp(self, recorder_status=Recorder.STATUS_FI):
"""
@@ -188,9 +193,9 @@ class QlibRecorder:
2) if id or name is specified, return the specified experiment. If no such exp found,
create a new experiment with given id or name, and the experiment is set to be running.
If R's not running:
1) no id or name specified, create a default experiment.
1) no id or name specified, create a default experiment, and the experiment is set to be running.
2) if id or name is specified, return the specified experiment. If no such exp found,
create a new experiment with given id or name, and the experiment is set to be running.
create a new experiment with given name or the default experiment, and the experiment is set to be running.
Else If `create` is False:
If R's running:
1) no id or name specified, return the active experiment.
@@ -367,8 +372,7 @@ class QlibRecorder:
# Case 1
with R.start('test'):
pred = model.predict(dataset)
kwargs = {"pred.pkl": pred}
R.save_objects(**kwargs, artifact_path='prediction')
R.save_objects(**{"pred.pkl": pred}, artifact_path='prediction')
# Case 2
with R.start('test'):

53
qlib/workflow/cli.py Normal file
View File

@@ -0,0 +1,53 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import sys
from pathlib import Path
import qlib
import fire
import yaml
import pandas as pd
from qlib.config import REG_CN
from qlib.utils import init_instance_by_config
from qlib.workflow import R
from qlib.workflow.record_temp import SignalRecord
# worflow handler function
def workflow(config_path):
with open(config_path) as fp:
config = yaml.load(fp, Loader=yaml.FullLoader)
provider_uri = config.get("provider_uri")
qlib.init(provider_uri=provider_uri, region=REG_CN)
# model initiaiton
model = init_instance_by_config(config.get("task")["model"])
dataset = init_instance_by_config(config.get("task")["dataset"])
# start exp
with R.start("workflow"):
model.fit(dataset)
recorder = R.get_recorder()
# generate records
for record in config.get("task")["record"]:
if record["class"] == SignalRecord.__name__:
srconf = {"model": model, "dataset": dataset, "recorder": recorder}
record["kwargs"].update(srconf)
sr = init_instance_by_config(record)
sr.generate()
else:
rconf = {"recorder": recorder}
record["kwargs"].update(rconf)
ar = init_instance_by_config(record)
ar.generate()
# function to run worklflow by config
def run():
fire.Fire(workflow)
if __name__ == "__main__":
run()

View File

@@ -2,7 +2,7 @@
# Licensed under the MIT License.
import mlflow
from datetime import datetime
from mlflow.exceptions import MlflowException
from pathlib import Path
from .recorder import Recorder, MLflowRecorder
from ..log import get_module_logger
@@ -38,10 +38,15 @@ class Experiment:
output["recorders"] = list(recorders.keys())
return output
def start(self):
def start(self, recorder_name=None):
"""
Start the experiment and set it to be active. This method will also start a new recorder.
Parameters
----------
recorder_name : str
the name of the recorder to be created.
Returns
-------
An active recorder.
@@ -59,10 +64,15 @@ class Experiment:
"""
raise NotImplementedError(f"Please implement the `end` method.")
def create_recorder(self):
def create_recorder(self, name=None):
"""
Create a recorder for each experiment.
Parameters
----------
name : str
the name of the recorder to be created.
Returns
-------
A recorder object.
@@ -126,6 +136,8 @@ class Experiment:
the id of the recorder to be deleted.
recorder_name : str
the name of the recorder to be deleted.
create : boolean
create the recorder if it hasn't been created before.
Returns
-------
@@ -155,12 +167,12 @@ class MLflowExperiment(Experiment):
self._default_name = None
self.client = mlflow.tracking.MlflowClient(tracking_uri=self._uri)
def start(self):
def start(self, recorder_name=None):
# set the active experiment
mlflow.set_experiment(self.name)
logger.info(f"Experiment {self.id} starts running ...")
# set up recorder
recorder = self.create_recorder()
recorder = self.create_recorder(recorder_name)
self.active_recorder = recorder
# start the recorder
run = self.active_recorder.start_run()
@@ -172,11 +184,12 @@ class MLflowExperiment(Experiment):
self.active_recorder.end_run(recorder_status)
self.active_recorder = None
def create_recorder(self):
recorders = self.list_recorders()
num = len(recorders)
name = "Recorder_{}".format(num + 1)
recorder = MLflowRecorder(name, self.id, self._uri)
def create_recorder(self, recorder_name=None):
if recorder_name is None:
recorders = self.list_recorders()
num = len(recorders)
recorder_name = "Recorder_{}".format(num + 1)
recorder = MLflowRecorder(recorder_name, self.id, self._uri)
return recorder
@@ -197,14 +210,67 @@ class MLflowExperiment(Experiment):
self.client.delete_run(recorder_id)
else:
recorders = self.list_recorders()
for r in recorders:
if recorders[r].name == recorder_name:
recorder_id = r
break
self.client.delete_run(recorder_id)
except:
recorder = self._get_recorder_by_name(recorder_name)
self.client.delete_run(recorder.id)
except MlflowException as e:
raise Exception(
"Something went wrong when deleting recorder. Please check if the name/id of the recorder is correct."
f"Error: {e}. Something went wrong when deleting recorder. Please check if the name/id of the recorder is correct."
)
def _get_recorder_by_id(self, recorder_id=None, create=False):
"""
Get a recorder by its id. If the `create` is set to True, this method will also start to run the recorder.
Parameters
----------
recorder_id : str
the id of the recorder to be returned.
create : boolean
create the recorder if it hasn't been created before.
Returns
-------
The specific recorder with given id.
"""
recorders = self.list_recorders()
if recorder_id in recorders:
return recorders[recorder_id]
else:
if create:
logger.warning(f"No valid recorder found. Create a new recorder with name {recorder_name}.")
self.start(recorder_name)
return self.active_recorder
else:
raise Exception(
"Something went wrong when retrieving recorders. Please check if id of the recorder is correct."
)
def _get_recorder_by_name(self, recorder_name=None, create=False):
"""
Get a recorder by its name. If the `create` is set to True, this method will also start to run the recorder.
Parameters
----------
recorder_name : str
the name of the recorder to be returned.
create : boolean
create the recorder if it hasn't been created before.
Returns
-------
The specific recorder with given name.
"""
recorders = self.list_recorders()
for rid in recorders:
if recorders[rid].name == recorder_name:
return recorders[rid]
if create:
logger.warning(f"No valid recorder found. Create a new recorder with name {recorder_name}.")
self.start(recorder_name)
return self.active_recorder
else:
raise Exception(
"Something went wrong when retrieving recorders. Please check if the name of the experiment is correct."
)
def get_recorder(self, recorder_id=None, recorder_name=None, create=True):
@@ -213,68 +279,22 @@ class MLflowExperiment(Experiment):
is set to True, this method will not automatically create an active recorder.
"""
# retrive all the recorders under this experiment
recorders = self.list_recorders()
if recorder_id is None and recorder_name is None:
if self.active_recorder:
return self.active_recorder
else:
if create:
self.start()
logger.warning(
f"Recorder {self.active_recorder.id} is running under the experiment with name {self.name}..."
)
return self.active_recorder
else:
raise Exception(
"Something went wrong when retrieving recorders. Please check if QlibRecorder is running."
)
return self._get_recorder_by_name(create=create)
else:
if recorder_id is not None:
if recorder_id in recorders:
return recorders[recorder_id]
else:
# mlflow does not support create a run with given id
raise Exception(
"Something went wrong when retrieving recorders. Please check if id of the recorder is correct."
)
return self._get_recorder_by_id(recorder_id, create=create)
else:
for rid in recorders:
if recorders[rid].name == recorder_name:
return recorders[rid]
if create:
recorders = self.list_recorders()
logger.warning(f"No valid recorder found. Create a new recorder with name {recorder_name}.")
recorder = self.create_recorder()
recorder.name = recorder_name
recorder.start_run()
return recorder
else:
raise Exception(
"Something went wrong when retrieving experiments. Please check if the name of the experiment is correct."
)
return self._get_recorder_by_name(recorder_name, create=create)
def list_recorders(self):
runs = self.client.list_run_infos(self.id, run_view_type=1)[::-1]
runs = self.client.search_runs(self.id, run_view_type=1)[::-1]
recorders = dict()
for i in range(len(runs)):
rid = runs[i].run_id
status = runs[i].status
start_time = runs[i].start_time
end_time = runs[i].end_time
recorder = MLflowRecorder(f"Recorder_{i+1}", self.id, self._uri)
recorder.id = rid
recorder.status = status
recorder.start_time = (
datetime.fromtimestamp(float(start_time) / 1000.0).strftime("%Y-%m-%d %H:%M:%S")
if start_time is not None
else None
)
recorder.end_time = (
datetime.fromtimestamp(float(end_time) / 1000.0).strftime("%Y-%m-%d %H:%M:%S")
if end_time is not None
else None
)
recorder._uri = self._uri
recorders[rid] = recorder
recorder = MLflowRecorder(f"Recorder_{i+1}", self.id, self._uri, runs[i])
recorders[runs[i].info.run_id] = recorder
return recorders

View File

@@ -2,6 +2,7 @@
# Licensed under the MIT License.
import mlflow
from mlflow.exceptions import MlflowException
import os
from pathlib import Path
from contextlib import contextmanager
@@ -23,14 +24,17 @@ class ExpManager:
self.default_exp_name = default_exp_name
self.active_experiment = None # only one experiment can running each time
def start_exp(self, experiment_name=None, uri=None, **kwargs):
def start_exp(self, experiment_name=None, recorder_name=None, uri=None, **kwargs):
"""
Start an experiment.
Start an experiment. This method includes first get_or_create an experiment, and then
set it to be running.
Parameters
----------
experiment_name : str
name of the active experiment.
recorder_name : str
name of the recorder to be started.
uri : str
the current tracking URI.
@@ -38,14 +42,7 @@ class ExpManager:
-------
An active experiment.
"""
# create experiment
experiment = self.create_exp(experiment_name, uri)
# set up active experiment
self.active_experiment = experiment
# start the experiment
self.active_experiment.start()
return self.active_experiment
raise NotImplementedError(f"Please implement the `start_exp` method.")
def end_exp(self, recorder_status: str = Recorder.STATUS_S, **kwargs):
"""
@@ -58,9 +55,7 @@ class ExpManager:
recorder_status : str
the status of the active recorder of the experiment.
"""
if self.active_experiment is not None:
self.active_experiment.end(recorder_status)
self.active_experiment = None
raise NotImplementedError(f"Please implement the `end_exp` method.")
def search_records(self, experiment_ids=None, **kwargs):
"""
@@ -76,29 +71,15 @@ class ExpManager:
"""
raise NotImplementedError(f"Please implement the `search_records` method.")
def create_exp(self, experiment_name=None, uri=None):
def get_exp(self, experiment_id=None, experiment_name=None, create: bool = True, run: bool = False):
"""
Create an experiment.
Retrieve an experiment. This method includes getting an active experiment, and get_or_create a specific experiment.
The returned experiment will be running.
Parameters
----------
experiment_name : str
the experiment name, which must be unique.
uri : str
the tracking uri of the experiment.
Returns
-------
An experiment object.
"""
raise NotImplementedError(f"Please implement the `create_exp` method.")
def get_exp(self, experiment_id=None, experiment_name=None, create: bool = True):
"""
Retrieve an experiment. When user specify experiment id and name, the method will try to return the
specific experiment. When user does not provide recorder id or name, the method will try to return the current
active experiment. The `create` argument determines whether the method will automatically create a new experiment
according to user's specification if the experiment hasn't been created before
When user specify experiment id and name, the method will try to return the specific experiment.
When user does not provide recorder id or name, the method will try to return the current active experiment.
The `create` argument determines whether the method will automatically create a new experiment according
to user's specification if the experiment hasn't been created before
If `create` is True:
If R's running:
@@ -122,9 +103,13 @@ class ExpManager:
Parameters
----------
experiment_id : str
the experiment id to return.
id of the experiment to return.
experiment_name : str
name of the experiment to return.
create : boolean
create the experiment if hasn't been created before.
create the experiment it if hasn't been created before.
run : boolean
run the experiment when it is created for the first time.
Returns
-------
@@ -175,9 +160,13 @@ class MLflowExpManager(ExpManager):
super(MLflowExpManager, self).__init__(uri, default_exp_name)
self.client = mlflow.tracking.MlflowClient(tracking_uri=self.uri)
def create_exp(self, experiment_name=None, uri=None):
# retrieve all created experiments
experiments = self.list_experiments()
def start_exp(self, experiment_name=None, recorder_name=None, uri=None):
# create experiment
experiment = self.get_exp(experiment_name=experiment_name, run=False)
# set up active experiment
self.active_experiment = experiment
# start the experiment
self.active_experiment.start(recorder_name)
# set the tracking uri
if uri is None:
logger.info(
@@ -186,35 +175,102 @@ class MLflowExpManager(ExpManager):
else:
self.uri = uri
mlflow.set_tracking_uri(self.uri)
# start the experiment
if experiment_name is None:
logger.info(
f"No experiment name provided. The default experiment name is set as `{self.default_exp_name}`."
)
if self.default_exp_name not in experiments:
experiment_id = self.client.create_experiment(self.default_exp_name)
else:
experiment_id = self.client.get_experiment_by_name(self.default_exp_name).experiment_id
# set the active experiment
mlflow.set_experiment(self.default_exp_name)
experiment_name = self.default_exp_name
else:
if experiment_name not in experiments:
if self.client.get_experiment_by_name(experiment_name) is not None:
logger.info(
"The experiment has already been created before. Try to resume the experiment with a new recorder..."
)
experiment_id = self.client.get_experiment_by_name(experiment_name).experiment_id
else:
experiment_id = self.client.create_experiment(experiment_name)
else:
experiment_id = experiments[experiment_name].id
experiment = experiments[experiment_name]
# init experiment
experiment = MLflowExperiment(experiment_id, experiment_name, self.uri)
experiment._default_name = self.default_exp_name
return experiment
return self.active_experiment
def end_exp(self, recorder_status: str = Recorder.STATUS_S):
if self.active_experiment is not None:
self.active_experiment.end(recorder_status)
self.active_experiment = None
def __get_exp_by_id(self, experiment_id=None, create=False, run=False):
"""
Method for retrieving an experiment by its id. If the `create` is set to True, this method will also start to run the experiment.
Parameters
----------
experiment_id : str
the id of the experiment to be returned.
create : boolean
create the experiment if it hasn't been created before.
Returns
-------
The specific experiment with given id.
"""
# retrive all created experiments
experiments = self.list_experiments()
for name in experiments:
if experiments[name].id == experiment_id:
return experiments[name]
if create:
logger.warning(f"No valid experiment found. Use the Default experiment for further process.")
return self.__get_exp_by_name(create=create, run=True)
else:
raise Exception(
"Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct."
)
def __get_exp_by_name(self, experiment_name=None, create=False, run=False):
"""
Method for retrieving an experiment by its name. If the `create` is set to True, this method will also start to run the experiment.
Parameters
----------
experiment_name : str
the name of the experiment to be returned.
create : boolean
create the experiment if it hasn't been created before.
Returns
-------
The specific experiment with given name.
"""
# retrive all created experiments
experiments = self.list_experiments()
if experiment_name in experiments:
return experiments[experiment_name]
if create:
if experiment_name is None:
logger.info(
f"No experiment name provided. Create experiment with name {self.default_exp_name} for further process."
)
experiment_name = self.default_exp_name
if self.client.get_experiment_by_name(experiment_name) is not None:
logger.info(
"The experiment has already been created before and deleted. Try to restore the experiment with a new recorder..."
)
experiment_id = self.client.get_experiment_by_name(experiment_name).experiment_id
self.client.restore_experiment(experiment_id)
else:
experiment_id = self.client.create_experiment(experiment_name)
# init experiment
experiment = MLflowExperiment(experiment_id, experiment_name, self.uri)
experiment._default_name = self.default_exp_name
if run:
self.active_experiment = experiment
self.active_experiment.start()
return experiment
else:
if experiment_name is None and self.default_exp_name in experiments:
return experiments[self.default_exp_name]
raise Exception(
"Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct."
)
def get_exp(self, experiment_id=None, experiment_name=None, create=True, run=True):
if experiment_id is None and experiment_name is None:
if self.active_experiment:
return self.active_experiment
else:
return self.__get_exp_by_name(create=create, run=run)
else:
if experiment_name is not None:
return self.__get_exp_by_name(experiment_name, create=create, run=run)
else:
return self.__get_exp_by_id(experiment_id, create=create, run=run)
def search_records(self, experiment_ids, **kwargs):
filter_string = "" if kwargs.get("filter_string") is None else kwargs.get("filter_string")
@@ -223,48 +279,6 @@ class MLflowExpManager(ExpManager):
order_by = kwargs.get("order_by")
return self.client.search_runs(experiment_ids, filter_string, run_view_type, max_results, order_by)
def get_exp(self, experiment_id=None, experiment_name=None, create=True):
# retrive all created experiments
experiments = self.list_experiments()
if experiment_id is None and experiment_name is None:
if self.active_experiment:
return self.active_experiment
else:
if create:
logger.warning("QlibRecorder is not running. Use the Default experiment for further process.")
return self.start_exp()
else:
if self.default_exp_name in experiments:
return experiments[self.default_exp_name]
raise Exception(
"Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct."
)
else:
if experiment_name is not None:
if experiment_name in experiments:
return experiments[experiment_name]
else:
if create:
logger.warning(
f"No valid experiment found. Create experiment with name {experiment_name} for further process."
)
return self.start_exp(experiment_name)
else:
raise Exception(
"Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct."
)
else:
for name in experiments:
if experiments[name].id == experiment_id:
return experiments[name]
if create:
logger.warning(f"No valid experiment found. Use the Default experiment for further process.")
return self.start_exp()
else:
raise Exception(
"Something went wrong when retrieving experiments. Please check if QlibRecorder is running or the name/id of the experiment is correct."
)
def delete_exp(self, experiment_id=None, experiment_name=None):
assert (
experiment_id is not None or experiment_name is not None
@@ -275,22 +289,19 @@ class MLflowExpManager(ExpManager):
else:
experiment = self.client.get_experiment_by_name(experiment_name)
self.client.delete_experiment(experiment.experiment_id)
except:
except MlflowException as e:
raise Exception(
"Something went wrong when deleting experiment. Please check if the name/id of the experiment is correct."
f"Error: {e}. Something went wrong when deleting experiment. Please check if the name/id of the experiment is correct."
)
def list_experiments(self):
# retrieve all the existing experiments
exps = self.client.list_experiments(view_type=1)
experiments = dict()
for i in range(len(exps)):
eid = exps[i].experiment_id
ename = exps[i].name
for exp in exps:
eid = exp.experiment_id
ename = exp.name
experiment = MLflowExperiment(eid, ename, self.uri)
experiment.id = eid
experiment.name = ename
experiment._uri = self.uri
experiments[ename] = experiment
return experiments

View File

@@ -3,6 +3,7 @@
import pandas as pd
from pathlib import Path
from pprint import pprint
from ..contrib.evaluate import (
backtest as normal_backtest,
risk_analysis,
@@ -83,14 +84,14 @@ class SignalRecord(RecordTemp):
logger.info(
f"Signal record 'pred.pkl' has been saved as the artifact of the Experiment {self.recorder.experiment_id}"
)
# print out results
pprint(f"The following are prediction results of the {type(self.model).__name__} model.")
pprint(pred.head(5))
def load(self):
# try to load the saved object
try:
pred = self.recorder.load_object("pred.pkl")
return pred
except:
raise Exception("Something went wrong when loading the saved object.")
pred = self.recorder.load_object("pred.pkl")
return pred
def check(self, **kwargs):
artifacts = self.recorder.list_artifacts()
@@ -148,19 +149,51 @@ class PortAnaRecord(SignalRecord):
analysis["excess_return_with_cost"] = risk_analysis(
report_normal["return"] - report_normal["bench"] - report_normal["cost"]
)
# log metrics
self.recorder.log_metrics(
excess_return_without_cost_mean=analysis["excess_return_without_cost"]["risk"]["mean"]
)
self.recorder.log_metrics(excess_return_without_cost_std=analysis["excess_return_without_cost"]["risk"]["std"])
self.recorder.log_metrics(
excess_return_without_cost_annualized_return=analysis["excess_return_without_cost"]["risk"][
"annualized_return"
]
)
self.recorder.log_metrics(
excess_return_without_cost_information_ratio=analysis["excess_return_without_cost"]["risk"][
"information_ratio"
]
)
self.recorder.log_metrics(
excess_return_without_cost_max_drawdown=analysis["excess_return_without_cost"]["risk"]["max_drawdown"]
)
self.recorder.log_metrics(excess_return_with_cost_mean=analysis["excess_return_with_cost"]["risk"]["mean"])
self.recorder.log_metrics(excess_return_with_cost_std=analysis["excess_return_with_cost"]["risk"]["std"])
self.recorder.log_metrics(
excess_return_with_cost_annualized_return=analysis["excess_return_with_cost"]["risk"]["annualized_return"]
)
self.recorder.log_metrics(
excess_return_with_cost_information_ratio=analysis["excess_return_with_cost"]["risk"]["information_ratio"]
)
self.recorder.log_metrics(
excess_return_with_cost_max_drawdown=analysis["excess_return_with_cost"]["risk"]["max_drawdown"]
)
# save portfolio analysis results
analysis_df = pd.concat(analysis) # type: pd.DataFrame
self.recorder.save_objects(**{"port_analysis.pkl": analysis_df}, artifact_path=self.artifact_path)
logger.info(
f"Portfolio analysis record 'port_analysis.pkl' has been saved as the artifact of the Experiment {self.recorder.experiment_id}"
)
# print out results
pprint("The following are analysis results of the excess return without cost.")
pprint(analysis["excess_return_without_cost"])
pprint("The following are analysis results of the excess return with cost.")
pprint(analysis["excess_return_with_cost"])
def load(self):
# try to load the saved object
try:
pred = self.recorder.load_object(self.artifact_path / "port_analysis.pkl")
return pred
except:
raise Exception("Something went wrong when loading the saved object.")
pred = self.recorder.load_object(self.artifact_path / "port_analysis.pkl")
return pred
def check(self):
artifacts = self.recorder.list_artifacts(self.artifact_path)

View File

@@ -2,8 +2,9 @@
# Licensed under the MIT License.
import mlflow
import shutil, os, pickle, tempfile, codecs, datetime
import shutil, os, pickle, tempfile, codecs
from pathlib import Path
from datetime import datetime
from ..utils.objm import FileManager
from ..log import get_module_logger
@@ -167,7 +168,7 @@ class MLflowRecorder(Recorder):
use file manager to help maintain the objects in the project.
"""
def __init__(self, name, experiment_id, uri):
def __init__(self, name, experiment_id, uri, mlflow_run=None):
super(MLflowRecorder, self).__init__(name, experiment_id)
self._uri = uri
self.artifact_uri = None
@@ -175,6 +176,22 @@ class MLflowRecorder(Recorder):
self.temp_dir = tempfile.mkdtemp()
self.fm = FileManager(Path(self.temp_dir).absolute())
self.client = mlflow.tracking.MlflowClient(tracking_uri=self._uri)
# construct from mlflow run
if mlflow_run is not None:
assert isinstance(mlflow_run, mlflow.entities.run.Run), "Please input with a MLflow Run object."
self.name = mlflow_run.data.tags["mlflow.runName"] if mlflow_run.data.tags["mlflow.runName"] != "" else name
self.id = mlflow_run.info.run_id
self.status = mlflow_run.info.status
self.start_time = (
datetime.fromtimestamp(float(mlflow_run.info.start_time) / 1000.0).strftime("%Y-%m-%d %H:%M:%S")
if mlflow_run.info.start_time is not None
else None
)
self.end_time = (
datetime.fromtimestamp(float(mlflow_run.info.end_time) / 1000.0).strftime("%Y-%m-%d %H:%M:%S")
if mlflow_run.info.end_time is not None
else None
)
def start_run(self):
# start the run
@@ -182,7 +199,7 @@ class MLflowRecorder(Recorder):
# save the run id and artifact_uri
self.id = run.info.run_id
self.artifact_uri = run.info.artifact_uri
self.start_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.status = Recorder.STATUS_R
logger.info(f"Recorder {self.id} starts running under Experiment {self.experiment_id} ...")
@@ -196,7 +213,7 @@ class MLflowRecorder(Recorder):
Recorder.STATUS_FA,
], f"The status type {status} is not supported."
mlflow.end_run(status)
self.end_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
self.end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if self.status != Recorder.STATUS_S:
self.status = status
shutil.rmtree(self.temp_dir)
@@ -213,31 +230,23 @@ class MLflowRecorder(Recorder):
def load_object(self, name):
assert self._uri is not None, "Please start the experiment and recorder first before using recorder directly."
path = self.client.download_artifacts(self.id, name)
try:
with Path(path).open("rb") as f:
f.seek(0)
return pickle.load(f)
except:
with codecs.open(path, mode="r", encoding="utf-8") as f:
return f.read()
with Path(path).open("rb") as f:
return pickle.load(f)
def log_params(self, **kwargs):
keys = list(kwargs.keys())
for name, data in kwargs.items():
self.client.log_param(self.id, name, data)
def log_metrics(self, step=None, **kwargs):
keys = list(kwargs.keys())
for name, data in kwargs.items():
self.client.log_metric(self.id, name, data)
def set_tags(self, **kwargs):
keys = list(kwargs.keys())
for name, data in kwargs.items():
self.client.set_tag(self.id, name, data)
def delete_tags(self, *keys):
for count, key in enumerate(keys):
for key in keys:
self.client.delete_tag(self.id, key)
def get_artifact_uri(self):

View File

@@ -1,13 +1,25 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import sys, traceback, signal
import sys, traceback, signal, atexit
from . import R
from .recorder import Recorder
from ..log import get_module_logger
logger = get_module_logger("workflow", "INFO")
# function to handle the experiment when unusual program ending occurs
def experiment_exit_handler():
"""
Method for handling the experiment when any unusual program ending occurs.
The `atexit` handler should be put in the last, since, as long as the program ends, it will be called.
Thus, if any exception or user interuption occurs beforehead, we should handle them first. Once `R` is
ended, another call of `R.end_exp` will not take effect.
"""
signal.signal(signal.SIGINT, experiment_kill_signal_handler) # handle user keyboard interupt
sys.excepthook = experiment_exception_hook # handle uncaught exception
atexit.register(R.end_exp, recorder_status=Recorder.STATUS_FI) # will not take effect if experiment ends
def experiment_exception_hook(type, value, tb):
"""
@@ -19,15 +31,16 @@ def experiment_exception_hook(type, value, tb):
value: Exception's value
tb: Exception's traceback
"""
error_msg = "An exception has been raised.\n" f"Type: {type}\n" f"Value: {value}\n"
error_msg = "An exception has been raised.\n" f"Type: {type}\n"
logger.error(error_msg)
traceback.print_tb(tb)
logger.error(f"Value: {value}")
R.end_exp(recorder_status=Recorder.STATUS_FA)
def experiment_kill_signal_handler(signum, frame):
"""
End an experiment when user kill the program (CTRL+C, etc.).
End an experiment when user kill the program through keyboard (CTRL+C, etc.).
"""
R.end_exp(recorder_status=Recorder.STATUS_FA)

View File

@@ -100,7 +100,7 @@ setup(
entry_points={
# 'console_scripts': ['mycli=mymodule:cli'],
"console_scripts": [
"estimator=qlib.contrib.estimator.launcher:run",
"workflow_by_config=qlib.workflow.cli:run",
],
},
ext_modules=extensions,