1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-06-06 05:51:17 +08:00

Use the region in qlib.config for FileCalendarStorage. (#1049)

* Use the region in qlib.config for FileCalendarStorage.

* Fix black.

* Make region as an optional parameter.
This commit is contained in:
Chia-hung Tai
2022-04-20 19:20:43 +08:00
committed by GitHub
parent a973e4fb66
commit fe0f9427f2
3 changed files with 83 additions and 31 deletions

View File

@@ -79,6 +79,7 @@ class FileCalendarStorage(FileStorageMixin, CalendarStorage):
self.future = future
self._provider_uri = None if provider_uri is None else C.DataPathManager.format_provider_uri(provider_uri)
self.enable_read_cache = True # TODO: make it configurable
self.region = C["region"]
@property
def file_name(self) -> str:
@@ -130,7 +131,9 @@ class FileCalendarStorage(FileStorageMixin, CalendarStorage):
else:
_calendar = self._read_calendar()
if Freq(self._freq_file) != Freq(self.freq):
_calendar = resam_calendar(np.array(list(map(pd.Timestamp, _calendar))), self._freq_file, self.freq)
_calendar = resam_calendar(
np.array(list(map(pd.Timestamp, _calendar))), self._freq_file, self.freq, self.region
)
return _calendar
def _get_storage_freq(self) -> List[str]:

View File

@@ -6,9 +6,12 @@ from typing import Union, Callable
from . import lazy_sort_index
from .time import Freq, cal_sam_minute
from ..config import C
def resam_calendar(calendar_raw: np.ndarray, freq_raw: Union[str, Freq], freq_sam: Union[str, Freq]) -> np.ndarray:
def resam_calendar(
calendar_raw: np.ndarray, freq_raw: Union[str, Freq], freq_sam: Union[str, Freq], region: str = None
) -> np.ndarray:
"""
Resample the calendar with frequency freq_raw into the calendar with frequency freq_sam
Assumption:
@@ -22,12 +25,16 @@ def resam_calendar(calendar_raw: np.ndarray, freq_raw: Union[str, Freq], freq_sa
Frequency of the raw calendar
freq_sam : str
Sample frequency
region: str
Region, for example, "cn", "us"
Returns
-------
np.ndarray
The calendar with frequency freq_sam
"""
if region is None:
region = C["region"]
freq_raw = Freq(freq_raw)
freq_sam = Freq(freq_sam)
if not len(calendar_raw):
@@ -40,7 +47,7 @@ def resam_calendar(calendar_raw: np.ndarray, freq_raw: Union[str, Freq], freq_sa
else:
if freq_raw.count > freq_sam.count:
raise ValueError("raw freq must be higher than sampling freq")
_calendar_minute = np.unique(list(map(lambda x: cal_sam_minute(x, freq_sam.count), calendar_raw)))
_calendar_minute = np.unique(list(map(lambda x: cal_sam_minute(x, freq_sam.count, region), calendar_raw)))
return _calendar_minute
# else, convert the raw calendar into day calendar, and divide the whole calendar into several bars evenly

View File

@@ -4,7 +4,7 @@
Time related utils are compiled in this script
"""
import bisect
from datetime import datetime, time, date
from datetime import datetime, time, date, timedelta
from typing import List, Optional, Tuple, Union
import functools
import re
@@ -12,10 +12,24 @@ import re
import pandas as pd
from qlib.config import C
from qlib.constant import REG_CN, REG_TW, REG_US
CN_TIME = [
datetime.strptime("9:30", "%H:%M"),
datetime.strptime("11:30", "%H:%M"),
datetime.strptime("13:00", "%H:%M"),
datetime.strptime("15:00", "%H:%M"),
]
US_TIME = [datetime.strptime("9:30", "%H:%M"), datetime.strptime("16:00", "%H:%M")]
TW_TIME = [
datetime.strptime("9:00", "%H:%M"),
datetime.strptime("13:30", "%H:%M"),
]
@functools.lru_cache(maxsize=240)
def get_min_cal(shift: int = 0) -> List[time]:
def get_min_cal(shift: int = 0, region: str = REG_CN) -> List[time]:
"""
get the minute level calendar in day period
@@ -24,6 +38,8 @@ def get_min_cal(shift: int = 0) -> List[time]:
shift : int
the shift direction would be like pandas shift.
series.shift(1) will replace the value at `i`-th with the one at `i-1`-th
region: str
Region, for example, "cn", "us"
Returns
-------
@@ -31,14 +47,30 @@ def get_min_cal(shift: int = 0) -> List[time]:
"""
cal = []
for ts in list(pd.date_range("9:30", "11:29", freq="1min") - pd.Timedelta(minutes=shift)) + list(
pd.date_range("13:00", "14:59", freq="1min") - pd.Timedelta(minutes=shift)
):
cal.append(ts.time())
if region == REG_CN:
for ts in list(
pd.date_range(CN_TIME[0], CN_TIME[1] - timedelta(minutes=1), freq="1min") - pd.Timedelta(minutes=shift)
) + list(
pd.date_range(CN_TIME[2], CN_TIME[3] - timedelta(minutes=1), freq="1min") - pd.Timedelta(minutes=shift)
):
cal.append(ts.time())
elif region == REG_TW:
for ts in list(
pd.date_range(TW_TIME[0], TW_TIME[1] - timedelta(minutes=1), freq="1min") - pd.Timedelta(minutes=shift)
):
cal.append(ts.time())
elif region == REG_US:
for ts in list(
pd.date_range(US_TIME[0], US_TIME[1] - timedelta(minutes=1), freq="1min") - pd.Timedelta(minutes=shift)
):
cal.append(ts.time())
else:
raise ValueError(f"{region} is not supported")
return cal
def is_single_value(start_time, end_time, freq, region="cn"):
def is_single_value(start_time, end_time, freq, region: str = REG_CN):
"""Is there only one piece of data for stock market.
Parameters
@@ -48,12 +80,14 @@ def is_single_value(start_time, end_time, freq, region="cn"):
end_time : Union[pd.Timestamp, str]
closed end time for data.
freq :
region: str
Region, for example, "cn", "us"
Returns
-------
bool
True means one piece of data to obtain.
"""
if region == "cn":
if region == REG_CN:
if end_time - start_time < freq:
return True
if start_time.hour == 11 and start_time.minute == 29 and start_time.second == 0:
@@ -61,6 +95,18 @@ def is_single_value(start_time, end_time, freq, region="cn"):
if start_time.hour == 14 and start_time.minute == 59 and start_time.second == 0:
return True
return False
elif region == REG_TW:
if end_time - start_time < freq:
return True
if start_time.hour == 13 and start_time.minute >= 25 and start_time.second == 0:
return True
return False
elif region == REG_US:
if end_time - start_time < freq:
return True
if start_time.hour == 15 and start_time.minute == 59 and start_time.second == 0:
return True
return False
else:
raise NotImplementedError(f"please implement the is_single_value func for {region}")
@@ -72,8 +118,6 @@ class Freq:
NORM_FREQ_MINUTE = "min" # using min instead of minute for align with Qlib's data filename
SUPPORT_CAL_LIST = [NORM_FREQ_MINUTE, NORM_FREQ_DAY] # FIXME: this list should from data
MIN_CAL = get_min_cal()
def __init__(self, freq: Union[str, "Freq"]) -> None:
if isinstance(freq, str):
self.count, self.base = self.parse(freq)
@@ -211,36 +255,32 @@ class Freq:
return min_freq[1] if min_freq else None
CN_TIME = [
datetime.strptime("9:30", "%H:%M"),
datetime.strptime("11:30", "%H:%M"),
datetime.strptime("13:00", "%H:%M"),
datetime.strptime("15:00", "%H:%M"),
]
US_TIME = [datetime.strptime("9:30", "%H:%M"), datetime.strptime("16:00", "%H:%M")]
def time_to_day_index(time_obj: Union[str, datetime], region: str = "cn"):
def time_to_day_index(time_obj: Union[str, datetime], region: str = REG_CN):
if isinstance(time_obj, str):
time_obj = datetime.strptime(time_obj, "%H:%M")
if region == "cn":
if region == REG_CN:
if CN_TIME[0] <= time_obj < CN_TIME[1]:
return int((time_obj - CN_TIME[0]).total_seconds() / 60)
elif CN_TIME[2] <= time_obj < CN_TIME[3]:
return int((time_obj - CN_TIME[2]).total_seconds() / 60) + 120
else:
raise ValueError(f"{time_obj} is not the opening time of the {region} stock market")
elif region == "us":
elif region == REG_US:
if US_TIME[0] <= time_obj < US_TIME[1]:
return int((time_obj - US_TIME[0]).total_seconds() / 60)
else:
raise ValueError(f"{time_obj} is not the opening time of the {region} stock market")
elif region == REG_TW:
if TW_TIME[0] <= time_obj < TW_TIME[1]:
return int((time_obj - TW_TIME[0]).total_seconds() / 60)
else:
raise ValueError(f"{time_obj} is not the opening time of the {region} stock market")
else:
raise ValueError(f"{region} is not supported")
def get_day_min_idx_range(start: str, end: str, freq: str) -> Tuple[int, int]:
def get_day_min_idx_range(start: str, end: str, freq: str, region: str) -> Tuple[int, int]:
"""
get the min-bar index in a day for a time range (both left and right is closed) given a fixed frequency
Parameters
@@ -260,7 +300,7 @@ def get_day_min_idx_range(start: str, end: str, freq: str) -> Tuple[int, int]:
start = pd.Timestamp(start).time()
end = pd.Timestamp(end).time()
freq = Freq(freq)
in_day_cal = Freq.MIN_CAL[:: freq.count]
in_day_cal = get_min_cal(region=region)[:: freq.count]
left_idx = bisect.bisect_left(in_day_cal, start)
right_idx = bisect.bisect_right(in_day_cal, end) - 1
return left_idx, right_idx
@@ -280,7 +320,7 @@ def concat_date_time(date_obj: date, time_obj: time) -> pd.Timestamp:
)
def cal_sam_minute(x: pd.Timestamp, sam_minutes: int) -> pd.Timestamp:
def cal_sam_minute(x: pd.Timestamp, sam_minutes: int, region: str = REG_CN) -> pd.Timestamp:
"""
align the minute-level data to a down sampled calendar
@@ -292,13 +332,15 @@ def cal_sam_minute(x: pd.Timestamp, sam_minutes: int) -> pd.Timestamp:
datetime to be aligned
sam_minutes : int
align to `sam_minutes` minute-level calendar
region: str
Region, for example, "cn", "us"
Returns
-------
pd.Timestamp:
the datetime after aligned
"""
cal = get_min_cal(C.min_data_shift)[::sam_minutes]
cal = get_min_cal(C.min_data_shift, region)[::sam_minutes]
idx = bisect.bisect_right(cal, x.time()) - 1
_date, new_time = x.date(), cal[idx]
return concat_date_time(_date, new_time)
@@ -332,4 +374,4 @@ def epsilon_change(date_time: pd.Timestamp, direction: str = "backward") -> pd.T
if __name__ == "__main__":
print(get_day_min_idx_range("8:30", "14:59", "10min"))
print(get_day_min_idx_range("8:30", "14:59", "10min", REG_CN))