diff --git a/qlib/data/dataset/handler.py b/qlib/data/dataset/handler.py index c955a6fe5..6ead332d2 100644 --- a/qlib/data/dataset/handler.py +++ b/qlib/data/dataset/handler.py @@ -70,7 +70,8 @@ class DataHandler(Serializable): self.start_time = start_time self.end_time = end_time if init_data: - self.init() + with TimeInspector.logt("Init data"): + self.init() super().__init__() def init(self, enable_cache: bool = True): @@ -91,7 +92,8 @@ class DataHandler(Serializable): """ # Setup data. # _data may be with multiple column index level. The outer level indicates the feature set name - self._data = self.data_loader.load(self.instruments, self.start_time, self.end_time) + with TimeInspector.logt("Loading data"): + self._data = self.data_loader.load(self.instruments, self.start_time, self.end_time) # TODO: cache def _fetch_df_by_index( @@ -293,7 +295,8 @@ class DataHandlerLP(DataHandler): def fit(self): for proc in self.get_all_processors(): - proc.fit(self._data) + with TimeInspector.logt(f"{proc.__class__.__name__}"): + proc.fit(self._data) def fit_process_data(self): """ @@ -320,9 +323,10 @@ class DataHandlerLP(DataHandler): for proc in self.infer_processors: if not proc.is_for_infer(): raise TypeError("Only processors usable for inference can be used in `infer_processors` ") - if with_fit: - proc.fit(_infer_df) - _infer_df = proc(_infer_df) + with TimeInspector.logt(f"{proc.__class__.__name__}"): + if with_fit: + proc.fit(_infer_df) + _infer_df = proc(_infer_df) self._infer = _infer_df # data for learning @@ -337,9 +341,10 @@ class DataHandlerLP(DataHandler): if len(self.learn_processors) > 0: # avoid modifying the original data _learn_df = _learn_df.copy() for proc in self.learn_processors: - if with_fit: - proc.fit(_learn_df) - _learn_df = proc(_learn_df) + with TimeInspector.logt(f"{proc.__class__.__name__}"): + if with_fit: + proc.fit(_learn_df) + _learn_df = proc(_learn_df) self._learn = _learn_df # init type diff --git a/qlib/data/dataset/processor.py b/qlib/data/dataset/processor.py index 3fc91f52c..b52426d16 100644 --- a/qlib/data/dataset/processor.py +++ b/qlib/data/dataset/processor.py @@ -8,6 +8,7 @@ import copy from ...log import TimeInspector from ...utils.serial import Serializable +from ...utils.paral import datetime_groupby_apply EPS = 1e-12 @@ -99,7 +100,7 @@ class ProcessInf(Processor): df[col] = df[col].replace([np.inf, -np.inf], df[col][~np.isinf(df[col])].mean()) return df - data = data.groupby("datetime").apply(process_inf) + data = datetime_groupby_apply(data, process_inf) data.sort_index(inplace=True) return data diff --git a/qlib/log.py b/qlib/log.py index 1f06f87f5..422a4c00b 100644 --- a/qlib/log.py +++ b/qlib/log.py @@ -78,10 +78,10 @@ class TimeInspector(object): Info that will be log into stdout. """ cost_time = time() - cls.time_marks.pop() - cls.timer_logger.info("Time cost: {0:.5f} | {1}".format(cost_time, info)) + cls.timer_logger.info("Time cost: {0:.3f}s | {1}".format(cost_time, info)) - @contextmanager @classmethod + @contextmanager def logt(cls, name="", show_start=False): """logt. Log the time of the inside code @@ -94,13 +94,13 @@ class TimeInspector(object): show_start """ if show_start: - cls.timer_logger.info(f"Begin {name}") + cls.timer_logger.info(f"{name} Begin") cls.set_time_mark() try: yield None finally: pass - cls.log_cost_time() + cls.log_cost_time(info=f"{name} Done") def set_log_with_config(log_config: dict): diff --git a/qlib/utils/paral.py b/qlib/utils/paral.py new file mode 100644 index 000000000..c709047b9 --- /dev/null +++ b/qlib/utils/paral.py @@ -0,0 +1,37 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from joblib import Parallel, delayed +import pandas as pd + + +def datetime_groupby_apply(df, apply_func, axis=0, level='datetime', resample_rule="M", n_jobs=-1, skip_group=False): + """ datetime_groupby_apply + This function will apply the `apply_func` on the datetime level index. + + Parameters + ---------- + df : + DataFrame for processing + apply_func : + apply_func for processing the data + axis : + which axis is the datetime level located + level : + which level is the datetime level + resample_rule : + How to resample the data to calculating parallel + n_jobs : + n_jobs for joblib + Returns: + pd.DataFrame + """ + def _naive_group_apply(df): + return df.groupby(axis=axis, level=level).apply(apply_func) + + if n_jobs != 1: + dfs = Parallel(n_jobs=n_jobs)(delayed(_naive_group_apply)(sub_df) + for idx, sub_df in df.resample(resample_rule, axis=axis, level=level)) + return pd.concat(dfs, axis=axis).sort_index() + else: + return _naive_group_apply(df) diff --git a/setup.py b/setup.py index 47ddceaf8..7e1bc1583 100644 --- a/setup.py +++ b/setup.py @@ -55,6 +55,7 @@ REQUIRED = [ "loguru", "lightgbm", "tornado", + "joblib>=0.17.0" ] # Numpy include