From d39c8de800aa846eb313530b63c43bc066b12d74 Mon Sep 17 00:00:00 2001 From: Young Date: Tue, 31 Aug 2021 02:33:44 +0000 Subject: [PATCH] draft design --- qlib/backtest/__init__.py | 2 +- qlib/backtest/exchange.py | 2 +- qlib/backtest/high_performance_ds.py | 75 ++-- qlib/backtest/report.py | 22 +- qlib/data/ops.py | 2 +- qlib/utils/index_data.py | 588 +++++++++++++++------------ qlib/utils/resam.py | 4 +- tests/misc/test_index_data.py | 87 ++++ 8 files changed, 467 insertions(+), 315 deletions(-) create mode 100644 tests/misc/test_index_data.py diff --git a/qlib/backtest/__init__.py b/qlib/backtest/__init__.py index b4a46614e..d4a19eb25 100644 --- a/qlib/backtest/__init__.py +++ b/qlib/backtest/__init__.py @@ -20,7 +20,7 @@ from ..utils import init_instance_by_config from ..log import get_module_logger from ..config import C -# make import more user-friendly by enable `from qlib.backtest import STH` +# make import more user-friendly by adding `from qlib.backtest import STH` logger = get_module_logger("backtest caller") diff --git a/qlib/backtest/exchange.py b/qlib/backtest/exchange.py index 347f33790..c55513d8b 100644 --- a/qlib/backtest/exchange.py +++ b/qlib/backtest/exchange.py @@ -424,7 +424,7 @@ class Exchange: else: raise NotImplementedError(f"This type of input is not supported") deal_price = self.quote.get_data(stock_id, start_time, end_time, field=pstr, method=method) - if method is not None and (np.isclose(deal_price, 0.0) or np.isnan(deal_price)): + if method is not None and (deal_price is None or np.isclose(deal_price, 0.0) or np.isnan(deal_price)): self.logger.warning(f"(stock_id:{stock_id}, trade_time:{(start_time, end_time)}, {pstr}): {deal_price}!!!") self.logger.warning(f"setting deal_price to close price") deal_price = self.get_close(stock_id, start_time, end_time, method) diff --git a/qlib/backtest/high_performance_ds.py b/qlib/backtest/high_performance_ds.py index d125fadc8..b94c6a279 100644 --- a/qlib/backtest/high_performance_ds.py +++ b/qlib/backtest/high_performance_ds.py @@ -15,6 +15,7 @@ from ..utils.index_data import IndexData, SingleData from ..utils.resam import resam_ts_data, ts_data_last from ..log import get_module_logger from ..utils.time import is_single_value +import qlib.utils.index_data as idd class BaseQuote: @@ -61,7 +62,9 @@ class BaseQuote: this function is used for three case: - 1. method is not None. It returns int/float/bool. + 1. method is not None. It returns int/float/bool/None. + - It will return None in one case, the method return None + print(get_data(stock_id="SH600000", start_time="2010-01-04", end_time="2010-01-06", field="$close", method="last")) 85.713585 @@ -87,8 +90,9 @@ class BaseQuote: Return ---------- Union[None, int, float, bool, IndexData] - None means there is no stock data from data source. - please refer to Example as following. + it will return None in following cases + - There is no stock data which meet the query criterion from data source. + - The `method` returns None """ raise NotImplementedError(f"Please implement the `get_data` method") @@ -112,7 +116,7 @@ class PandasQuote(BaseQuote): elif isinstance(stock_data, (bool, np.bool_, int, float, np.number)): return stock_data elif isinstance(stock_data, pd.Series): - return IndexData.Series(stock_data) + return idd.SingleData(stock_data) else: raise ValueError(f"stock data from resam_ts_data must be a number, pd.Series or pd.DataFrame") @@ -130,7 +134,8 @@ class CN1minNumpyQuote(BaseQuote): super().__init__(quote_df=quote_df) quote_dict = {} for stock_id, stock_val in quote_df.groupby(level="instrument"): - quote_dict[stock_id] = IndexData.DataFrame(stock_val.droplevel(level="instrument")) + quote_dict[stock_id] = idd.MultiData(stock_val.droplevel(level="instrument")) + quote_dict[stock_id].sort_index() # To support more flexible slicing, we must sort data first self.data = quote_dict self.freq = pd.Timedelta(minutes=1) @@ -145,32 +150,22 @@ class CN1minNumpyQuote(BaseQuote): # single data # If it don't consider the classification of single data, it will consume a lot of time. - if is_single_value(start_time, end_time, self.freq): - now_index_map = self.data[stock_id].index_map - now_columns_map = self.data[stock_id].columns_map - if start_time not in now_index_map: + if is_single_value(start_time, end_time, self.freq) and method is not None: + # this is a very special case. + # skip aggregating function to speed-up the query calculation + try: + self.data[stock_id].loc[start_time, field] + except KeyError: return None - else: - return self.data[stock_id].values[now_index_map[start_time], now_columns_map[field]] - - # multi data else: - if method is None: - stock_data = self.data[stock_id].loc(start_time, end_time, field) - if stock_data.empty: - return None - else: - return stock_data - else: - stock_data = self.data[stock_id].loc(start_time, end_time, field) - if stock_data.empty: - return None - elif len(stock_data) == 1: - return stock_data[0] - else: - return self._agg_data(stock_data.values, method) + data = self.data[stock_id].loc[start_time:end_time, field] + if data.empty: + return None + if method is not None: + data = self._agg_data(data, method) + return data - def _agg_data(self, data, method): + def _agg_data(self, data: IndexData, method): """Agg data by specific method.""" if method == "sum": return np.nansum(data) @@ -183,11 +178,11 @@ class CN1minNumpyQuote(BaseQuote): elif method == "any": return data.any() elif method == ts_data_last: - valid_data = data[data != np.NaN] + valid_data = data.loc[~data.isna().data.astype(bool)] if len(valid_data) == 0: return None else: - return valid_data[0] + return valid_data.iloc[-1] else: raise ValueError(f"{method} is not supported") @@ -259,9 +254,6 @@ class BaseSingleMetric: def abs(self) -> "BaseSingleMetric": raise NotImplementedError(f"Please implement the `abs` method") - def astype(self, dtype: type) -> "BaseSingleMetric": - raise NotImplementedError(f"Please implement the `astype` method") - @property def empty(self) -> bool: """If metric is empty, return True.""" @@ -332,7 +324,7 @@ class BaseOrderIndicator: the kwargs of func will be replaced with metric data by name in this function. e.g. def func(pa): - return (pa > 0).astype(int).sum() / pa.count() + return (pa > 0).sum() / pa.count() new_col : str, optional New metric will be assigned in the data if new_col is not None, by default None. @@ -513,9 +505,6 @@ class PandasSingleMetric(SingleMetric): def abs(self): return self.__class__(self.metric.abs()) - def astype(self, dtype): - return self.__class__(self.metric.astype(dtype)) - @property def empty(self): return self.metric.empty @@ -552,9 +541,9 @@ class PandasOrderIndicator(BaseOrderIndicator): def get_index_data(self, metric): if metric in self.data: - return IndexData.Series(self.data[metric].metric) + return idd.SingleData(self.data[metric].metric) else: - return IndexData.Series() + return idd.SingleData() def get_metric_series(self, metric: str) -> Union[pd.Series]: if metric in self.data: @@ -579,7 +568,7 @@ class PandasOrderIndicator(BaseOrderIndicator): class NumpyOrderIndicator(BaseOrderIndicator): """ The data structure is OrderedDict(str: SingleData). - Each IndexData.Series is one metric. + Each idd.SingleData is one metric. Str is the name of metric. """ @@ -587,13 +576,13 @@ class NumpyOrderIndicator(BaseOrderIndicator): self.data: Dict[str, SingleData] = OrderedDict() def assign(self, col: str, metric: dict): - self.data[col] = IndexData.Series(metric) + self.data[col] = idd.SingleData(metric) def get_index_data(self, metric): if metric in self.data: return self.data[metric] else: - return IndexData.Series() + return idd.SingleData() def get_metric_series(self, metric: str) -> Union[pd.Series]: return self.data[metric].to_series() @@ -609,7 +598,7 @@ class NumpyOrderIndicator(BaseOrderIndicator): if isinstance(metrics, str): metrics = [metrics] for metric in metrics: - tmp_metric = IndexData.Series() + tmp_metric = IndexData.SingleData() for indicator in indicators: tmp_metric = tmp_metric.add(indicator.data[metric], fill_value) order_indicator.data[metric] = tmp_metric diff --git a/qlib/backtest/report.py b/qlib/backtest/report.py index e29921da6..0dfc92582 100644 --- a/qlib/backtest/report.py +++ b/qlib/backtest/report.py @@ -12,10 +12,10 @@ import pandas as pd from qlib.backtest.exchange import Exchange from qlib.backtest.order import BaseTradeDecision, Order, OrderDir from .high_performance_ds import PandasOrderIndicator, NumpyOrderIndicator, SingleMetric -from ..utils.index_data import IndexData, SingleData from ..tests.config import CSI300_BENCH from ..utils.resam import get_higher_eq_freq_feature, resam_ts_data from .order import IdxTradeRange +import qlib.utils.index_data as idd class Report: @@ -386,8 +386,8 @@ class Indicator: return None, None if isinstance(price_s, (int, float, np.number)): - price_s = IndexData.Series(price_s, [trade_start_time]) - elif isinstance(price_s, SingleData): + price_s = idd.SingleData(price_s, [trade_start_time]) + elif isinstance(price_s, idd.SingleData): pass else: raise NotImplementedError(f"This type of input is not supported") @@ -401,10 +401,10 @@ class Indicator: if agg == "vwap": volume_s = trade_exchange.get_volume(inst, trade_start_time, trade_end_time, method=None) if isinstance(volume_s, (int, float, np.number)): - volume_s = IndexData.Series(volume_s, [trade_start_time]) + volume_s = idd.SingleData(volume_s, [trade_start_time]) volume_s = volume_s.reindex(price_s.index) elif agg == "twap": - volume_s = IndexData.Series(1, price_s.index) + volume_s = idd.SingleData(1, price_s.index) else: raise NotImplementedError(f"This type of input is not supported") @@ -414,7 +414,7 @@ class Indicator: def _agg_base_price( self, - inner_order_indicators: List[Dict[str, Union[SingleMetric, SingleData]]], + inner_order_indicators: List[Dict[str, Union[SingleMetric, idd.SingleData]]], decision_list: List[Tuple[BaseTradeDecision, pd.Timestamp, pd.Timestamp]], trade_exchange: Exchange, pa_config: dict = {}, @@ -467,12 +467,12 @@ class Indicator: else: bp_new[inst], bv_new[inst] = pr, v - bp_new = IndexData.Series(bp_new) - bv_new = IndexData.Series(bv_new) + bp_new = idd.SingleData(bp_new) + bv_new = idd.SingleData(bv_new) bp_all.append(bp_new) bv_all.append(bv_new) - bp_all = IndexData.concat(bp_all, axis=1) - bv_all = IndexData.concat(bv_all, axis=1) + bp_all = idd.concat(bp_all, axis=1) + bv_all = idd.concat(bv_all, axis=1) base_volume = bv_all.sum(axis=1) self.order_indicator.assign("base_volume", base_volume.to_dict()) @@ -550,7 +550,7 @@ class Indicator: def _cal_trade_positive_rate(self): def func(pa): - return (pa > 0).astype(int).sum() / pa.count() + return (pa > 0).sum() / pa.count() return self.order_indicator.transfer(func) diff --git a/qlib/data/ops.py b/qlib/data/ops.py index e044533d3..bd8832b49 100644 --- a/qlib/data/ops.py +++ b/qlib/data/ops.py @@ -1405,7 +1405,7 @@ class Corr(PairRolling): super(Corr, self).__init__(feature_left, feature_right, N, "corr") def _load_internal(self, instrument, start_index, end_index, freq): - res = super(Corr, self)._load_internal(instrument, start_index, end_index, freq) + res: pd.Series = super(Corr, self)._load_internal(instrument, start_index, end_index, freq) # NOTE: Load uses MemCache, so calling load again will not cause performance degradation series_left = self.feature_left.load(instrument, start_index, end_index, freq) diff --git a/qlib/utils/index_data.py b/qlib/utils/index_data.py index 8a92cfcf5..78cd32b50 100644 --- a/qlib/utils/index_data.py +++ b/qlib/utils/index_data.py @@ -1,178 +1,334 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +""" +Motivation of index_data +- Pandas has a lot of user-friendly interfaces. However, integrating too much features in a single tool bring to much overhead and makes it much slower than numpy. + Some users just want a simple numpy dataframe with indices and don't want such a complicated tools. + Such users are the target of `index_data` +`index_data` try to behave like pandas (some API will be different because we try to be simpler and more intuitive) but don't compromize the performance. It provides the basic numpy data and simple indexing feature. If users call APIs which may compromize the performance, index_data will raise Errors. +""" -from typing import Union, Callable +from typing import Tuple, Union, Callable, List import bisect import numpy as np import pandas as pd -class IndexData: - """This is a simplified version of pandas which is faster based on numpy.""" +def concat(data_list: Union["SingleData"], axis=0) -> "MultiData": + """concat all SingleData by index. + TODO: now just for SingleData. - @staticmethod - def Series( - data: Union[dict, pd.Series, int, float, np.floating, list, np.ndarray] = [], index: Union[list, pd.Index] = [] - ): - if isinstance(data, dict): - return SingleData(list(data.values()), list(data.keys())) - elif isinstance(data, pd.Series): - return SingleData(data.values, data.index) + Parameters + ---------- + index_data_list : List[SingleData] + the list of all SingleData to concat. + + Returns + ------- + MultiData + the MultiData with ndim == 2 + """ + if axis == 0: + raise NotImplementedError(f"please implement this func when axis == 0") + elif axis == 1: + # get all index and row + all_index = set() + for index_data in data_list: + all_index = all_index | set(index_data.index) + all_index = list(all_index) + all_index.sort() + all_index_map = dict(zip(all_index, range(len(all_index)))) + + # concat all + tmp_data = np.full((len(all_index), len(data_list)), np.NaN) + for data_id, index_data in enumerate(data_list): + assert isinstance(index_data, SingleData) + now_data_map = [all_index_map[index] for index in index_data.index] + tmp_data[now_data_map, data_id] = index_data.data + return MultiData(tmp_data, all_index) + else: + raise ValueError(f"axis must be 0 or 1") + + +class Index: + """ + This is for indexing(rows or columns) + + Read-only operations has higher priorities than others. + So this class is designed in a **read-only** way to shared data for queries. + Modifications will results in new Index. + + NOTE: the indexing has following flaws + - duplicated index value is not well supported (only the first appearance will be considered) + - The order of the index is not considered!!!! So the slicing will not behave like pandas when indexings are ordered + """ + def __init__(self, idx_list: Union[List, pd.Index, "Index", int]): + self.idx_list: np.ndarray = None # using array type for index list will make things easier + if isinstance(idx_list, Index): + # Fast read-only copy + self.idx_list = idx_list.idx_list + self.index_map = idx_list.index_map + self._is_sorted = idx_list._is_sorted + elif isinstance(idx_list, int): + self.index_map = self.idx_list = np.arange(idx_list) + self._is_sorted = True else: - return SingleData(data, index) + self.idx_list = np.array(idx_list) + # NOTE: only the first appearance is indexed + self.index_map = dict(zip(self.idx_list, range(len(self)))) + self._is_sorted = False - @staticmethod - def DataFrame( - data: Union[pd.DataFrame, list, np.ndarray] = [[]], - index: Union[list, pd.Index] = [], - columns: Union[list, pd.Index] = [], - ): - if isinstance(data, pd.DataFrame): - return MultiData(data.values, data.index, data.columns) - else: - return MultiData(data, index, columns) + def __getitem__(self, i: int): + return self.idx_list[i] - @staticmethod - def concat(data_list: Union["SingleData"], axis=0) -> "MultiData": - """concat all SingleData by index. - TODO: now just for SingleData. + def index(self, item) -> int: + """ + Given the index value, get the integer index - Parameters - ---------- - index_data_list : List[SingleData] - the list of all SingleData to concat. + """ + return self.index_map[item] + + def __eq__(self, other: "Index"): + # NOTE: np.nan is not supported in the index + return (self.idx_list == other.idx_list).all() + + def __len__(self): + return len(self.idx_list) + + def is_sorted(self): + return self._is_sorted + + def sort(self) -> Tuple["Index", np.ndarray]: + """ + sort the index Returns ------- - MultiData - the MultiData with ndim == 2 + Tuple["Index", np.ndarray]: + the sorted Index and the changed index """ - if axis == 0: - raise NotImplementedError(f"please implement this func when axis == 0") - elif axis == 1: - # get all index and row - all_index = set() - for index_data in data_list: - all_index = all_index | set(index_data.index) - all_index = list(all_index) - all_index.sort() - all_index_map = dict(zip(all_index, range(len(all_index)))) + sorted_idx = np.argsort(self.idx_list) + idx = Index(self.idx_list[sorted_idx]) + idx._is_sorted = True + return idx, sorted_idx - # concat all - tmp_data = np.full((len(all_index), len(data_list)), np.NaN) - for data_id, index_data in enumerate(data_list): - assert isinstance(index_data, SingleData) - now_data_map = [all_index_map[index] for index in index_data.index] - tmp_data[now_data_map, data_id] = index_data.data - return MultiData(tmp_data, all_index) + + +class LocIndexer: + """ + `Indexer` will behave like the `LocIndexer` in Pandas + + Read-only operations has higher priorities than others. + So this class is designed in a read-only way to shared data for queries. + Modifications will results in new Index. + """ + def __init__(self, index_data: "IndexData", indices: List[Index], int_loc: bool = False): + self._indices: List[Index] = indices + self._bind_id = index_data # bind index data + self._int_loc = int_loc + assert self._bind_id.data.ndim == len(self._indices) + + @staticmethod + def proc_idx_l(indices: List[Union[List, pd.Index, Index]], data_shape: Tuple = None) -> List[Index]: + """ process the indices from user and output a list of `Index` """ + res = [] + for i, idx in enumerate(indices): + res.append(Index(data_shape[i] if len(idx) == 0 else idx)) + return res + + def _slc_convert(self, index: Index, indexing: slice) -> slice: + """ + convert value-based indexing to integer-based indexing. + + Parameters + ---------- + index : Index + index data. + indexing : slice + value based indexing data with slice type for indexing. + + Returns + ------- + slice: + the integer based slicing + """ + if index.is_sorted(): + int_start = None if indexing.start is None else bisect.bisect_left(index, indexing.start) + int_stop = None if indexing.stop is None else bisect.bisect_right(index, indexing.stop) else: - raise ValueError(f"axis must be 0 or 1") + int_start = None if indexing.start is None else index.index(indexing.start) + int_stop = None if indexing.stop is None else index.index(indexing.stop) + 1 + return slice(int_start, int_stop) + + def __getitem__(self, indexing): + """ + + Parameters + ---------- + indexing : + query for data + + Raises + ------ + KeyError: + If the non-slice index is queried but does not exist, `KeyError` is raised. + """ + # 1) convert slices to int loc + if not isinstance(indexing, tuple): + # NOTE: tuple is not supported for indexing + indexing = (indexing, ) + + # TODO: create a subclass for single value query + assert len(indexing) <= len(self._indices) + + int_indexing = [] + for dim, index in enumerate(self._indices): + if dim < len(indexing): + _indexing = indexing[dim] + if not self._int_loc: # type converting is only necessary when it is not `iloc` + if isinstance(_indexing, slice): + _indexing = self._slc_convert(index, _indexing) + elif isinstance(_indexing, (IndexData, np.ndarray)): + if isinstance(_indexing, IndexData): + _indexing = _indexing.data + assert _indexing.ndim == 1 + if _indexing.dtype != np.bool: + _indexing = np.array(list(index.index(i) for i in _indexing)) + else: + _indexing = index.index(_indexing) + else: + _indexing = slice(None) + int_indexing.append(_indexing) + + # 2) select data and index + new_data = self._bind_id.data[tuple(int_indexing)] + new_indices = [idx[indexing] for idx, indexing in zip(self._indices, int_indexing)] + + # 3) squash dimensions + new_indices = [idx for idx in new_indices if isinstance(idx, np.ndarray) and idx.ndim > 0] # squash the zero dim indexing + + if new_data.ndim == 0: + return new_data + else: + if new_data.ndim == 1: + cls = SingleData + elif new_data.ndim == 2: + cls = MultiData + else: + raise ValueError("Not supported") + return cls(new_data, *new_indices) -class BaseData: - """Base data structure of SingleData and MultiData.""" +class IndexData: + """ + Base data structure of SingleData and MultiData. - def __init__(self): - self.index_columns = self._get_index_columns() + NOTE: + - For performance issue, only **np.floating** is supported in the underlayer data !!! + - Boolean based on np.floating is also supported. Here are some examples - def _get_index_columns(self): - index_columns = [] - if hasattr(self, "index"): - index_columns.append(self.index) - if hasattr(self, "columns"): - index_columns.append(self.columns) - return index_columns + .. code-block:: python - def _align_index(self, other): + np.array([ np.nan]).any() -> True + np.array([ np.nan]).all() -> True + np.array([1. , 0.]).any() -> True + np.array([1. , 0.]).all() -> False + """ + + loc_idx_cls = LocIndexer + def __init__(self, data: np.ndarray, *indices: Union[List, pd.Index, Index]): + + self.data = data + self.indices = indices + + # get the expected data shape + # - The index has higher priority + self.data = np.array(data) + + expected_dim = max(self.data.ndim, len(indices)) + + data_shape = [] + for i in range(expected_dim): + idx_l = indices[i] if len(indices) > i else [] + if len(idx_l) == 0: + data_shape.append(self.data.shape[i]) + else: + data_shape.append(len(idx_l)) + data_shape = tuple(data_shape) + + # broadcast the data to expected shape + self.data = np.broadcast_to(self.data, data_shape) + + self.data = self.data.astype(np.float64) + # Please notice following cases when converting the type + # - np.array([None, 1]).astype(np.float64) -> array([nan, 1.]) + + # create index from user's index data. + self.indices: List[Index] = self.loc_idx_cls.proc_idx_l(indices, data_shape) + + for dim in range(expected_dim): + assert self.data.shape[dim] == len(self.indices[dim]) + + self.ndim = expected_dim + + # indexing related methods + @property + def loc(self): + return self.loc_idx_cls(index_data=self, indices=self.indices) + + @property + def iloc(self): + return self.loc_idx_cls(index_data=self, indices=self.indices, int_loc=True) + + @property + def index(self): + return self.indices[0] + + @property + def columns(self): + return self.indices[1] + + def _align_indices(self, other): """Align index before performing the four arithmetic operations.""" - raise NotImplementedError(f"please implement _align_index func") + raise NotImplementedError(f"please implement _align_indices func") - def __add__(self, other): - if isinstance(other, (int, float, np.number)): - return self.__class__(self.data + other, *self.index_columns) - elif isinstance(other, self.__class__): - tmp_data1, tmp_data2 = self._align_index(other) - return self.__class__(tmp_data1.data + tmp_data2.data, *tmp_data1.index_columns) - else: - return NotImplemented + def sort_index(self, axis=0, inplace=True): + assert inplace, "Only support sorting inplace now" + self.indices[axis], sorted_idx = self.indices[axis].sort() + self.data = np.take(self.data, sorted_idx, axis=axis) - def __sub__(self, other): - if isinstance(other, (int, float, np.number)): - return self.__class__(self.data - other, *self.index_columns) - elif isinstance(other, self.__class__): - tmp_data1, tmp_data2 = self._align_index(other) - return self.__class__(tmp_data1.data - tmp_data2.data, *tmp_data1.index_columns) - else: - return NotImplemented + # calculation related methods + def __getattribute__(self, attr_name: str): + # 1) use a unified operation for the basic operation - def __rsub__(self, other): - if isinstance(other, (int, float, np.number)): - return self.__class__(other - self.data, *self.index_columns) - elif isinstance(other, self.__class__): - tmp_data1, tmp_data2 = self._align_index(other) - return self.__class__(tmp_data2.data - tmp_data1.data, *tmp_data1.index_columns) - else: - return NotImplemented + def _basic_binary_ops(other): + self_data_method = getattr(self.data, attr_name) - def __mul__(self, other): - if isinstance(other, (int, float, np.number)): - return self.__class__(self.data * other, *self.index_columns) - elif isinstance(other, self.__class__): - tmp_data1, tmp_data2 = self._align_index(other) - return self.__class__(tmp_data1.data * tmp_data2.data, *tmp_data1.index_columns) - else: - return NotImplemented + if isinstance(other, (int, float, np.number)): + return self.__class__(self_data_method(other)) + elif isinstance(other, self.__class__): + # TODO: bad interface + tmp_data1, tmp_data2 = self._align_indices(other) + return self.__class__(self_data_method(tmp_data2.data), *self.indices) + else: + return NotImplemented - def __truediv__(self, other): - if isinstance(other, (int, float, np.number)): - return self.__class__(self.data / other, *self.index_columns) - elif isinstance(other, self.__class__): - tmp_data1, tmp_data2 = self._align_index(other) - return self.__class__(tmp_data1.data / tmp_data2.data, *tmp_data1.index_columns) - else: - return NotImplemented + if attr_name in {"__add__", "__sub__", "__rsub__", "__mul__", "__truediv__", "__eq__", "__gt__", "__lt__"}: + return _basic_binary_ops - def __eq__(self, other): - if isinstance(other, (int, float, np.number)): - return self.__class__(self.data == other, *self.index_columns) - elif isinstance(other, self.__class__): - tmp_data1, tmp_data2 = self._align_index(other) - return self.__class__(tmp_data1.data == tmp_data2.data, *tmp_data1.index_columns) - else: - return NotImplemented - - def __gt__(self, other): - if isinstance(other, (int, float, np.number)): - return self.__class__(self.data > other, *self.index_columns) - elif isinstance(other, self.__class__): - tmp_data1, tmp_data2 = self._align_index(other) - return self.__class__(tmp_data1.data > tmp_data2.data, *tmp_data1.index_columns) - else: - return NotImplemented - - def __lt__(self, other): - if isinstance(other, (int, float, np.number)): - return self.__class__(self.data < other, *self.index_columns) - elif isinstance(other, self.__class__): - tmp_data1, tmp_data2 = self._align_index(other) - return self.__class__(tmp_data1.data < tmp_data2.data, *tmp_data1.index_columns) - else: - return NotImplemented + # 2) otherwise, follow the default behavior + return super().__getattribute__(attr_name) + # The code below could be simpler like methods in __getattribute__ def __invert__(self): - return self.__class__(~self.data, *self.index_columns) + return self.__class__(~self.data.astype(np.bool), *self.indices) def abs(self): """get the abs of data except np.NaN.""" tmp_data = np.absolute(self.data) - return self.__class__(tmp_data, *self.index_columns) - - def astype(self, dtype): - """change the type of data.""" - tmp_data = self.data.astype(dtype) - return self.__class__(tmp_data, *self.index_columns) + return self.__class__(tmp_data, *self.indices) def replace(self, to_replace: dict): assert isinstance(to_replace, dict) @@ -180,12 +336,12 @@ class BaseData: for num in to_replace: if num in tmp_data: tmp_data[tmp_data == num] = to_replace[num] - return self.__class__(tmp_data, *self.index_columns) + return self.__class__(tmp_data, *self.indices) def apply(self, func: Callable): """apply a function to data.""" tmp_data = func(self.data) - return self.__class__(tmp_data, *self.index_columns) + return self.__class__(tmp_data, *self.indices) def __len__(self): """the length of the data. @@ -221,6 +377,9 @@ class BaseData: else: raise ValueError(f"axis must be None, 0 or 1") + def isna(self): + return self.__class__(np.isnan(self.data), *self.indices) + def count(self): return len(self.data[~np.isnan(self.data)]) @@ -233,60 +392,37 @@ class BaseData: return self.data -class SingleData(BaseData): - def __init__(self, data: Union[int, float, np.number, list] = [], index: Union[list, pd.Index] = []): +class SingleData(IndexData): + def __init__(self, data: Union[int, float, np.number, list, dict, pd.Series] = [], index: Union[List, pd.Index, Index] = []): """A data structure of index and numpy data. It's used to replace pd.Series due to high-speed. Parameters ---------- - data : Union[int, float, np.floating, list, np.ndarray] - the dim of data must be 1. + data : Union[int, float, np.number, list, dict, pd.Series] + the input data index : Union[list, pd.Index] the index of data. + empty list indicates that auto filling the index to the length of data """ - # data - if isinstance(data, (int, float, np.floating)): - self.data = np.full(len(index), fill_value=data, dtype=np.float64) - elif isinstance(data, list): - self.data = np.array(data) - elif isinstance(data, np.ndarray): - self.data = data - else: - raise ValueError(f"data must be list or np.ndarray") - # data in SingleData must be one dim - assert self.data.ndim == 1 - # replace int with float - if self.data.dtype == np.signedinteger: - self.data = self.data.astype(np.float64) - # replace None with np.NaN, because pd.Series does it. - if None in self.data: - self.data[self.data == None] = np.NaN + # for special data type + if isinstance(data, dict): + assert len(index) == 0 + index, data = zip(*data.items()) + elif isinstance(data, pd.Series): + assert len(index) == 0 + index, data = data.index, data.values + super().__init__(data, index) + assert self.ndim == 1 - # index - if isinstance(index, list): - if index == [] and len(self.data) > 0: - index = list(range(len(self.data))) - self.index = index - elif isinstance(index, pd.Index): - self.index = list(index) - else: - raise ValueError(f"index must be list or pd.Index") - assert len(self.data) == len(self.index) - # if data is not empty, - self.index_map = dict(zip(self.index, range(len(self.index)))) - - super(SingleData, self).__init__() - - def _align_index(self, other): + def _align_indices(self, other): if self.index == other.index: return self, other elif set(self.index) == set(other.index): return self, other.reindex(self.index) else: raise ValueError( - f"The indexes of self and other do not meet the requirements of the four arithmetic operations" - ) + f"The indexes of self and other do not meet the requirements of the four arithmetic operations") def reindex(self, index, fill_value=np.NaN): """reindex data and fill the missing value with np.NaN. @@ -301,6 +437,7 @@ class SingleData(BaseData): SingleData reindex data """ + # TODO: This method can be more general if self.index == index: return self tmp_data = np.full(len(index), fill_value, dtype=np.float64) @@ -310,6 +447,7 @@ class SingleData(BaseData): return SingleData(tmp_data, index) def add(self, other, fill_value=0): + # TODO: add and __add__ are a little confusing. common_index = list(set(self.index) | set(other.index)) tmp_data1 = self.reindex(common_index, fill_value) tmp_data2 = other.reindex(common_index, fill_value) @@ -328,26 +466,15 @@ class SingleData(BaseData): def to_series(self): return pd.Series(self.data, index=self.index) - def __getitem__(self, index: Union["SingleData", int, str]): - if isinstance(index, int): - return self.data[index] - elif isinstance(index, str): - return self.data[self.index_map[index]] - elif isinstance(index, SingleData): - new_data = self.data[index.data] - new_index = list(np.array(self.index)[index.data]) - return SingleData(new_data, new_index) - else: - raise ValueError(f"index must be SingleData, int, str") + def __repr__(self) -> str: + return str(pd.Series(self.data, index=self.index)) -class MultiData(BaseData): - def __init__( - self, - data: Union[list, np.ndarray] = [[]], - index: Union[list, pd.Index] = [], - columns: Union[list, pd.Index] = [], - ): +class MultiData(IndexData): + def __init__(self, + data: Union[int, float, np.number, list] = [], + index: Union[List, pd.Index, Index] = [], + columns: Union[List, pd.Index, Index] = []): """A data structure of index and numpy data. It's used to replace pd.DataFrame due to high-speed. @@ -355,73 +482,22 @@ class MultiData(BaseData): ---------- data : Union[list, np.ndarray] the dim of data must be 2. - index : Union[list, pd.Index] + index : Union[List, pd.Index, Index] the index of data. - columns: Union[list, pd.Index] + columns: Union[List, pd.Index, Index] the columns of data. """ - # data - if isinstance(data, list): - self.data = np.array(data) - elif isinstance(data, np.ndarray): - self.data = data - else: - raise ValueError(f"data must be list or np.ndarray") - # data in SingleData must be two dim - assert self.data.ndim == 2 - # replace int with float - if self.data.dtype == np.signedinteger: - self.data = self.data.astype(np.float64) - # replace None with np.NaN, because pd.DataFrame does it. - if None in self.data: - self.data[self.data == None] = np.NaN + if isinstance(data, pd.DataFrame): + index, columns, data = data.index, data.columns, data.values + super().__init__(data, index, columns) + assert self.ndim == 2 - # index - if isinstance(index, list): - if index == [] and self.data.shape[0] > 0: - index = list(range(self.data.shape[0])) - self.index = index - elif isinstance(index, pd.Index): - self.index = list(index) - else: - raise ValueError(f"index must be list or pd.Index") - assert self.data.shape[0] == len(self.index) - # if data is not empty, - self.index_map = dict(zip(self.index, range(len(self.index)))) - - # columns - if isinstance(columns, list): - if columns == [] and self.data.shape[1] > 0: - columns = list(range(self.data.shape[1])) - self.columns = columns - elif isinstance(columns, pd.Index): - self.columns = list(columns) - else: - raise ValueError(f"columns must be list or pd.Index") - assert self.data.shape[1] == len(self.columns) - # if data is not empty, - self.columns_map = dict(zip(self.columns, range(len(self.columns)))) - - super(MultiData, self).__init__() - - def _align_index(self, other): + def _align_indices(self, other): if self.index_columns == other.index_columns: return self, other else: raise ValueError( - f"The indexes of self and other do not meet the requirements of the four arithmetic operations" - ) + f"The indexes of self and other do not meet the requirements of the four arithmetic operations") - def __getitem__(self, col) -> SingleData: - if col not in self.columns: - return SingleData() - else: - return SingleData(self.data[:, self.columns_map[col]], self.index) - - def loc(self, start, end, col=None): - start_id = bisect.bisect_left(self.index, start) - end_id = bisect.bisect_right(self.index, end) - if col is None: - return MultiData(self.data[start_id:end_id], self.index[start_id:end_id], self.columns) - else: - return SingleData(self.data[start_id:end_id, self.columns_map[col]], self.index[start_id:end_id]) + def __repr__(self) -> str: + return str(pd.DataFrame(self.data, index=self.index, columns=self.columns)) diff --git a/qlib/utils/resam.py b/qlib/utils/resam.py index 9e9590e30..d67ed6421 100644 --- a/qlib/utils/resam.py +++ b/qlib/utils/resam.py @@ -296,5 +296,5 @@ def _ts_data_valid(ts_feature, last=False): raise TypeError(f"ts_feature should be pd.DataFrame/Series, not {type(ts_feature)}") -ts_data_last = partial(_ts_data_valid, last=False) -ts_data_first = partial(_ts_data_valid, last=True) +ts_data_last = partial(_ts_data_valid, last=True) +ts_data_first = partial(_ts_data_valid, last=False) diff --git a/tests/misc/test_index_data.py b/tests/misc/test_index_data.py new file mode 100644 index 000000000..af5b31132 --- /dev/null +++ b/tests/misc/test_index_data.py @@ -0,0 +1,87 @@ +import numpy as np +import pandas as pd + +import qlib.utils.index_data as idd + +import unittest + + +class IndexDataTest(unittest.TestCase): + def test_index_single_data(self): + # Auto broadcast for scalar + sd = idd.SingleData(0, index=["foo", "bar"]) + print(sd) + + # Support empty value + sd = idd.SingleData() + print(sd) + + # Bad case: the input is not aligned + with self.assertRaises(ValueError): + idd.SingleData(range(10), index=["foo", "bar"]) + + # test indexing + sd = idd.SingleData([1, 2, 3, 4], index=["foo", "bar", "f", "g"]) + print(sd) + print(sd.iloc[1]) # get second row + + # Bad case: it is not in the index + with self.assertRaises(KeyError): + print(sd.loc[1]) + + print(sd.loc["foo"]) + + # Test slicing + print(sd.loc[:"bar"]) + + print(sd.iloc[:3]) + + def test_index_multi_data(self): + # Auto broadcast for scalar + sd = idd.MultiData(0, index=["foo", "bar"], columns=["f", "g"]) + print(sd) + + # Bad case: the input is not aligned + with self.assertRaises(ValueError): + idd.MultiData(range(10), index=["foo", "bar"], columns=["f", "g"]) + + # test indexing + sd = idd.MultiData(np.arange(4).reshape(2, 2), index=["foo", "bar"], columns=["f", "g"]) + print(sd) + print(sd.iloc[1]) # get second row + + # Bad case: it is not in the index + with self.assertRaises(KeyError): + print(sd.loc[1]) + + print(sd.loc["foo"]) + + # Test slicing + + print(sd.loc[:"foo"]) + + print(sd.loc[:, "g":]) + + def test_sorting(self): + sd = idd.MultiData(np.arange(4).reshape(2, 2), index=["foo", "bar"], columns=["f", "g"]) + print(sd) + sd.sort_index() + + print(sd) + print(sd.loc[:"c"]) + + def test_corner_cases(self): + sd = idd.MultiData([[1, 2], [3, np.NaN]], index=["foo", "bar"], columns=["f", "g"]) + print(sd) + + self.assertTrue(np.isnan(sd.loc["bar", "g"])) + + + # support slicing + print(sd.loc[~sd.loc[:, "g"].isna().data.astype(np.bool)]) + + + + +if __name__ == "__main__": + unittest.main()