diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index af386f6ca..3f77a7273 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,7 +12,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - os: [windows-latest, ubuntu-16.04, ubuntu-18.04, ubuntu-20.04] + os: [windows-latest, ubuntu-18.04, ubuntu-20.04] python-version: [3.6, 3.7, 3.8] steps: diff --git a/.github/workflows/test_macos.yml b/.github/workflows/test_macos.yml index b6003f668..0db80610a 100644 --- a/.github/workflows/test_macos.yml +++ b/.github/workflows/test_macos.yml @@ -36,12 +36,15 @@ jobs: run: | python -m pip install numpy==1.19.5 python -m pip install pyqlib --ignore-installed ruamel.yaml numpy - - name: Install Lightgbm for MacOS run: | /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Microsoft/qlib/main/.github/brew_install.sh)" HOMEBREW_NO_AUTO_UPDATE=1 brew install lightgbm - + # FIX MacOS error: Segmentation fault + # reference: https://github.com/microsoft/LightGBM/issues/4229 + wget https://raw.githubusercontent.com/Homebrew/homebrew-core/fb8323f2b170bd4ae97e1bac9bf3e2983af3fdb0/Formula/libomp.rb + brew unlink libomp + brew install libomp.rb - name: Test data downloads run: | python scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data --interval 1d --region cn @@ -56,7 +59,6 @@ jobs: python -m pip install numpy jupyter jupyter_contrib_nbextensions python -m pip install -U scipy scikit-learn # installing without this line will cause errors on GitHub Actions, while instsalling locally won't python setup.py install - - name: Install test dependencies run: | python -m pip install --upgrade pip @@ -68,4 +70,4 @@ jobs: python -m pytest . --durations=0 - name: Test workflow by config (install from source) run: | - python qlib/workflow/cli.py examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml \ No newline at end of file + python qlib/workflow/cli.py examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml diff --git a/VERSION.txt b/VERSION.txt index 79e6b01ac..e89cd218b 100644 --- a/VERSION.txt +++ b/VERSION.txt @@ -1 +1 @@ -0.7.1.99 +0.7.2.99 diff --git a/docs/reference/api.rst b/docs/reference/api.rst index 5e6e50b0b..bf1f00f28 100644 --- a/docs/reference/api.rst +++ b/docs/reference/api.rst @@ -241,6 +241,7 @@ Online Tool .. automodule:: qlib.workflow.online.utils :members: + RecordUpdater -------------------- .. automodule:: qlib.workflow.online.update @@ -257,4 +258,4 @@ Serializable :members: - \ No newline at end of file + diff --git a/examples/benchmarks/LightGBM/features_sample.py b/examples/benchmarks/LightGBM/features_sample.py new file mode 100644 index 000000000..0b996bd1f --- /dev/null +++ b/examples/benchmarks/LightGBM/features_sample.py @@ -0,0 +1,16 @@ +import datetime +import pandas as pd + +from qlib.data.inst_processor import InstProcessor + + +class Resample1minProcessor(InstProcessor): + def __init__(self, hour: int, minute: int, **kwargs): + self.hour = hour + self.minute = minute + + def __call__(self, df: pd.DataFrame, *args, **kwargs): + df.index = pd.to_datetime(df.index) + df = df.loc[df.index.time == datetime.time(self.hour, self.minute)] + df.index = df.index.normalize() + return df diff --git a/examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml b/examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml index 8bee2bd38..2bb21d41d 100644 --- a/examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml +++ b/examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml @@ -69,4 +69,4 @@ task: - class: PortAnaRecord module_path: qlib.workflow.record_temp kwargs: - config: *port_analysis_config \ No newline at end of file + config: *port_analysis_config diff --git a/examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158_multi_freq.yaml b/examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158_multi_freq.yaml new file mode 100644 index 000000000..46b5c0f80 --- /dev/null +++ b/examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158_multi_freq.yaml @@ -0,0 +1,83 @@ +qlib_init: + provider_uri: + day: "~/.qlib/qlib_data/cn_data" + 1min: "~/.qlib/qlib_data/cn_data_1min" + region: cn + dataset_cache: null + maxtasksperchild: 1 +market: &market csi300 +benchmark: &benchmark SH000300 +data_handler_config: &data_handler_config + start_time: 2008-01-01 + # 1min closing time is 15:00:00 + end_time: "2020-08-01 15:00:00" + fit_start_time: 2008-01-01 + fit_end_time: 2014-12-31 + instruments: *market + freq: + label: day + feature: 1min + # with label as reference + inst_processor: + feature: + - class: Resample1minProcessor + module_path: features_sample.py + kwargs: + hour: 14 + minute: 56 + +port_analysis_config: &port_analysis_config + strategy: + class: TopkDropoutStrategy + module_path: qlib.contrib.strategy.strategy + kwargs: + topk: 50 + n_drop: 5 + backtest: + verbose: False + limit_threshold: 0.095 + account: 100000000 + benchmark: *benchmark + deal_price: close + open_cost: 0.0005 + close_cost: 0.0015 + min_cost: 5 +task: + model: + class: LGBModel + module_path: qlib.contrib.model.gbdt + kwargs: + loss: mse + colsample_bytree: 0.8879 + learning_rate: 0.2 + subsample: 0.8789 + lambda_l1: 205.6999 + lambda_l2: 580.9768 + max_depth: 8 + num_leaves: 210 + num_threads: 20 + dataset: + class: DatasetH + module_path: qlib.data.dataset + kwargs: + handler: + class: Alpha158 + module_path: qlib.contrib.data.handler + kwargs: *data_handler_config + segments: + train: [2008-01-01, 2014-12-31] + valid: [2015-01-01, 2016-12-31] + test: [2017-01-01, 2020-08-01] + record: + - class: SignalRecord + module_path: qlib.workflow.record_temp + kwargs: {} + - class: SigAnaRecord + module_path: qlib.workflow.record_temp + kwargs: + ana_long_short: False + ann_scaler: 252 + - class: PortAnaRecord + module_path: qlib.workflow.record_temp + kwargs: + config: *port_analysis_config \ No newline at end of file diff --git a/examples/benchmarks/TRA/workflow_config_tra_Alpha158_full.yaml b/examples/benchmarks/TRA/workflow_config_tra_Alpha158_full.yaml index ab8febc2f..107f58906 100644 --- a/examples/benchmarks/TRA/workflow_config_tra_Alpha158_full.yaml +++ b/examples/benchmarks/TRA/workflow_config_tra_Alpha158_full.yaml @@ -122,5 +122,5 @@ task: ann_scaler: 252 - class: PortAnaRecord module_path: qlib.workflow.record_temp - kwargs: + kwargs: config: *port_analysis_config diff --git a/examples/benchmarks/TRA/workflow_config_tra_Alpha360.yaml b/examples/benchmarks/TRA/workflow_config_tra_Alpha360.yaml index 64e3c91cb..191aef53d 100644 --- a/examples/benchmarks/TRA/workflow_config_tra_Alpha360.yaml +++ b/examples/benchmarks/TRA/workflow_config_tra_Alpha360.yaml @@ -122,5 +122,5 @@ task: ann_scaler: 252 - class: PortAnaRecord module_path: qlib.workflow.record_temp - kwargs: + kwargs: config: *port_analysis_config diff --git a/qlib/data/data.py b/qlib/data/data.py index 7fbc48f71..115d38170 100644 --- a/qlib/data/data.py +++ b/qlib/data/data.py @@ -10,16 +10,12 @@ import abc import copy import queue import bisect -from typing import List import numpy as np import pandas as pd from multiprocessing import Pool from typing import Iterable, Union from typing import List, Union -import numpy as np -import pandas as pd - # For supporting multiprocessing in outter code, joblib is used from joblib import delayed @@ -30,9 +26,9 @@ from .ops import Operators from .inst_processor import InstProcessor from ..log import get_module_logger -from .cache import DiskDatasetCache from ..utils.time import Freq from ..utils.resam import resam_calendar +from .cache import DiskDatasetCache, DiskExpressionCache from ..utils import ( Wrapper, init_instance_by_config, @@ -278,7 +274,6 @@ class InstrumentProvider(abc.ABC, ProviderBackendMixin): """ if isinstance(market, list): return market - from .filter import SeriesDFilter if filter_pipe is None: diff --git a/qlib/workflow/online/update.py b/qlib/workflow/online/update.py index f2135a27a..f349a45b3 100644 --- a/qlib/workflow/online/update.py +++ b/qlib/workflow/online/update.py @@ -1,6 +1,5 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. - """ Updater is a module to update artifacts such as predictions when the stock data is updating. """ @@ -10,11 +9,12 @@ from abc import ABCMeta, abstractmethod import pandas as pd from qlib import get_module_logger from qlib.data import D -from qlib.data.dataset import DatasetH +from qlib.data.dataset import Dataset, DatasetH from qlib.data.dataset.handler import DataHandlerLP from qlib.model import Model from qlib.utils import get_date_by_shift from qlib.workflow.recorder import Recorder +from qlib.workflow.record_temp import SignalRecord class RMDLoader: @@ -72,12 +72,25 @@ class RecordUpdater(metaclass=ABCMeta): ... -class PredUpdater(RecordUpdater): +class DSBasedUpdater(RecordUpdater, metaclass=ABCMeta): """ - Update the prediction in the Recorder + Dataset-Based Updater + - Provding updating feature for Updating data based on Qlib Dataset + + Assumption + - Based on Qlib dataset + - The data to be updated is a multi-level index pd.DataFrame. For example label , prediction. + + LABEL0 + datetime instrument + 2021-05-10 SH600000 0.006965 + SH600004 0.003407 + ... ... + 2021-05-28 SZ300498 0.015748 + SZ300676 -0.001321 """ - def __init__(self, record: Recorder, to_date=None, hist_ref: int = 0, freq="day"): + def __init__(self, record: Recorder, to_date=None, hist_ref: int = 0, freq="day", fname="pred.pkl"): """ Init PredUpdater. @@ -100,15 +113,27 @@ class PredUpdater(RecordUpdater): self.to_date = to_date self.hist_ref = hist_ref self.freq = freq + self.fname = fname self.rmdl = RMDLoader(rec=record) + latest_date = D.calendar(freq=freq)[-1] if to_date == None: - to_date = D.calendar(freq=freq)[-1] - self.to_date = pd.Timestamp(to_date) + to_date = latest_date + to_date = pd.Timestamp(to_date) + + if to_date >= latest_date: + self.logger.warning( + f"The given `to_date`({to_date}) is later than `latest_date`({latest_date}). So `to_date` is clipped to `latest_date`." + ) + to_date = latest_date + self.to_date = to_date # FIXME: it will raise error when running routine with delay trainer - # should we use another predicition updater for delay trainer? - self.old_pred = record.load_object("pred.pkl") - self.last_end = self.old_pred.index.get_level_values("datetime").max() + # should we use another prediction updater for delay trainer? + self.old_data: pd.DataFrame = record.load_object(fname) + + # dropna is for being compatible to some data with future information(e.g. label) + # The recent label data should be updated together + self.last_end = self.old_data.dropna().index.get_level_values("datetime").max() def prepare_data(self) -> DatasetH: """ @@ -127,7 +152,7 @@ class PredUpdater(RecordUpdater): def update(self, dataset: DatasetH = None): """ - Update the prediction in a recorder. + Update the data in a recorder. Args: DatasetH: the instance of DatasetH. None for reprepare. @@ -139,7 +164,7 @@ class PredUpdater(RecordUpdater): if self.last_end >= self.to_date: self.logger.info( - f"The prediction in {self.record.info['id']} are latest ({self.last_end}). No need to update to {self.to_date}." + f"The data in {self.record.info['id']} are latest ({self.last_end}). No need to update to {self.to_date}." ) return @@ -148,14 +173,49 @@ class PredUpdater(RecordUpdater): # For reusing the dataset dataset = self.prepare_data() + self.record.save_objects(**{self.fname: self.get_update_data(dataset)}) + + @abstractmethod + def get_update_data(self, dataset: Dataset) -> pd.DataFrame: + """ + return the updated data based on the given dataset + + The difference between `get_update_data` and `update` + - `update_date` only include some data specific feature + - `update` include some general routine steps(e.g. prepare dataset, checking) + """ + ... + + +class PredUpdater(DSBasedUpdater): + """ + Update the prediction in the Recorder + """ + + def get_update_data(self, dataset: Dataset) -> pd.DataFrame: # Load model model = self.rmdl.get_model() - new_pred: pd.Series = model.predict(dataset) - cb_pred = pd.concat([self.old_pred, new_pred.to_frame("score")], axis=0) + cb_pred = pd.concat([self.old_data, new_pred.to_frame("score")], axis=0) cb_pred = cb_pred.sort_index() - - self.record.save_objects(**{"pred.pkl": cb_pred}) - self.logger.info(f"Finish updating new {new_pred.shape[0]} predictions in {self.record.info['id']}.") + return cb_pred + + +class LabelUpdater(DSBasedUpdater): + """ + Update the label in the recorder + + Assumption + - The label is generated from record_temp.SignalRecord. + """ + + def __init__(self, record: Recorder, to_date=None, **kwargs): + super().__init__(record, to_date=to_date, fname="label.pkl", **kwargs) + + def get_update_data(self, dataset: Dataset) -> pd.DataFrame: + new_label = SignalRecord.generate_label(dataset) + cb_data = pd.concat([self.old_data, new_label], axis=0) + cb_data = cb_data[~cb_data.index.duplicated(keep="last")].sort_index() + return cb_data diff --git a/qlib/workflow/record_temp.py b/qlib/workflow/record_temp.py index 605b70cf9..bae14d642 100644 --- a/qlib/workflow/record_temp.py +++ b/qlib/workflow/record_temp.py @@ -125,6 +125,30 @@ class SignalRecord(RecordTemp): self.model = model self.dataset = dataset + @staticmethod + def generate_label(dataset): + # NOTE: + # Python doesn't provide the downcasting mechanism. + # We use the trick here to downcast the class + orig_cls = dataset.__class__ + dataset.__class__ = DatasetH + + params = dict(segments="test", col_set="label", data_key=DataHandlerLP.DK_R) + try: + # Assume the backend handler is DataHandlerLP + raw_label = dataset.prepare(**params) + except TypeError: + # The argument number is not right + del params["data_key"] + # The backend handler should be DataHandler + raw_label = dataset.prepare(**params) + except AttributeError: + # The data handler is initialize with `drop_raw=True`... + # So raw_label is not available + raw_label = None + dataset.__class__ = orig_cls + return raw_label + def generate(self, **kwargs): # generate prediciton pred = self.model.predict(self.dataset) @@ -140,28 +164,8 @@ class SignalRecord(RecordTemp): pprint(pred.head(5)) if isinstance(self.dataset, DatasetH): - # NOTE: - # Python doesn't provide the downcasting mechanism. - # We use the trick here to downcast the class - orig_cls = self.dataset.__class__ - self.dataset.__class__ = DatasetH - - params = dict(segments="test", col_set="label", data_key=DataHandlerLP.DK_R) - try: - # Assume the backend handler is DataHandlerLP - raw_label = self.dataset.prepare(**params) - except TypeError: - # The argument number is not right - del params["data_key"] - # The backend handler should be DataHandler - raw_label = self.dataset.prepare(**params) - except AttributeError: - # The data handler is initialize with `drop_raw=True`... - # So raw_label is not available - raw_label = None - + raw_label = self.generate_label(self.dataset) self.recorder.save_objects(**{"label.pkl": raw_label}) - self.dataset.__class__ = orig_cls @staticmethod def list(): diff --git a/tests/rolling_tests/test_update_pred.py b/tests/rolling_tests/test_update_pred.py new file mode 100644 index 000000000..7b900d0b4 --- /dev/null +++ b/tests/rolling_tests/test_update_pred.py @@ -0,0 +1,117 @@ +import copy +import unittest + +import fire +import pandas as pd + +import qlib +from qlib.config import REG_CN +from qlib.data import D +from qlib.model.trainer import task_train +from qlib.tests import TestAutoData +from qlib.tests.config import CSI300_GBDT_TASK +from qlib.workflow.online.utils import OnlineToolR +from qlib.workflow.online.update import LabelUpdater + + +class TestRolling(TestAutoData): + _setup_kwargs = dict(expression_cache=None, dataset_cache=None) + + def test_update_pred(self): + """ + This test is for testing if it will raise error if the `to_date` is out of the boundary. + """ + task = copy.deepcopy(CSI300_GBDT_TASK) + + task["record"] = { + "class": "SignalRecord", + "module_path": "qlib.workflow.record_temp", + } + + exp_name = "online_srv_test" + + cal = D.calendar() + latest_date = cal[-1] + + train_start = latest_date - pd.Timedelta(days=61) + train_end = latest_date - pd.Timedelta(days=41) + task["dataset"]["kwargs"]["segments"] = { + "train": (train_start, train_end), + "valid": (latest_date - pd.Timedelta(days=40), latest_date - pd.Timedelta(days=21)), + "test": (latest_date - pd.Timedelta(days=20), latest_date), + } + + task["dataset"]["kwargs"]["handler"]["kwargs"] = { + "start_time": train_start, + "end_time": latest_date, + "fit_start_time": train_start, + "fit_end_time": train_end, + "instruments": "csi300", + } + + rec = task_train(task, exp_name) + + pred = rec.load_object("pred.pkl") + + online_tool = OnlineToolR(exp_name) + online_tool.reset_online_tag(rec) # set to online model + + online_tool.update_online_pred(to_date=latest_date + pd.Timedelta(days=10)) + + def test_update_label(self): + + task = copy.deepcopy(CSI300_GBDT_TASK) + + task["record"] = { + "class": "SignalRecord", + "module_path": "qlib.workflow.record_temp", + } + + exp_name = "online_srv_test" + + cal = D.calendar() + shift = 10 + latest_date = cal[-1 - shift] + + train_start = latest_date - pd.Timedelta(days=61) + train_end = latest_date - pd.Timedelta(days=41) + task["dataset"]["kwargs"]["segments"] = { + "train": (train_start, train_end), + "valid": (latest_date - pd.Timedelta(days=40), latest_date - pd.Timedelta(days=21)), + "test": (latest_date - pd.Timedelta(days=20), latest_date), + } + + task["dataset"]["kwargs"]["handler"]["kwargs"] = { + "start_time": train_start, + "end_time": latest_date, + "fit_start_time": train_start, + "fit_end_time": train_end, + "instruments": "csi300", + } + + rec = task_train(task, exp_name) + + pred = rec.load_object("pred.pkl") + + online_tool = OnlineToolR(exp_name) + online_tool.reset_online_tag(rec) # set to online model + online_tool.update_online_pred() + + new_pred = rec.load_object("pred.pkl") + label = rec.load_object("label.pkl") + label_date = label.dropna().index.get_level_values("datetime").max() + pred_date = new_pred.dropna().index.get_level_values("datetime").max() + + # The prediction is updated, but the label is not updated. + self.assertTrue(label_date < pred_date) + + # Update label now + lu = LabelUpdater(rec) + lu.update() + new_label = rec.load_object("label.pkl") + new_label_date = new_label.index.get_level_values("datetime").max() + self.assertTrue(new_label_date == pred_date) # make sure the label is updated now + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/storage_tests/test_storage.py b/tests/storage_tests/test_storage.py index e2ab28af4..95502875b 100644 --- a/tests/storage_tests/test_storage.py +++ b/tests/storage_tests/test_storage.py @@ -149,15 +149,15 @@ class TestStorage(TestAutoData): """ - feature = FeatureStorage(instrument="SH600004", field="close", freq="day", provider_uri=self.provider_uri) + feature = FeatureStorage(instrument="SZ300677", field="close", freq="day", provider_uri=self.provider_uri) with self.assertRaises(IndexError): print(feature[0]) assert isinstance( - feature[815][1], (float, np.float32) + feature[3049][1], (float, np.float32) ), f"{feature.__class__.__name__}.__getitem__(i: int) error" - assert len(feature[815:818]) == 3, f"{feature.__class__.__name__}.__getitem__(s: slice) error" - print(f"feature[815: 818]: \n{feature[815: 818]}") + assert len(feature[3049:3052]) == 3, f"{feature.__class__.__name__}.__getitem__(s: slice) error" + print(f"feature[3049: 3052]: \n{feature[3049: 3052]}") print(f"feature[:].tail(): \n{feature[:].tail()}")