1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-03 11:00:57 +08:00

draft design

This commit is contained in:
Young
2021-08-31 02:33:44 +00:00
committed by you-n-g
parent 43a8f502ed
commit d39c8de800
8 changed files with 467 additions and 315 deletions

View File

@@ -20,7 +20,7 @@ from ..utils import init_instance_by_config
from ..log import get_module_logger
from ..config import C
# make import more user-friendly by enable `from qlib.backtest import STH`
# make import more user-friendly by adding `from qlib.backtest import STH`
logger = get_module_logger("backtest caller")

View File

@@ -424,7 +424,7 @@ class Exchange:
else:
raise NotImplementedError(f"This type of input is not supported")
deal_price = self.quote.get_data(stock_id, start_time, end_time, field=pstr, method=method)
if method is not None and (np.isclose(deal_price, 0.0) or np.isnan(deal_price)):
if method is not None and (deal_price is None or np.isclose(deal_price, 0.0) or np.isnan(deal_price)):
self.logger.warning(f"(stock_id:{stock_id}, trade_time:{(start_time, end_time)}, {pstr}): {deal_price}!!!")
self.logger.warning(f"setting deal_price to close price")
deal_price = self.get_close(stock_id, start_time, end_time, method)

View File

@@ -15,6 +15,7 @@ from ..utils.index_data import IndexData, SingleData
from ..utils.resam import resam_ts_data, ts_data_last
from ..log import get_module_logger
from ..utils.time import is_single_value
import qlib.utils.index_data as idd
class BaseQuote:
@@ -61,7 +62,9 @@ class BaseQuote:
this function is used for three case:
1. method is not None. It returns int/float/bool.
1. method is not None. It returns int/float/bool/None.
- It will return None in one case, the method return None
print(get_data(stock_id="SH600000", start_time="2010-01-04", end_time="2010-01-06", field="$close", method="last"))
85.713585
@@ -87,8 +90,9 @@ class BaseQuote:
Return
----------
Union[None, int, float, bool, IndexData]
None means there is no stock data from data source.
please refer to Example as following.
it will return None in following cases
- There is no stock data which meet the query criterion from data source.
- The `method` returns None
"""
raise NotImplementedError(f"Please implement the `get_data` method")
@@ -112,7 +116,7 @@ class PandasQuote(BaseQuote):
elif isinstance(stock_data, (bool, np.bool_, int, float, np.number)):
return stock_data
elif isinstance(stock_data, pd.Series):
return IndexData.Series(stock_data)
return idd.SingleData(stock_data)
else:
raise ValueError(f"stock data from resam_ts_data must be a number, pd.Series or pd.DataFrame")
@@ -130,7 +134,8 @@ class CN1minNumpyQuote(BaseQuote):
super().__init__(quote_df=quote_df)
quote_dict = {}
for stock_id, stock_val in quote_df.groupby(level="instrument"):
quote_dict[stock_id] = IndexData.DataFrame(stock_val.droplevel(level="instrument"))
quote_dict[stock_id] = idd.MultiData(stock_val.droplevel(level="instrument"))
quote_dict[stock_id].sort_index() # To support more flexible slicing, we must sort data first
self.data = quote_dict
self.freq = pd.Timedelta(minutes=1)
@@ -145,32 +150,22 @@ class CN1minNumpyQuote(BaseQuote):
# single data
# If it don't consider the classification of single data, it will consume a lot of time.
if is_single_value(start_time, end_time, self.freq):
now_index_map = self.data[stock_id].index_map
now_columns_map = self.data[stock_id].columns_map
if start_time not in now_index_map:
if is_single_value(start_time, end_time, self.freq) and method is not None:
# this is a very special case.
# skip aggregating function to speed-up the query calculation
try:
self.data[stock_id].loc[start_time, field]
except KeyError:
return None
else:
return self.data[stock_id].values[now_index_map[start_time], now_columns_map[field]]
# multi data
else:
if method is None:
stock_data = self.data[stock_id].loc(start_time, end_time, field)
if stock_data.empty:
return None
else:
return stock_data
else:
stock_data = self.data[stock_id].loc(start_time, end_time, field)
if stock_data.empty:
return None
elif len(stock_data) == 1:
return stock_data[0]
else:
return self._agg_data(stock_data.values, method)
data = self.data[stock_id].loc[start_time:end_time, field]
if data.empty:
return None
if method is not None:
data = self._agg_data(data, method)
return data
def _agg_data(self, data, method):
def _agg_data(self, data: IndexData, method):
"""Agg data by specific method."""
if method == "sum":
return np.nansum(data)
@@ -183,11 +178,11 @@ class CN1minNumpyQuote(BaseQuote):
elif method == "any":
return data.any()
elif method == ts_data_last:
valid_data = data[data != np.NaN]
valid_data = data.loc[~data.isna().data.astype(bool)]
if len(valid_data) == 0:
return None
else:
return valid_data[0]
return valid_data.iloc[-1]
else:
raise ValueError(f"{method} is not supported")
@@ -259,9 +254,6 @@ class BaseSingleMetric:
def abs(self) -> "BaseSingleMetric":
raise NotImplementedError(f"Please implement the `abs` method")
def astype(self, dtype: type) -> "BaseSingleMetric":
raise NotImplementedError(f"Please implement the `astype` method")
@property
def empty(self) -> bool:
"""If metric is empty, return True."""
@@ -332,7 +324,7 @@ class BaseOrderIndicator:
the kwargs of func will be replaced with metric data by name in this function.
e.g.
def func(pa):
return (pa > 0).astype(int).sum() / pa.count()
return (pa > 0).sum() / pa.count()
new_col : str, optional
New metric will be assigned in the data if new_col is not None, by default None.
@@ -513,9 +505,6 @@ class PandasSingleMetric(SingleMetric):
def abs(self):
return self.__class__(self.metric.abs())
def astype(self, dtype):
return self.__class__(self.metric.astype(dtype))
@property
def empty(self):
return self.metric.empty
@@ -552,9 +541,9 @@ class PandasOrderIndicator(BaseOrderIndicator):
def get_index_data(self, metric):
if metric in self.data:
return IndexData.Series(self.data[metric].metric)
return idd.SingleData(self.data[metric].metric)
else:
return IndexData.Series()
return idd.SingleData()
def get_metric_series(self, metric: str) -> Union[pd.Series]:
if metric in self.data:
@@ -579,7 +568,7 @@ class PandasOrderIndicator(BaseOrderIndicator):
class NumpyOrderIndicator(BaseOrderIndicator):
"""
The data structure is OrderedDict(str: SingleData).
Each IndexData.Series is one metric.
Each idd.SingleData is one metric.
Str is the name of metric.
"""
@@ -587,13 +576,13 @@ class NumpyOrderIndicator(BaseOrderIndicator):
self.data: Dict[str, SingleData] = OrderedDict()
def assign(self, col: str, metric: dict):
self.data[col] = IndexData.Series(metric)
self.data[col] = idd.SingleData(metric)
def get_index_data(self, metric):
if metric in self.data:
return self.data[metric]
else:
return IndexData.Series()
return idd.SingleData()
def get_metric_series(self, metric: str) -> Union[pd.Series]:
return self.data[metric].to_series()
@@ -609,7 +598,7 @@ class NumpyOrderIndicator(BaseOrderIndicator):
if isinstance(metrics, str):
metrics = [metrics]
for metric in metrics:
tmp_metric = IndexData.Series()
tmp_metric = IndexData.SingleData()
for indicator in indicators:
tmp_metric = tmp_metric.add(indicator.data[metric], fill_value)
order_indicator.data[metric] = tmp_metric

View File

@@ -12,10 +12,10 @@ import pandas as pd
from qlib.backtest.exchange import Exchange
from qlib.backtest.order import BaseTradeDecision, Order, OrderDir
from .high_performance_ds import PandasOrderIndicator, NumpyOrderIndicator, SingleMetric
from ..utils.index_data import IndexData, SingleData
from ..tests.config import CSI300_BENCH
from ..utils.resam import get_higher_eq_freq_feature, resam_ts_data
from .order import IdxTradeRange
import qlib.utils.index_data as idd
class Report:
@@ -386,8 +386,8 @@ class Indicator:
return None, None
if isinstance(price_s, (int, float, np.number)):
price_s = IndexData.Series(price_s, [trade_start_time])
elif isinstance(price_s, SingleData):
price_s = idd.SingleData(price_s, [trade_start_time])
elif isinstance(price_s, idd.SingleData):
pass
else:
raise NotImplementedError(f"This type of input is not supported")
@@ -401,10 +401,10 @@ class Indicator:
if agg == "vwap":
volume_s = trade_exchange.get_volume(inst, trade_start_time, trade_end_time, method=None)
if isinstance(volume_s, (int, float, np.number)):
volume_s = IndexData.Series(volume_s, [trade_start_time])
volume_s = idd.SingleData(volume_s, [trade_start_time])
volume_s = volume_s.reindex(price_s.index)
elif agg == "twap":
volume_s = IndexData.Series(1, price_s.index)
volume_s = idd.SingleData(1, price_s.index)
else:
raise NotImplementedError(f"This type of input is not supported")
@@ -414,7 +414,7 @@ class Indicator:
def _agg_base_price(
self,
inner_order_indicators: List[Dict[str, Union[SingleMetric, SingleData]]],
inner_order_indicators: List[Dict[str, Union[SingleMetric, idd.SingleData]]],
decision_list: List[Tuple[BaseTradeDecision, pd.Timestamp, pd.Timestamp]],
trade_exchange: Exchange,
pa_config: dict = {},
@@ -467,12 +467,12 @@ class Indicator:
else:
bp_new[inst], bv_new[inst] = pr, v
bp_new = IndexData.Series(bp_new)
bv_new = IndexData.Series(bv_new)
bp_new = idd.SingleData(bp_new)
bv_new = idd.SingleData(bv_new)
bp_all.append(bp_new)
bv_all.append(bv_new)
bp_all = IndexData.concat(bp_all, axis=1)
bv_all = IndexData.concat(bv_all, axis=1)
bp_all = idd.concat(bp_all, axis=1)
bv_all = idd.concat(bv_all, axis=1)
base_volume = bv_all.sum(axis=1)
self.order_indicator.assign("base_volume", base_volume.to_dict())
@@ -550,7 +550,7 @@ class Indicator:
def _cal_trade_positive_rate(self):
def func(pa):
return (pa > 0).astype(int).sum() / pa.count()
return (pa > 0).sum() / pa.count()
return self.order_indicator.transfer(func)

View File

@@ -1405,7 +1405,7 @@ class Corr(PairRolling):
super(Corr, self).__init__(feature_left, feature_right, N, "corr")
def _load_internal(self, instrument, start_index, end_index, freq):
res = super(Corr, self)._load_internal(instrument, start_index, end_index, freq)
res: pd.Series = super(Corr, self)._load_internal(instrument, start_index, end_index, freq)
# NOTE: Load uses MemCache, so calling load again will not cause performance degradation
series_left = self.feature_left.load(instrument, start_index, end_index, freq)

View File

@@ -1,178 +1,334 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""
Motivation of index_data
- Pandas has a lot of user-friendly interfaces. However, integrating too much features in a single tool bring to much overhead and makes it much slower than numpy.
Some users just want a simple numpy dataframe with indices and don't want such a complicated tools.
Such users are the target of `index_data`
`index_data` try to behave like pandas (some API will be different because we try to be simpler and more intuitive) but don't compromize the performance. It provides the basic numpy data and simple indexing feature. If users call APIs which may compromize the performance, index_data will raise Errors.
"""
from typing import Union, Callable
from typing import Tuple, Union, Callable, List
import bisect
import numpy as np
import pandas as pd
class IndexData:
"""This is a simplified version of pandas which is faster based on numpy."""
def concat(data_list: Union["SingleData"], axis=0) -> "MultiData":
"""concat all SingleData by index.
TODO: now just for SingleData.
@staticmethod
def Series(
data: Union[dict, pd.Series, int, float, np.floating, list, np.ndarray] = [], index: Union[list, pd.Index] = []
):
if isinstance(data, dict):
return SingleData(list(data.values()), list(data.keys()))
elif isinstance(data, pd.Series):
return SingleData(data.values, data.index)
Parameters
----------
index_data_list : List[SingleData]
the list of all SingleData to concat.
Returns
-------
MultiData
the MultiData with ndim == 2
"""
if axis == 0:
raise NotImplementedError(f"please implement this func when axis == 0")
elif axis == 1:
# get all index and row
all_index = set()
for index_data in data_list:
all_index = all_index | set(index_data.index)
all_index = list(all_index)
all_index.sort()
all_index_map = dict(zip(all_index, range(len(all_index))))
# concat all
tmp_data = np.full((len(all_index), len(data_list)), np.NaN)
for data_id, index_data in enumerate(data_list):
assert isinstance(index_data, SingleData)
now_data_map = [all_index_map[index] for index in index_data.index]
tmp_data[now_data_map, data_id] = index_data.data
return MultiData(tmp_data, all_index)
else:
raise ValueError(f"axis must be 0 or 1")
class Index:
"""
This is for indexing(rows or columns)
Read-only operations has higher priorities than others.
So this class is designed in a **read-only** way to shared data for queries.
Modifications will results in new Index.
NOTE: the indexing has following flaws
- duplicated index value is not well supported (only the first appearance will be considered)
- The order of the index is not considered!!!! So the slicing will not behave like pandas when indexings are ordered
"""
def __init__(self, idx_list: Union[List, pd.Index, "Index", int]):
self.idx_list: np.ndarray = None # using array type for index list will make things easier
if isinstance(idx_list, Index):
# Fast read-only copy
self.idx_list = idx_list.idx_list
self.index_map = idx_list.index_map
self._is_sorted = idx_list._is_sorted
elif isinstance(idx_list, int):
self.index_map = self.idx_list = np.arange(idx_list)
self._is_sorted = True
else:
return SingleData(data, index)
self.idx_list = np.array(idx_list)
# NOTE: only the first appearance is indexed
self.index_map = dict(zip(self.idx_list, range(len(self))))
self._is_sorted = False
@staticmethod
def DataFrame(
data: Union[pd.DataFrame, list, np.ndarray] = [[]],
index: Union[list, pd.Index] = [],
columns: Union[list, pd.Index] = [],
):
if isinstance(data, pd.DataFrame):
return MultiData(data.values, data.index, data.columns)
else:
return MultiData(data, index, columns)
def __getitem__(self, i: int):
return self.idx_list[i]
@staticmethod
def concat(data_list: Union["SingleData"], axis=0) -> "MultiData":
"""concat all SingleData by index.
TODO: now just for SingleData.
def index(self, item) -> int:
"""
Given the index value, get the integer index
Parameters
----------
index_data_list : List[SingleData]
the list of all SingleData to concat.
"""
return self.index_map[item]
def __eq__(self, other: "Index"):
# NOTE: np.nan is not supported in the index
return (self.idx_list == other.idx_list).all()
def __len__(self):
return len(self.idx_list)
def is_sorted(self):
return self._is_sorted
def sort(self) -> Tuple["Index", np.ndarray]:
"""
sort the index
Returns
-------
MultiData
the MultiData with ndim == 2
Tuple["Index", np.ndarray]:
the sorted Index and the changed index
"""
if axis == 0:
raise NotImplementedError(f"please implement this func when axis == 0")
elif axis == 1:
# get all index and row
all_index = set()
for index_data in data_list:
all_index = all_index | set(index_data.index)
all_index = list(all_index)
all_index.sort()
all_index_map = dict(zip(all_index, range(len(all_index))))
sorted_idx = np.argsort(self.idx_list)
idx = Index(self.idx_list[sorted_idx])
idx._is_sorted = True
return idx, sorted_idx
# concat all
tmp_data = np.full((len(all_index), len(data_list)), np.NaN)
for data_id, index_data in enumerate(data_list):
assert isinstance(index_data, SingleData)
now_data_map = [all_index_map[index] for index in index_data.index]
tmp_data[now_data_map, data_id] = index_data.data
return MultiData(tmp_data, all_index)
class LocIndexer:
"""
`Indexer` will behave like the `LocIndexer` in Pandas
Read-only operations has higher priorities than others.
So this class is designed in a read-only way to shared data for queries.
Modifications will results in new Index.
"""
def __init__(self, index_data: "IndexData", indices: List[Index], int_loc: bool = False):
self._indices: List[Index] = indices
self._bind_id = index_data # bind index data
self._int_loc = int_loc
assert self._bind_id.data.ndim == len(self._indices)
@staticmethod
def proc_idx_l(indices: List[Union[List, pd.Index, Index]], data_shape: Tuple = None) -> List[Index]:
""" process the indices from user and output a list of `Index` """
res = []
for i, idx in enumerate(indices):
res.append(Index(data_shape[i] if len(idx) == 0 else idx))
return res
def _slc_convert(self, index: Index, indexing: slice) -> slice:
"""
convert value-based indexing to integer-based indexing.
Parameters
----------
index : Index
index data.
indexing : slice
value based indexing data with slice type for indexing.
Returns
-------
slice:
the integer based slicing
"""
if index.is_sorted():
int_start = None if indexing.start is None else bisect.bisect_left(index, indexing.start)
int_stop = None if indexing.stop is None else bisect.bisect_right(index, indexing.stop)
else:
raise ValueError(f"axis must be 0 or 1")
int_start = None if indexing.start is None else index.index(indexing.start)
int_stop = None if indexing.stop is None else index.index(indexing.stop) + 1
return slice(int_start, int_stop)
def __getitem__(self, indexing):
"""
Parameters
----------
indexing :
query for data
Raises
------
KeyError:
If the non-slice index is queried but does not exist, `KeyError` is raised.
"""
# 1) convert slices to int loc
if not isinstance(indexing, tuple):
# NOTE: tuple is not supported for indexing
indexing = (indexing, )
# TODO: create a subclass for single value query
assert len(indexing) <= len(self._indices)
int_indexing = []
for dim, index in enumerate(self._indices):
if dim < len(indexing):
_indexing = indexing[dim]
if not self._int_loc: # type converting is only necessary when it is not `iloc`
if isinstance(_indexing, slice):
_indexing = self._slc_convert(index, _indexing)
elif isinstance(_indexing, (IndexData, np.ndarray)):
if isinstance(_indexing, IndexData):
_indexing = _indexing.data
assert _indexing.ndim == 1
if _indexing.dtype != np.bool:
_indexing = np.array(list(index.index(i) for i in _indexing))
else:
_indexing = index.index(_indexing)
else:
_indexing = slice(None)
int_indexing.append(_indexing)
# 2) select data and index
new_data = self._bind_id.data[tuple(int_indexing)]
new_indices = [idx[indexing] for idx, indexing in zip(self._indices, int_indexing)]
# 3) squash dimensions
new_indices = [idx for idx in new_indices if isinstance(idx, np.ndarray) and idx.ndim > 0] # squash the zero dim indexing
if new_data.ndim == 0:
return new_data
else:
if new_data.ndim == 1:
cls = SingleData
elif new_data.ndim == 2:
cls = MultiData
else:
raise ValueError("Not supported")
return cls(new_data, *new_indices)
class BaseData:
"""Base data structure of SingleData and MultiData."""
class IndexData:
"""
Base data structure of SingleData and MultiData.
def __init__(self):
self.index_columns = self._get_index_columns()
NOTE:
- For performance issue, only **np.floating** is supported in the underlayer data !!!
- Boolean based on np.floating is also supported. Here are some examples
def _get_index_columns(self):
index_columns = []
if hasattr(self, "index"):
index_columns.append(self.index)
if hasattr(self, "columns"):
index_columns.append(self.columns)
return index_columns
.. code-block:: python
def _align_index(self, other):
np.array([ np.nan]).any() -> True
np.array([ np.nan]).all() -> True
np.array([1. , 0.]).any() -> True
np.array([1. , 0.]).all() -> False
"""
loc_idx_cls = LocIndexer
def __init__(self, data: np.ndarray, *indices: Union[List, pd.Index, Index]):
self.data = data
self.indices = indices
# get the expected data shape
# - The index has higher priority
self.data = np.array(data)
expected_dim = max(self.data.ndim, len(indices))
data_shape = []
for i in range(expected_dim):
idx_l = indices[i] if len(indices) > i else []
if len(idx_l) == 0:
data_shape.append(self.data.shape[i])
else:
data_shape.append(len(idx_l))
data_shape = tuple(data_shape)
# broadcast the data to expected shape
self.data = np.broadcast_to(self.data, data_shape)
self.data = self.data.astype(np.float64)
# Please notice following cases when converting the type
# - np.array([None, 1]).astype(np.float64) -> array([nan, 1.])
# create index from user's index data.
self.indices: List[Index] = self.loc_idx_cls.proc_idx_l(indices, data_shape)
for dim in range(expected_dim):
assert self.data.shape[dim] == len(self.indices[dim])
self.ndim = expected_dim
# indexing related methods
@property
def loc(self):
return self.loc_idx_cls(index_data=self, indices=self.indices)
@property
def iloc(self):
return self.loc_idx_cls(index_data=self, indices=self.indices, int_loc=True)
@property
def index(self):
return self.indices[0]
@property
def columns(self):
return self.indices[1]
def _align_indices(self, other):
"""Align index before performing the four arithmetic operations."""
raise NotImplementedError(f"please implement _align_index func")
raise NotImplementedError(f"please implement _align_indices func")
def __add__(self, other):
if isinstance(other, (int, float, np.number)):
return self.__class__(self.data + other, *self.index_columns)
elif isinstance(other, self.__class__):
tmp_data1, tmp_data2 = self._align_index(other)
return self.__class__(tmp_data1.data + tmp_data2.data, *tmp_data1.index_columns)
else:
return NotImplemented
def sort_index(self, axis=0, inplace=True):
assert inplace, "Only support sorting inplace now"
self.indices[axis], sorted_idx = self.indices[axis].sort()
self.data = np.take(self.data, sorted_idx, axis=axis)
def __sub__(self, other):
if isinstance(other, (int, float, np.number)):
return self.__class__(self.data - other, *self.index_columns)
elif isinstance(other, self.__class__):
tmp_data1, tmp_data2 = self._align_index(other)
return self.__class__(tmp_data1.data - tmp_data2.data, *tmp_data1.index_columns)
else:
return NotImplemented
# calculation related methods
def __getattribute__(self, attr_name: str):
# 1) use a unified operation for the basic operation
def __rsub__(self, other):
if isinstance(other, (int, float, np.number)):
return self.__class__(other - self.data, *self.index_columns)
elif isinstance(other, self.__class__):
tmp_data1, tmp_data2 = self._align_index(other)
return self.__class__(tmp_data2.data - tmp_data1.data, *tmp_data1.index_columns)
else:
return NotImplemented
def _basic_binary_ops(other):
self_data_method = getattr(self.data, attr_name)
def __mul__(self, other):
if isinstance(other, (int, float, np.number)):
return self.__class__(self.data * other, *self.index_columns)
elif isinstance(other, self.__class__):
tmp_data1, tmp_data2 = self._align_index(other)
return self.__class__(tmp_data1.data * tmp_data2.data, *tmp_data1.index_columns)
else:
return NotImplemented
if isinstance(other, (int, float, np.number)):
return self.__class__(self_data_method(other))
elif isinstance(other, self.__class__):
# TODO: bad interface
tmp_data1, tmp_data2 = self._align_indices(other)
return self.__class__(self_data_method(tmp_data2.data), *self.indices)
else:
return NotImplemented
def __truediv__(self, other):
if isinstance(other, (int, float, np.number)):
return self.__class__(self.data / other, *self.index_columns)
elif isinstance(other, self.__class__):
tmp_data1, tmp_data2 = self._align_index(other)
return self.__class__(tmp_data1.data / tmp_data2.data, *tmp_data1.index_columns)
else:
return NotImplemented
if attr_name in {"__add__", "__sub__", "__rsub__", "__mul__", "__truediv__", "__eq__", "__gt__", "__lt__"}:
return _basic_binary_ops
def __eq__(self, other):
if isinstance(other, (int, float, np.number)):
return self.__class__(self.data == other, *self.index_columns)
elif isinstance(other, self.__class__):
tmp_data1, tmp_data2 = self._align_index(other)
return self.__class__(tmp_data1.data == tmp_data2.data, *tmp_data1.index_columns)
else:
return NotImplemented
def __gt__(self, other):
if isinstance(other, (int, float, np.number)):
return self.__class__(self.data > other, *self.index_columns)
elif isinstance(other, self.__class__):
tmp_data1, tmp_data2 = self._align_index(other)
return self.__class__(tmp_data1.data > tmp_data2.data, *tmp_data1.index_columns)
else:
return NotImplemented
def __lt__(self, other):
if isinstance(other, (int, float, np.number)):
return self.__class__(self.data < other, *self.index_columns)
elif isinstance(other, self.__class__):
tmp_data1, tmp_data2 = self._align_index(other)
return self.__class__(tmp_data1.data < tmp_data2.data, *tmp_data1.index_columns)
else:
return NotImplemented
# 2) otherwise, follow the default behavior
return super().__getattribute__(attr_name)
# The code below could be simpler like methods in __getattribute__
def __invert__(self):
return self.__class__(~self.data, *self.index_columns)
return self.__class__(~self.data.astype(np.bool), *self.indices)
def abs(self):
"""get the abs of data except np.NaN."""
tmp_data = np.absolute(self.data)
return self.__class__(tmp_data, *self.index_columns)
def astype(self, dtype):
"""change the type of data."""
tmp_data = self.data.astype(dtype)
return self.__class__(tmp_data, *self.index_columns)
return self.__class__(tmp_data, *self.indices)
def replace(self, to_replace: dict):
assert isinstance(to_replace, dict)
@@ -180,12 +336,12 @@ class BaseData:
for num in to_replace:
if num in tmp_data:
tmp_data[tmp_data == num] = to_replace[num]
return self.__class__(tmp_data, *self.index_columns)
return self.__class__(tmp_data, *self.indices)
def apply(self, func: Callable):
"""apply a function to data."""
tmp_data = func(self.data)
return self.__class__(tmp_data, *self.index_columns)
return self.__class__(tmp_data, *self.indices)
def __len__(self):
"""the length of the data.
@@ -221,6 +377,9 @@ class BaseData:
else:
raise ValueError(f"axis must be None, 0 or 1")
def isna(self):
return self.__class__(np.isnan(self.data), *self.indices)
def count(self):
return len(self.data[~np.isnan(self.data)])
@@ -233,60 +392,37 @@ class BaseData:
return self.data
class SingleData(BaseData):
def __init__(self, data: Union[int, float, np.number, list] = [], index: Union[list, pd.Index] = []):
class SingleData(IndexData):
def __init__(self, data: Union[int, float, np.number, list, dict, pd.Series] = [], index: Union[List, pd.Index, Index] = []):
"""A data structure of index and numpy data.
It's used to replace pd.Series due to high-speed.
Parameters
----------
data : Union[int, float, np.floating, list, np.ndarray]
the dim of data must be 1.
data : Union[int, float, np.number, list, dict, pd.Series]
the input data
index : Union[list, pd.Index]
the index of data.
empty list indicates that auto filling the index to the length of data
"""
# data
if isinstance(data, (int, float, np.floating)):
self.data = np.full(len(index), fill_value=data, dtype=np.float64)
elif isinstance(data, list):
self.data = np.array(data)
elif isinstance(data, np.ndarray):
self.data = data
else:
raise ValueError(f"data must be list or np.ndarray")
# data in SingleData must be one dim
assert self.data.ndim == 1
# replace int with float
if self.data.dtype == np.signedinteger:
self.data = self.data.astype(np.float64)
# replace None with np.NaN, because pd.Series does it.
if None in self.data:
self.data[self.data == None] = np.NaN
# for special data type
if isinstance(data, dict):
assert len(index) == 0
index, data = zip(*data.items())
elif isinstance(data, pd.Series):
assert len(index) == 0
index, data = data.index, data.values
super().__init__(data, index)
assert self.ndim == 1
# index
if isinstance(index, list):
if index == [] and len(self.data) > 0:
index = list(range(len(self.data)))
self.index = index
elif isinstance(index, pd.Index):
self.index = list(index)
else:
raise ValueError(f"index must be list or pd.Index")
assert len(self.data) == len(self.index)
# if data is not empty,
self.index_map = dict(zip(self.index, range(len(self.index))))
super(SingleData, self).__init__()
def _align_index(self, other):
def _align_indices(self, other):
if self.index == other.index:
return self, other
elif set(self.index) == set(other.index):
return self, other.reindex(self.index)
else:
raise ValueError(
f"The indexes of self and other do not meet the requirements of the four arithmetic operations"
)
f"The indexes of self and other do not meet the requirements of the four arithmetic operations")
def reindex(self, index, fill_value=np.NaN):
"""reindex data and fill the missing value with np.NaN.
@@ -301,6 +437,7 @@ class SingleData(BaseData):
SingleData
reindex data
"""
# TODO: This method can be more general
if self.index == index:
return self
tmp_data = np.full(len(index), fill_value, dtype=np.float64)
@@ -310,6 +447,7 @@ class SingleData(BaseData):
return SingleData(tmp_data, index)
def add(self, other, fill_value=0):
# TODO: add and __add__ are a little confusing.
common_index = list(set(self.index) | set(other.index))
tmp_data1 = self.reindex(common_index, fill_value)
tmp_data2 = other.reindex(common_index, fill_value)
@@ -328,26 +466,15 @@ class SingleData(BaseData):
def to_series(self):
return pd.Series(self.data, index=self.index)
def __getitem__(self, index: Union["SingleData", int, str]):
if isinstance(index, int):
return self.data[index]
elif isinstance(index, str):
return self.data[self.index_map[index]]
elif isinstance(index, SingleData):
new_data = self.data[index.data]
new_index = list(np.array(self.index)[index.data])
return SingleData(new_data, new_index)
else:
raise ValueError(f"index must be SingleData, int, str")
def __repr__(self) -> str:
return str(pd.Series(self.data, index=self.index))
class MultiData(BaseData):
def __init__(
self,
data: Union[list, np.ndarray] = [[]],
index: Union[list, pd.Index] = [],
columns: Union[list, pd.Index] = [],
):
class MultiData(IndexData):
def __init__(self,
data: Union[int, float, np.number, list] = [],
index: Union[List, pd.Index, Index] = [],
columns: Union[List, pd.Index, Index] = []):
"""A data structure of index and numpy data.
It's used to replace pd.DataFrame due to high-speed.
@@ -355,73 +482,22 @@ class MultiData(BaseData):
----------
data : Union[list, np.ndarray]
the dim of data must be 2.
index : Union[list, pd.Index]
index : Union[List, pd.Index, Index]
the index of data.
columns: Union[list, pd.Index]
columns: Union[List, pd.Index, Index]
the columns of data.
"""
# data
if isinstance(data, list):
self.data = np.array(data)
elif isinstance(data, np.ndarray):
self.data = data
else:
raise ValueError(f"data must be list or np.ndarray")
# data in SingleData must be two dim
assert self.data.ndim == 2
# replace int with float
if self.data.dtype == np.signedinteger:
self.data = self.data.astype(np.float64)
# replace None with np.NaN, because pd.DataFrame does it.
if None in self.data:
self.data[self.data == None] = np.NaN
if isinstance(data, pd.DataFrame):
index, columns, data = data.index, data.columns, data.values
super().__init__(data, index, columns)
assert self.ndim == 2
# index
if isinstance(index, list):
if index == [] and self.data.shape[0] > 0:
index = list(range(self.data.shape[0]))
self.index = index
elif isinstance(index, pd.Index):
self.index = list(index)
else:
raise ValueError(f"index must be list or pd.Index")
assert self.data.shape[0] == len(self.index)
# if data is not empty,
self.index_map = dict(zip(self.index, range(len(self.index))))
# columns
if isinstance(columns, list):
if columns == [] and self.data.shape[1] > 0:
columns = list(range(self.data.shape[1]))
self.columns = columns
elif isinstance(columns, pd.Index):
self.columns = list(columns)
else:
raise ValueError(f"columns must be list or pd.Index")
assert self.data.shape[1] == len(self.columns)
# if data is not empty,
self.columns_map = dict(zip(self.columns, range(len(self.columns))))
super(MultiData, self).__init__()
def _align_index(self, other):
def _align_indices(self, other):
if self.index_columns == other.index_columns:
return self, other
else:
raise ValueError(
f"The indexes of self and other do not meet the requirements of the four arithmetic operations"
)
f"The indexes of self and other do not meet the requirements of the four arithmetic operations")
def __getitem__(self, col) -> SingleData:
if col not in self.columns:
return SingleData()
else:
return SingleData(self.data[:, self.columns_map[col]], self.index)
def loc(self, start, end, col=None):
start_id = bisect.bisect_left(self.index, start)
end_id = bisect.bisect_right(self.index, end)
if col is None:
return MultiData(self.data[start_id:end_id], self.index[start_id:end_id], self.columns)
else:
return SingleData(self.data[start_id:end_id, self.columns_map[col]], self.index[start_id:end_id])
def __repr__(self) -> str:
return str(pd.DataFrame(self.data, index=self.index, columns=self.columns))

