1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-06-06 05:51:17 +08:00

Add future calendar collector (#795)

* fix Windows mount

* add future_calendar_collector

* update docs

Co-authored-by: Young <afe.young@gmail.com>
Co-authored-by: you-n-g <you-n-g@users.noreply.github.com>
This commit is contained in:
Pengrong Zhu
2022-01-16 10:14:27 +08:00
committed by GitHub
parent a62e2ec4de
commit 2aee9e0145
4 changed files with 147 additions and 12 deletions

View File

@@ -63,7 +63,7 @@ def init(default_conf="client", **kwargs):
else:
logger.warning(f"auto_path is False, please make sure {mount_path} is mounted")
elif uri_type == C.NFS_URI:
_mount_nfs_uri(provider_uri, mount_path, C["auto_mount"])
_mount_nfs_uri(provider_uri, C.dpm.get_data_uri(_freq), C["auto_mount"])
else:
raise NotImplementedError(f"This type of URI is not supported")
@@ -96,7 +96,7 @@ def _mount_nfs_uri(provider_uri, mount_path, auto_mount: bool = False):
sys_type = platform.system()
if "win" in sys_type.lower():
# system: window
exec_result = os.popen("mount -o anon %s %s" % (provider_uri, mount_path + ":"))
exec_result = os.popen(f"mount -o anon {provider_uri} {mount_path}")
result = exec_result.read()
if "85" in result:
LOG.warning(f"{provider_uri} on Windows:{mount_path} is already mounted")

View File

@@ -269,11 +269,19 @@ class QlibConfig(Config):
self._registered = False
class DataPathManager:
def __init__(
self,
provider_uri: Union[str, Path, dict],
mount_path: Union[str, Path, dict],
):
"""
Motivation:
- get the right path (e.g. data uri) for accessing data based on given information(e.g. provider_uri, mount_path and frequency)
- some helper functions to process uri.
"""
def __init__(self, provider_uri: Union[str, Path, dict], mount_path: Union[str, Path, dict]):
"""
The relation of `provider_uri` and `mount_path`
- `mount_path` is used only if provider_uri is an NFS path
- otherwise, provider_uri will be used for accessing data
"""
self.provider_uri = provider_uri
self.mount_path = mount_path
@@ -304,6 +312,9 @@ class QlibConfig(Config):
return QlibConfig.LOCAL_URI
def get_data_uri(self, freq: Optional[Union[str, Freq]] = None) -> Path:
"""
please refer DataPathManager's __init__ and class doc
"""
if freq is not None:
freq = str(freq) # converting Freq to string
if freq is None or freq not in self.provider_uri:
@@ -314,7 +325,8 @@ class QlibConfig(Config):
elif self.get_uri_type(_provider_uri) == QlibConfig.NFS_URI:
if "win" in platform.system().lower():
# windows, mount_path is the drive
return Path(f"{self.mount_path[freq]}:\\")
_path = str(self.mount_path[freq])
return Path(f"{_path}:\\") if ":" not in _path else Path(_path)
return Path(self.mount_path[freq])
else:
raise NotImplementedError(f"This type of uri is not supported")
@@ -351,9 +363,7 @@ class QlibConfig(Config):
for _freq in _provider_uri.keys():
# mount_path
_mount_path[_freq] = (
_mount_path[_freq]
if _mount_path[_freq] is None
else str(Path(_mount_path[_freq]).expanduser().resolve())
_mount_path[_freq] if _mount_path[_freq] is None else str(Path(_mount_path[_freq]).expanduser())
)
self["provider_uri"] = _provider_uri
self["mount_path"] = _mount_path

View File

@@ -34,7 +34,11 @@ class FileStorageMixin:
@property
def dpm(self):
return C.dpm if getattr(self, "_provider_uri", None) is None else C.DataPathManager(self._provider_uri, None)
return (
C.dpm
if getattr(self, "_provider_uri", None) is None
else C.DataPathManager(self._provider_uri, C.mount_path)
)
@property
def support_freq(self) -> List[str]:

View File

@@ -0,0 +1,121 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import abc
import importlib
from pathlib import Path
from typing import Union, Iterable, List
import fire
import numpy as np
import pandas as pd
# pip install baostock
import baostock as bs
from loguru import logger
class CollectorFutureCalendar:
calendar_format = "%Y-%m-%d"
def __init__(self, qlib_dir: Union[str, Path], start_date: str = None, end_date: str = None):
"""
Parameters
----------
qlib_dir:
qlib data directory
start_date
start date
end_date
end date
"""
self.qlib_dir = Path(qlib_dir).expanduser().absolute()
self.calendar_path = self.qlib_dir.joinpath("calendars/day.txt")
self.future_path = self.qlib_dir.joinpath("calendars/day_future.txt")
self._calendar_list = self.calendar_list
_latest_date = self._calendar_list[-1]
self.start_date = _latest_date if start_date is None else pd.Timestamp(start_date)
self.end_date = _latest_date + pd.Timedelta(days=365 * 2) if end_date is None else pd.Timestamp(end_date)
@property
def calendar_list(self) -> List[pd.Timestamp]:
# load old calendar
if not self.calendar_path.exists():
raise ValueError(f"calendar does not exist: {self.calendar_path}")
calendar_df = pd.read_csv(self.calendar_path, header=None)
calendar_df.columns = ["date"]
calendar_df["date"] = pd.to_datetime(calendar_df["date"])
return calendar_df["date"].to_list()
def _format_datetime(self, datetime_d: [str, pd.Timestamp]):
datetime_d = pd.Timestamp(datetime_d)
return datetime_d.strftime(self.calendar_format)
def write_calendar(self, calendar: Iterable):
calendars_list = list(map(lambda x: self._format_datetime(x), sorted(set(self.calendar_list + calendar))))
np.savetxt(self.future_path, calendars_list, fmt="%s", encoding="utf-8")
@abc.abstractmethod
def collector(self) -> Iterable[pd.Timestamp]:
"""
Returns
-------
"""
raise NotImplementedError(f"Please implement the `collector` method")
class CollectorFutureCalendarCN(CollectorFutureCalendar):
def collector(self) -> Iterable[pd.Timestamp]:
lg = bs.login()
if lg.error_code != "0":
raise ValueError(f"login respond error_msg: {lg.error_msg}")
rs = bs.query_trade_dates(
start_date=self._format_datetime(self.start_date), end_date=self._format_datetime(self.end_date)
)
if rs.error_code != "0":
raise ValueError(f"query_trade_dates respond error_msg: {rs.error_msg}")
data_list = []
while (rs.error_code == "0") & rs.next():
data_list.append(rs.get_row_data())
calendar = pd.DataFrame(data_list, columns=rs.fields)
calendar["is_trading_day"] = calendar["is_trading_day"].astype(int)
return pd.to_datetime(calendar[calendar["is_trading_day"] == 1]["calendar_date"]).to_list()
class CollectorFutureCalendarUS(CollectorFutureCalendar):
def collector(self) -> Iterable[pd.Timestamp]:
# TODO: US future calendar
raise ValueError("Us calendar is not supported")
def run(qlib_dir: Union[str, Path], region: str = "cn", start_date: str = None, end_date: str = None):
"""Collect future calendar(day)
Parameters
----------
qlib_dir:
qlib data directory
region:
cn/CN or us/US
start_date
start date
end_date
end date
Examples
-------
# get cn future calendar
$ python future_calendar_collector.py --qlib_data_1d_dir <user data dir> --region cn
"""
logger.info(f"collector future calendar: region={region}")
_cur_module = importlib.import_module("future_calendar_collector")
_class = getattr(_cur_module, f"CollectorFutureCalendar{region.upper()}")
collector = _class(qlib_dir=qlib_dir, start_date=start_date, end_date=end_date)
collector.write_calendar(collector.collector())
if __name__ == "__main__":
fire.Fire(run)