1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-06-30 17:41:18 +08:00

monitor initial version

This commit is contained in:
Young
2021-01-25 13:58:48 +00:00
parent 4ff0c4fb0f
commit cddaf90ef5
4 changed files with 384 additions and 0 deletions

154
examples/data/monitor.py Normal file
View File

@@ -0,0 +1,154 @@
"""
This script is the demonstrating the implementation of following requirements.
NOTE: A lot of details is not considered in this script
- Corner case that will raise error( std == 0)
· Transformer:
1) Basic statistics on different slices of the DataFrame df:
§ The statistics include:
· STD, Mean, Skewnes, Kurtosis
§ The above statistics can be calculated on the following data slices:
· df.groupby(['datetime'])
· df.groupby(['datetime', 'industry' ])
· df.groupby(['instrument', 'factor'])
· df.apply("<expresion>").groupby([..]), in which [..] could be any one of the above slicing rules.
2) Advanced statistics on different slices of the DataFrame df:
§ Auto-correlation:
· Calculate corr(df.loc[t, :, :], df.loc[t-w, :, :]), w=1, 2, ….
§ Correlation between factors:
· For any pair of factors (i, j): calculate corr(df.loc[t, :, i], df.loc[t, :, j]). The result is a correlation matrix with each element corresponds to a correlation value between a pair of factors.
§ The data slices are the same as those in 1).
· Monitor:
1) Algorithms:
§ Basic checks: NaN.
§ Point anomaly detection.
§ Segment anomaly detection.
2) Scenarios:
§ Online anomaly detection: monitoring streaming data.
Offline anomaly detection: verifying whole historical data.
"""
# AUTO download data
from qlib.utils import exists_qlib_data
from qlib.tests.data import GetData
from qlib.config import REG_CN
provider_uri = "~/.qlib/qlib_data/cn_data" # target_dir
if not exists_qlib_data(provider_uri):
print(f"Qlib data is not found in {provider_uri}")
GetData().qlib_data(target_dir=provider_uri, region=REG_CN)
import qlib
qlib.init()
import pandas as pd
from qlib.contrib.data.handler import Alpha158
from qlib.data.dataset.loader import QlibDataLoader
from qlib.data.monitor.metric import format_conv
from qlib.data.monitor.metric import MeanM, SkewM, KurtM, StdM, AutoCM, CorrM
from qlib.data.monitor.detector import NDDetector, SWNDD, ThresholdD
from qlib.data import D
def get_factor_df(col_idx=0):
dh = Alpha158(instruments="csi300", infer_processors=[], learn_processors=[], start_time="20200101")
df = dh.fetch()
print(df.head())
# We don't have industries in dataframe, we generate the with fake data
industry = pd.Series(df.index.get_level_values("instrument").str.slice(stop=2).to_list(), index=df.index)
# select a factor
factor_df = format_conv(df.iloc[:, col_idx], industry=industry)
print(f"Selected metric: {df.columns[col_idx]}")
print(factor_df)
return factor_df
def case_1_1():
factor_df = get_factor_df()
# 1) Extract metrics
# 1.1) df.groupby(["datetime"])
mtrc = MeanM()
m_mean = mtrc.extract(factor_df)
print(m_mean)
ndd = NDDetector()
ndd.fit(m_mean) # use historical data to fit detector
check_res = ndd.check(m_mean)
print(check_res) # detecting on new data or historical data
print(check_res.value_counts())
def case_1_2():
factor_df = get_factor_df()
# 1.2) df.groupby("datetime", "industry")
mtrc = MeanM(group=["industry"])
m_multi = mtrc.extract(factor_df)
print(m_multi)
for col_name, s in m_multi.iteritems():
print(col_name)
ndd = NDDetector()
ndd.fit(s) # use historical data to fit detector
check_res = ndd.check(s)
print(check_res) # detecting on new data or historical data
print(check_res.value_counts())
def case_1_3_1_4():
# case 1.3 and case 1.4
# factor_df = get_factor_df()
qdl = QlibDataLoader(config=(["$close/Ref($close, 1) - 1"], ["return"]))
df = qdl.load(instruments=["SH600519"], start_time="20200101")
df = format_conv(df)
s = df.iloc[:, 0]
print(s)
dtc = SWNDD(window=20)
dtc.fit(s) # fit use historical data (TODO: updating will be supported in the future)
check_res = dtc.check(s) #
print(check_res)
print(check_res.value_counts())
print(check_res[check_res])
def case_2_1():
# · Calculate corr(df.loc[t, :, :], df.loc[t-w, :, :]), w=1, 2, ….
factor_df = get_factor_df()
acm = AutoCM()
mtrc = acm.extract(factor_df)
print(mtrc)
thd = ThresholdD(0.0, reverse=True)
check_res = thd.check(mtrc)
print(check_res)
print(check_res.value_counts())
def case_2_2():
factor_df1, factor_df2 = get_factor_df(0), get_factor_df(1)
cm = CorrM()
mtrc = cm.extract(factor_df1, factor_df2)
print(mtrc)
thd = ThresholdD(0.0, reverse=True)
check_res = thd.check(mtrc)
print(check_res)
print(check_res.value_counts())
if __name__ == "__main__":
case_1_1()
case_1_2()
case_1_3_1_4()
case_2_1()
case_2_2()

View File

View File

