1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-03 19:10:58 +08:00

Modify FileStorage

This commit is contained in:
zhupr
2021-04-01 12:58:34 +08:00
parent 9b8acd9a82
commit 70fc58104b
3 changed files with 262 additions and 58 deletions

View File

@@ -1,4 +1,4 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from .storage import CalendarStorage, InstrumentStorage, FeatureStorage
from .storage import CalendarStorage, InstrumentStorage, FeatureStorage, CalVT, InstVT, InstKT

View File

@@ -3,69 +3,193 @@
import struct
from pathlib import Path
from typing import Iterator, Iterable, Type, List, Tuple, Text, Union
from .storage import FeatureVT
from typing import Iterator, Iterable, Union, Dict, Mapping, Tuple
import numpy as np
import pandas as pd
from . import CalendarStorage, InstrumentStorage, FeatureStorage
CalVT = Type[pd.Timestamp]
# instrument value
InstVT = List[Tuple[CalVT, CalVT]]
# instrument key
InstKT = Text
from . import CalendarStorage, InstrumentStorage, FeatureStorage, CalVT, InstKT, InstVT
class FileCalendarStorage(CalendarStorage):
def __init__(self, uri: str):
super(FileCalendarStorage, self).__init__(uri=uri)
with open(uri) as f:
self._data = [pd.Timestamp(x.strip()) for x in f]
super(FileCalendarStorage, self).__init__(uri)
self._uri = Path(self._uri).expanduser().resolve()
def _read_calendar(self, skip_rows: int = 0, n_rows: int = None) -> np.ndarray:
if not self._uri.exists():
self._write_calendar(values=[])
with self._uri.open("rb") as fp:
return np.loadtxt(fp, str, skiprows=skip_rows, max_rows=n_rows, encoding="utf-8")
def _write_calendar(self, values: Iterable[CalVT], mode: str = "wb"):
with self._uri.open(mode=mode) as fp:
np.savetxt(fp, values, fmt="%s", encoding="utf-8")
def extend(self, values: Iterable[CalVT]) -> None:
self._write_calendar(values, mode="ab")
def clear(self) -> None:
self._write_calendar(values=[])
def index(self, value: CalVT) -> int:
calendar = self._read_calendar()
return int(np.argwhere(calendar == value)[0])
def insert(self, index: int, value: CalVT):
calendar = self._read_calendar()
calendar = np.insert(calendar, index, value)
self._write_calendar(values=calendar)
def remove(self, value: CalVT) -> None:
index = self.index(value)
calendar = self._read_calendar()
calendar = np.delete(calendar, index)
self._write_calendar(values=calendar)
def __setitem__(self, i: Union[int, slice], values: Union[CalVT, Iterable[CalVT]]) -> None:
calendar = self._read_calendar()
calendar[i] = values
self._write_calendar(values=calendar)
def __delitem__(self, i: Union[int, slice]) -> None:
calendar = self._read_calendar()
calendar = np.delete(calendar, i)
self._write_calendar(values=calendar)
def __getitem__(self, i: Union[int, slice]) -> Union[CalVT, Iterable[CalVT]]:
if isinstance(i, (int, slice)):
return self._data[i]
else:
raise TypeError(f"type(i) = {type(i)}")
return self._read_calendar()[i]
def __len__(self) -> int:
return len(self._data)
return len(self._read_calendar())
def __iter__(self):
with self._uri.open("r") as fp:
yield fp.readline()
class FileInstrumentStorage(InstrumentStorage):
INSTRUMENT_SEP = "\t"
INSTRUMENT_START_FIELD = "start_datetime"
INSTRUMENT_END_FIELD = "end_datetime"
SYMBOL_FIELD_NAME = "instrument"
def __init__(self, uri: str):
super(FileInstrumentStorage, self).__init__(uri=uri)
self._data = self._load_data()
self._uri = Path(self._uri).expanduser().resolve()
def _read_instrument(self) -> Dict[InstKT, InstVT]:
if not self._uri.exists():
self._write_instrument()
def _load_data(self):
_instruments = dict()
df = pd.read_csv(
self._uri,
sep="\t",
usecols=[0, 1, 2],
names=["inst", "start_datetime", "end_datetime"],
dtype={"inst": str},
parse_dates=["start_datetime", "end_datetime"],
names=[self.SYMBOL_FIELD_NAME, self.INSTRUMENT_START_FIELD, self.INSTRUMENT_END_FIELD],
dtype={self.SYMBOL_FIELD_NAME: str},
parse_dates=[self.INSTRUMENT_START_FIELD, self.INSTRUMENT_END_FIELD],
)
for row in df.itertuples(index=False):
_instruments.setdefault(row[0], []).append((row[1], row[2]))
return _instruments
def _write_instrument(self, data: Dict[InstKT, InstVT] = None) -> None:
if not data:
with self._uri.open("w") as _:
pass
return
res = []
for inst, v_list in data.items():
_df = pd.DataFrame(v_list, columns=[self.INSTRUMENT_START_FIELD, self.INSTRUMENT_END_FIELD])
_df[self.SYMBOL_FIELD_NAME] = inst
res.append(_df)
df = pd.concat(res, sort=False)
df.loc[:, [self.SYMBOL_FIELD_NAME, self.INSTRUMENT_START_FIELD, self.INSTRUMENT_END_FIELD]].to_csv(
self._uri, header=False, sep=self.INSTRUMENT_SEP, index=False
)
df.to_csv(self._uri, sep="\t", encoding="utf-8", header=False, index=False)
def clear(self) -> None:
self._write_instrument(data={})
def __setitem__(self, k: InstKT, v: InstVT) -> None:
inst = self._read_instrument()
inst[k] = v
self._write_instrument(inst)
def __delitem__(self, k: InstKT) -> None:
inst = self._read_instrument()
del inst[k]
self._write_instrument(inst)
def __getitem__(self, k: InstKT) -> InstVT:
return self._data[k]
return self._read_instrument()[k]
def __len__(self) -> int:
return len(self._data)
inst = self._read_instrument()
return len(inst)
def __iter__(self) -> Iterator[InstKT]:
return self._data.__iter__()
for _inst in self._read_instrument().keys():
yield _inst
def update(self, *args, **kwargs) -> None:
if len(args) > 1:
raise TypeError(f"update expected at most 1 arguments, got {len(args)}")
inst = self._read_instrument()
if args:
other = args[0]
if isinstance(other, Mapping):
for key in other:
inst[key] = other[key]
elif hasattr(other, "keys"):
for key in other.keys():
inst[key] = other[key]
else:
for key, value in other:
inst[key] = value
for key, value in kwargs.items():
inst[key] = value
self._write_instrument(inst)
class FileFeatureStorage(FeatureStorage):
def __getitem__(self, i: Union[int, slice]) -> Union[FeatureVT, Iterable[FeatureVT]]:
def __init__(self, uri: str):
super(FileFeatureStorage, self).__init__(uri=uri)
self._uri = Path(self._uri)
def clear(self):
with self._uri.open("wb") as _:
pass
def extend(self, series: pd.Series) -> None:
extend_start_index = self[0][0] + len(self) if self._uri.exists() else series.index[0]
series = series.reindex(pd.RangeIndex(extend_start_index, series.index[-1] + 1))
with self._uri.open("ab") as fp:
np.array(series.values).astype("<f").tofile(fp)
def rebase(self, series: pd.Series) -> None:
origin_series = self[:]
series = series.append(origin_series.loc[origin_series.index > series.index[-1]])
series = series.reindex(pd.RangeIndex(series.index[0], series.index[-1]))
with self._uri.open("wb") as fp:
np.array(series.values).astype("<f").tofile(fp)
def __getitem__(self, i: Union[int, slice]) -> Union[Tuple[int, float], pd.Series]:
if not self._uri.exists():
if isinstance(i, int):
return None, None
elif isinstance(i, slice):
return pd.Series()
else:
raise TypeError(f"type(i) = {type(i)}")
with open(self._uri, "rb") as fp:
ref_start_index = int(np.frombuffer(fp.read(4), dtype="<f")[0])
@@ -79,23 +203,28 @@ class FileFeatureStorage(FeatureStorage):
end_index = i.stop - 1
si = max(ref_start_index, start_index)
if si > end_index:
return []
return pd.Series()
fp.seek(4 * (si - ref_start_index) + 4)
# read n bytes
count = end_index - si + 1
data = np.frombuffer(fp.read(4 * count), dtype="<f")
return list(zip(range(si, si + len(data)), data))
return pd.Series(data, index=pd.RangeIndex(si, si + len(data)))
else:
raise TypeError(f"type(i) = {type(i)}")
def __len__(self) -> int:
return Path(self._uri).stat().st_size // 4 - 1
return self._uri.stat().st_size // 4 - 1 if self._uri.exists() else 0
def __iter__(self):
if not self._uri.exists():
return
with open(self._uri, "rb") as fp:
ref_start_index = int(np.frombuffer(fp.read(4), dtype="<f")[0])
fp.seek(4)
# read n bytes
data = np.frombuffer(fp.read(), dtype="<f")
for v in zip(range(ref_start_index, ref_start_index + len(data)), data):
yield v
while True:
v = fp.read(4)
if v:
yield ref_start_index, struct.unpack("f", v)
ref_start_index += 1
else:
break

