1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-05 12:00:58 +08:00

Merge remote-tracking branch 'origin/main' into nested_decision_exe

This commit is contained in:
Young
2021-09-30 18:41:15 +00:00
14 changed files with 336 additions and 58 deletions

View File

@@ -12,7 +12,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [windows-latest, ubuntu-16.04, ubuntu-18.04, ubuntu-20.04]
os: [windows-latest, ubuntu-18.04, ubuntu-20.04]
python-version: [3.6, 3.7, 3.8]
steps:

View File

@@ -36,12 +36,15 @@ jobs:
run: |
python -m pip install numpy==1.19.5
python -m pip install pyqlib --ignore-installed ruamel.yaml numpy
- name: Install Lightgbm for MacOS
run: |
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Microsoft/qlib/main/.github/brew_install.sh)"
HOMEBREW_NO_AUTO_UPDATE=1 brew install lightgbm
# FIX MacOS error: Segmentation fault
# reference: https://github.com/microsoft/LightGBM/issues/4229
wget https://raw.githubusercontent.com/Homebrew/homebrew-core/fb8323f2b170bd4ae97e1bac9bf3e2983af3fdb0/Formula/libomp.rb
brew unlink libomp
brew install libomp.rb
- name: Test data downloads
run: |
python scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data --interval 1d --region cn
@@ -56,7 +59,6 @@ jobs:
python -m pip install numpy jupyter jupyter_contrib_nbextensions
python -m pip install -U scipy scikit-learn # installing without this line will cause errors on GitHub Actions, while instsalling locally won't
python setup.py install
- name: Install test dependencies
run: |
python -m pip install --upgrade pip
@@ -68,4 +70,4 @@ jobs:
python -m pytest . --durations=0
- name: Test workflow by config (install from source)
run: |
python qlib/workflow/cli.py examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml
python qlib/workflow/cli.py examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml

View File

@@ -1 +1 @@
0.7.1.99
0.7.2.99

View File

@@ -241,6 +241,7 @@ Online Tool
.. automodule:: qlib.workflow.online.utils
:members:
RecordUpdater
--------------------
.. automodule:: qlib.workflow.online.update
@@ -257,4 +258,4 @@ Serializable
:members:

View File

@@ -0,0 +1,16 @@
import datetime
import pandas as pd
from qlib.data.inst_processor import InstProcessor
class Resample1minProcessor(InstProcessor):
def __init__(self, hour: int, minute: int, **kwargs):
self.hour = hour
self.minute = minute
def __call__(self, df: pd.DataFrame, *args, **kwargs):
df.index = pd.to_datetime(df.index)
df = df.loc[df.index.time == datetime.time(self.hour, self.minute)]
df.index = df.index.normalize()
return df

View File

@@ -69,4 +69,4 @@ task:
- class: PortAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
config: *port_analysis_config
config: *port_analysis_config

View File

@@ -0,0 +1,83 @@
qlib_init:
provider_uri:
day: "~/.qlib/qlib_data/cn_data"
1min: "~/.qlib/qlib_data/cn_data_1min"
region: cn
dataset_cache: null
maxtasksperchild: 1
market: &market csi300
benchmark: &benchmark SH000300
data_handler_config: &data_handler_config
start_time: 2008-01-01
# 1min closing time is 15:00:00
end_time: "2020-08-01 15:00:00"
fit_start_time: 2008-01-01
fit_end_time: 2014-12-31
instruments: *market
freq:
label: day
feature: 1min
# with label as reference
inst_processor:
feature:
- class: Resample1minProcessor
module_path: features_sample.py
kwargs:
hour: 14
minute: 56
port_analysis_config: &port_analysis_config
strategy:
class: TopkDropoutStrategy
module_path: qlib.contrib.strategy.strategy
kwargs:
topk: 50
n_drop: 5
backtest:
verbose: False
limit_threshold: 0.095
account: 100000000
benchmark: *benchmark
deal_price: close
open_cost: 0.0005
close_cost: 0.0015
min_cost: 5
task:
model:
class: LGBModel
module_path: qlib.contrib.model.gbdt
kwargs:
loss: mse
colsample_bytree: 0.8879
learning_rate: 0.2
subsample: 0.8789
lambda_l1: 205.6999
lambda_l2: 580.9768
max_depth: 8
num_leaves: 210
num_threads: 20
dataset:
class: DatasetH
module_path: qlib.data.dataset
kwargs:
handler:
class: Alpha158
module_path: qlib.contrib.data.handler
kwargs: *data_handler_config
segments:
train: [2008-01-01, 2014-12-31]
valid: [2015-01-01, 2016-12-31]
test: [2017-01-01, 2020-08-01]
record:
- class: SignalRecord
module_path: qlib.workflow.record_temp
kwargs: {}
- class: SigAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
ana_long_short: False
ann_scaler: 252
- class: PortAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
config: *port_analysis_config