@@ -0,0 +1,103 @@
import pandas as pd
import abc
from typing import Union
class Detector(metaclass=abc.ABCMeta):
"""
Detector is stateful
The input of detector is Series with shape <time> or Dataframe with shape <time, factor>
"""
@abc.abstractmethod
def check(self, df: Union[pd.Series, pd.DataFrame]) -> pd.Series:
"""
Check the result of values
Parameters
----------
df : Union[pd.Series, pd.DataFrame]
Data to be checked
Returns
-------
pd.Series:
True: Abnormalities detected.
False: No abnormality detected.
"""
pass
def fit(self, df: pd.DataFrame):
raise NotImplementedError(f"This type of input is not supported")
def update(self, df: pd.DataFrame):
"""
The state of detector can be updated gradually.
"""
raise NotImplementedError(f"This type of input is not supported")
class NDDetector(Detector):
"""
Normal Distribution Detector
This will be used more in offline detector
"""
def __init__(self, n=3):
"""
The detection range:
- mean ± n * std
"""
self.n = n
def fit(self, s: pd.Series):
self.mean = s.mean()
self.std = s.std()
def check(self, s: pd.Series):
return ~s.between(self.mean - self.std * self.n, self.mean + self.std * self.n)
class SWNDD(Detector):
"""(S)liding (W)indow (N)ormal (D)istribition (D)etector"""
def __init__(self, n=3, **rolling_args):
self.rolling_args = rolling_args
self.n = n
def fit(self, s: pd.Series):
# TODO: pd.Dataframe is not supported now
self.mean = s.rolling(**self.rolling_args).mean()
self.std = s.rolling(**self.rolling_args).std()
def check(self, s: pd.Series):
res = ~s.between(self.mean - self.std * self.n, self.mean + self.std * self.n)
res = res & ~self.mean.isna() & ~self.std.isna()
return res.reindex(s.index)
# TODO: daily normalization detector.
class CountD(Detector):
"""
Count detector
"""
# TODO: check if the number of counts is enough
# TODO: This is a instance of Count Detector.
class ThresholdD(Detector):
"""
Threshold (D)etector
"""
def __init__(self, threshold, reverse=False):
self.threshold = threshold
self.reverse = reverse
def check(self, s: pd.Series):
return (s < self.threshold) if self.reverse else (s > self.threshold)

127
qlib/data/monitor/metric.py Normal file
View File

@@ -0,0 +1,127 @@
import pandas as pd
from typing import Union
from abc import abstractmethod, ABCMeta
def format_conv(df: pd.Series, **col_group):
# TODO: col_group
df = df.copy() # performance problems here
if len(col_group) > 0:
col_group_df = pd.DataFrame({name: group.reindex(df.index) for name, group in col_group.items()})
col_group_df = col_group_df.reindex(df.index)
# merge all the groups into df.index
col_group_df = col_group_df.set_index(keys=col_group_df.columns.to_list(), append=True)
df.index = col_group_df.index
ustk_cols = [col for col in df.index.names if col != "datetime"]
return df.unstack(ustk_cols)
class MetricExt(metaclass=ABCMeta):
"""Metric Extractor
Current design.
The input data are assumed like qlib format
The extracted information like time-series. Column could be multiple index
"""
@abstractmethod
def extract(self, df: Union[pd.Series, pd.DataFrame]) -> Union[pd.Series, pd.DataFrame]:
pass
# overall metrics
class AggMetrics(MetricExt):
"""
TODO: this metric assumes that the daily assumptions(The operation is used on each row)
"""
def __init__(self, group=None):
if isinstance(group, str):
group = [group]
self.group = group
def extract(self, df: pd.DataFrame) -> pd.Series:
if self.group is None:
return df.apply(self.agg, axis=1)
else:
return df.groupby(self.group, axis=1).apply(self.agg, axis=1)
@abstractmethod
def agg(self, *args, **kwargs):
pass
class StdM(AggMetrics):
def agg(self, s, *args, **kwargs):
return s.std(*args, **kwargs)
class MeanM(AggMetrics):
def agg(self, s, *args, **kwargs):
return s.mean(*args, **kwargs)
class SkewM(AggMetrics):
def agg(self, s, *args, **kwargs):
return s.skew(*args, **kwargs)
class KurtM(AggMetrics):
def agg(self, s, *args, **kwargs):
return pd.DataFrame.kurt(s, *args, **kwargs)
# sliding window metrics
class SWMetrics(MetricExt):
"""
(S)liding (W)indow Metrics
TODO: testing this class
"""
def __init__(self, **rolling_args):
self.rolling_args = rolling_args
def extract(self, df: Union[pd.Series, pd.DataFrame]) -> Union[pd.Series, pd.DataFrame]:
if isinstance(pd.Series):
return df.rolling(**self.rolling_args).apply(self.agg)
elif isinstance(pd.DataFrame):
return df.rolling(**self.rolling_args).apply(self.agg, axis=0)
else:
raise NotImplementedError(f"This type of input is not supported")
@abstractmethod
def agg(self, *args, **kwargs):
pass
## TODO: more metrics is ignored: mean, std, skew, kurt
def calc_corr(df1: pd.DataFrame, df2: pd.DataFrame, mode):
corr = {}
for (t1, s1), (t2, s2) in zip(df1.iterrows(), df2.iterrows()):
assert t1 == t2
corr[t1] = s1.corr(s2, method=mode)
return pd.Series(corr)
class AutoCM(MetricExt):
"""(A)uto (C)orrelation (M)etrics"""
def __init__(self, mode="pearson", shift=1):
self.mode = mode
self.shift = shift
def extract(self, df: pd.DataFrame):
return calc_corr(df, df.shift(self.shift), self.mode)
class CorrM(MetricExt):
"""correlation extractor """
def __init__(self, mode="pearson"):
self.mode = mode
def extract(self, df1: pd.DataFrame, df2: pd.DataFrame):
return calc_corr(df1, df2, self.mode)