1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-04 03:21:00 +08:00

add parallel processor

This commit is contained in:
Young
2020-11-06 10:24:21 +00:00
parent eead71fcb5
commit 9a826eefa3
5 changed files with 58 additions and 14 deletions

View File

@@ -70,7 +70,8 @@ class DataHandler(Serializable):
self.start_time = start_time
self.end_time = end_time
if init_data:
self.init()
with TimeInspector.logt("Init data"):
self.init()
super().__init__()
def init(self, enable_cache: bool = True):
@@ -91,7 +92,8 @@ class DataHandler(Serializable):
"""
# Setup data.
# _data may be with multiple column index level. The outer level indicates the feature set name
self._data = self.data_loader.load(self.instruments, self.start_time, self.end_time)
with TimeInspector.logt("Loading data"):
self._data = self.data_loader.load(self.instruments, self.start_time, self.end_time)
# TODO: cache
def _fetch_df_by_index(
@@ -293,7 +295,8 @@ class DataHandlerLP(DataHandler):
def fit(self):
for proc in self.get_all_processors():
proc.fit(self._data)
with TimeInspector.logt(f"{proc.__class__.__name__}"):
proc.fit(self._data)
def fit_process_data(self):
"""
@@ -320,9 +323,10 @@ class DataHandlerLP(DataHandler):
for proc in self.infer_processors:
if not proc.is_for_infer():
raise TypeError("Only processors usable for inference can be used in `infer_processors` ")
if with_fit:
proc.fit(_infer_df)
_infer_df = proc(_infer_df)
with TimeInspector.logt(f"{proc.__class__.__name__}"):
if with_fit:
proc.fit(_infer_df)
_infer_df = proc(_infer_df)
self._infer = _infer_df
# data for learning
@@ -337,9 +341,10 @@ class DataHandlerLP(DataHandler):
if len(self.learn_processors) > 0: # avoid modifying the original data
_learn_df = _learn_df.copy()
for proc in self.learn_processors:
if with_fit:
proc.fit(_learn_df)
_learn_df = proc(_learn_df)
with TimeInspector.logt(f"{proc.__class__.__name__}"):
if with_fit:
proc.fit(_learn_df)
_learn_df = proc(_learn_df)
self._learn = _learn_df
# init type

View File

@@ -8,6 +8,7 @@ import copy
from ...log import TimeInspector
from ...utils.serial import Serializable
from ...utils.paral import datetime_groupby_apply
EPS = 1e-12
@@ -99,7 +100,7 @@ class ProcessInf(Processor):
df[col] = df[col].replace([np.inf, -np.inf], df[col][~np.isinf(df[col])].mean())
return df
data = data.groupby("datetime").apply(process_inf)
data = datetime_groupby_apply(data, process_inf)
data.sort_index(inplace=True)
return data

View File

@@ -78,10 +78,10 @@ class TimeInspector(object):
Info that will be log into stdout.
"""
cost_time = time() - cls.time_marks.pop()
cls.timer_logger.info("Time cost: {0:.5f} | {1}".format(cost_time, info))
cls.timer_logger.info("Time cost: {0:.3f}s | {1}".format(cost_time, info))
@contextmanager
@classmethod
@contextmanager
def logt(cls, name="", show_start=False):
"""logt.
Log the time of the inside code
@@ -94,13 +94,13 @@ class TimeInspector(object):
show_start
"""
if show_start:
cls.timer_logger.info(f"Begin {name}")
cls.timer_logger.info(f"{name} Begin")
cls.set_time_mark()
try:
yield None
finally:
pass
cls.log_cost_time()
cls.log_cost_time(info=f"{name} Done")
def set_log_with_config(log_config: dict):

37
qlib/utils/paral.py Normal file
View File

@@ -0,0 +1,37 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from joblib import Parallel, delayed
import pandas as pd
def datetime_groupby_apply(df, apply_func, axis=0, level='datetime', resample_rule="M", n_jobs=-1, skip_group=False):
""" datetime_groupby_apply
This function will apply the `apply_func` on the datetime level index.
Parameters
----------
df :
DataFrame for processing
apply_func :
apply_func for processing the data
axis :
which axis is the datetime level located
level :
which level is the datetime level
resample_rule :
How to resample the data to calculating parallel
n_jobs :
n_jobs for joblib
Returns:
pd.DataFrame
"""
def _naive_group_apply(df):
return df.groupby(axis=axis, level=level).apply(apply_func)
if n_jobs != 1:
dfs = Parallel(n_jobs=n_jobs)(delayed(_naive_group_apply)(sub_df)
for idx, sub_df in df.resample(resample_rule, axis=axis, level=level))
return pd.concat(dfs, axis=axis).sort_index()
else:
return _naive_group_apply(df)

View File

@@ -55,6 +55,7 @@ REQUIRED = [
"loguru",
"lightgbm",
"tornado",
"joblib>=0.17.0"
]
# Numpy include