View File

@@ -122,5 +122,5 @@ task:
ann_scaler: 252
- class: PortAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
kwargs:
config: *port_analysis_config

View File

@@ -122,5 +122,5 @@ task:
ann_scaler: 252
- class: PortAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
kwargs:
config: *port_analysis_config

View File

@@ -10,16 +10,12 @@ import abc
import copy
import queue
import bisect
from typing import List
import numpy as np
import pandas as pd
from multiprocessing import Pool
from typing import Iterable, Union
from typing import List, Union
import numpy as np
import pandas as pd
# For supporting multiprocessing in outter code, joblib is used
from joblib import delayed
@@ -30,9 +26,9 @@ from .ops import Operators
from .inst_processor import InstProcessor
from ..log import get_module_logger
from .cache import DiskDatasetCache
from ..utils.time import Freq
from ..utils.resam import resam_calendar
from .cache import DiskDatasetCache, DiskExpressionCache
from ..utils import (
Wrapper,
init_instance_by_config,
@@ -278,7 +274,6 @@ class InstrumentProvider(abc.ABC, ProviderBackendMixin):
"""
if isinstance(market, list):
return market
from .filter import SeriesDFilter
if filter_pipe is None:

View File

@@ -1,6 +1,5 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""
Updater is a module to update artifacts such as predictions when the stock data is updating.
"""
@@ -10,11 +9,12 @@ from abc import ABCMeta, abstractmethod
import pandas as pd
from qlib import get_module_logger
from qlib.data import D
from qlib.data.dataset import DatasetH
from qlib.data.dataset import Dataset, DatasetH
from qlib.data.dataset.handler import DataHandlerLP
from qlib.model import Model
from qlib.utils import get_date_by_shift
from qlib.workflow.recorder import Recorder
from qlib.workflow.record_temp import SignalRecord
class RMDLoader:
@@ -72,12 +72,25 @@ class RecordUpdater(metaclass=ABCMeta):
...
class PredUpdater(RecordUpdater):
class DSBasedUpdater(RecordUpdater, metaclass=ABCMeta):
"""
Update the prediction in the Recorder
Dataset-Based Updater
- Provding updating feature for Updating data based on Qlib Dataset
Assumption
- Based on Qlib dataset
- The data to be updated is a multi-level index pd.DataFrame. For example label , prediction.
LABEL0
datetime instrument
2021-05-10 SH600000 0.006965
SH600004 0.003407
... ...
2021-05-28 SZ300498 0.015748
SZ300676 -0.001321
"""
def __init__(self, record: Recorder, to_date=None, hist_ref: int = 0, freq="day"):
def __init__(self, record: Recorder, to_date=None, hist_ref: int = 0, freq="day", fname="pred.pkl"):
"""
Init PredUpdater.
@@ -100,15 +113,27 @@ class PredUpdater(RecordUpdater):
self.to_date = to_date
self.hist_ref = hist_ref
self.freq = freq
self.fname = fname
self.rmdl = RMDLoader(rec=record)
latest_date = D.calendar(freq=freq)[-1]
if to_date == None:
to_date = D.calendar(freq=freq)[-1]
self.to_date = pd.Timestamp(to_date)
to_date = latest_date
to_date = pd.Timestamp(to_date)
if to_date >= latest_date:
self.logger.warning(
f"The given `to_date`({to_date}) is later than `latest_date`({latest_date}). So `to_date` is clipped to `latest_date`."
)
to_date = latest_date
self.to_date = to_date
# FIXME: it will raise error when running routine with delay trainer
# should we use another predicition updater for delay trainer?
self.old_pred = record.load_object("pred.pkl")
self.last_end = self.old_pred.index.get_level_values("datetime").max()
# should we use another prediction updater for delay trainer?
self.old_data: pd.DataFrame = record.load_object(fname)
# dropna is for being compatible to some data with future information(e.g. label)
# The recent label data should be updated together
self.last_end = self.old_data.dropna().index.get_level_values("datetime").max()
def prepare_data(self) -> DatasetH:
"""
@@ -127,7 +152,7 @@ class PredUpdater(RecordUpdater):
def update(self, dataset: DatasetH = None):
"""
Update the prediction in a recorder.
Update the data in a recorder.
Args:
DatasetH: the instance of DatasetH. None for reprepare.
@@ -139,7 +164,7 @@ class PredUpdater(RecordUpdater):
if self.last_end >= self.to_date:
self.logger.info(
f"The prediction in {self.record.info['id']} are latest ({self.last_end}). No need to update to {self.to_date}."
f"The data in {self.record.info['id']} are latest ({self.last_end}). No need to update to {self.to_date}."
)
return
@@ -148,14 +173,49 @@ class PredUpdater(RecordUpdater):
# For reusing the dataset
dataset = self.prepare_data()
self.record.save_objects(**{self.fname: self.get_update_data(dataset)})
@abstractmethod
def get_update_data(self, dataset: Dataset) -> pd.DataFrame:
"""
return the updated data based on the given dataset
The difference between `get_update_data` and `update`
- `update_date` only include some data specific feature
- `update` include some general routine steps(e.g. prepare dataset, checking)
"""
...
class PredUpdater(DSBasedUpdater):
"""
Update the prediction in the Recorder
"""
def get_update_data(self, dataset: Dataset) -> pd.DataFrame:
# Load model
model = self.rmdl.get_model()
new_pred: pd.Series = model.predict(dataset)
cb_pred = pd.concat([self.old_pred, new_pred.to_frame("score")], axis=0)
cb_pred = pd.concat([self.old_data, new_pred.to_frame("score")], axis=0)
cb_pred = cb_pred.sort_index()
self.record.save_objects(**{"pred.pkl": cb_pred})
self.logger.info(f"Finish updating new {new_pred.shape[0]} predictions in {self.record.info['id']}.")
return cb_pred
class LabelUpdater(DSBasedUpdater):
"""
Update the label in the recorder
Assumption
- The label is generated from record_temp.SignalRecord.
"""
def __init__(self, record: Recorder, to_date=None, **kwargs):
super().__init__(record, to_date=to_date, fname="label.pkl", **kwargs)
def get_update_data(self, dataset: Dataset) -> pd.DataFrame:
new_label = SignalRecord.generate_label(dataset)
cb_data = pd.concat([self.old_data, new_label], axis=0)
cb_data = cb_data[~cb_data.index.duplicated(keep="last")].sort_index()
return cb_data

View File

@@ -125,6 +125,30 @@ class SignalRecord(RecordTemp):
self.model = model
self.dataset = dataset
@staticmethod
def generate_label(dataset):
# NOTE:
# Python doesn't provide the downcasting mechanism.
# We use the trick here to downcast the class
orig_cls = dataset.__class__
dataset.__class__ = DatasetH
params = dict(segments="test", col_set="label", data_key=DataHandlerLP.DK_R)
try:
# Assume the backend handler is DataHandlerLP
raw_label = dataset.prepare(**params)
except TypeError:
# The argument number is not right
del params["data_key"]
# The backend handler should be DataHandler
raw_label = dataset.prepare(**params)
except AttributeError:
# The data handler is initialize with `drop_raw=True`...
# So raw_label is not available
raw_label = None
dataset.__class__ = orig_cls
return raw_label
def generate(self, **kwargs):
# generate prediciton
pred = self.model.predict(self.dataset)
@@ -140,28 +164,8 @@ class SignalRecord(RecordTemp):
pprint(pred.head(5))
if isinstance(self.dataset, DatasetH):
# NOTE:
# Python doesn't provide the downcasting mechanism.
# We use the trick here to downcast the class
orig_cls = self.dataset.__class__
self.dataset.__class__ = DatasetH
params = dict(segments="test", col_set="label", data_key=DataHandlerLP.DK_R)
try:
# Assume the backend handler is DataHandlerLP
raw_label = self.dataset.prepare(**params)
except TypeError:
# The argument number is not right
del params["data_key"]
# The backend handler should be DataHandler
raw_label = self.dataset.prepare(**params)
except AttributeError:
# The data handler is initialize with `drop_raw=True`...
# So raw_label is not available
raw_label = None
raw_label = self.generate_label(self.dataset)
self.recorder.save_objects(**{"label.pkl": raw_label})
self.dataset.__class__ = orig_cls
@staticmethod
def list():

View File

@@ -0,0 +1,117 @@
import copy
import unittest
import fire
import pandas as pd
import qlib
from qlib.config import REG_CN
from qlib.data import D
from qlib.model.trainer import task_train
from qlib.tests import TestAutoData
from qlib.tests.config import CSI300_GBDT_TASK
from qlib.workflow.online.utils import OnlineToolR
from qlib.workflow.online.update import LabelUpdater
class TestRolling(TestAutoData):
_setup_kwargs = dict(expression_cache=None, dataset_cache=None)
def test_update_pred(self):
"""
This test is for testing if it will raise error if the `to_date` is out of the boundary.
"""
task = copy.deepcopy(CSI300_GBDT_TASK)
task["record"] = {
"class": "SignalRecord",
"module_path": "qlib.workflow.record_temp",
}
exp_name = "online_srv_test"
cal = D.calendar()
latest_date = cal[-1]
train_start = latest_date - pd.Timedelta(days=61)
train_end = latest_date - pd.Timedelta(days=41)
task["dataset"]["kwargs"]["segments"] = {
"train": (train_start, train_end),
"valid": (latest_date - pd.Timedelta(days=40), latest_date - pd.Timedelta(days=21)),
"test": (latest_date - pd.Timedelta(days=20), latest_date),
}
task["dataset"]["kwargs"]["handler"]["kwargs"] = {
"start_time": train_start,
"end_time": latest_date,
"fit_start_time": train_start,
"fit_end_time": train_end,
"instruments": "csi300",
}
rec = task_train(task, exp_name)
pred = rec.load_object("pred.pkl")
online_tool = OnlineToolR(exp_name)
online_tool.reset_online_tag(rec) # set to online model
online_tool.update_online_pred(to_date=latest_date + pd.Timedelta(days=10))
def test_update_label(self):
task = copy.deepcopy(CSI300_GBDT_TASK)
task["record"] = {
"class": "SignalRecord",
"module_path": "qlib.workflow.record_temp",
}
exp_name = "online_srv_test"
cal = D.calendar()
shift = 10
latest_date = cal[-1 - shift]
train_start = latest_date - pd.Timedelta(days=61)
train_end = latest_date - pd.Timedelta(days=41)
task["dataset"]["kwargs"]["segments"] = {
"train": (train_start, train_end),
"valid": (latest_date - pd.Timedelta(days=40), latest_date - pd.Timedelta(days=21)),
"test": (latest_date - pd.Timedelta(days=20), latest_date),
}
task["dataset"]["kwargs"]["handler"]["kwargs"] = {
"start_time": train_start,
"end_time": latest_date,
"fit_start_time": train_start,
"fit_end_time": train_end,
"instruments": "csi300",
}
rec = task_train(task, exp_name)
pred = rec.load_object("pred.pkl")
online_tool = OnlineToolR(exp_name)
online_tool.reset_online_tag(rec) # set to online model
online_tool.update_online_pred()
new_pred = rec.load_object("pred.pkl")
label = rec.load_object("label.pkl")
label_date = label.dropna().index.get_level_values("datetime").max()
pred_date = new_pred.dropna().index.get_level_values("datetime").max()
# The prediction is updated, but the label is not updated.
self.assertTrue(label_date < pred_date)
# Update label now
lu = LabelUpdater(rec)
lu.update()
new_label = rec.load_object("label.pkl")
new_label_date = new_label.index.get_level_values("datetime").max()
self.assertTrue(new_label_date == pred_date) # make sure the label is updated now
if __name__ == "__main__":
unittest.main()

View File

@@ -149,15 +149,15 @@ class TestStorage(TestAutoData):
"""
feature = FeatureStorage(instrument="SH600004", field="close", freq="day", provider_uri=self.provider_uri)
feature = FeatureStorage(instrument="SZ300677", field="close", freq="day", provider_uri=self.provider_uri)
with self.assertRaises(IndexError):
print(feature[0])
assert isinstance(
feature[815][1], (float, np.float32)
feature[3049][1], (float, np.float32)
), f"{feature.__class__.__name__}.__getitem__(i: int) error"
assert len(feature[815:818]) == 3, f"{feature.__class__.__name__}.__getitem__(s: slice) error"
print(f"feature[815: 818]: \n{feature[815: 818]}")
assert len(feature[3049:3052]) == 3, f"{feature.__class__.__name__}.__getitem__(s: slice) error"
print(f"feature[3049: 3052]: \n{feature[3049: 3052]}")
print(f"feature[:].tail(): \n{feature[:].tail()}")