View File

@@ -1,12 +1,12 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from collections.abc import MutableSequence, MutableMapping, Sequence
from typing import Iterable, overload, TypeVar, Tuple, List, Text, Iterator
from typing import Iterable, overload, Tuple, List, Text, Iterator, Union
import pandas as pd
# calendar value type
CalVT = TypeVar("CalVT")
CalVT = str
# instrument value
InstVT = List[Tuple[CalVT, CalVT]]
@@ -14,27 +14,36 @@ InstVT = List[Tuple[CalVT, CalVT]]
InstKT = Text
FeatureVT = Tuple[int, float]
class CalendarStorage(MutableSequence):
class CalendarStorage:
def __init__(self, uri: str):
self._uri = uri
def insert(self, index: int, o: CalVT) -> None:
def extend(self, iterable: Iterable[CalVT]) -> None:
raise NotImplementedError("Subclass of CalendarStorage must implement `extend` method")
def clear(self) -> None:
raise NotImplementedError("Subclass of CalendarStorage must implement `clear` method")
def index(self, value: CalVT) -> int:
raise NotImplementedError("Subclass of CalendarStorage must implement `index` method")
def insert(self, index: int, value: CalVT) -> None:
raise NotImplementedError("Subclass of CalendarStorage must implement `insert` method")
def remove(self, value: CalVT) -> None:
raise NotImplementedError("Subclass of CalendarStorage must implement `remove` method")
@overload
def __setitem__(self, i: int, o: CalVT) -> None:
def __setitem__(self, i: int, value: CalVT) -> None:
"""x.__setitem__(i, o) <==> x[i] = o"""
...
@overload
def __setitem__(self, s: slice, o: Iterable[CalVT]) -> None:
def __setitem__(self, s: slice, value: Iterable[CalVT]) -> None:
"""x.__setitem__(s, o) <==> x[s] = o"""
...
def __setitem__(self, i, o) -> None:
def __setitem__(self, i, value) -> None:
raise NotImplementedError(
"Subclass of CalendarStorage must implement `__setitem__(i: int, o: CalVT)`/`__setitem__(s: slice, o: Iterable[CalVT])` method"
)
@@ -74,10 +83,21 @@ class CalendarStorage(MutableSequence):
raise NotImplementedError("Subclass of CalendarStorage must implement `__len__` method")
class InstrumentStorage(MutableMapping):
class InstrumentStorage:
def __init__(self, uri: str):
self._uri = uri
def clear(self) -> None:
raise NotImplementedError("Subclass of InstrumentStorage must implement `clear` method")
def update(self, *args, **kwargs) -> None:
"""D.update([E, ]**F) -> None. Update D from mapping/iterable E and F.
If E present and has a .keys() method, does: for k in E: D[k] = E[k]
If E present and lacks .keys() method, does: for (k, v) in E: D[k] = v
In either case, this is followed by: for k, v in F.items(): D[k] = v
"""
raise NotImplementedError("Subclass of InstrumentStorage must implement `update` method")
def __setitem__(self, k: InstKT, v: InstVT) -> None:
""" Set self[key] to value. """
raise NotImplementedError("Subclass of InstrumentStorage must implement `__setitem__` method")
@@ -99,37 +119,92 @@ class InstrumentStorage(MutableMapping):
raise NotImplementedError("Subclass of InstrumentStorage must implement `__iter__` method")
class FeatureStorage(Sequence):
class FeatureStorage:
def __init__(self, uri: str):
self._uri = uri
def append(self, obj: FeatureVT) -> None:
""" Append object to the end of the FeatureStorage. """
raise NotImplementedError("Subclass of FeatureStorage must implement `append` method")
def clear(self):
""" Remove all items from FeatureStorage. """
raise NotImplementedError("Subclass of FeatureStorage must implement `clear` method")
def extend(self, iterable: Iterable[FeatureVT]):
""" Extend list by appending elements from the iterable. """
def extend(self, series: pd.Series):
"""Extend feature by appending elements from the series.
Examples:
feature:
3 4
4 5
5 6
>>> self.extend(pd.Series({7: 8, 9:10}))
feature:
3 4
4 5
5 6
6 np.nan
7 8
9 10
"""
raise NotImplementedError("Subclass of FeatureStorage must implement `extend` method")
def rebase(self, series: pd.Series):
"""Rebase feature header from the series.
Examples:
feature:
3 4
4 5
5 6
>>> self.rebase(pd.Series({1: 2}))
feature:
1 2
2 np.nan
3 4
4 5
5 6
>>> self.rebase(pd.Series({5: 6, 7: 8, 9: 10}))
feature:
5 6
7 8
9 10
>>> self.rebase(pd.Series({11: 12, 12: 13,}))
feature:
11 12
12 13
"""
raise NotImplementedError("Subclass of FeatureStorage must implement `rebase` method")
@overload
def __getitem__(self, s: slice) -> Iterable[FeatureVT]:
"""x.__getitem__(slice(start: int, stop: int, step: int)) <==> x[start:stop:step]"""
def __getitem__(self, s: slice) -> pd.Series:
"""x.__getitem__(slice(start: int, stop: int, step: int)) <==> x[start:stop:step] == pd.Series(values, index=pd.RangeIndex(start, len(values))"""
...
@overload
def __getitem__(self, i: int) -> float:
def __getitem__(self, i: int) -> Tuple[int, float]:
"""x.__getitem__(y) <==> x[y]"""
...
def __getitem__(self, i) -> float:
def __getitem__(self, i) -> Union[Tuple[int, float], pd.Series]:
"""x.__getitem__(y) <==> x[y]"""
raise NotImplementedError(
"Subclass of FeatureStorage must implement `__getitem__(i: int)`/`__getitem__(s: slice)` method"
)
def __len__(self) -> int:
"""len(feature) <==> feature.__len__() """
raise NotImplementedError("Subclass of FeatureStorage must implement `__len__` method")
def __iter__(self) -> Iterable[Tuple[int, float]]:
"""iter(feature)"""
raise NotImplementedError("Subclass of FeatureStorage must implement `__iter__` method")