diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 547991159..d5db7940a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -43,7 +43,7 @@ jobs: - name: Lint with Black run: | cd .. - python -m black qlib -l 120 + python -m black qlib -l 120 --check - name: Unit tests with Pytest run: | diff --git a/examples/workflow_by_code.py b/examples/workflow_by_code.py index bc5f9337e..a959d6ea1 100644 --- a/examples/workflow_by_code.py +++ b/examples/workflow_by_code.py @@ -22,7 +22,6 @@ from qlib.utils import init_instance_by_config if __name__ == "__main__": - # use default data provider_uri = "~/.qlib/qlib_data/cn_data" # target_dir if not exists_qlib_data(provider_uri): @@ -37,15 +36,14 @@ if __name__ == "__main__": MARKET = "csi300" BENCHMARK = "SH000300" - ################################### # train model ################################### DATA_HANDLER_CONFIG = { "start_time": "2008-01-01", "end_time": "2020-08-01", - "fit_start_time":"2008-01-01", - "fit_end_time":"2014-12-31", + "fit_start_time": "2008-01-01", + "fit_end_time": "2014-12-31", "instruments": MARKET, } @@ -72,31 +70,37 @@ if __name__ == "__main__": "max_depth": 8, "num_leaves": 210, "num_threads": 20, - } + }, }, "dataset": { "class": "DatasetH", "module_path": "qlib.data.dataset", "kwargs": { - 'handler': { + "handler": { "class": "Alpha158", "module_path": "qlib.contrib.data.handler", - "kwargs": DATA_HANDLER_CONFIG + "kwargs": DATA_HANDLER_CONFIG, }, - 'segments': { - 'train': ("2008-01-01", "2014-12-31"), - 'valid': ("2015-01-01", "2016-12-31",), - 'test': ("2017-01-01", "2020-08-01",), - } - } + "segments": { + "train": ("2008-01-01", "2014-12-31"), + "valid": ( + "2015-01-01", + "2016-12-31", + ), + "test": ( + "2017-01-01", + "2020-08-01", + ), + }, + }, } # You shoud record the data in specific sequence # "record": ['SignalRecord', 'SigAnaRecord', 'PortAnaRecord'], } # model = train_model(task) - model = init_instance_by_config(task['model']) - dataset = init_instance_by_config(task['dataset']) + model = init_instance_by_config(task["model"]) + dataset = init_instance_by_config(task["dataset"]) model.fit(dataset) diff --git a/qlib/contrib/data/handler.py b/qlib/contrib/data/handler.py index bd07fff7e..b2fd0515d 100644 --- a/qlib/contrib/data/handler.py +++ b/qlib/contrib/data/handler.py @@ -281,7 +281,6 @@ class Alpha158(DataHandlerLP): class Alpha158vwap(Alpha158): - def get_feature_config(self): conf = { "kbar": {}, diff --git a/qlib/contrib/model/catboost_model.py b/qlib/contrib/model/catboost_model.py index 8a55d0385..d53a6db41 100644 --- a/qlib/contrib/model/catboost_model.py +++ b/qlib/contrib/model/catboost_model.py @@ -9,17 +9,17 @@ from ...data.dataset.handler import DataHandlerLP class CatBoostModel(Model): - """CatBoost Model""" + """CatBoost Model""" - def __init__(self, loss="RMSE", **kwargs): - # There are more options - if loss not in {"RMSE", "Logloss"}: - raise NotImplementedError - self._params = {"loss_function": loss} - self._params.update(kwargs) - self.model = None + def __init__(self, loss="RMSE", **kwargs): + # There are more options + if loss not in {"RMSE", "Logloss"}: + raise NotImplementedError + self._params = {"loss_function": loss} + self._params.update(kwargs) + self.model = None - def fit( + def fit( self, dataset: DatasetH, num_boost_round=1000, @@ -27,48 +27,42 @@ class CatBoostModel(Model): verbose_eval=20, evals_result=dict(), **kwargs - ): - df_train, df_valid = dataset.prepare( - ["train", "valid"], col_set=["feature", "label"], data_key=DataHandlerLP.DK_L + ): + df_train, df_valid = dataset.prepare( + ["train", "valid"], col_set=["feature", "label"], data_key=DataHandlerLP.DK_L ) - x_train, y_train = df_train["feature"], df_train["label"] - x_valid, y_valid = df_valid["feature"], df_valid["label"] + x_train, y_train = df_train["feature"], df_train["label"] + x_valid, y_valid = df_valid["feature"], df_valid["label"] - # CatBoost needs 1D array as its label - if y_train.values.ndim == 2 and y_train.values.shape[1] == 1: - y_train_1d, y_valid_1d = np.squeeze(y_train.values), np.squeeze(y_valid.values) - else: - raise ValueError("CatBoost doesn't support multi-label training") + # CatBoost needs 1D array as its label + if y_train.values.ndim == 2 and y_train.values.shape[1] == 1: + y_train_1d, y_valid_1d = np.squeeze(y_train.values), np.squeeze(y_valid.values) + else: + raise ValueError("CatBoost doesn't support multi-label training") - train_pool = Pool(data = x_train, label = y_train_1d) - valid_pool = Pool(data = x_valid, label = y_valid_1d) + train_pool = Pool(data=x_train, label=y_train_1d) + valid_pool = Pool(data=x_valid, label=y_valid_1d) - #Initialize the catboost model - self._params['iterations'] = num_boost_round - self._params['early_stopping_rounds'] = early_stopping_rounds - self._params['verbose_eval'] = verbose_eval - self._params['task_type'] = "GPU" if get_gpu_device_count() > 0 else "CPU" - self.model = CatBoost(self._params, **kwargs) + # Initialize the catboost model + self._params["iterations"] = num_boost_round + self._params["early_stopping_rounds"] = early_stopping_rounds + self._params["verbose_eval"] = verbose_eval + self._params["task_type"] = "GPU" if get_gpu_device_count() > 0 else "CPU" + self.model = CatBoost(self._params, **kwargs) - #train the model - self.model.fit( - train_pool, - eval_set = valid_pool, - use_best_model = True, - **kwargs - ) - - evals_result = self.model.get_evals_result() - evals_result["train"] = list(evals_result["learn"].values())[0] - evals_result["valid"] = list(evals_result["validation"].values())[0] + # train the model + self.model.fit(train_pool, eval_set=valid_pool, use_best_model=True, **kwargs) + + evals_result = self.model.get_evals_result() + evals_result["train"] = list(evals_result["learn"].values())[0] + evals_result["valid"] = list(evals_result["validation"].values())[0] + + def predict(self, dataset): + if self.model is None: + raise ValueError("model is not fitted yet!") + x_test = dataset.prepare("test", col_set="feature") + return pd.Series(self.model.predict(np.squeeze(x_test.values)), index=x_test.index) - def predict(self, dataset): - if self.model is None: - raise ValueError("model is not fitted yet!") - x_test = dataset.prepare("test", col_set="feature") - return pd.Series(self.model.predict(np.squeeze(x_test.values)), index=x_test.index) - - -if __name__ == '__main__': - cat = CatBoostModel() \ No newline at end of file +if __name__ == "__main__": + cat = CatBoostModel() diff --git a/qlib/contrib/model/pytorch_nn.py b/qlib/contrib/model/pytorch_nn.py index 1acb5c843..1835fb617 100644 --- a/qlib/contrib/model/pytorch_nn.py +++ b/qlib/contrib/model/pytorch_nn.py @@ -159,9 +159,7 @@ class DNNModelPytorch(Model): x_valid, y_valid = df_valid["feature"], df_valid["label"] try: - wdf_train, wdf_valid = dataset.prepare( - ["train", "valid"], col_set=["weight"], data_key=DataHandlerLP.DK_L - ) + wdf_train, wdf_valid = dataset.prepare(["train", "valid"], col_set=["weight"], data_key=DataHandlerLP.DK_L) w_train, w_valid = wdf_train["weight"], wdf_valid["weight"] except: w_train = pd.DataFrame(np.ones_like(y_train.values), index=y_train.index) diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index 78d19d005..13d3465c7 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -65,8 +65,9 @@ class DataHandler(Serializable): self.data_loader = init_instance_by_config( data_loader, - None if (isinstance(data_loader, dict) and 'module_path' in data_loader) else data_loader_module, - accept_types=DataLoader) + None if (isinstance(data_loader, dict) and "module_path" in data_loader) else data_loader_module, + accept_types=DataLoader, + ) self.instruments = instruments self.start_time = start_time diff --git a/qlib/data/dataset/loader.py b/qlib/data/dataset/loader.py index 0d1e7be2e..564a7e5d5 100644 --- a/qlib/data/dataset/loader.py +++ b/qlib/data/dataset/loader.py @@ -14,6 +14,7 @@ class DataLoader(abc.ABC): """ DataLoader is designed for loading raw data from original data source. """ + @abc.abstractmethod def load(self, instruments, start_time=None, end_time=None) -> pd.DataFrame: """ @@ -53,6 +54,7 @@ class DLWParser(DataLoader): Extracting this class so that QlibDataLoader and other dataloaders(such as QdbDataLoader) can share the fields """ + def __init__(self, config: Tuple[list, tuple, dict]): """ Parameters @@ -113,7 +115,8 @@ class DLWParser(DataLoader): grp: self.load_group_df(instruments, exprs, names, start_time, end_time) for grp, (exprs, names) in self.fields.items() }, - axis=1) + axis=1, + ) else: exprs, names = self.fields df = self.load_group_df(instruments, exprs, names, start_time, end_time) @@ -122,6 +125,7 @@ class DLWParser(DataLoader): class QlibDataLoader(DLWParser): """Same as QlibDataLoader. The fields can be define by config""" + def __init__(self, config: Tuple[list, tuple, dict], filter_pipe=None): """ Parameters diff --git a/qlib/utils/__init__.py b/qlib/utils/__init__.py index 8d16798d6..d9ae98bd5 100644 --- a/qlib/utils/__init__.py +++ b/qlib/utils/__init__.py @@ -195,7 +195,7 @@ def get_cls_kwargs(config: Union[dict, str], module) -> (type, dict): def init_instance_by_config( - config: Union[str, dict, object], module=None, accept_types: Union[type, Tuple[type]] = tuple([]), **kwargs + config: Union[str, dict, object], module=None, accept_types: Union[type, Tuple[type]] = tuple([]), **kwargs ) -> object: """ get initialized instance with config diff --git a/qlib/utils/paral.py b/qlib/utils/paral.py index c709047b9..a640b04ea 100644 --- a/qlib/utils/paral.py +++ b/qlib/utils/paral.py @@ -5,8 +5,8 @@ from joblib import Parallel, delayed import pandas as pd -def datetime_groupby_apply(df, apply_func, axis=0, level='datetime', resample_rule="M", n_jobs=-1, skip_group=False): - """ datetime_groupby_apply +def datetime_groupby_apply(df, apply_func, axis=0, level="datetime", resample_rule="M", n_jobs=-1, skip_group=False): + """datetime_groupby_apply This function will apply the `apply_func` on the datetime level index. Parameters @@ -26,12 +26,14 @@ def datetime_groupby_apply(df, apply_func, axis=0, level='datetime', resample_ru Returns: pd.DataFrame """ + def _naive_group_apply(df): return df.groupby(axis=axis, level=level).apply(apply_func) if n_jobs != 1: - dfs = Parallel(n_jobs=n_jobs)(delayed(_naive_group_apply)(sub_df) - for idx, sub_df in df.resample(resample_rule, axis=axis, level=level)) + dfs = Parallel(n_jobs=n_jobs)( + delayed(_naive_group_apply)(sub_df) for idx, sub_df in df.resample(resample_rule, axis=axis, level=level) + ) return pd.concat(dfs, axis=axis).sort_index() else: return _naive_group_apply(df) diff --git a/qlib/workflow/__init__.py b/qlib/workflow/__init__.py index 5ac673a30..b801da880 100644 --- a/qlib/workflow/__init__.py +++ b/qlib/workflow/__init__.py @@ -6,6 +6,7 @@ from .expm import MLflowExpManager from ..utils import Wrapper from ..config import C + class QlibRecorder: """ A global system that helps to manage the experiments. diff --git a/qlib/workflow/exp.py b/qlib/workflow/exp.py index e4ef6d8a6..432497fda 100644 --- a/qlib/workflow/exp.py +++ b/qlib/workflow/exp.py @@ -8,6 +8,7 @@ from ..log import get_module_logger logger = get_module_logger("workflow", "INFO") + class Experiment: """ Thie is the `Experiment` class for each experiment being run. The API is designed @@ -17,22 +18,22 @@ class Experiment: self.name = None self.id = None self.active_recorder = None # only one recorder can running each time - self.recorders = dict() # recorder id -> object + self.recorders = dict() # recorder id -> object def __repr__(self): return str(self.info) - + def __str__(self): - return str(self.info) + return str(self.info) @property def info(self): output = dict() - output['class'] = "Experiment" - output['id'] = self.id - output['name'] = self.name - output['active_recorder'] = self.active_recorder.id - output['recorders'] = list(self.recorders.keys()) + output["class"] = "Experiment" + output["id"] = self.id + output["name"] = self.name + output["active_recorder"] = self.active_recorder.id + output["recorders"] = list(self.recorders.keys()) def start(self): """ @@ -137,7 +138,6 @@ class MLflowExperiment(Experiment): run = self.active_recorder.start_run() # store the recorder self.recorders[self.active_recorder.id] = recorder - return self.active_recorder def end(self, status): @@ -147,7 +147,7 @@ class MLflowExperiment(Experiment): def create_recorder(self): num = len(self.recorders) - name = "Recorder_{}".format(num+1) + name = "Recorder_{}".format(num + 1) recorder = MLflowRecorder(name, self.id) return recorder @@ -170,9 +170,7 @@ class MLflowExperiment(Experiment): if self.recorders[rid].name == recorder_name: return self.recorders[rid] elif self.active_recorder is None: - raise Exception('No valid active recorder exists. Please make sure the experiment is running.') + raise Exception("No valid active recorder exists. Please make sure the experiment is running.") else: - logger.info( - "No experiment id or name is given. Return the current active experiment." - ) - return self.active_recorder \ No newline at end of file + logger.info("No experiment id or name is given. Return the current active experiment.") + return self.active_recorder diff --git a/qlib/workflow/expm.py b/qlib/workflow/expm.py index 2afdee279..f597d4a96 100644 --- a/qlib/workflow/expm.py +++ b/qlib/workflow/expm.py @@ -184,9 +184,7 @@ class MLflowExpManager(ExpManager): else: if experiment_name not in self.experiments: if mlflow.get_experiment_by_name(experiment_name) is not None: - logger.info( - "The experiment has already been created before. Try to resume the experiment..." - ) + logger.info("The experiment has already been created before. Try to resume the experiment...") experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id else: experiment_id = mlflow.create_experiment(experiment_name) @@ -216,11 +214,9 @@ class MLflowExpManager(ExpManager): if self.experiments[name].id == experiment_id: return self.experiments[name] elif self.active_experiment is None: - raise Exception('No valid active experiment exists. Please make sure experiment manager is running.') + raise Exception("No valid active experiment exists. Please make sure experiment manager is running.") else: - logger.info( - "No experiment id or name is given. Return the current active experiment." - ) + logger.info("No experiment id or name is given. Return the current active experiment.") return self.active_experiment def delete_exp(self, experiment_id): diff --git a/qlib/workflow/record_temp.py b/qlib/workflow/record_temp.py index d92f836a8..3cef8b5d3 100644 --- a/qlib/workflow/record_temp.py +++ b/qlib/workflow/record_temp.py @@ -12,7 +12,7 @@ from ..utils import init_instance_by_config, get_module_by_module_path class RecordTemp: """ - This is the Records Template class that enables user to generate experiment results such as IC and + This is the Records Template class that enables user to generate experiment results such as IC and backtest in a certain format. """ @@ -116,8 +116,8 @@ class PortAnaRecord(SignalRecord): def __init__(self, recorder, config, **kwargs): self.recorder = recorder - self.strategy_config = config['strategy'] - self.backtest_config = config['backtest'] + self.strategy_config = config["strategy"] + self.backtest_config = config["backtest"] self.strategy = init_instance_by_config(self.strategy_config) self.artifact_path = "portfolio_analysis" diff --git a/qlib/workflow/recorder.py b/qlib/workflow/recorder.py index 89b16e9f1..68ce5432b 100644 --- a/qlib/workflow/recorder.py +++ b/qlib/workflow/recorder.py @@ -20,21 +20,21 @@ class Recorder: self.name = name self.experiment_id = experiment_id self.status = "SCHEDULED" - + def __repr__(self): return str(self.info) - + def __str__(self): - return str(self.info) + return str(self.info) @property def info(self): output = dict() - output['class'] = "Recorder" - output['id'] = self.id - output['name'] = self.name - output['experiment_id'] = self.experiment_id - output['status'] = self.status + output["class"] = "Recorder" + output["id"] = self.id + output["name"] = self.name + output["experiment_id"] = self.experiment_id + output["status"] = self.status def set_recorder_name(self, rname): self.recorder_name = rname @@ -188,16 +188,16 @@ class MLflowRecorder(Recorder): client = mlflow.tracking.MlflowClient(tracking_uri=self._uri) if local_path is not None: client.log_artifacts(self.id, local_path, artifact_path) - elif kwargs.get('data') is not None and kwargs.get('name') is not None: - data, name = kwargs.get('data'), kwargs.get('name') + elif kwargs.get("data") is not None and kwargs.get("name") is not None: + data, name = kwargs.get("data"), kwargs.get("name") self.fm.save_obj(data, name) client.log_artifact(self.id, self.fm.path / name, artifact_path) - elif kwargs.get('data_name_list') is not None: - data_name_list = kwargs.get('data_name_list') + elif kwargs.get("data_name_list") is not None: + data_name_list = kwargs.get("data_name_list") self.fm.save_objs(data_name_list) client.log_artifacts(self.id, self.fm.path, artifact_path) else: - raise Exception('Please provide valid arguments in order to save object properly.') + raise Exception("Please provide valid arguments in order to save object properly.") def load_object(self, name): client = mlflow.tracking.MlflowClient(tracking_uri=self._uri) diff --git a/setup.py b/setup.py index 7e1bc1583..22e806d8d 100644 --- a/setup.py +++ b/setup.py @@ -55,7 +55,7 @@ REQUIRED = [ "loguru", "lightgbm", "tornado", - "joblib>=0.17.0" + "joblib>=0.17.0", ] # Numpy include