1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-01 01:51:18 +08:00
Files
qlib/qlib/data/filter.py
2021-02-19 22:23:22 +08:00

375 lines
14 KiB
Python

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from __future__ import print_function
from abc import abstractmethod
import re
import pandas as pd
import numpy as np
import abc
from .data import Cal, DatasetD
class BaseDFilter(abc.ABC):
"""Dynamic Instruments Filter Abstract class
Users can override this class to construct their own filter
Override __init__ to input filter regulations
Override filter_main to use the regulations to filter instruments
"""
def __init__(self):
pass
@staticmethod
def from_config(config):
"""Construct an instance from config dict.
Parameters
----------
config : dict
dict of config parameters.
"""
raise NotImplementedError("Subclass of BaseDFilter must reimplement `from_config` method")
@abstractmethod
def to_config(self):
"""Construct an instance from config dict.
Returns
----------
dict
return the dict of config parameters.
"""
raise NotImplementedError("Subclass of BaseDFilter must reimplement `to_config` method")
class SeriesDFilter(BaseDFilter):
"""Dynamic Instruments Filter Abstract class to filter a series of certain features
Filters should provide parameters:
- filter start time
- filter end time
- filter rule
Override __init__ to assign a certain rule to filter the series.
Override _getFilterSeries to use the rule to filter the series and get a dict of {inst => series}, or override filter_main for more advanced series filter rule
"""
def __init__(self, fstart_time=None, fend_time=None):
"""Init function for filter base class.
Filter a set of instruments based on a certain rule within a certain period assigned by fstart_time and fend_time.
Parameters
----------
fstart_time: str
the time for the filter rule to start filter the instruments.
fend_time: str
the time for the filter rule to stop filter the instruments.
"""
super(SeriesDFilter, self).__init__()
self.filter_start_time = pd.Timestamp(fstart_time) if fstart_time else None
self.filter_end_time = pd.Timestamp(fend_time) if fend_time else None
def _getTimeBound(self, instruments):
"""Get time bound for all instruments.
Parameters
----------
instruments: dict
the dict of instruments in the form {instrument_name => list of timestamp tuple}.
Returns
----------
pd.Timestamp, pd.Timestamp
the lower time bound and upper time bound of all the instruments.
"""
trange = Cal.calendar(freq=self.filter_freq)
ubound, lbound = trange[0], trange[-1]
for _, timestamp in instruments.items():
if timestamp:
lbound = timestamp[0][0] if timestamp[0][0] < lbound else lbound
ubound = timestamp[-1][-1] if timestamp[-1][-1] > ubound else ubound
return lbound, ubound
def _toSeries(self, time_range, target_timestamp):
"""Convert the target timestamp to a pandas series of bool value within a time range.
Make the time inside the target_timestamp range TRUE, others FALSE.
Parameters
----------
time_range : D.calendar
the time range of the instruments.
target_timestamp : list
the list of tuple (timestamp, timestamp).
Returns
----------
pd.Series
the series of bool value for an instrument.
"""
# Construct a whole dict of {date => bool}
timestamp_series = {timestamp: False for timestamp in time_range}
# Convert to pd.Series
timestamp_series = pd.Series(timestamp_series)
# Fill the date within target_timestamp with TRUE
for start, end in target_timestamp:
timestamp_series[Cal.calendar(start_time=start, end_time=end, freq=self.filter_freq)] = True
return timestamp_series
def _filterSeries(self, timestamp_series, filter_series):
"""Filter the timestamp series with filter series by using element-wise AND operation of the two series.
Parameters
----------
timestamp_series : pd.Series
the series of bool value indicating existing time.
filter_series : pd.Series
the series of bool value indicating filter feature.
Returns
----------
pd.Series
the series of bool value indicating whether the date satisfies the filter condition and exists in target timestamp.
"""
fstart, fend = list(filter_series.keys())[0], list(filter_series.keys())[-1]
filter_series = filter_series.astype("bool") # Make sure the filter_series is boolean
timestamp_series[fstart:fend] = timestamp_series[fstart:fend] & filter_series
return timestamp_series
def _toTimestamp(self, timestamp_series):
"""Convert the timestamp series to a list of tuple (timestamp, timestamp) indicating a continuous range of TRUE.
Parameters
----------
timestamp_series: pd.Series
the series of bool value after being filtered.
Returns
----------
list
the list of tuple (timestamp, timestamp).
"""
# sort the timestamp_series according to the timestamps
timestamp_series.sort_index()
timestamp = []
_lbool = None
_ltime = None
for _ts, _bool in timestamp_series.items():
# there is likely to be NAN when the filter series don't have the
# bool value, so we just change the NAN into False
if _bool == np.nan:
_bool = False
if _lbool is None:
_cur_start = _ts
_lbool = _bool
_ltime = _ts
continue
if (_lbool, _bool) == (True, False):
if _cur_start:
timestamp.append((_cur_start, _ltime))
elif (_lbool, _bool) == (False, True):
_cur_start = _ts
_lbool = _bool
_ltime = _ts
if _lbool:
timestamp.append((_cur_start, _ltime))
return timestamp
def __call__(self, instruments, start_time=None, end_time=None, freq="day"):
"""Call this filter to get filtered instruments list"""
self.filter_freq = freq
return self.filter_main(instruments, start_time, end_time)
@abstractmethod
def _getFilterSeries(self, instruments, fstart, fend):
"""Get filter series based on the rules assigned during the initialization and the input time range.
Parameters
----------
instruments : dict
the dict of instruments to be filtered.
fstart : pd.Timestamp
start time of filter.
fend : pd.Timestamp
end time of filter.
.. note:: fstart/fend indicates the intersection of instruments start/end time and filter start/end time.
Returns
----------
pd.Dataframe
a series of {pd.Timestamp => bool}.
"""
raise NotImplementedError("Subclass of SeriesDFilter must reimplement `getFilterSeries` method")
def filter_main(self, instruments, start_time=None, end_time=None):
"""Implement this method to filter the instruments.
Parameters
----------
instruments: dict
input instruments to be filtered.
start_time: str
start of the time range.
end_time: str
end of the time range.
Returns
----------
dict
filtered instruments, same structure as input instruments.
"""
lbound, ubound = self._getTimeBound(instruments)
start_time = pd.Timestamp(start_time or lbound)
end_time = pd.Timestamp(end_time or ubound)
_instruments_filtered = {}
_all_calendar = Cal.calendar(start_time=start_time, end_time=end_time, freq=self.filter_freq)
_filter_calendar = Cal.calendar(
start_time=self.filter_start_time and max(self.filter_start_time, _all_calendar[0]) or _all_calendar[0],
end_time=self.filter_end_time and min(self.filter_end_time, _all_calendar[-1]) or _all_calendar[-1],
freq=self.filter_freq,
)
_all_filter_series = self._getFilterSeries(instruments, _filter_calendar[0], _filter_calendar[-1])
for inst, timestamp in instruments.items():
# Construct a whole map of date
_timestamp_series = self._toSeries(_all_calendar, timestamp)
# Get filter series
if inst in _all_filter_series:
_filter_series = _all_filter_series[inst]
else:
if self.keep:
_filter_series = pd.Series({timestamp: True for timestamp in _filter_calendar})
else:
_filter_series = pd.Series({timestamp: False for timestamp in _filter_calendar})
# Calculate bool value within the range of filter
_timestamp_series = self._filterSeries(_timestamp_series, _filter_series)
# Reform the map to (start_timestamp, end_timestamp) format
_timestamp = self._toTimestamp(_timestamp_series)
# Remove empty timestamp
if _timestamp:
_instruments_filtered[inst] = _timestamp
return _instruments_filtered
class NameDFilter(SeriesDFilter):
"""Name dynamic instrument filter
Filter the instruments based on a regulated name format.
A name rule regular expression is required.
"""
def __init__(self, name_rule_re, fstart_time=None, fend_time=None):
"""Init function for name filter class
params:
------
name_rule_re: str
regular expression for the name rule.
"""
super(NameDFilter, self).__init__(fstart_time, fend_time)
self.name_rule_re = name_rule_re
def _getFilterSeries(self, instruments, fstart, fend):
all_filter_series = {}
filter_calendar = Cal.calendar(start_time=fstart, end_time=fend, freq=self.filter_freq)
for inst, timestamp in instruments.items():
if re.match(self.name_rule_re, inst):
_filter_series = pd.Series({timestamp: True for timestamp in filter_calendar})
else:
_filter_series = pd.Series({timestamp: False for timestamp in filter_calendar})
all_filter_series[inst] = _filter_series
return all_filter_series
@staticmethod
def from_config(config):
return NameDFilter(
name_rule_re=config["name_rule_re"],
fstart_time=config["filter_start_time"],
fend_time=config["filter_end_time"],
)
def to_config(self):
return {
"filter_type": "NameDFilter",
"name_rule_re": self.name_rule_re,
"filter_start_time": str(self.filter_start_time) if self.filter_start_time else self.filter_start_time,
"filter_end_time": str(self.filter_end_time) if self.filter_end_time else self.filter_end_time,
}
class ExpressionDFilter(SeriesDFilter):
"""Expression dynamic instrument filter
Filter the instruments based on a certain expression.
An expression rule indicating a certain feature field is required.
Examples
----------
- *basic features filter* : rule_expression = '$close/$open>5'
- *cross-sectional features filter* : rule_expression = '$rank($close)<10'
- *time-sequence features filter* : rule_expression = '$Ref($close, 3)>100'
"""
def __init__(self, rule_expression, fstart_time=None, fend_time=None, keep=False):
"""Init function for expression filter class
params:
------
fstart_time: str
filter the feature starting from this time.
fend_time: str
filter the feature ending by this time.
rule_expression: str
an input expression for the rule.
keep: bool
whether to keep the instruments of which features don't exist in the filter time span.
"""
super(ExpressionDFilter, self).__init__(fstart_time, fend_time)
self.rule_expression = rule_expression
self.keep = keep
def _getFilterSeries(self, instruments, fstart, fend):
# do not use dataset cache
try:
_features = DatasetD.dataset(
instruments,
[self.rule_expression],
fstart,
fend,
freq=self.filter_freq,
disk_cache=0,
)
except TypeError:
# use LocalDatasetProvider
_features = DatasetD.dataset(instruments, [self.rule_expression], fstart, fend, freq=self.filter_freq)
rule_expression_field_name = list(_features.keys())[0]
all_filter_series = _features[rule_expression_field_name]
return all_filter_series
@staticmethod
def from_config(config):
return ExpressionDFilter(
rule_expression=config["rule_expression"],
fstart_time=config["filter_start_time"],
fend_time=config["filter_end_time"],
keep=config["keep"],
)
def to_config(self):
return {
"filter_type": "ExpressionDFilter",
"rule_expression": self.rule_expression,
"filter_start_time": str(self.filter_start_time) if self.filter_start_time else self.filter_start_time,
"filter_end_time": str(self.filter_end_time) if self.filter_end_time else self.filter_end_time,
"keep": self.keep,
}