View File

@@ -296,5 +296,5 @@ def _ts_data_valid(ts_feature, last=False):
raise TypeError(f"ts_feature should be pd.DataFrame/Series, not {type(ts_feature)}")
ts_data_last = partial(_ts_data_valid, last=False)
ts_data_first = partial(_ts_data_valid, last=True)
ts_data_last = partial(_ts_data_valid, last=True)
ts_data_first = partial(_ts_data_valid, last=False)

View File

@@ -0,0 +1,87 @@
import numpy as np
import pandas as pd
import qlib.utils.index_data as idd
import unittest
class IndexDataTest(unittest.TestCase):
def test_index_single_data(self):
# Auto broadcast for scalar
sd = idd.SingleData(0, index=["foo", "bar"])
print(sd)
# Support empty value
sd = idd.SingleData()
print(sd)
# Bad case: the input is not aligned
with self.assertRaises(ValueError):
idd.SingleData(range(10), index=["foo", "bar"])
# test indexing
sd = idd.SingleData([1, 2, 3, 4], index=["foo", "bar", "f", "g"])
print(sd)
print(sd.iloc[1]) # get second row
# Bad case: it is not in the index
with self.assertRaises(KeyError):
print(sd.loc[1])
print(sd.loc["foo"])
# Test slicing
print(sd.loc[:"bar"])
print(sd.iloc[:3])
def test_index_multi_data(self):
# Auto broadcast for scalar
sd = idd.MultiData(0, index=["foo", "bar"], columns=["f", "g"])
print(sd)
# Bad case: the input is not aligned
with self.assertRaises(ValueError):
idd.MultiData(range(10), index=["foo", "bar"], columns=["f", "g"])
# test indexing
sd = idd.MultiData(np.arange(4).reshape(2, 2), index=["foo", "bar"], columns=["f", "g"])
print(sd)
print(sd.iloc[1]) # get second row
# Bad case: it is not in the index
with self.assertRaises(KeyError):
print(sd.loc[1])
print(sd.loc["foo"])
# Test slicing
print(sd.loc[:"foo"])
print(sd.loc[:, "g":])
def test_sorting(self):
sd = idd.MultiData(np.arange(4).reshape(2, 2), index=["foo", "bar"], columns=["f", "g"])
print(sd)
sd.sort_index()
print(sd)
print(sd.loc[:"c"])
def test_corner_cases(self):
sd = idd.MultiData([[1, 2], [3, np.NaN]], index=["foo", "bar"], columns=["f", "g"])
print(sd)
self.assertTrue(np.isnan(sd.loc["bar", "g"]))
# support slicing
print(sd.loc[~sd.loc[:, "g"].isna().data.astype(np.bool)])
if __name__ == "__main__":
unittest.main()