# Copyright (c) Microsoft Corporation. # Licensed under the MIT License. import os import abc import warnings import numpy as np import pandas as pd from typing import Tuple, Union from qlib.data import D from qlib.utils import load_dataset class DataLoader(abc.ABC): """ DataLoader is designed for loading raw data from original data source. """ @abc.abstractmethod def load(self, instruments, start_time=None, end_time=None) -> pd.DataFrame: """ load the data as pd.DataFrame. Parameters ---------- instruments : str or dict it can either be the market name or the config file of instruments generated by InstrumentProvider. start_time : str start of the time range. end_time : str end of the time range. Returns ------- pd.DataFrame: data load from the under layer source Example of the data (The multi-index of the columns is optional.): .. code-block:: feature label $close $volume Ref($close, 1) Mean($close, 3) $high-$low LABEL0 datetime instrument 2010-01-04 SH600000 81.807068 17145150.0 83.737389 83.016739 2.741058 0.0032 SH600004 13.313329 11800983.0 13.313329 13.317701 0.183632 0.0042 SH600005 37.796539 12231662.0 38.258602 37.919757 0.970325 0.0289 """ pass class DLWParser(DataLoader): """ (D)ata(L)oader (W)ith (P)arser for features and names Extracting this class so that QlibDataLoader and other dataloaders(such as QdbDataLoader) can share the fields. """ def __init__(self, config: Tuple[list, tuple, dict]): """ Parameters ---------- config : Tuple[list, tuple, dict] Config will be used to describe the fields and column names .. code-block:: YAML := { "group_name1": "group_name2": } or := := ["expr", ...] | (["expr", ...], ["col_name", ...]) """ self.is_group = isinstance(config, dict) if self.is_group: self.fields = {grp: self._parse_fields_info(fields_info) for grp, fields_info in config.items()} else: self.fields = self._parse_fields_info(config) def _parse_fields_info(self, fields_info: Tuple[list, tuple]) -> Tuple[list, list]: if isinstance(fields_info, list): exprs = names = fields_info elif isinstance(fields_info, tuple): exprs, names = fields_info else: raise NotImplementedError(f"This type of input is not supported") return exprs, names @abc.abstractmethod def load_group_df(self, instruments, exprs: list, names: list, start_time=None, end_time=None) -> pd.DataFrame: """ load the dataframe for specific group Parameters ---------- instruments : the instruments exprs : list The expressions to describe the content of the data names : list The name of the data Returns ------- pd.DataFrame: the queried dataframe """ pass def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame: if self.is_group: df = pd.concat( { grp: self.load_group_df(instruments, exprs, names, start_time, end_time) for grp, (exprs, names) in self.fields.items() }, axis=1, ) else: exprs, names = self.fields df = self.load_group_df(instruments, exprs, names, start_time, end_time) return df class QlibDataLoader(DLWParser): """Same as QlibDataLoader. The fields can be define by config""" def __init__(self, config: Tuple[list, tuple, dict], filter_pipe=None): """ Parameters ---------- config : Tuple[list, tuple, dict] Please refer to the doc of DLWParser filter_pipe : Filter pipe for the instruments """ self.filter_pipe = filter_pipe super().__init__(config) def load_group_df(self, instruments, exprs: list, names: list, start_time=None, end_time=None) -> pd.DataFrame: if instruments is None: warnings.warn("`instruments` is not set, will load all stocks") instruments = "all" if isinstance(instruments, str): instruments = D.instruments(instruments, filter_pipe=self.filter_pipe) elif self.filter_pipe is not None: warnings.warn("`filter_pipe` is not None, but it will not be used with `instruments` as list") df = D.features(instruments, exprs, start_time, end_time) df.columns = names df = df.swaplevel().sort_index() # NOTE: always return return df class StaticDataLoader(DataLoader): """ DataLoader that supports loading data from file or as provided. """ def __init__(self, config: dict, join="outer"): """ Parameters ---------- config : dict {fields_group: } join : str How to align different dataframes """ self.config = config self.join = join self._data = None def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame: self._maybe_load_raw_data() if instruments is None: df = self._data else: df = self._data.loc(axis=0)[:, instruments] if start_time is None and end_time is None: return df # NOTE: avoid copy by loc return df.loc[pd.Timestamp(start_time) : pd.Timestamp(end_time)] def _maybe_load_raw_data(self): if self._data is not None: return self._data = pd.concat( {fields_group: load_dataset(path_or_obj) for fields_group, path_or_obj in self.config.items()}, axis=1, join=self.join, ) self._data.sort_index(inplace=True)