diff --git a/qlib/__init__.py b/qlib/__init__.py index 30d49383c..13d8b3590 100644 --- a/qlib/__init__.py +++ b/qlib/__init__.py @@ -63,7 +63,7 @@ def init(default_conf="client", **kwargs): else: logger.warning(f"auto_path is False, please make sure {mount_path} is mounted") elif uri_type == C.NFS_URI: - _mount_nfs_uri(provider_uri, mount_path, C["auto_mount"]) + _mount_nfs_uri(provider_uri, C.dpm.get_data_uri(_freq), C["auto_mount"]) else: raise NotImplementedError(f"This type of URI is not supported") @@ -96,7 +96,7 @@ def _mount_nfs_uri(provider_uri, mount_path, auto_mount: bool = False): sys_type = platform.system() if "win" in sys_type.lower(): # system: window - exec_result = os.popen("mount -o anon %s %s" % (provider_uri, mount_path + ":")) + exec_result = os.popen(f"mount -o anon {provider_uri} {mount_path}") result = exec_result.read() if "85" in result: LOG.warning(f"{provider_uri} on Windows:{mount_path} is already mounted") diff --git a/qlib/config.py b/qlib/config.py index d9a2dd07f..54af9b954 100644 --- a/qlib/config.py +++ b/qlib/config.py @@ -269,11 +269,19 @@ class QlibConfig(Config): self._registered = False class DataPathManager: - def __init__( - self, - provider_uri: Union[str, Path, dict], - mount_path: Union[str, Path, dict], - ): + """ + Motivation: + - get the right path (e.g. data uri) for accessing data based on given information(e.g. provider_uri, mount_path and frequency) + - some helper functions to process uri. + """ + + def __init__(self, provider_uri: Union[str, Path, dict], mount_path: Union[str, Path, dict]): + + """ + The relation of `provider_uri` and `mount_path` + - `mount_path` is used only if provider_uri is an NFS path + - otherwise, provider_uri will be used for accessing data + """ self.provider_uri = provider_uri self.mount_path = mount_path @@ -304,6 +312,9 @@ class QlibConfig(Config): return QlibConfig.LOCAL_URI def get_data_uri(self, freq: Optional[Union[str, Freq]] = None) -> Path: + """ + please refer DataPathManager's __init__ and class doc + """ if freq is not None: freq = str(freq) # converting Freq to string if freq is None or freq not in self.provider_uri: @@ -314,7 +325,8 @@ class QlibConfig(Config): elif self.get_uri_type(_provider_uri) == QlibConfig.NFS_URI: if "win" in platform.system().lower(): # windows, mount_path is the drive - return Path(f"{self.mount_path[freq]}:\\") + _path = str(self.mount_path[freq]) + return Path(f"{_path}:\\") if ":" not in _path else Path(_path) return Path(self.mount_path[freq]) else: raise NotImplementedError(f"This type of uri is not supported") @@ -351,9 +363,7 @@ class QlibConfig(Config): for _freq in _provider_uri.keys(): # mount_path _mount_path[_freq] = ( - _mount_path[_freq] - if _mount_path[_freq] is None - else str(Path(_mount_path[_freq]).expanduser().resolve()) + _mount_path[_freq] if _mount_path[_freq] is None else str(Path(_mount_path[_freq]).expanduser()) ) self["provider_uri"] = _provider_uri self["mount_path"] = _mount_path diff --git a/qlib/data/storage/file_storage.py b/qlib/data/storage/file_storage.py index 4058b85c2..f0aaa4650 100644 --- a/qlib/data/storage/file_storage.py +++ b/qlib/data/storage/file_storage.py @@ -34,7 +34,11 @@ class FileStorageMixin: @property def dpm(self): - return C.dpm if getattr(self, "_provider_uri", None) is None else C.DataPathManager(self._provider_uri, None) + return ( + C.dpm + if getattr(self, "_provider_uri", None) is None + else C.DataPathManager(self._provider_uri, C.mount_path) + ) @property def support_freq(self) -> List[str]: diff --git a/scripts/data_collector/future_calendar_collector.py b/scripts/data_collector/future_calendar_collector.py new file mode 100644 index 000000000..e5b118926 --- /dev/null +++ b/scripts/data_collector/future_calendar_collector.py @@ -0,0 +1,121 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import abc +import importlib +from pathlib import Path +from typing import Union, Iterable, List + +import fire +import numpy as np +import pandas as pd + +# pip install baostock +import baostock as bs +from loguru import logger + + +class CollectorFutureCalendar: + calendar_format = "%Y-%m-%d" + + def __init__(self, qlib_dir: Union[str, Path], start_date: str = None, end_date: str = None): + """ + + Parameters + ---------- + qlib_dir: + qlib data directory + start_date + start date + end_date + end date + """ + self.qlib_dir = Path(qlib_dir).expanduser().absolute() + self.calendar_path = self.qlib_dir.joinpath("calendars/day.txt") + self.future_path = self.qlib_dir.joinpath("calendars/day_future.txt") + self._calendar_list = self.calendar_list + _latest_date = self._calendar_list[-1] + self.start_date = _latest_date if start_date is None else pd.Timestamp(start_date) + self.end_date = _latest_date + pd.Timedelta(days=365 * 2) if end_date is None else pd.Timestamp(end_date) + + @property + def calendar_list(self) -> List[pd.Timestamp]: + # load old calendar + if not self.calendar_path.exists(): + raise ValueError(f"calendar does not exist: {self.calendar_path}") + calendar_df = pd.read_csv(self.calendar_path, header=None) + calendar_df.columns = ["date"] + calendar_df["date"] = pd.to_datetime(calendar_df["date"]) + return calendar_df["date"].to_list() + + def _format_datetime(self, datetime_d: [str, pd.Timestamp]): + datetime_d = pd.Timestamp(datetime_d) + return datetime_d.strftime(self.calendar_format) + + def write_calendar(self, calendar: Iterable): + calendars_list = list(map(lambda x: self._format_datetime(x), sorted(set(self.calendar_list + calendar)))) + np.savetxt(self.future_path, calendars_list, fmt="%s", encoding="utf-8") + + @abc.abstractmethod + def collector(self) -> Iterable[pd.Timestamp]: + """ + + Returns + ------- + + """ + raise NotImplementedError(f"Please implement the `collector` method") + + +class CollectorFutureCalendarCN(CollectorFutureCalendar): + def collector(self) -> Iterable[pd.Timestamp]: + lg = bs.login() + if lg.error_code != "0": + raise ValueError(f"login respond error_msg: {lg.error_msg}") + rs = bs.query_trade_dates( + start_date=self._format_datetime(self.start_date), end_date=self._format_datetime(self.end_date) + ) + if rs.error_code != "0": + raise ValueError(f"query_trade_dates respond error_msg: {rs.error_msg}") + data_list = [] + while (rs.error_code == "0") & rs.next(): + data_list.append(rs.get_row_data()) + calendar = pd.DataFrame(data_list, columns=rs.fields) + calendar["is_trading_day"] = calendar["is_trading_day"].astype(int) + return pd.to_datetime(calendar[calendar["is_trading_day"] == 1]["calendar_date"]).to_list() + + +class CollectorFutureCalendarUS(CollectorFutureCalendar): + def collector(self) -> Iterable[pd.Timestamp]: + # TODO: US future calendar + raise ValueError("Us calendar is not supported") + + +def run(qlib_dir: Union[str, Path], region: str = "cn", start_date: str = None, end_date: str = None): + """Collect future calendar(day) + + Parameters + ---------- + qlib_dir: + qlib data directory + region: + cn/CN or us/US + start_date + start date + end_date + end date + + Examples + ------- + # get cn future calendar + $ python future_calendar_collector.py --qlib_data_1d_dir --region cn + """ + logger.info(f"collector future calendar: region={region}") + _cur_module = importlib.import_module("future_calendar_collector") + _class = getattr(_cur_module, f"CollectorFutureCalendar{region.upper()}") + collector = _class(qlib_dir=qlib_dir, start_date=start_date, end_date=end_date) + collector.write_calendar(collector.collector()) + + +if __name__ == "__main__": + fire.Fire(run)