From 70fc58104bfb3a4666382b3441887591c40d3c9c Mon Sep 17 00:00:00 2001 From: zhupr Date: Thu, 1 Apr 2021 12:58:34 +0800 Subject: [PATCH] Modify FileStorage --- qlib/data/storage/__init__.py | 2 +- qlib/data/storage/file_storage.py | 197 ++++++++++++++++++++++++------ qlib/data/storage/storage.py | 121 ++++++++++++++---- 3 files changed, 262 insertions(+), 58 deletions(-) diff --git a/qlib/data/storage/__init__.py b/qlib/data/storage/__init__.py index f42504791..552e1e3e8 100644 --- a/qlib/data/storage/__init__.py +++ b/qlib/data/storage/__init__.py @@ -1,4 +1,4 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. -from .storage import CalendarStorage, InstrumentStorage, FeatureStorage +from .storage import CalendarStorage, InstrumentStorage, FeatureStorage, CalVT, InstVT, InstKT diff --git a/qlib/data/storage/file_storage.py b/qlib/data/storage/file_storage.py index aadc918c3..e2e5bd3e7 100644 --- a/qlib/data/storage/file_storage.py +++ b/qlib/data/storage/file_storage.py @@ -3,69 +3,193 @@ import struct from pathlib import Path -from typing import Iterator, Iterable, Type, List, Tuple, Text, Union - -from .storage import FeatureVT +from typing import Iterator, Iterable, Union, Dict, Mapping, Tuple import numpy as np import pandas as pd -from . import CalendarStorage, InstrumentStorage, FeatureStorage - -CalVT = Type[pd.Timestamp] -# instrument value -InstVT = List[Tuple[CalVT, CalVT]] -# instrument key -InstKT = Text +from . import CalendarStorage, InstrumentStorage, FeatureStorage, CalVT, InstKT, InstVT class FileCalendarStorage(CalendarStorage): def __init__(self, uri: str): - super(FileCalendarStorage, self).__init__(uri=uri) - with open(uri) as f: - self._data = [pd.Timestamp(x.strip()) for x in f] + super(FileCalendarStorage, self).__init__(uri) + self._uri = Path(self._uri).expanduser().resolve() + + def _read_calendar(self, skip_rows: int = 0, n_rows: int = None) -> np.ndarray: + if not self._uri.exists(): + self._write_calendar(values=[]) + with self._uri.open("rb") as fp: + return np.loadtxt(fp, str, skiprows=skip_rows, max_rows=n_rows, encoding="utf-8") + + def _write_calendar(self, values: Iterable[CalVT], mode: str = "wb"): + with self._uri.open(mode=mode) as fp: + np.savetxt(fp, values, fmt="%s", encoding="utf-8") + + def extend(self, values: Iterable[CalVT]) -> None: + self._write_calendar(values, mode="ab") + + def clear(self) -> None: + self._write_calendar(values=[]) + + def index(self, value: CalVT) -> int: + calendar = self._read_calendar() + return int(np.argwhere(calendar == value)[0]) + + def insert(self, index: int, value: CalVT): + calendar = self._read_calendar() + calendar = np.insert(calendar, index, value) + self._write_calendar(values=calendar) + + def remove(self, value: CalVT) -> None: + index = self.index(value) + calendar = self._read_calendar() + calendar = np.delete(calendar, index) + self._write_calendar(values=calendar) + + def __setitem__(self, i: Union[int, slice], values: Union[CalVT, Iterable[CalVT]]) -> None: + calendar = self._read_calendar() + calendar[i] = values + self._write_calendar(values=calendar) + + def __delitem__(self, i: Union[int, slice]) -> None: + calendar = self._read_calendar() + calendar = np.delete(calendar, i) + self._write_calendar(values=calendar) def __getitem__(self, i: Union[int, slice]) -> Union[CalVT, Iterable[CalVT]]: - if isinstance(i, (int, slice)): - return self._data[i] - else: - raise TypeError(f"type(i) = {type(i)}") + return self._read_calendar()[i] def __len__(self) -> int: - return len(self._data) + return len(self._read_calendar()) + + def __iter__(self): + with self._uri.open("r") as fp: + yield fp.readline() class FileInstrumentStorage(InstrumentStorage): + INSTRUMENT_SEP = "\t" + INSTRUMENT_START_FIELD = "start_datetime" + INSTRUMENT_END_FIELD = "end_datetime" + SYMBOL_FIELD_NAME = "instrument" + def __init__(self, uri: str): super(FileInstrumentStorage, self).__init__(uri=uri) - self._data = self._load_data() + self._uri = Path(self._uri).expanduser().resolve() + + def _read_instrument(self) -> Dict[InstKT, InstVT]: + if not self._uri.exists(): + self._write_instrument() - def _load_data(self): _instruments = dict() df = pd.read_csv( self._uri, sep="\t", usecols=[0, 1, 2], - names=["inst", "start_datetime", "end_datetime"], - dtype={"inst": str}, - parse_dates=["start_datetime", "end_datetime"], + names=[self.SYMBOL_FIELD_NAME, self.INSTRUMENT_START_FIELD, self.INSTRUMENT_END_FIELD], + dtype={self.SYMBOL_FIELD_NAME: str}, + parse_dates=[self.INSTRUMENT_START_FIELD, self.INSTRUMENT_END_FIELD], ) for row in df.itertuples(index=False): _instruments.setdefault(row[0], []).append((row[1], row[2])) return _instruments + def _write_instrument(self, data: Dict[InstKT, InstVT] = None) -> None: + if not data: + with self._uri.open("w") as _: + pass + return + + res = [] + for inst, v_list in data.items(): + _df = pd.DataFrame(v_list, columns=[self.INSTRUMENT_START_FIELD, self.INSTRUMENT_END_FIELD]) + _df[self.SYMBOL_FIELD_NAME] = inst + res.append(_df) + + df = pd.concat(res, sort=False) + df.loc[:, [self.SYMBOL_FIELD_NAME, self.INSTRUMENT_START_FIELD, self.INSTRUMENT_END_FIELD]].to_csv( + self._uri, header=False, sep=self.INSTRUMENT_SEP, index=False + ) + df.to_csv(self._uri, sep="\t", encoding="utf-8", header=False, index=False) + + def clear(self) -> None: + self._write_instrument(data={}) + + def __setitem__(self, k: InstKT, v: InstVT) -> None: + inst = self._read_instrument() + inst[k] = v + self._write_instrument(inst) + + def __delitem__(self, k: InstKT) -> None: + inst = self._read_instrument() + del inst[k] + self._write_instrument(inst) + def __getitem__(self, k: InstKT) -> InstVT: - return self._data[k] + return self._read_instrument()[k] def __len__(self) -> int: - return len(self._data) + inst = self._read_instrument() + return len(inst) def __iter__(self) -> Iterator[InstKT]: - return self._data.__iter__() + for _inst in self._read_instrument().keys(): + yield _inst + + def update(self, *args, **kwargs) -> None: + + if len(args) > 1: + raise TypeError(f"update expected at most 1 arguments, got {len(args)}") + inst = self._read_instrument() + if args: + other = args[0] + if isinstance(other, Mapping): + for key in other: + inst[key] = other[key] + elif hasattr(other, "keys"): + for key in other.keys(): + inst[key] = other[key] + else: + for key, value in other: + inst[key] = value + for key, value in kwargs.items(): + inst[key] = value + + self._write_instrument(inst) class FileFeatureStorage(FeatureStorage): - def __getitem__(self, i: Union[int, slice]) -> Union[FeatureVT, Iterable[FeatureVT]]: + def __init__(self, uri: str): + super(FileFeatureStorage, self).__init__(uri=uri) + self._uri = Path(self._uri) + + def clear(self): + with self._uri.open("wb") as _: + pass + + def extend(self, series: pd.Series) -> None: + extend_start_index = self[0][0] + len(self) if self._uri.exists() else series.index[0] + series = series.reindex(pd.RangeIndex(extend_start_index, series.index[-1] + 1)) + with self._uri.open("ab") as fp: + np.array(series.values).astype(" None: + origin_series = self[:] + series = series.append(origin_series.loc[origin_series.index > series.index[-1]]) + series = series.reindex(pd.RangeIndex(series.index[0], series.index[-1])) + with self._uri.open("wb") as fp: + np.array(series.values).astype(" Union[Tuple[int, float], pd.Series]: + if not self._uri.exists(): + if isinstance(i, int): + return None, None + elif isinstance(i, slice): + return pd.Series() + else: + raise TypeError(f"type(i) = {type(i)}") + with open(self._uri, "rb") as fp: ref_start_index = int(np.frombuffer(fp.read(4), dtype=" end_index: - return [] + return pd.Series() fp.seek(4 * (si - ref_start_index) + 4) # read n bytes count = end_index - si + 1 data = np.frombuffer(fp.read(4 * count), dtype=" int: - return Path(self._uri).stat().st_size // 4 - 1 + return self._uri.stat().st_size // 4 - 1 if self._uri.exists() else 0 def __iter__(self): + if not self._uri.exists(): + return with open(self._uri, "rb") as fp: ref_start_index = int(np.frombuffer(fp.read(4), dtype=" None: + def extend(self, iterable: Iterable[CalVT]) -> None: + raise NotImplementedError("Subclass of CalendarStorage must implement `extend` method") + + def clear(self) -> None: + raise NotImplementedError("Subclass of CalendarStorage must implement `clear` method") + + def index(self, value: CalVT) -> int: + raise NotImplementedError("Subclass of CalendarStorage must implement `index` method") + + def insert(self, index: int, value: CalVT) -> None: raise NotImplementedError("Subclass of CalendarStorage must implement `insert` method") + def remove(self, value: CalVT) -> None: + raise NotImplementedError("Subclass of CalendarStorage must implement `remove` method") + @overload - def __setitem__(self, i: int, o: CalVT) -> None: + def __setitem__(self, i: int, value: CalVT) -> None: """x.__setitem__(i, o) <==> x[i] = o""" ... @overload - def __setitem__(self, s: slice, o: Iterable[CalVT]) -> None: + def __setitem__(self, s: slice, value: Iterable[CalVT]) -> None: """x.__setitem__(s, o) <==> x[s] = o""" ... - def __setitem__(self, i, o) -> None: + def __setitem__(self, i, value) -> None: raise NotImplementedError( "Subclass of CalendarStorage must implement `__setitem__(i: int, o: CalVT)`/`__setitem__(s: slice, o: Iterable[CalVT])` method" ) @@ -74,10 +83,21 @@ class CalendarStorage(MutableSequence): raise NotImplementedError("Subclass of CalendarStorage must implement `__len__` method") -class InstrumentStorage(MutableMapping): +class InstrumentStorage: def __init__(self, uri: str): self._uri = uri + def clear(self) -> None: + raise NotImplementedError("Subclass of InstrumentStorage must implement `clear` method") + + def update(self, *args, **kwargs) -> None: + """D.update([E, ]**F) -> None. Update D from mapping/iterable E and F. + If E present and has a .keys() method, does: for k in E: D[k] = E[k] + If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v + In either case, this is followed by: for k, v in F.items(): D[k] = v + """ + raise NotImplementedError("Subclass of InstrumentStorage must implement `update` method") + def __setitem__(self, k: InstKT, v: InstVT) -> None: """ Set self[key] to value. """ raise NotImplementedError("Subclass of InstrumentStorage must implement `__setitem__` method") @@ -99,37 +119,92 @@ class InstrumentStorage(MutableMapping): raise NotImplementedError("Subclass of InstrumentStorage must implement `__iter__` method") -class FeatureStorage(Sequence): +class FeatureStorage: def __init__(self, uri: str): self._uri = uri - def append(self, obj: FeatureVT) -> None: - """ Append object to the end of the FeatureStorage. """ - raise NotImplementedError("Subclass of FeatureStorage must implement `append` method") - def clear(self): """ Remove all items from FeatureStorage. """ raise NotImplementedError("Subclass of FeatureStorage must implement `clear` method") - def extend(self, iterable: Iterable[FeatureVT]): - """ Extend list by appending elements from the iterable. """ + def extend(self, series: pd.Series): + """Extend feature by appending elements from the series. + + Examples: + + feature: + 3 4 + 4 5 + 5 6 + + >>> self.extend(pd.Series({7: 8, 9:10})) + + feature: + 3 4 + 4 5 + 5 6 + 6 np.nan + 7 8 + 9 10 + + """ raise NotImplementedError("Subclass of FeatureStorage must implement `extend` method") + def rebase(self, series: pd.Series): + """Rebase feature header from the series. + + Examples: + + feature: + 3 4 + 4 5 + 5 6 + + >>> self.rebase(pd.Series({1: 2})) + + feature: + 1 2 + 2 np.nan + 3 4 + 4 5 + 5 6 + + >>> self.rebase(pd.Series({5: 6, 7: 8, 9: 10})) + + feature: + 5 6 + 7 8 + 9 10 + + >>> self.rebase(pd.Series({11: 12, 12: 13,})) + + feature: + 11 12 + 12 13 + + """ + raise NotImplementedError("Subclass of FeatureStorage must implement `rebase` method") + @overload - def __getitem__(self, s: slice) -> Iterable[FeatureVT]: - """x.__getitem__(slice(start: int, stop: int, step: int)) <==> x[start:stop:step]""" + def __getitem__(self, s: slice) -> pd.Series: + """x.__getitem__(slice(start: int, stop: int, step: int)) <==> x[start:stop:step] == pd.Series(values, index=pd.RangeIndex(start, len(values))""" ... @overload - def __getitem__(self, i: int) -> float: + def __getitem__(self, i: int) -> Tuple[int, float]: """x.__getitem__(y) <==> x[y]""" ... - def __getitem__(self, i) -> float: + def __getitem__(self, i) -> Union[Tuple[int, float], pd.Series]: """x.__getitem__(y) <==> x[y]""" raise NotImplementedError( "Subclass of FeatureStorage must implement `__getitem__(i: int)`/`__getitem__(s: slice)` method" ) def __len__(self) -> int: + """len(feature) <==> feature.__len__() """ raise NotImplementedError("Subclass of FeatureStorage must implement `__len__` method") + + def __iter__(self) -> Iterable[Tuple[int, float]]: + """iter(feature)""" + raise NotImplementedError("Subclass of FeatureStorage must implement `__iter__` method")