From 2c8a3ded08c249f18d8d3e71670c3b6c68ac79cb Mon Sep 17 00:00:00 2001 From: "wangwenxi.handsome" Date: Thu, 22 Jul 2021 15:20:03 +0000 Subject: [PATCH] high_performance_data_structure --- qlib/backtest/exchange.py | 106 +------ qlib/backtest/high_performance_ds.py | 414 +++++++++++++++++++++++++++ qlib/backtest/report.py | 277 +++--------------- 3 files changed, 453 insertions(+), 344 deletions(-) create mode 100644 qlib/backtest/high_performance_ds.py diff --git a/qlib/backtest/exchange.py b/qlib/backtest/exchange.py index 8d02e7893..edcd7baaf 100644 --- a/qlib/backtest/exchange.py +++ b/qlib/backtest/exchange.py @@ -4,7 +4,7 @@ import random import logging -from typing import List, Tuple, Union +from typing import List, Tuple, Union, Callable, Iterable import numpy as np import pandas as pd @@ -15,6 +15,7 @@ from ..config import C, REG_CN from ..utils.resam import resam_ts_data, ts_data_last from ..log import get_module_logger from .order import Order, OrderDir, OrderHelper +from .high_performane_ds import PandasQuote class Exchange: @@ -32,6 +33,7 @@ class Exchange: close_cost=0.0025, min_cost=5, extra_quote=None, + quote_cls=PandasQuote, **kwargs, ): """__init__ @@ -143,7 +145,8 @@ class Exchange: self.get_quote_from_qlib() # init quote by quote_df - self.quote = PandasQuote(self.quote_df) + self.quote_cls = quote_cls + self.quote = self.quote_cls(self.quote_df) def get_quote_from_qlib(self): # get stock data from qlib @@ -593,102 +596,3 @@ class Exchange: # cache to avoid recreate the same instance self._order_helper = OrderHelper(self) return self._order_helper - - -class BaseQuote: - def __init__(self, quote_df: pd.DataFrame): - self.logger = get_module_logger("online operator", level=logging.INFO) - - def get_all_stock(self): - """return all stock codes - - Return - ------ - Union[list, Dict.keys(), set, tuple] - all stock codes - """ - raise NotImplementedError(f"Please implement the `get_all_stock` method") - - def get_data( - self, - stock_id: Union[str, list], - start_time: Union[pd.Timestamp, str], - end_time: Union[pd.Timestamp, str], - fields: Union[str, list] = None, - method: Union[str, "Callable"] = None, - ): - """get the specific fields of stock data during start time and end_time, - and apply method to the data. - - Example: - .. code-block:: - $close $volume - instrument datetime - SH600000 2010-01-04 86.778313 16162960.0 - 2010-01-05 87.433578 28117442.0 - 2010-01-06 85.713585 23632884.0 - 2010-01-07 83.788803 20813402.0 - 2010-01-08 84.730675 16044853.0 - - SH600655 2010-01-04 2699.567383 158193.328125 - 2010-01-08 2612.359619 77501.406250 - 2010-01-11 2712.982422 160852.390625 - 2010-01-12 2788.688232 164587.937500 - 2010-01-13 2790.604004 145460.453125 - - print(get_data(stock_id=["SH600000", "SH600655"], start_time="2010-01-04", end_time="2010-01-05", fields=["$close", "$volume"], method="last")) - - $close $volume - instrument - SH600000 87.433578 28117442.0 - SH600655 2699.567383 158193.328125 - - print(get_data(stock_id="SH600000", start_time="2010-01-04", end_time="2010-01-05", fields=["$close", "$volume"], method="last")) - - $close 87.433578 - $volume 28117442.0 - - print(get_data(stock_id="SH600000", start_time="2010-01-04", end_time="2010-01-05", fields="$close", method="last")) - - 87.433578 - - Parameters - ---------- - stock_id: Union[str, list] - start_time : Union[pd.Timestamp, str] - closed start time for backtest - end_time : Union[pd.Timestamp, str] - closed end time for backtest - fields : Union[str, List] - the columns of data to fetch - method : Union[str, Callable] - the method apply to data. - e.g ["None", "last", "all", "sum", "mean", "any", qlib/utils/resam.py/ts_data_last] - - Return - ---------- - Union[None, float, pd.Series, pd.DataFrame] - The resampled DataFrame/Series/value, return None when the resampled data is empty. - """ - - raise NotImplementedError(f"Please implement the `get_data` method") - - -class PandasQuote(BaseQuote): - def __init__(self, quote_df: pd.DataFrame): - super().__init__(quote_df=quote_df) - quote_dict = {} - for stock_id, stock_val in quote_df.groupby(level="instrument"): - quote_dict[stock_id] = stock_val.droplevel(level="instrument") - self.data = quote_dict - - def get_all_stock(self): - return self.data.keys() - - def get_data(self, stock_id, start_time, end_time, fields=None, method=None): - if fields is None: - return resam_ts_data(self.data[stock_id], start_time, end_time, method=method) - elif isinstance(fields, (str, list)): - return resam_ts_data(self.data[stock_id][fields], start_time, end_time, method=method) - else: - raise ValueError(f"fields must be None, str or list") diff --git a/qlib/backtest/high_performance_ds.py b/qlib/backtest/high_performance_ds.py new file mode 100644 index 000000000..3e5a9d8e2 --- /dev/null +++ b/qlib/backtest/high_performance_ds.py @@ -0,0 +1,414 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +import logging +from typing import List, Tuple, Union, Callable, Iterable, Dict +from collections import OrderedDict + +import inspect +import pandas as pd + +from ..utils.resam import resam_ts_data +from ..log import get_module_logger + + +class BaseQuote: + def __init__(self, quote_df: pd.DataFrame): + self.logger = get_module_logger("online operator", level=logging.INFO) + + def get_all_stock(self) -> Iterable: + """return all stock codes + + Return + ------ + Iterable + all stock codes + """ + raise NotImplementedError(f"Please implement the `get_all_stock` method") + + def get_data( + self, + stock_id: Union[str, list], + start_time: Union[pd.Timestamp, str], + end_time: Union[pd.Timestamp, str], + fields: Union[str, list] = None, + method: Union[str, Callable] = None, + ) -> Union[None, float, pd.Series, pd.DataFrame]: + """get the specific fields of stock data during start time and end_time, + and apply method to the data. + + Example: + .. code-block:: + $close $volume + instrument datetime + SH600000 2010-01-04 86.778313 16162960.0 + 2010-01-05 87.433578 28117442.0 + 2010-01-06 85.713585 23632884.0 + 2010-01-07 83.788803 20813402.0 + 2010-01-08 84.730675 16044853.0 + + SH600655 2010-01-04 2699.567383 158193.328125 + 2010-01-08 2612.359619 77501.406250 + 2010-01-11 2712.982422 160852.390625 + 2010-01-12 2788.688232 164587.937500 + 2010-01-13 2790.604004 145460.453125 + + print(get_data(stock_id=["SH600000", "SH600655"], start_time="2010-01-04", end_time="2010-01-05", fields=["$close", "$volume"], method="last")) + + $close $volume + instrument + SH600000 87.433578 28117442.0 + SH600655 2699.567383 158193.328125 + + print(get_data(stock_id="SH600000", start_time="2010-01-04", end_time="2010-01-05", fields=["$close", "$volume"], method="last")) + + $close 87.433578 + $volume 28117442.0 + + print(get_data(stock_id="SH600000", start_time="2010-01-04", end_time="2010-01-05", fields="$close", method="last")) + + 87.433578 + + Parameters + ---------- + stock_id: Union[str, list] + start_time : Union[pd.Timestamp, str] + closed start time for backtest + end_time : Union[pd.Timestamp, str] + closed end time for backtest + fields : Union[str, List] + the columns of data to fetch + method : Union[str, Callable] + the method apply to data. + e.g ["None", "last", "all", "sum", "mean", "any", qlib/utils/resam.py/ts_data_last] + + Return + ---------- + Union[None, float, pd.Series, pd.DataFrame] + The resampled DataFrame/Series/value, return None when the resampled data is empty. + """ + + raise NotImplementedError(f"Please implement the `get_data` method") + + +class PandasQuote(BaseQuote): + def __init__(self, quote_df: pd.DataFrame): + super().__init__(quote_df=quote_df) + quote_dict = {} + for stock_id, stock_val in quote_df.groupby(level="instrument"): + quote_dict[stock_id] = stock_val.droplevel(level="instrument") + self.data = quote_dict + + def get_all_stock(self): + return self.data.keys() + + def get_data(self, stock_id, start_time, end_time, fields=None, method=None): + if fields is None: + return resam_ts_data(self.data[stock_id], start_time, end_time, method=method) + elif isinstance(fields, (str, list)): + return resam_ts_data(self.data[stock_id][fields], start_time, end_time, method=method) + else: + raise ValueError(f"fields must be None, str or list") + + +class BaseSingleMetric: + """ + The data structure of the single metric. + The following methods are used for computing metrics in one indicator. + """ + + def __init__(self, metric: Union[dict, pd.Series]): + pass + + def __add__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + pass + + def __radd__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + return self + other + + def __sub__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + pass + + def __rsub__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + pass + + def __mul__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + pass + + def __truediv__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + pass + + def __eq__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + pass + + def __gt__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + pass + + def __lt__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + pass + + def __len__(self) -> int: + pass + + def sum(self) -> float: + pass + + def mean(self) -> float: + pass + + def count(self) -> int: + pass + + def abs(self) -> "BaseSingleMetric": + pass + + def astype(self, type: type) -> "BaseSingleMetric": + pass + + @property + def empty(self) -> bool: + """If metric is empyt, return True.""" + pass + + def add(self, other: "BaseSingleMetric", fill_value: float = None) -> "BaseSingleMetric": + """Replace np.NaN with fill_value in two metrics and add them.""" + pass + + def apply(self, map_dict: dict) -> "BaseSingleMetric": + """Replace the value of metric according to map_dict.""" + pass + + +class BaseOrderIndicator: + """ + The data structure of order indicator. + !!!NOTE: There are two ways to organize the data structure. Please choose a better way. + 1. one way is use BaseSingleMetric to represent each metric. For example, the data + structure of PandasOrderIndicator is Dict[str: PandasSingleMetric]. It uses + PandasSingleMetric based on pd.Series to represent each metric. + 2. the another way doesn't BaseSingleMetric to represent each metric. The data + structure of PandasOrderIndicator is a whole matrix. + """ + + def assign(self, col: str, metric: Union[dict, pd.Series]): + """assign one metric. + + Parameters + ---------- + col : str + the metric name of one metric. + metric : Union[dict, pd.Series] + the metric data. + """ + + pass + + def transfer(self, func: Callable, new_col: str = None) -> Union[None, BaseSingleMetric]: + """compute new metric with existing metrics. + + Parameters + ---------- + func : Callable + the func of computing new metric. + the kwargs of func will be replaced with metric data by name in this function. + e.g. + def func(pa): + return (pa > 0).astype(int).sum() / pa.count() + new_col : str, optional + New metric will be assigned in the data if new_col is not None, by default None. + + Return + ---------- + BaseSingleMetric + new metric. + """ + + pass + + def get_metric_series(self, metric: str) -> pd.Series: + """return the single metric with pd.Series format. + + Parameters + ---------- + metric : str + the metric name. + + Return + ---------- + pd.Series + the single metric. + If there is no metric name in the data, return pd.Series(). + """ + + pass + + @staticmethod + def sum_all_indicators( + indicators: list, metrics: Union[str, List[str]], fill_value: float = None + ) -> Dict[str, BaseSingleMetric]: + """sum indicators with the same metrics. + + Parameters + ---------- + indicators : List[BaseOrderIndicator] + the list of all inner indicators. + metrics : Union[str, List[str]] + all metrics needs ot be sumed. + fill_value : float, optional + fill np.NaN with value. By default None. + + Return + ---------- + Dict[str: PandasSingleMetric] + a dict of metric name and data. + """ + + pass + + +class PandasSingleMetric: + """Each SingleMetric is based on pd.Series.""" + + def __init__(self, metric: Union[dict, pd.Series]): + if isinstance(metric, dict): + self.metric = pd.Series(metric) + elif isinstance(metric, pd.Series): + self.metric = metric + else: + raise ValueError(f"metric must be dict or pd.Series") + + def __add__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(self.metric + other) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(self.metric + other.metric) + else: + return NotImplemented + + def __sub__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(self.metric - other) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(self.metric - other.metric) + else: + return NotImplemented + + def __rsub__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(other - self.metric) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(other.metric - self.metric) + else: + return NotImplemented + + def __mul__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(self.metric * other) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(self.metric * other.metric) + else: + return NotImplemented + + def __truediv__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(self.metric / other) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(self.metric / other.metric) + else: + return NotImplemented + + def __eq__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(self.metric == other) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(self.metric == other.metric) + else: + return NotImplemented + + def __gt__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(self.metric < other) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(self.metric < other.metric) + else: + return NotImplemented + + def __lt__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(self.metric > other) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(self.metric > other.metric) + else: + return NotImplemented + + def __len__(self): + return len(self.metric) + + def sum(self): + return self.metric.sum() + + def mean(self): + return self.metric.mean() + + def count(self): + return self.metric.count() + + def abs(self): + return PandasSingleMetric(self.metric.abs()) + + def astype(self, type): + return PandasSingleMetric(self.metric.astype(type)) + + @property + def empty(self): + return self.metric.empty + + def add(self, other, fill_value=None): + return PandasSingleMetric(self.metric.add(other.metric, fill_value=fill_value)) + + def apply(self, map_dict: dict): + return PandasSingleMetric(self.metric.apply(map_dict)) + + +class PandasOrderIndicator(BaseOrderIndicator): + """ + The data structure is OrderedDict(str: PandasSingleMetric). + Each PandasSingleMetric based on pd.Series is one metric. + Str is the name of metric. + """ + + def __init__(self): + self.data: Dict[str, PandasSingleMetric] = OrderedDict() + + def assign(self, col: str, metric: Union[dict, pd.Series]): + self.data[col] = PandasSingleMetric(metric) + + def transfer(self, func: Callable, new_col: str = None) -> Union[None, PandasSingleMetric]: + func_sig = inspect.signature(func).parameters.keys() + func_kwargs = {sig: self.data[sig] for sig in func_sig} + tmp_metric = func(**func_kwargs) + if new_col is not None: + self.data[new_col] = tmp_metric + else: + return tmp_metric + + def get_metric_series(self, metric: str) -> Union[pd.Series]: + if metric in self.data: + return self.data[metric].metric + else: + return pd.Series() + + @staticmethod + def sum_all_indicators( + indicators: list, metrics: Union[str, List[str]], fill_value=None + ) -> Dict[str, PandasSingleMetric]: + metric_dict = {} + if isinstance(metrics, str): + metrics = [metrics] + for metric in metrics: + tmp_metric = PandasSingleMetric({}) + for indicator in indicators: + tmp_metric = tmp_metric.add(indicator.data[metric], fill_value) + metric_dict[metric] = tmp_metric.metric + return metric_dict diff --git a/qlib/backtest/report.py b/qlib/backtest/report.py index 1ae50f5e2..98d8b4f63 100644 --- a/qlib/backtest/report.py +++ b/qlib/backtest/report.py @@ -5,7 +5,7 @@ from collections import OrderedDict from logging import warning import pathlib -from typing import Dict, List, Tuple, Union +from typing import Dict, List, Tuple, Union, Callable import warnings import inspect @@ -18,6 +18,7 @@ from qlib.backtest.exchange import Exchange from qlib.backtest.order import BaseTradeDecision, Order, OrderDir from qlib.backtest.utils import TradeCalendarManager +from .high_performane_ds import PandasOrderIndicator from ..data import D from ..tests.config import CSI300_BENCH from ..utils.resam import get_higher_eq_freq_feature, resam_ts_data @@ -254,10 +255,12 @@ class Indicator: """ - def __init__(self): + def __init__(self, order_indicator_cls=PandasOrderIndicator): + self.order_indicator_cls = order_indicator_cls + # order indicator is metrics for a single order for a specific step self.order_indicator_his = OrderedDict() - self.order_indicator = PandasOrderIndicator() + self.order_indicator = self.order_indicator_cls() # trade indicator is metrics for all orders for a specific step self.trade_indicator_his = OrderedDict() @@ -267,7 +270,7 @@ class Indicator: # def reset(self, trade_calendar: TradeCalendarManager): def reset(self): - self.order_indicator = PandasOrderIndicator() + self.order_indicator = self.order_indicator_cls() self.trade_indicator = OrderedDict() # self._trade_calendar = trade_calendar @@ -291,6 +294,7 @@ class Indicator: trade_value[order.stock_id] = _trade_val * order.sign trade_cost[order.stock_id] = _trade_cost trade_dir[order.stock_id] = order.direction + # The PA in the innermost layer is meanless pa[order.stock_id] = 0 self.order_indicator.assign("amount", amount) @@ -306,32 +310,33 @@ class Indicator: def _update_order_fulfill_rate(self): def func(deal_amount, amount): return deal_amount / amount + self.order_indicator.transfer(func, "ffr") def update_order_indicators(self, trade_info: list): self._update_order_trade_info(trade_info=trade_info) self._update_order_fulfill_rate() - # self._update_order_price_advantage() def _agg_order_trade_info(self, inner_order_indicators: List[Dict[str, pd.Series]]): - all_metric = ["inner_amount", "deal_amount", "trade_price", - "trade_value", "trade_cost", "trade_dir"] - metric_dict = PandasOrderIndicator.agg_all_indicators(inner_order_indicators, all_metric, fill_value=0) + all_metric = ["inner_amount", "deal_amount", "trade_price", "trade_value", "trade_cost", "trade_dir"] + metric_dict = self.order_indicator_cls.sum_all_indicators(inner_order_indicators, all_metric, fill_value=0) for metric in metric_dict: self.order_indicator.assign(metric, metric_dict[metric]) def func(trade_price, deal_amount): return trade_price / deal_amount + self.order_indicator.transfer(func, "trade_price") def func_apply(trade_dir): return trade_dir.apply(Order.parse_dir) + self.order_indicator.transfer(func_apply, "trade_dir") def _update_trade_amount(self, outer_trade_decision: BaseTradeDecision): # NOTE: these indicator is designed for order execution, so the decision: List[Order] = outer_trade_decision.get_decision() - if decision is None: + if len(decision) == 0: self.order_indicator.assign("amount", {}) else: self.order_indicator.assign("amount", {order.stock_id: order.amount_delta for order in decision}) @@ -450,11 +455,14 @@ class Indicator: def _agg_order_price_advantage(self): def if_empty_func(trade_price): return trade_price.empty + if_empty = self.order_indicator.transfer(if_empty_func) if not if_empty: + def func(trade_dir, trade_price, base_price): sign = 1 - trade_dir * 2 return sign * (trade_price / base_price - 1) + self.order_indicator.transfer(func, "pa") else: self.order_indicator.assign("pa", {}) @@ -471,33 +479,45 @@ class Indicator: self._update_trade_amount(outer_trade_decision) self._update_order_fulfill_rate() pa_config = indicator_config.get("pa_config", {}) - self._agg_base_price(inner_order_indicators, decision_list, trade_exchange, pa_config=pa_config) # TODO + self._agg_base_price(inner_order_indicators, decision_list, trade_exchange, pa_config=pa_config) # TODO self._agg_order_price_advantage() def _cal_trade_fulfill_rate(self, method="mean"): if method == "mean": + def func(ffr): return ffr.mean() + elif method == "amount_weighted": + def func(ffr, deal_amount): return (ffr * deal_amount.abs()).sum() / (deal_amount.abs().sum()) + elif method == "value_weighted": + def func(ffr, trade_value): return (ffr * trade_value.abs()).sum() / (trade_value.abs().sum()) + else: raise ValueError(f"method {method} is not supported!") return self.order_indicator.transfer(func) def _cal_trade_price_advantage(self, method="mean"): if method == "mean": + def func(pa): return pa.mean() + elif method == "amount_weighted": + def func(pa, deal_amount): return (pa * deal_amount.abs()).sum() / (deal_amount.abs().sum()) + elif method == "value_weighted": + def func(pa, trade_value): return (pa * trade_value.abs()).sum() / (trade_value.abs().sum()) + else: raise ValueError(f"method {method} is not supported!") return self.order_indicator.transfer(func) @@ -505,21 +525,25 @@ class Indicator: def _cal_trade_positive_rate(self): def func(pa): return (pa > 0).astype(int).sum() / pa.count() + return self.order_indicator.transfer(func) def _cal_deal_amount(self): def func(deal_amount): return deal_amount.abs().sum() + return self.order_indicator.transfer(func) def _cal_trade_value(self): def func(trade_value): return trade_value.abs().sum() + return self.order_indicator.transfer(func) def _cal_trade_order_count(self): def func(amount): return amount.count() + return self.order_indicator.transfer(func) def cal_trade_indicators(self, trade_start_time, freq, indicator_config={}): @@ -553,236 +577,3 @@ class Indicator: def generate_trade_indicators_dataframe(self): return pd.DataFrame.from_dict(self.trade_indicator_his, orient="index") - - -class BaseOrderIndicator: - """The data structure of order indicator. - """ - - def __init__(self): - pass - - def assign(self, col: str, metric: Union[dict, pd.Series]): - """assign one metric. - - Parameters - ---------- - col : str - the metric name of one metric. - metric : Union[dict, pd.Series] - the metric data. - """ - - pass - - def transfer(self, func: "Callable", new_col: str = None): - """compute new metric with existing. - - Parameters - ---------- - func : Callable - the func of computing new metric. - the kwargs of func will be replaced with metric data by name in this function. - e.g. - def func(pa): - return (pa > 0).astype(int).sum() / pa.count() - new_col : str, optional - New metric will be assigned in the data if new_col is not None, by default None. - - Return - ---------- - SingleMetric - new metric. - """ - - pass - - def get_metric_series(self, metric: str): - """return the single metric with pd.Series format - - Parameters - ---------- - metric : str - the metric name. - - Return - ---------- - pd.Series - the single metric. - If there is no metric name in the data, return pd.Series(). - """ - - pass - - @classmethod - def agg_all_indicators(indicators: list, metrics: Union[str, List[str]], fill_value: float = None): - """sum indicators with the same metrics. - - Parameters - ---------- - indicators : List[BaseOrderIndicator] - the list of all inner indicators. - metrics : Union[str, List[str]] - all metrics needs ot be sumed. - fill_value : float, optional - fill np.NaN with value. By default None. - - Return - ---------- - Dict[str: SingleMetric] - a dict of metric name and data. - """ - - pass - - -class PandasOrderIndicator(BaseOrderIndicator): - """The data structure is OrderedDict(str: SingleMetric). - Each SingleMetric based on pd.Series is one metric. - Str is the name of metric. - """ - - class SingleMetric: - """The data structure of the single metric. - The following methods are used for computing metrics in one indicator. - """ - - def __init__(self, metric: Union[dict, pd.Series]): - if isinstance(metric, dict): - self.metric = pd.Series(metric) - elif isinstance(metric, pd.Series): - self.metric = metric - else: - raise ValueError(f"metric must be dict or pd.Series") - - def __add__(self, other): - if isinstance(other, (int, float)): - return PandasOrderIndicator.SingleMetric(self.metric + other) - elif isinstance(other, PandasOrderIndicator.SingleMetric): - return PandasOrderIndicator.SingleMetric(self.metric + other.metric) - else: - return NotImplemented - - def __radd__(self, other): - if isinstance(other, (int, float)): - return PandasOrderIndicator.SingleMetric(other + self.metric) - elif isinstance(other, PandasOrderIndicator.SingleMetric): - return PandasOrderIndicator.SingleMetric(other.metric + self.metric) - else: - return NotImplemented - - def __sub__(self, other): - if isinstance(other, (int, float)): - return PandasOrderIndicator.SingleMetric(self.metric - other) - elif isinstance(other, PandasOrderIndicator.SingleMetric): - return PandasOrderIndicator.SingleMetric(self.metric - other.metric) - else: - return NotImplemented - - def __rsub__(self, other): - if isinstance(other, (int, float)): - return PandasOrderIndicator.SingleMetric(other - self.metric) - elif isinstance(other, PandasOrderIndicator.SingleMetric): - return PandasOrderIndicator.SingleMetric(other.metric - self.metric) - else: - return NotImplemented - - def __mul__(self, other): - if isinstance(other, (int, float)): - return PandasOrderIndicator.SingleMetric(self.metric * other) - elif isinstance(other, PandasOrderIndicator.SingleMetric): - return PandasOrderIndicator.SingleMetric(self.metric * other.metric) - else: - return NotImplemented - - def __truediv__(self, other): - if isinstance(other, (int, float)): - return PandasOrderIndicator.SingleMetric(self.metric / other) - elif isinstance(other, PandasOrderIndicator.SingleMetric): - return PandasOrderIndicator.SingleMetric(self.metric / other.metric) - else: - return NotImplemented - - def __eq__(self, other): - if isinstance(other, (int, float)): - return PandasOrderIndicator.SingleMetric(self.metric == other) - elif isinstance(other, PandasOrderIndicator.SingleMetric): - return PandasOrderIndicator.SingleMetric(self.metric == other.metric) - else: - return NotImplemented - - def __gt__(self, other): - if isinstance(other, (int, float)): - return PandasOrderIndicator.SingleMetric(self.metric < other) - elif isinstance(other, PandasOrderIndicator.SingleMetric): - return PandasOrderIndicator.SingleMetric(self.metric < other.metric) - else: - return NotImplemented - - def __lt__(self, other): - if isinstance(other, (int, float)): - return PandasOrderIndicator.SingleMetric(self.metric > other) - elif isinstance(other, PandasOrderIndicator.SingleMetric): - return PandasOrderIndicator.SingleMetric(self.metric > other.metric) - else: - return NotImplemented - - def __len__(self): - return len(self.metric) - - def sum(self): - return self.metric.sum() - - def mean(self): - return self.metric.mean() - - def count(self): - return self.metric.count() - - def abs(self): - return PandasOrderIndicator.SingleMetric(self.metric.abs()) - - def astype(self, type): - return PandasOrderIndicator.SingleMetric(self.metric.astype(type)) - - @property - def empty(self): - return self.metric.empty - - def add(self, other, fill_value: None): - return PandasOrderIndicator.SingleMetric(self.metric.add(other.metric, fill_value = fill_value)) - - def apply(self, map_dict: dict): - return PandasOrderIndicator.SingleMetric(self.metric.apply(map_dict)) - - def __init__(self): - self.data: Dict[str, self.SingleMetric] = OrderedDict() - - def assign(self, col: str, metric: Union[dict, pd.Series]): - self.data[col] = self.SingleMetric(metric) - - def transfer(self, func: "Callable", new_col: str = None): - func_sig = inspect.signature(func).parameters.keys() - func_kwargs = {sig: self.data[sig] for sig in func_sig} - tmp_metric = func(**func_kwargs) - if(new_col is not None): - self.data[new_col] = tmp_metric - return tmp_metric - - def get_metric_series(self, metric: str): - if(metric in self.data): - return self.data[metric].metric - else: - return pd.Series() - - @staticmethod - def agg_all_indicators(indicators: list, metrics: Union[str, List[str]], fill_value = None): - metric_dict = {} - if isinstance(metrics, str): - metrics = [metrics] - for metric in metrics: - tmp_metric = PandasOrderIndicator.SingleMetric({}) - for indicator in indicators: - tmp_metric = tmp_metric.add(indicator.data[metric], fill_value) - metric_dict[metric] = tmp_metric.metric - return metric_dict \ No newline at end of file