1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-06-29 00:51:19 +08:00

Compare commits

..

12 Commits

Author SHA1 Message Date
Young
4c057f645e Successfully run training 2024-07-10 06:25:30 +00:00
Young
a9fc3435ab Almost success to run GRU 2024-07-10 05:59:49 +00:00
Young
e2879d9b1e We choose another mode as the initial version 2024-07-10 05:42:27 +00:00
Young
a67a6134b4 We must align with previous results 2024-07-10 05:34:09 +00:00
Young
f4674ef98c Add model template; 2024-07-10 05:31:14 +00:00
Young
0f9312593d Remove some deprecated code 2024-07-09 09:11:06 +00:00
Young
4405cb784f Init model for both dataset 2024-07-08 06:11:51 +00:00
you-n-g
a7d5a9b500 Nested data loader (#1822)
* nested data loader

* Amend

* add data loder test

* fix pylint error

* fix pytest error

* fix pytest error

* delete comments

* Update qlib/contrib/data/handler.py

---------

Co-authored-by: Linlang <Lv.Linlang@hotmail.com>
2024-07-05 15:44:16 +08:00
you-n-g
5190332c7e Add some misc features. (#1816)
* Normal mod

* Black linting

* Linting
2024-06-26 18:34:00 +08:00
cyncyw
cde80206e4 Update index_data.py for datatype conversion and alignment (#1813)
* Update index_data.py for data convertion and alignment

* Update qlib/utils/index_data.py

* Update qlib/utils/index_data.py

* fix linting

---------

Co-authored-by: taozhiwang <taozhiwa@gmail.com>
Co-authored-by: you-n-g <you-n-g@users.noreply.github.com>
2024-06-24 15:34:48 +08:00
cyncyw
a339fc11d1 add a note for code standard (#1814)
* add a note for code standard

* handle both cases

---------

Co-authored-by: taozhiwang <taozhiwa@gmail.com>
2024-06-24 15:33:45 +08:00
Linlang
33482047dc change weight data download url (#1812) 2024-06-21 13:05:53 +08:00
27 changed files with 1697 additions and 357 deletions

View File

@@ -60,4 +60,4 @@ The `[dev]` option will help you to install some related packages when developin
.. code-block:: bash
pip install -e .[dev]
pip install -e ".[dev]"

View File

@@ -0,0 +1,15 @@
# Introduction
What is GeneralPtNN
- Fix previous design that fail to support both Time-series and tabular data
- Now you can just replace the Pytorch model structure to run a NN model.
We provide an example to demonstrate the effectiveness of the current design.
- `workflow_config_gru.yaml` align with previous results [GRU(Kyunghyun Cho, et al.)](../README.md#Alpha158 dataset)
- `workflow_config_mlp.yaml` align with previous results [MLP](../README.md#Alpha158 dataset)
# TODO
We will align existing models to current design.

View File

@@ -0,0 +1,97 @@
qlib_init:
provider_uri: "~/.qlib/qlib_data/cn_data"
region: cn
market: &market csi300
benchmark: &benchmark SH000300
data_handler_config: &data_handler_config
start_time: 2008-01-01
end_time: 2020-08-01
fit_start_time: 2008-01-01
fit_end_time: 2014-12-31
instruments: *market
infer_processors:
- class: FilterCol
kwargs:
fields_group: feature
col_list: ["RESI5", "WVMA5", "RSQR5", "KLEN", "RSQR10", "CORR5", "CORD5", "CORR10",
"ROC60", "RESI10", "VSTD5", "RSQR60", "CORR60", "WVMA60", "STD5",
"RSQR20", "CORD60", "CORD10", "CORR20", "KLOW"
]
- class: RobustZScoreNorm
kwargs:
fields_group: feature
clip_outlier: true
- class: Fillna
kwargs:
fields_group: feature
learn_processors:
- class: DropnaLabel
- class: CSRankNorm
kwargs:
fields_group: label
label: ["Ref($close, -2) / Ref($close, -1) - 1"]
port_analysis_config: &port_analysis_config
strategy:
class: TopkDropoutStrategy
module_path: qlib.contrib.strategy
kwargs:
signal: <PRED>
topk: 50
n_drop: 5
backtest:
start_time: 2017-01-01
end_time: 2020-08-01
account: 100000000
benchmark: *benchmark
exchange_kwargs:
limit_threshold: 0.095
deal_price: close
open_cost: 0.0005
close_cost: 0.0015
min_cost: 5
task:
model:
class: GeneralPTNN
module_path: qlib.contrib.model.pytorch_general_nn
kwargs:
d_feat: 20
hidden_size: 64
num_layers: 2
dropout: 0.0
n_epochs: 200
lr: 2e-4
early_stop: 10
batch_size: 800
metric: loss
loss: mse
n_jobs: 20
GPU: 0
dataset:
class: TSDatasetH
module_path: qlib.data.dataset
kwargs:
handler:
class: Alpha158
module_path: qlib.contrib.data.handler
kwargs: *data_handler_config
segments:
train: [2008-01-01, 2014-12-31]
valid: [2015-01-01, 2016-12-31]
test: [2017-01-01, 2020-08-01]
step_len: 20
record:
- class: SignalRecord
module_path: qlib.workflow.record_temp
kwargs:
model: <MODEL>
dataset: <DATASET>
- class: SigAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
ana_long_short: False
ann_scaler: 252
- class: PortAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
config: *port_analysis_config

View File

@@ -0,0 +1,98 @@
qlib_init:
provider_uri: "~/.qlib/qlib_data/cn_data"
region: cn
market: &market csi300
benchmark: &benchmark SH000300
data_handler_config: &data_handler_config
start_time: 2008-01-01
end_time: 2020-08-01
fit_start_time: 2008-01-01
fit_end_time: 2014-12-31
instruments: *market
infer_processors: [
{
"class" : "DropCol",
"kwargs":{"col_list": ["VWAP0"]}
},
{
"class" : "CSZFillna",
"kwargs":{"fields_group": "feature"}
}
]
learn_processors: [
{
"class" : "DropCol",
"kwargs":{"col_list": ["VWAP0"]}
},
{
"class" : "DropnaProcessor",
"kwargs":{"fields_group": "feature"}
},
"DropnaLabel",
{
"class": "CSZScoreNorm",
"kwargs": {"fields_group": "label"}
}
]
process_type: "independent"
port_analysis_config: &port_analysis_config
strategy:
class: TopkDropoutStrategy
module_path: qlib.contrib.strategy
kwargs:
signal: <PRED>
topk: 50
n_drop: 5
backtest:
start_time: 2017-01-01
end_time: 2020-08-01
account: 100000000
benchmark: *benchmark
exchange_kwargs:
limit_threshold: 0.095
deal_price: close
open_cost: 0.0005
close_cost: 0.0015
min_cost: 5
task:
model:
class: GeneralPTNN
module_path: qlib.contrib.model.pytorch_general_nn
kwargs:
loss: mse
lr: 0.002
optimizer: adam
max_steps: 8000
batch_size: 8192
GPU: 0
weight_decay: 0.0002
pt_model_kwargs:
input_dim: 157
dataset:
class: DatasetH
module_path: qlib.data.dataset
kwargs:
handler:
class: Alpha158
module_path: qlib.contrib.data.handler
kwargs: *data_handler_config
segments:
train: [2008-01-01, 2014-12-31]
valid: [2015-01-01, 2016-12-31]
test: [2017-01-01, 2020-08-01]
record:
- class: SignalRecord
module_path: qlib.workflow.record_temp
kwargs:
model: <MODEL>
dataset: <DATASET>
- class: SigAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
ana_long_short: False
ann_scaler: 252
- class: PortAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
config: *port_analysis_config

View File

@@ -1,5 +1,6 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import os
from pathlib import Path
from typing import Union
@@ -35,6 +36,10 @@ class DDGDABench(DDGDA):
if __name__ == "__main__":
GetData().qlib_data(exists_skip=True)
auto_init()
kwargs = {}
if os.environ.get("PROVIDER_URI", "") == "":
GetData().qlib_data(exists_skip=True)
else:
kwargs["provider_uri"] = os.environ["PROVIDER_URI"]
auto_init(**kwargs)
fire.Fire(DDGDABench)

View File

@@ -1,5 +1,6 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import os
from pathlib import Path
from typing import Union
@@ -31,6 +32,10 @@ class RollingBenchmark(Rolling):
if __name__ == "__main__":
GetData().qlib_data(exists_skip=True)
auto_init()
kwargs = {}
if os.environ.get("PROVIDER_URI", "") == "":
GetData().qlib_data(exists_skip=True)
else:
kwargs["provider_uri"] = os.environ["PROVIDER_URI"]
auto_init(**kwargs)
fire.Fire(RollingBenchmark)

View File

@@ -1,6 +1,7 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from qlib.contrib.data.loader import Alpha158DL, Alpha360DL
from ...data.dataset.handler import DataHandlerLP
from ...data.dataset.processor import Processor
from ...utils import get_callable_kwargs
@@ -66,7 +67,7 @@ class Alpha360(DataHandlerLP):
"class": "QlibDataLoader",
"kwargs": {
"config": {
"feature": self.get_feature_config(),
"feature": Alpha360DL.get_feature_config(),
"label": kwargs.pop("label", self.get_label_config()),
},
"filter_pipe": filter_pipe,
@@ -88,51 +89,6 @@ class Alpha360(DataHandlerLP):
def get_label_config(self):
return ["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"]
@staticmethod
def get_feature_config():
# NOTE:
# Alpha360 tries to provide a dataset with original price data
# the original price data includes the prices and volume in the last 60 days.
# To make it easier to learn models from this dataset, all the prices and volume
# are normalized by the latest price and volume data ( dividing by $close, $volume)
# So the latest normalized $close will be 1 (with name CLOSE0), the latest normalized $volume will be 1 (with name VOLUME0)
# If further normalization are executed (e.g. centralization), CLOSE0 and VOLUME0 will be 0.
fields = []
names = []
for i in range(59, 0, -1):
fields += ["Ref($close, %d)/$close" % i]
names += ["CLOSE%d" % i]
fields += ["$close/$close"]
names += ["CLOSE0"]
for i in range(59, 0, -1):
fields += ["Ref($open, %d)/$close" % i]
names += ["OPEN%d" % i]
fields += ["$open/$close"]
names += ["OPEN0"]
for i in range(59, 0, -1):
fields += ["Ref($high, %d)/$close" % i]
names += ["HIGH%d" % i]
fields += ["$high/$close"]
names += ["HIGH0"]
for i in range(59, 0, -1):
fields += ["Ref($low, %d)/$close" % i]
names += ["LOW%d" % i]
fields += ["$low/$close"]
names += ["LOW0"]
for i in range(59, 0, -1):
fields += ["Ref($vwap, %d)/$close" % i]
names += ["VWAP%d" % i]
fields += ["$vwap/$close"]
names += ["VWAP0"]
for i in range(59, 0, -1):
fields += ["Ref($volume, %d)/($volume+1e-12)" % i]
names += ["VOLUME%d" % i]
fields += ["$volume/($volume+1e-12)"]
names += ["VOLUME0"]
return fields, names
class Alpha360vwap(Alpha360):
def get_label_config(self):
@@ -190,242 +146,11 @@ class Alpha158(DataHandlerLP):
},
"rolling": {},
}
return self.parse_config_to_fields(conf)
return Alpha158DL.get_feature_config(conf)
def get_label_config(self):
return ["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"]
@staticmethod
def parse_config_to_fields(config):
"""create factors from config
config = {
'kbar': {}, # whether to use some hard-code kbar features
'price': { # whether to use raw price features
'windows': [0, 1, 2, 3, 4], # use price at n days ago
'feature': ['OPEN', 'HIGH', 'LOW'] # which price field to use
},
'volume': { # whether to use raw volume features
'windows': [0, 1, 2, 3, 4], # use volume at n days ago
},
'rolling': { # whether to use rolling operator based features
'windows': [5, 10, 20, 30, 60], # rolling windows size
'include': ['ROC', 'MA', 'STD'], # rolling operator to use
#if include is None we will use default operators
'exclude': ['RANK'], # rolling operator not to use
}
}
"""
fields = []
names = []
if "kbar" in config:
fields += [
"($close-$open)/$open",
"($high-$low)/$open",
"($close-$open)/($high-$low+1e-12)",
"($high-Greater($open, $close))/$open",
"($high-Greater($open, $close))/($high-$low+1e-12)",
"(Less($open, $close)-$low)/$open",
"(Less($open, $close)-$low)/($high-$low+1e-12)",
"(2*$close-$high-$low)/$open",
"(2*$close-$high-$low)/($high-$low+1e-12)",
]
names += [
"KMID",
"KLEN",
"KMID2",
"KUP",
"KUP2",
"KLOW",
"KLOW2",
"KSFT",
"KSFT2",
]
if "price" in config:
windows = config["price"].get("windows", range(5))
feature = config["price"].get("feature", ["OPEN", "HIGH", "LOW", "CLOSE", "VWAP"])
for field in feature:
field = field.lower()
fields += ["Ref($%s, %d)/$close" % (field, d) if d != 0 else "$%s/$close" % field for d in windows]
names += [field.upper() + str(d) for d in windows]
if "volume" in config:
windows = config["volume"].get("windows", range(5))
fields += ["Ref($volume, %d)/($volume+1e-12)" % d if d != 0 else "$volume/($volume+1e-12)" for d in windows]
names += ["VOLUME" + str(d) for d in windows]
if "rolling" in config:
windows = config["rolling"].get("windows", [5, 10, 20, 30, 60])
include = config["rolling"].get("include", None)
exclude = config["rolling"].get("exclude", [])
# `exclude` in dataset config unnecessary filed
# `include` in dataset config necessary field
def use(x):
return x not in exclude and (include is None or x in include)
# Some factor ref: https://guorn.com/static/upload/file/3/134065454575605.pdf
if use("ROC"):
# https://www.investopedia.com/terms/r/rateofchange.asp
# Rate of change, the price change in the past d days, divided by latest close price to remove unit
fields += ["Ref($close, %d)/$close" % d for d in windows]
names += ["ROC%d" % d for d in windows]
if use("MA"):
# https://www.investopedia.com/ask/answers/071414/whats-difference-between-moving-average-and-weighted-moving-average.asp
# Simple Moving Average, the simple moving average in the past d days, divided by latest close price to remove unit
fields += ["Mean($close, %d)/$close" % d for d in windows]
names += ["MA%d" % d for d in windows]
if use("STD"):
# The standard diviation of close price for the past d days, divided by latest close price to remove unit
fields += ["Std($close, %d)/$close" % d for d in windows]
names += ["STD%d" % d for d in windows]
if use("BETA"):
# The rate of close price change in the past d days, divided by latest close price to remove unit
# For example, price increase 10 dollar per day in the past d days, then Slope will be 10.
fields += ["Slope($close, %d)/$close" % d for d in windows]
names += ["BETA%d" % d for d in windows]
if use("RSQR"):
# The R-sqaure value of linear regression for the past d days, represent the trend linear
fields += ["Rsquare($close, %d)" % d for d in windows]
names += ["RSQR%d" % d for d in windows]
if use("RESI"):
# The redisdual for linear regression for the past d days, represent the trend linearity for past d days.
fields += ["Resi($close, %d)/$close" % d for d in windows]
names += ["RESI%d" % d for d in windows]
if use("MAX"):
# The max price for past d days, divided by latest close price to remove unit
fields += ["Max($high, %d)/$close" % d for d in windows]
names += ["MAX%d" % d for d in windows]
if use("LOW"):
# The low price for past d days, divided by latest close price to remove unit
fields += ["Min($low, %d)/$close" % d for d in windows]
names += ["MIN%d" % d for d in windows]
if use("QTLU"):
# The 80% quantile of past d day's close price, divided by latest close price to remove unit
# Used with MIN and MAX
fields += ["Quantile($close, %d, 0.8)/$close" % d for d in windows]
names += ["QTLU%d" % d for d in windows]
if use("QTLD"):
# The 20% quantile of past d day's close price, divided by latest close price to remove unit
fields += ["Quantile($close, %d, 0.2)/$close" % d for d in windows]
names += ["QTLD%d" % d for d in windows]
if use("RANK"):
# Get the percentile of current close price in past d day's close price.
# Represent the current price level comparing to past N days, add additional information to moving average.
fields += ["Rank($close, %d)" % d for d in windows]
names += ["RANK%d" % d for d in windows]
if use("RSV"):
# Represent the price position between upper and lower resistent price for past d days.
fields += ["($close-Min($low, %d))/(Max($high, %d)-Min($low, %d)+1e-12)" % (d, d, d) for d in windows]
names += ["RSV%d" % d for d in windows]
if use("IMAX"):
# The number of days between current date and previous highest price date.
# Part of Aroon Indicator https://www.investopedia.com/terms/a/aroon.asp
# The indicator measures the time between highs and the time between lows over a time period.
# The idea is that strong uptrends will regularly see new highs, and strong downtrends will regularly see new lows.
fields += ["IdxMax($high, %d)/%d" % (d, d) for d in windows]
names += ["IMAX%d" % d for d in windows]
if use("IMIN"):
# The number of days between current date and previous lowest price date.
# Part of Aroon Indicator https://www.investopedia.com/terms/a/aroon.asp
# The indicator measures the time between highs and the time between lows over a time period.
# The idea is that strong uptrends will regularly see new highs, and strong downtrends will regularly see new lows.
fields += ["IdxMin($low, %d)/%d" % (d, d) for d in windows]
names += ["IMIN%d" % d for d in windows]
if use("IMXD"):
# The time period between previous lowest-price date occur after highest price date.
# Large value suggest downward momemtum.
fields += ["(IdxMax($high, %d)-IdxMin($low, %d))/%d" % (d, d, d) for d in windows]
names += ["IMXD%d" % d for d in windows]
if use("CORR"):
# The correlation between absolute close price and log scaled trading volume
fields += ["Corr($close, Log($volume+1), %d)" % d for d in windows]
names += ["CORR%d" % d for d in windows]
if use("CORD"):
# The correlation between price change ratio and volume change ratio
fields += ["Corr($close/Ref($close,1), Log($volume/Ref($volume, 1)+1), %d)" % d for d in windows]
names += ["CORD%d" % d for d in windows]
if use("CNTP"):
# The percentage of days in past d days that price go up.
fields += ["Mean($close>Ref($close, 1), %d)" % d for d in windows]
names += ["CNTP%d" % d for d in windows]
if use("CNTN"):
# The percentage of days in past d days that price go down.
fields += ["Mean($close<Ref($close, 1), %d)" % d for d in windows]
names += ["CNTN%d" % d for d in windows]
if use("CNTD"):
# The diff between past up day and past down day
fields += ["Mean($close>Ref($close, 1), %d)-Mean($close<Ref($close, 1), %d)" % (d, d) for d in windows]
names += ["CNTD%d" % d for d in windows]
if use("SUMP"):
# The total gain / the absolute total price changed
# Similar to RSI indicator. https://www.investopedia.com/terms/r/rsi.asp
fields += [
"Sum(Greater($close-Ref($close, 1), 0), %d)/(Sum(Abs($close-Ref($close, 1)), %d)+1e-12)" % (d, d)
for d in windows
]
names += ["SUMP%d" % d for d in windows]
if use("SUMN"):
# The total lose / the absolute total price changed
# Can be derived from SUMP by SUMN = 1 - SUMP
# Similar to RSI indicator. https://www.investopedia.com/terms/r/rsi.asp
fields += [
"Sum(Greater(Ref($close, 1)-$close, 0), %d)/(Sum(Abs($close-Ref($close, 1)), %d)+1e-12)" % (d, d)
for d in windows
]
names += ["SUMN%d" % d for d in windows]
if use("SUMD"):
# The diff ratio between total gain and total lose
# Similar to RSI indicator. https://www.investopedia.com/terms/r/rsi.asp
fields += [
"(Sum(Greater($close-Ref($close, 1), 0), %d)-Sum(Greater(Ref($close, 1)-$close, 0), %d))"
"/(Sum(Abs($close-Ref($close, 1)), %d)+1e-12)" % (d, d, d)
for d in windows
]
names += ["SUMD%d" % d for d in windows]
if use("VMA"):
# Simple Volume Moving average: https://www.barchart.com/education/technical-indicators/volume_moving_average
fields += ["Mean($volume, %d)/($volume+1e-12)" % d for d in windows]
names += ["VMA%d" % d for d in windows]
if use("VSTD"):
# The standard deviation for volume in past d days.
fields += ["Std($volume, %d)/($volume+1e-12)" % d for d in windows]
names += ["VSTD%d" % d for d in windows]
if use("WVMA"):
# The volume weighted price change volatility
fields += [
"Std(Abs($close/Ref($close, 1)-1)*$volume, %d)/(Mean(Abs($close/Ref($close, 1)-1)*$volume, %d)+1e-12)"
% (d, d)
for d in windows
]
names += ["WVMA%d" % d for d in windows]
if use("VSUMP"):
# The total volume increase / the absolute total volume changed
fields += [
"Sum(Greater($volume-Ref($volume, 1), 0), %d)/(Sum(Abs($volume-Ref($volume, 1)), %d)+1e-12)"
% (d, d)
for d in windows
]
names += ["VSUMP%d" % d for d in windows]
if use("VSUMN"):
# The total volume increase / the absolute total volume changed
# Can be derived from VSUMP by VSUMN = 1 - VSUMP
fields += [
"Sum(Greater(Ref($volume, 1)-$volume, 0), %d)/(Sum(Abs($volume-Ref($volume, 1)), %d)+1e-12)"
% (d, d)
for d in windows
]
names += ["VSUMN%d" % d for d in windows]
if use("VSUMD"):
# The diff ratio between total volume increase and total volume decrease
# RSI indicator for volume
fields += [
"(Sum(Greater($volume-Ref($volume, 1), 0), %d)-Sum(Greater(Ref($volume, 1)-$volume, 0), %d))"
"/(Sum(Abs($volume-Ref($volume, 1)), %d)+1e-12)" % (d, d, d)
for d in windows
]
names += ["VSUMD%d" % d for d in windows]
return fields, names
class Alpha158vwap(Alpha158):
def get_label_config(self):

310
qlib/contrib/data/loader.py Normal file
View File

@@ -0,0 +1,310 @@
from qlib.data.dataset.loader import QlibDataLoader
class Alpha360DL(QlibDataLoader):
"""Dataloader to get Alpha360"""
def __init__(self, config=None, **kwargs):
_config = {
"feature": self.get_feature_config(),
}
if config is not None:
_config.update(config)
super().__init__(config=_config, **kwargs)
@staticmethod
def get_feature_config():
# NOTE:
# Alpha360 tries to provide a dataset with original price data
# the original price data includes the prices and volume in the last 60 days.
# To make it easier to learn models from this dataset, all the prices and volume
# are normalized by the latest price and volume data ( dividing by $close, $volume)
# So the latest normalized $close will be 1 (with name CLOSE0), the latest normalized $volume will be 1 (with name VOLUME0)
# If further normalization are executed (e.g. centralization), CLOSE0 and VOLUME0 will be 0.
fields = []
names = []
for i in range(59, 0, -1):
fields += ["Ref($close, %d)/$close" % i]
names += ["CLOSE%d" % i]
fields += ["$close/$close"]
names += ["CLOSE0"]
for i in range(59, 0, -1):
fields += ["Ref($open, %d)/$close" % i]
names += ["OPEN%d" % i]
fields += ["$open/$close"]
names += ["OPEN0"]
for i in range(59, 0, -1):
fields += ["Ref($high, %d)/$close" % i]
names += ["HIGH%d" % i]
fields += ["$high/$close"]
names += ["HIGH0"]
for i in range(59, 0, -1):
fields += ["Ref($low, %d)/$close" % i]
names += ["LOW%d" % i]
fields += ["$low/$close"]
names += ["LOW0"]
for i in range(59, 0, -1):
fields += ["Ref($vwap, %d)/$close" % i]
names += ["VWAP%d" % i]
fields += ["$vwap/$close"]
names += ["VWAP0"]
for i in range(59, 0, -1):
fields += ["Ref($volume, %d)/($volume+1e-12)" % i]
names += ["VOLUME%d" % i]
fields += ["$volume/($volume+1e-12)"]
names += ["VOLUME0"]
return fields, names
class Alpha158DL(QlibDataLoader):
"""Dataloader to get Alpha158"""
def __init__(self, config=None, **kwargs):
_config = {
"feature": self.get_feature_config(),
}
if config is not None:
_config.update(config)
super().__init__(config=_config, **kwargs)
@staticmethod
def get_feature_config(
config={
"kbar": {},
"price": {
"windows": [0],
"feature": ["OPEN", "HIGH", "LOW", "VWAP"],
},
"rolling": {},
}
):
"""create factors from config
config = {
'kbar': {}, # whether to use some hard-code kbar features
'price': { # whether to use raw price features
'windows': [0, 1, 2, 3, 4], # use price at n days ago
'feature': ['OPEN', 'HIGH', 'LOW'] # which price field to use
},
'volume': { # whether to use raw volume features
'windows': [0, 1, 2, 3, 4], # use volume at n days ago
},
'rolling': { # whether to use rolling operator based features
'windows': [5, 10, 20, 30, 60], # rolling windows size
'include': ['ROC', 'MA', 'STD'], # rolling operator to use
#if include is None we will use default operators
'exclude': ['RANK'], # rolling operator not to use
}
}
"""
fields = []
names = []
if "kbar" in config:
fields += [
"($close-$open)/$open",
"($high-$low)/$open",
"($close-$open)/($high-$low+1e-12)",
"($high-Greater($open, $close))/$open",
"($high-Greater($open, $close))/($high-$low+1e-12)",
"(Less($open, $close)-$low)/$open",
"(Less($open, $close)-$low)/($high-$low+1e-12)",
"(2*$close-$high-$low)/$open",
"(2*$close-$high-$low)/($high-$low+1e-12)",
]
names += [
"KMID",
"KLEN",
"KMID2",
"KUP",
"KUP2",
"KLOW",
"KLOW2",
"KSFT",
"KSFT2",
]
if "price" in config:
windows = config["price"].get("windows", range(5))
feature = config["price"].get("feature", ["OPEN", "HIGH", "LOW", "CLOSE", "VWAP"])
for field in feature:
field = field.lower()
fields += ["Ref($%s, %d)/$close" % (field, d) if d != 0 else "$%s/$close" % field for d in windows]
names += [field.upper() + str(d) for d in windows]
if "volume" in config:
windows = config["volume"].get("windows", range(5))
fields += ["Ref($volume, %d)/($volume+1e-12)" % d if d != 0 else "$volume/($volume+1e-12)" for d in windows]
names += ["VOLUME" + str(d) for d in windows]
if "rolling" in config:
windows = config["rolling"].get("windows", [5, 10, 20, 30, 60])
include = config["rolling"].get("include", None)
exclude = config["rolling"].get("exclude", [])
# `exclude` in dataset config unnecessary filed
# `include` in dataset config necessary field
def use(x):
return x not in exclude and (include is None or x in include)
# Some factor ref: https://guorn.com/static/upload/file/3/134065454575605.pdf
if use("ROC"):
# https://www.investopedia.com/terms/r/rateofchange.asp
# Rate of change, the price change in the past d days, divided by latest close price to remove unit
fields += ["Ref($close, %d)/$close" % d for d in windows]
names += ["ROC%d" % d for d in windows]
if use("MA"):
# https://www.investopedia.com/ask/answers/071414/whats-difference-between-moving-average-and-weighted-moving-average.asp
# Simple Moving Average, the simple moving average in the past d days, divided by latest close price to remove unit
fields += ["Mean($close, %d)/$close" % d for d in windows]
names += ["MA%d" % d for d in windows]
if use("STD"):
# The standard diviation of close price for the past d days, divided by latest close price to remove unit
fields += ["Std($close, %d)/$close" % d for d in windows]
names += ["STD%d" % d for d in windows]
if use("BETA"):
# The rate of close price change in the past d days, divided by latest close price to remove unit
# For example, price increase 10 dollar per day in the past d days, then Slope will be 10.
fields += ["Slope($close, %d)/$close" % d for d in windows]
names += ["BETA%d" % d for d in windows]
if use("RSQR"):
# The R-sqaure value of linear regression for the past d days, represent the trend linear
fields += ["Rsquare($close, %d)" % d for d in windows]
names += ["RSQR%d" % d for d in windows]
if use("RESI"):
# The redisdual for linear regression for the past d days, represent the trend linearity for past d days.
fields += ["Resi($close, %d)/$close" % d for d in windows]
names += ["RESI%d" % d for d in windows]
if use("MAX"):
# The max price for past d days, divided by latest close price to remove unit
fields += ["Max($high, %d)/$close" % d for d in windows]
names += ["MAX%d" % d for d in windows]
if use("LOW"):
# The low price for past d days, divided by latest close price to remove unit
fields += ["Min($low, %d)/$close" % d for d in windows]
names += ["MIN%d" % d for d in windows]
if use("QTLU"):
# The 80% quantile of past d day's close price, divided by latest close price to remove unit
# Used with MIN and MAX
fields += ["Quantile($close, %d, 0.8)/$close" % d for d in windows]
names += ["QTLU%d" % d for d in windows]
if use("QTLD"):
# The 20% quantile of past d day's close price, divided by latest close price to remove unit
fields += ["Quantile($close, %d, 0.2)/$close" % d for d in windows]
names += ["QTLD%d" % d for d in windows]
if use("RANK"):
# Get the percentile of current close price in past d day's close price.
# Represent the current price level comparing to past N days, add additional information to moving average.
fields += ["Rank($close, %d)" % d for d in windows]
names += ["RANK%d" % d for d in windows]
if use("RSV"):
# Represent the price position between upper and lower resistent price for past d days.
fields += ["($close-Min($low, %d))/(Max($high, %d)-Min($low, %d)+1e-12)" % (d, d, d) for d in windows]
names += ["RSV%d" % d for d in windows]
if use("IMAX"):
# The number of days between current date and previous highest price date.
# Part of Aroon Indicator https://www.investopedia.com/terms/a/aroon.asp
# The indicator measures the time between highs and the time between lows over a time period.
# The idea is that strong uptrends will regularly see new highs, and strong downtrends will regularly see new lows.
fields += ["IdxMax($high, %d)/%d" % (d, d) for d in windows]
names += ["IMAX%d" % d for d in windows]
if use("IMIN"):
# The number of days between current date and previous lowest price date.
# Part of Aroon Indicator https://www.investopedia.com/terms/a/aroon.asp
# The indicator measures the time between highs and the time between lows over a time period.
# The idea is that strong uptrends will regularly see new highs, and strong downtrends will regularly see new lows.
fields += ["IdxMin($low, %d)/%d" % (d, d) for d in windows]
names += ["IMIN%d" % d for d in windows]
if use("IMXD"):
# The time period between previous lowest-price date occur after highest price date.
# Large value suggest downward momemtum.
fields += ["(IdxMax($high, %d)-IdxMin($low, %d))/%d" % (d, d, d) for d in windows]
names += ["IMXD%d" % d for d in windows]
if use("CORR"):
# The correlation between absolute close price and log scaled trading volume
fields += ["Corr($close, Log($volume+1), %d)" % d for d in windows]
names += ["CORR%d" % d for d in windows]
if use("CORD"):
# The correlation between price change ratio and volume change ratio
fields += ["Corr($close/Ref($close,1), Log($volume/Ref($volume, 1)+1), %d)" % d for d in windows]
names += ["CORD%d" % d for d in windows]
if use("CNTP"):
# The percentage of days in past d days that price go up.
fields += ["Mean($close>Ref($close, 1), %d)" % d for d in windows]
names += ["CNTP%d" % d for d in windows]
if use("CNTN"):
# The percentage of days in past d days that price go down.
fields += ["Mean($close<Ref($close, 1), %d)" % d for d in windows]
names += ["CNTN%d" % d for d in windows]
if use("CNTD"):
# The diff between past up day and past down day
fields += ["Mean($close>Ref($close, 1), %d)-Mean($close<Ref($close, 1), %d)" % (d, d) for d in windows]
names += ["CNTD%d" % d for d in windows]
if use("SUMP"):
# The total gain / the absolute total price changed
# Similar to RSI indicator. https://www.investopedia.com/terms/r/rsi.asp
fields += [
"Sum(Greater($close-Ref($close, 1), 0), %d)/(Sum(Abs($close-Ref($close, 1)), %d)+1e-12)" % (d, d)
for d in windows
]
names += ["SUMP%d" % d for d in windows]
if use("SUMN"):
# The total lose / the absolute total price changed
# Can be derived from SUMP by SUMN = 1 - SUMP
# Similar to RSI indicator. https://www.investopedia.com/terms/r/rsi.asp
fields += [
"Sum(Greater(Ref($close, 1)-$close, 0), %d)/(Sum(Abs($close-Ref($close, 1)), %d)+1e-12)" % (d, d)
for d in windows
]
names += ["SUMN%d" % d for d in windows]
if use("SUMD"):
# The diff ratio between total gain and total lose
# Similar to RSI indicator. https://www.investopedia.com/terms/r/rsi.asp
fields += [
"(Sum(Greater($close-Ref($close, 1), 0), %d)-Sum(Greater(Ref($close, 1)-$close, 0), %d))"
"/(Sum(Abs($close-Ref($close, 1)), %d)+1e-12)" % (d, d, d)
for d in windows
]
names += ["SUMD%d" % d for d in windows]
if use("VMA"):
# Simple Volume Moving average: https://www.barchart.com/education/technical-indicators/volume_moving_average
fields += ["Mean($volume, %d)/($volume+1e-12)" % d for d in windows]
names += ["VMA%d" % d for d in windows]
if use("VSTD"):
# The standard deviation for volume in past d days.
fields += ["Std($volume, %d)/($volume+1e-12)" % d for d in windows]
names += ["VSTD%d" % d for d in windows]
if use("WVMA"):
# The volume weighted price change volatility
fields += [
"Std(Abs($close/Ref($close, 1)-1)*$volume, %d)/(Mean(Abs($close/Ref($close, 1)-1)*$volume, %d)+1e-12)"
% (d, d)
for d in windows
]
names += ["WVMA%d" % d for d in windows]
if use("VSUMP"):
# The total volume increase / the absolute total volume changed
fields += [
"Sum(Greater($volume-Ref($volume, 1), 0), %d)/(Sum(Abs($volume-Ref($volume, 1)), %d)+1e-12)"
% (d, d)
for d in windows
]
names += ["VSUMP%d" % d for d in windows]
if use("VSUMN"):
# The total volume increase / the absolute total volume changed
# Can be derived from VSUMP by VSUMN = 1 - VSUMP
fields += [
"Sum(Greater(Ref($volume, 1)-$volume, 0), %d)/(Sum(Abs($volume-Ref($volume, 1)), %d)+1e-12)"
% (d, d)
for d in windows
]
names += ["VSUMN%d" % d for d in windows]
if use("VSUMD"):
# The diff ratio between total volume increase and total volume decrease
# RSI indicator for volume
fields += [
"(Sum(Greater($volume-Ref($volume, 1), 0), %d)-Sum(Greater(Ref($volume, 1)-$volume, 0), %d))"
"/(Sum(Abs($volume-Ref($volume, 1)), %d)+1e-12)" % (d, d, d)
for d in windows
]
names += ["VSUMD%d" % d for d in windows]
return fields, names

View File

@@ -243,7 +243,7 @@ class MetaDatasetDS(MetaTaskDataset):
trunc_days: int = None,
rolling_ext_days: int = 0,
exp_name: Union[str, InternalData],
segments: Union[Dict[Text, Tuple], float],
segments: Union[Dict[Text, Tuple], float, str],
hist_step_n: int = 10,
task_mode: str = MetaTask.PROC_MODE_FULL,
fill_method: str = "max",
@@ -271,12 +271,16 @@ class MetaDatasetDS(MetaTaskDataset):
- str: the name of the experiment to store the performance of data
- InternalData: a prepared internal data
segments: Union[Dict[Text, Tuple], float]
the segments to divide data
both left and right
if the segment is a Dict
the segments to divide data
both left and right are included
if segments is a float:
the float represents the percentage of data for training
if segments is a string:
it will try its best to put its data in training and ensure that the date `segments` is in the test set
hist_step_n: int
length of historical steps for the meta infomation
Number of steps of the data similarity information
task_mode : str
Please refer to the docs of MetaTask
"""
@@ -383,10 +387,30 @@ class MetaDatasetDS(MetaTaskDataset):
if isinstance(self.segments, float):
train_task_n = int(len(self.meta_task_l) * self.segments)
if segment == "train":
return self.meta_task_l[:train_task_n]
train_tasks = self.meta_task_l[:train_task_n]
get_module_logger("MetaDatasetDS").info(f"The first train meta task: {train_tasks[0]}")
return train_tasks
elif segment == "test":
return self.meta_task_l[train_task_n:]
test_tasks = self.meta_task_l[train_task_n:]
get_module_logger("MetaDatasetDS").info(f"The first test meta task: {test_tasks[0]}")
return test_tasks
else:
raise NotImplementedError(f"This type of input is not supported")
elif isinstance(self.segments, str):
train_tasks = []
test_tasks = []
for t in self.meta_task_l:
test_end = t.task["dataset"]["kwargs"]["segments"]["test"][1]
if test_end is None or pd.Timestamp(test_end) < pd.Timestamp(self.segments):
train_tasks.append(t)
else:
test_tasks.append(t)
get_module_logger("MetaDatasetDS").info(f"The first train meta task: {train_tasks[0]}")
get_module_logger("MetaDatasetDS").info(f"The first test meta task: {test_tasks[0]}")
if segment == "train":
return train_tasks
elif segment == "test":
return test_tasks
raise NotImplementedError(f"This type of input is not supported")
else:
raise NotImplementedError(f"This type of input is not supported")

View File

@@ -53,7 +53,12 @@ class MetaModelDS(MetaTaskModel):
max_epoch=100,
seed=43,
alpha=0.0,
loss_skip_thresh=50,
):
"""
loss_skip_size: int
The number of threshold to skip the loss calculation for each day.
"""
self.step = step
self.hist_step_n = hist_step_n
self.clip_method = clip_method
@@ -63,6 +68,7 @@ class MetaModelDS(MetaTaskModel):
self.max_epoch = max_epoch
self.fitted = False
self.alpha = alpha
self.loss_skip_thresh = loss_skip_thresh
torch.manual_seed(seed)
def run_epoch(self, phase, task_list, epoch, opt, loss_l, ignore_weight=False):
@@ -88,12 +94,14 @@ class MetaModelDS(MetaTaskModel):
criterion = nn.MSELoss()
loss = criterion(pred, meta_input["y_test"])
elif self.criterion == "ic_loss":
criterion = ICLoss()
criterion = ICLoss(self.loss_skip_thresh)
try:
loss = criterion(pred, meta_input["y_test"], meta_input["test_idx"], skip_size=50)
loss = criterion(pred, meta_input["y_test"], meta_input["test_idx"])
except ValueError as e:
get_module_logger("MetaModelDS").warning(f"Exception `{e}` when calculating IC loss")
continue
else:
raise ValueError(f"Unknown criterion: {self.criterion}")
assert not np.isnan(loss.detach().item()), "NaN loss!"

View File

@@ -10,7 +10,11 @@ from qlib.log import get_module_logger
class ICLoss(nn.Module):
def forward(self, pred, y, idx, skip_size=50):
def __init__(self, skip_size=50):
super().__init__()
self.skip_size = skip_size
def forward(self, pred, y, idx):
"""forward.
FIXME:
- Some times it will be a slightly different from the result from `pandas.corr()`
@@ -33,7 +37,7 @@ class ICLoss(nn.Module):
skip_n = 0
for start_i, end_i in zip(diff_point, diff_point[1:]):
pred_focus = pred[start_i:end_i] # TODO: just for fake
if pred_focus.shape[0] < skip_size:
if pred_focus.shape[0] < self.skip_size:
# skip some days which have very small amount of stock.
skip_n += 1
continue
@@ -50,6 +54,7 @@ class ICLoss(nn.Module):
)
ic_all += ic_day
if len(diff_point) - 1 - skip_n <= 0:
__import__("ipdb").set_trace()
raise ValueError("No enough data for calculating IC")
if skip_n > 0:
get_module_logger("ICLoss").info(

View File

@@ -63,6 +63,7 @@ class LinearModel(Model):
df_train = pd.concat([df_train, df_valid])
except KeyError:
get_module_logger("LinearModel").info("include_valid=True, but valid does not exist")
df_train = df_train.dropna()
if df_train.empty:
raise ValueError("Empty data from dataset, please check your dataset config.")
if reweighter is not None:

View File

@@ -0,0 +1,663 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from __future__ import division
from __future__ import print_function
from torch.utils.data import DataLoader, RandomSampler, StackDataset
import os
import numpy as np
import pandas as pd
from typing import Callable, Optional, Text, Union
from sklearn.metrics import roc_auc_score, mean_squared_error
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import StackDataset
from qlib.data.dataset.weight import Reweighter
from .pytorch_utils import count_parameters
from ...model.base import Model
from ...data.dataset import DatasetH, TSDatasetH
from ...data.dataset.handler import DataHandlerLP
from ...utils import (
auto_filter_kwargs,
init_instance_by_config,
unpack_archive_with_buffer,
save_multiple_parts_file,
get_or_create_path,
)
from ...log import get_module_logger
from ...workflow import R
from qlib.contrib.meta.data_selection.utils import ICLoss
from torch.nn import DataParallel
class GeneralPTNN(Model):
"""General Pytorch Neural Network Model
Parameters
----------
input_dim : int
input dimension
output_dim : int
output dimension
layers : tuple
layer sizes
lr : float
learning rate
optimizer : str
optimizer name
GPU : int
the GPU ID used for training
"""
def __init__(
self,
lr=0.001,
max_steps=300,
batch_size=2000,
early_stop_rounds=50,
eval_steps=20,
optimizer="gd",
loss="mse",
GPU=0,
seed=None,
weight_decay=0.0,
data_parall=False,
scheduler: Optional[Union[Callable]] = "default", # when it is Callable, it accept one argument named optimizer
init_model=None,
eval_train_metric=False,
pt_model_uri="qlib.contrib.model.pytorch_nn.Net",
pt_model_kwargs={
"input_dim": 360,
"layers": (256,),
},
valid_key=DataHandlerLP.DK_L,
# TODO: Infer Key is a more reasonable key. But it requires more detailed processing on label processing
):
# Set logger.
self.logger = get_module_logger("DNNModelPytorch")
self.logger.info("DNN pytorch version...")
# set hyper-parameters.
self.lr = lr
self.max_steps = max_steps
self.batch_size = batch_size
self.early_stop_rounds = early_stop_rounds
self.eval_steps = eval_steps
self.optimizer = optimizer.lower()
self.loss_type = loss
if isinstance(GPU, str):
self.device = torch.device(GPU)
else:
self.device = torch.device("cuda:%d" % (GPU) if torch.cuda.is_available() and GPU >= 0 else "cpu")
self.seed = seed
self.weight_decay = weight_decay
self.data_parall = data_parall
self.eval_train_metric = eval_train_metric
self.valid_key = valid_key
self.best_step = None
self.logger.info(
"DNN parameters setting:"
f"\nlr : {lr}"
f"\nmax_steps : {max_steps}"
f"\nbatch_size : {batch_size}"
f"\nearly_stop_rounds : {early_stop_rounds}"
f"\neval_steps : {eval_steps}"
f"\noptimizer : {optimizer}"
f"\nloss_type : {loss}"
f"\nseed : {seed}"
f"\ndevice : {self.device}"
f"\nuse_GPU : {self.use_gpu}"
f"\nweight_decay : {weight_decay}"
f"\nenable data parall : {self.data_parall}"
f"\npt_model_uri: {pt_model_uri}"
f"\npt_model_kwargs: {pt_model_kwargs}"
)
if self.seed is not None:
np.random.seed(self.seed)
torch.manual_seed(self.seed)
if loss not in {"mse", "binary"}:
raise NotImplementedError("loss {} is not supported!".format(loss))
self._scorer = mean_squared_error if loss == "mse" else roc_auc_score
if init_model is None:
self.dnn_model = init_instance_by_config({"class": pt_model_uri, "kwargs": pt_model_kwargs})
if self.data_parall:
self.dnn_model = DataParallel(self.dnn_model).to(self.device)
else:
self.dnn_model = init_model
self.logger.info("model:\n{:}".format(self.dnn_model))
self.logger.info("model size: {:.4f} MB".format(count_parameters(self.dnn_model)))
if optimizer.lower() == "adam":
self.train_optimizer = optim.Adam(self.dnn_model.parameters(), lr=self.lr, weight_decay=self.weight_decay)
elif optimizer.lower() == "gd":
self.train_optimizer = optim.SGD(self.dnn_model.parameters(), lr=self.lr, weight_decay=self.weight_decay)
else:
raise NotImplementedError("optimizer {} is not supported!".format(optimizer))
if scheduler == "default":
# Reduce learning rate when loss has stopped decrease
self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
self.train_optimizer,
mode="min",
factor=0.5,
patience=10,
verbose=True,
threshold=0.0001,
threshold_mode="rel",
cooldown=0,
min_lr=0.00001,
eps=1e-08,
)
elif scheduler is None:
self.scheduler = None
else:
self.scheduler = scheduler(optimizer=self.train_optimizer)
self.dnn_model.to(self.device)
@property
def use_gpu(self):
return self.device != torch.device("cpu")
def _eval_valid_dl(self, valid_loader, val_index):
with torch.no_grad():
self.dnn_model.eval()
val_loss = []
val_pred = []
val_label = []
for x_batch, y_batch in valid_loader:
x_batch = x_batch.to(self.device)
y_batch = y_batch.to(self.device)
cur_loss = self.get_loss(preds, y_batch, self.loss_type)
val_loss.append(cur_loss.detach().cpu().numpy().item())
val_loss = np.mean(val_loss)
val_pred = torch.cat(val_pred, axis=0).detach().cpu().numpy()
val_label = torch.cat(val_label, axis=0).detach().cpu().numpy()
val_metric = self.get_metric(val_pred, val_label, val_index).detach().cpu().numpy().item()
return val_loss, val_metric
def fit(
self,
dataset: Union[DatasetH, TSDatasetH],
verbose=True,
save_path=None,
):
ists = isinstance(dataset, TSDatasetH) # is this time series dataset
# prepare training
train_x = dataset.prepare("train", col_set="feature", data_key=DataHandlerLP.DK_L)
train_y = dataset.prepare("train", col_set="label", data_key=DataHandlerLP.DK_L)
train_ds = StackDataset(train_x, train_y)
train_sampler = RandomSampler(train_ds)
train_loader = DataLoader(train_ds, batch_size=self.batch_size, sampler=train_sampler)
# prepare validation
valid_x = dataset.prepare("train", col_set="feature", data_key=DataHandlerLP.DK_L)
valid_y = dataset.prepare("train", col_set="label", data_key=DataHandlerLP.DK_L)
valid_ds = StackDataset(valid_x, valid_y)
valid_loader = DataLoader(valid_ds, batch_size=self.batch_size, shuffle=False)
if ists:
val_index = valid_x.data_index
else:
val_index = valid_x.index
save_path = get_or_create_path(save_path)
stop_steps = 0
train_loss = 0
best_loss = np.inf
# train
self.logger.info("training...")
for step in range(1, self.max_steps + 1):
if stop_steps >= self.early_stop_rounds:
if verbose:
self.logger.info("\tearly stop")
break
loss = AverageMeter()
self.dnn_model.train()
self.train_optimizer.zero_grad()
for x_batch, y_batch in train_loader:
x_batch = x_batch.to(self.device)
y_batch = y_batch.to(self.device)
# forward
preds = self.dnn_model(x_batch)
cur_loss = self.get_loss(preds, y_batch, self.loss_type)
cur_loss.backward()
self.train_optimizer.step()
loss.update(cur_loss.item())
R.log_metrics(train_loss=loss.avg, step=step)
# validation
train_loss += loss.val
# for every `eval_steps` steps or at the last steps, we will evaluate the model.
if step % self.eval_steps == 0 or step == self.max_steps:
stop_steps += 1
train_loss /= self.eval_steps
val_loss, val_metric = self._eval_valid_dl(valid_loader, val_index)
R.log_metrics(val_loss=val_loss, step=step)
R.log_metrics(val_metric=val_metric, step=step)
if val_loss < best_loss:
if verbose:
self.logger.info(
"\tvalid loss update from {:.6f} to {:.6f}, save checkpoint.".format(
best_loss, val_loss
)
)
best_loss = val_loss
self.best_step = step
R.log_metrics(best_step=self.best_step, step=step)
stop_steps = 0
torch.save(self.dnn_model.state_dict(), save_path)
train_loss = 0
# update learning rate
if self.scheduler is not None:
auto_filter_kwargs(self.scheduler.step, warning=False)(metrics=val_loss, epoch=step)
R.log_metrics(lr=self.get_lr(), step=step)
# restore the optimal parameters after training
self.dnn_model.load_state_dict(torch.load(save_path, map_location=self.device))
if self.use_gpu:
torch.cuda.empty_cache()
def get_lr(self):
assert len(self.train_optimizer.param_groups) == 1
return self.train_optimizer.param_groups[0]["lr"]
def get_loss(self, pred, target, loss_type, w=None):
pred, target = pred.reshape(-1), target.reshape(-1)
if w is None:
# make it ones and the same size with pred
w = torch.ones_like(pred).to(pred.device)
if loss_type == "mse":
sqr_loss = torch.mul(pred - target, pred - target)
loss = torch.mul(sqr_loss, w).mean()
return loss
elif loss_type == "binary":
loss = nn.BCEWithLogitsLoss(weight=w)
return loss(pred, target)
else:
raise NotImplementedError("loss {} is not supported!".format(loss_type))
def get_metric(self, pred, target, index):
# NOTE: the order of the index must follow <datetime, instrument> sorted order
return -ICLoss()(pred, target, index) # pylint: disable=E1130
def _nn_predict(self, data, return_cpu=True):
"""Reusing predicting NN.
Scenarios
1) test inference (data may come from CPU and expect the output data is on CPU)
2) evaluation on training (data may come from GPU)
"""
if not isinstance(data, torch.Tensor):
if isinstance(data, pd.DataFrame):
data = data.values
data = torch.Tensor(data)
data = data.to(self.device)
preds = []
self.dnn_model.eval()
with torch.no_grad():
batch_size = 8096
for i in range(0, len(data), batch_size):
x = data[i : i + batch_size]
preds.append(self.dnn_model(x.to(self.device)).detach().reshape(-1))
if return_cpu:
preds = np.concatenate([pr.cpu().numpy() for pr in preds])
else:
preds = torch.cat(preds, axis=0)
return preds
def predict(self, dataset: DatasetH, segment: Union[Text, slice] = "test"):
x_test_pd = dataset.prepare(segment, col_set="feature", data_key=DataHandlerLP.DK_I)
preds = self._nn_predict(x_test_pd)
return pd.Series(preds.reshape(-1), index=x_test_pd.index)
class AverageMeter:
"""Computes and stores the average and current value"""
def __init__(self):
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
from ...model.utils import ConcatDataset
class GeneralPTNN(Model):
"""
Motivation:
We want to provide a Qlib General Pytorch Model Adaptor
You can reuse it for all kinds of Pytorch models.
It should include the training and predict process
Parameters
----------
d_feat : int
input dimension for each time step
metric: str
the evaluation metric used in early stop
optimizer : str
optimizer name
GPU : str
the GPU ID(s) used for training
"""
def __init__(
self,
n_epochs=200,
lr=0.001,
metric="",
batch_size=2000,
early_stop=20,
loss="mse",
optimizer="adam",
n_jobs=10,
GPU=0,
seed=None,
pt_model_uri="qlib.contrib.model.pytorch_gru_ts.GRUModel",
pt_model_kwargs={
"d_feat":6,
"hidden_size":64,
"num_layers":2,
"dropout":0.,
},
):
# Set logger.
self.logger = get_module_logger("GeneralPTNN")
self.logger.info("GeneralPTNN pytorch version...")
# set hyper-parameters.
self.n_epochs = n_epochs
self.lr = lr
self.metric = metric
self.batch_size = batch_size
self.early_stop = early_stop
self.optimizer = optimizer.lower()
self.loss = loss
self.device = torch.device("cuda:%d" % (GPU) if torch.cuda.is_available() and GPU >= 0 else "cpu")
self.n_jobs = n_jobs
self.seed = seed
self.pt_model_uri, self.pt_model_kwargs = pt_model_uri, pt_model_kwargs
self.dnn_model = init_instance_by_config({"class": pt_model_uri, "kwargs": pt_model_kwargs})
self.logger.info(
"GeneralPTNN parameters setting:"
"\nn_epochs : {}"
"\nlr : {}"
"\nmetric : {}"
"\nbatch_size : {}"
"\nearly_stop : {}"
"\noptimizer : {}"
"\nloss_type : {}"
"\ndevice : {}"
"\nn_jobs : {}"
"\nuse_GPU : {}"
"\nseed : {}"
"\npt_model_uri: {}"
"\npt_model_kwargs: {}".format(
n_epochs,
lr,
metric,
batch_size,
early_stop,
optimizer.lower(),
loss,
self.device,
n_jobs,
self.use_gpu,
seed,
pt_model_uri,
pt_model_kwargs,
)
)
if self.seed is not None:
np.random.seed(self.seed)
torch.manual_seed(self.seed)
self.logger.info("model:\n{:}".format(self.dnn_model))
self.logger.info("model size: {:.4f} MB".format(count_parameters(self.dnn_model)))
if optimizer.lower() == "adam":
self.train_optimizer = optim.Adam(self.dnn_model.parameters(), lr=self.lr)
elif optimizer.lower() == "gd":
self.train_optimizer = optim.SGD(self.dnn_model.parameters(), lr=self.lr)
else:
raise NotImplementedError("optimizer {} is not supported!".format(optimizer))
self.fitted = False
self.dnn_model.to(self.device)
@property
def use_gpu(self):
return self.device != torch.device("cpu")
def mse(self, pred, label, weight):
loss = weight * (pred - label) ** 2
return torch.mean(loss)
def loss_fn(self, pred, label, weight=None):
mask = ~torch.isnan(label)
if weight is None:
weight = torch.ones_like(label)
if self.loss == "mse":
return self.mse(pred[mask], label[mask], weight[mask])
raise ValueError("unknown loss `%s`" % self.loss)
def metric_fn(self, pred, label):
mask = torch.isfinite(label)
if self.metric in ("", "loss"):
return -self.loss_fn(pred[mask], label[mask])
raise ValueError("unknown metric `%s`" % self.metric)
def _get_fl(self, data: torch.Tensor):
"""
get feature and label from data
- Handle the different data shape of time series and tabular data
Parameters
----------
data : torch.Tensor
input data which maybe 3 dimension or 2 dimension
- 3dim: [batch_size, time_step, feature_dim]
- 2dim: [batch_size, feature_dim]
Returns
-------
Tuple[torch.Tensor, torch.Tensor]
"""
if data.dim() == 3:
# it is a time series dataset
feature = data[:, :, 0:-1].to(self.device)
label = data[:, -1, -1].to(self.device)
elif data.dim() == 2:
# it is a tabular dataset
feature = data[:, 0:-1].to(self.device)
label = data[:, -1].to(self.device)
else:
raise ValueError("Unsupported data shape.")
return feature, label
def train_epoch(self, data_loader):
self.dnn_model.train()
for data, weight in data_loader:
feature , label = self._get_fl(data)
pred = self.dnn_model(feature.float())
loss = self.loss_fn(pred, label, weight.to(self.device))
self.train_optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_value_(self.dnn_model.parameters(), 3.0)
self.train_optimizer.step()
def test_epoch(self, data_loader):
self.dnn_model.eval()
scores = []
losses = []
for data, weight in data_loader:
feature = data[:, :, 0:-1].to(self.device)
# feature[torch.isnan(feature)] = 0
label = data[:, -1, -1].to(self.device)
with torch.no_grad():
pred = self.dnn_model(feature.float())
loss = self.loss_fn(pred, label, weight.to(self.device))
losses.append(loss.item())
score = self.metric_fn(pred, label)
scores.append(score.item())
return np.mean(losses), np.mean(scores)
def fit(
self,
dataset: Union[DatasetH, TSDatasetH],
evals_result=dict(),
save_path=None,
reweighter=None,
):
ists = isinstance(dataset, TSDatasetH) # is this time series dataset
dl_train = dataset.prepare("train", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L)
dl_valid = dataset.prepare("valid", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L)
if dl_train.empty or dl_valid.empty:
raise ValueError("Empty data from dataset, please check your dataset config.")
if reweighter is None:
wl_train = np.ones(len(dl_train))
wl_valid = np.ones(len(dl_valid))
elif isinstance(reweighter, Reweighter):
wl_train = reweighter.reweight(dl_train)
wl_valid = reweighter.reweight(dl_valid)
else:
raise ValueError("Unsupported reweighter type.")
# Preprocess for data. To align to Dataset Interface for DataLoader
if ists:
dl_train.config(fillna_type="ffill+bfill") # process nan brought by dataloader
dl_valid.config(fillna_type="ffill+bfill") # process nan brought by dataloader
else:
# If it is a tabular, we convert the dataframe to numpy to be indexable by DataLoader
dl_train = dl_train.values
dl_valid = dl_valid.values
train_loader = DataLoader(
ConcatDataset(dl_train, wl_train),
batch_size=self.batch_size,
shuffle=True,
num_workers=self.n_jobs,
drop_last=True,
)
valid_loader = DataLoader(
ConcatDataset(dl_valid, wl_valid),
batch_size=self.batch_size,
shuffle=False,
num_workers=self.n_jobs,
drop_last=True,
)
del dl_train, dl_valid, wl_train, wl_valid
save_path = get_or_create_path(save_path)
stop_steps = 0
train_loss = 0
best_score = -np.inf
best_epoch = 0
evals_result["train"] = []
evals_result["valid"] = []
# train
self.logger.info("training...")
self.fitted = True
for step in range(self.n_epochs):
self.logger.info("Epoch%d:", step)
self.logger.info("training...")
self.train_epoch(train_loader)
self.logger.info("evaluating...")
train_loss, train_score = self.test_epoch(train_loader)
val_loss, val_score = self.test_epoch(valid_loader)
self.logger.info("train %.6f, valid %.6f" % (train_score, val_score))
evals_result["train"].append(train_score)
evals_result["valid"].append(val_score)
if val_score > best_score:
best_score = val_score
stop_steps = 0
best_epoch = step
best_param = copy.deepcopy(self.dnn_model.state_dict())
else:
stop_steps += 1
if stop_steps >= self.early_stop:
self.logger.info("early stop")
break
self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch))
self.dnn_model.load_state_dict(best_param)
torch.save(best_param, save_path)
if self.use_gpu:
torch.cuda.empty_cache()
def predict(self, dataset: Union[DatasetH, TSDatasetH]):
if not self.fitted:
raise ValueError("model is not fitted yet!")
dl_test = dataset.prepare("test", col_set=["feature", "label"], data_key=DataHandlerLP.DK_I)
dl_test.config(fillna_type="ffill+bfill")
test_loader = DataLoader(dl_test, batch_size=self.batch_size, num_workers=self.n_jobs)
self.dnn_model.eval()
preds = []
for data in test_loader:
feature = data[:, :, 0:-1].to(self.device)
with torch.no_grad():
pred = self.dnn_model(feature.float()).detach().cpu().numpy()
preds.append(pred)
return pd.Series(np.concatenate(preds), index=dl_test.get_index())

View File

@@ -1,25 +1,25 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from __future__ import division
from __future__ import print_function
import copy
from typing import Text, Union
import numpy as np
import pandas as pd
from typing import Text, Union
import copy
from ...utils import get_or_create_path
from ...log import get_module_logger
import torch
import torch.nn as nn
import torch.optim as optim
from .pytorch_utils import count_parameters
from ...model.base import Model
from qlib.workflow import R
from ...data.dataset import DatasetH
from ...data.dataset.handler import DataHandlerLP
from ...log import get_module_logger
from ...model.base import Model
from ...utils import get_or_create_path
from .pytorch_utils import count_parameters
class GRU(Model):
@@ -212,16 +212,31 @@ class GRU(Model):
evals_result=dict(),
save_path=None,
):
df_train, df_valid, df_test = dataset.prepare(
["train", "valid", "test"],
col_set=["feature", "label"],
data_key=DataHandlerLP.DK_L,
)
if df_train.empty or df_valid.empty:
raise ValueError("Empty data from dataset, please check your dataset config.")
# prepare training and validation data
dfs = {
k: dataset.prepare(
k,
col_set=["feature", "label"],
data_key=DataHandlerLP.DK_L,
)
for k in ["train", "valid"]
if k in dataset.segments
}
df_train, df_valid = dfs.get("train", pd.DataFrame()), dfs.get("valid", pd.DataFrame())
# check if training data is empty
if df_train.empty:
raise ValueError("Empty training data from dataset, please check your dataset config.")
df_train = df_train.dropna()
x_train, y_train = df_train["feature"], df_train["label"]
x_valid, y_valid = df_valid["feature"], df_valid["label"]
# check if validation data is provided
if not df_valid.empty:
df_valid = df_valid.dropna()
x_valid, y_valid = df_valid["feature"], df_valid["label"]
else:
x_valid, y_valid = None, None
save_path = get_or_create_path(save_path)
stop_steps = 0
@@ -235,32 +250,42 @@ class GRU(Model):
self.logger.info("training...")
self.fitted = True
best_param = copy.deepcopy(self.gru_model.state_dict())
for step in range(self.n_epochs):
self.logger.info("Epoch%d:", step)
self.logger.info("training...")
self.train_epoch(x_train, y_train)
self.logger.info("evaluating...")
train_loss, train_score = self.test_epoch(x_train, y_train)
val_loss, val_score = self.test_epoch(x_valid, y_valid)
self.logger.info("train %.6f, valid %.6f" % (train_score, val_score))
evals_result["train"].append(train_score)
evals_result["valid"].append(val_score)
if val_score > best_score:
best_score = val_score
stop_steps = 0
best_epoch = step
best_param = copy.deepcopy(self.gru_model.state_dict())
else:
stop_steps += 1
if stop_steps >= self.early_stop:
self.logger.info("early stop")
break
# evaluate on validation data if provided
if x_valid is not None and y_valid is not None:
val_loss, val_score = self.test_epoch(x_valid, y_valid)
self.logger.info("train %.6f, valid %.6f" % (train_score, val_score))
evals_result["valid"].append(val_score)
if val_score > best_score:
best_score = val_score
stop_steps = 0
best_epoch = step
best_param = copy.deepcopy(self.gru_model.state_dict())
else:
stop_steps += 1
if stop_steps >= self.early_stop:
self.logger.info("early stop")
break
self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch))
self.gru_model.load_state_dict(best_param)
torch.save(best_param, save_path)
# Logging
rec = R.get_recorder()
for k, v_l in evals_result.items():
for i, v in enumerate(v_l):
rec.log_metrics(step=i, **{k: v})
if self.use_gpu:
torch.cuda.empty_cache()
@@ -292,6 +317,7 @@ class GRU(Model):
class GRUModel(nn.Module):
def __init__(self, d_feat=6, hidden_size=64, num_layers=2, dropout=0.0):
super().__init__()

View File

@@ -1,5 +1,17 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""
Here we have a comprehensive set of analysis classes.
Here is an example.
.. code-block:: python
from qlib.contrib.report.data.ana import FeaMeanStd
fa = FeaMeanStd(ret_df)
fa.plot_all(wspace=0.3, sub_figsize=(12, 3), col_n=5)
"""
import pandas as pd
import numpy as np
from qlib.contrib.report.data.base import FeaAnalyser
@@ -152,6 +164,7 @@ class FeaSkewTurt(NumFeaAnalyser):
self._kurt[col].plot(ax=right_ax, label="kurt", color="green")
right_ax.set_xlabel("")
right_ax.set_ylabel("kurt")
right_ax.grid(None) # set the grid to None to avoid two layer of grid
h1, l1 = ax.get_legend_handles_labels()
h2, l2 = right_ax.get_legend_handles_labels()
@@ -171,12 +184,15 @@ class FeaMeanStd(NumFeaAnalyser):
ax.set_xlabel("")
ax.set_ylabel("mean")
ax.legend()
ax.tick_params(axis="x", rotation=90)
right_ax = ax.twinx()
self._std[col].plot(ax=right_ax, label="std", color="green")
right_ax.set_xlabel("")
right_ax.set_ylabel("std")
right_ax.tick_params(axis="x", rotation=90)
right_ax.grid(None) # set the grid to None to avoid two layer of grid
h1, l1 = ax.get_legend_handles_labels()
h2, l2 = right_ax.get_legend_handles_labels()

View File

@@ -14,6 +14,24 @@ from qlib.contrib.report.utils import sub_fig_generator
class FeaAnalyser:
def __init__(self, dataset: pd.DataFrame):
"""
Parameters
----------
dataset : pd.DataFrame
We often have multiple columns for dataset. Each column corresponds to one sub figure.
There will be a datatime column in the index levels.
Aggretation will be used for more summarized metrics overtime.
Here is an example of data:
.. code-block::
return
datetime instrument
2007-02-06 equity_tpx 0.010087
equity_spx 0.000786
"""
self._dataset = dataset
with TimeInspector.logt("calc_stat_values"):
self.calc_stat_values()

View File

@@ -4,7 +4,7 @@ import matplotlib.pyplot as plt
import pandas as pd
def sub_fig_generator(sub_fs=(3, 3), col_n=10, row_n=1, wspace=None, hspace=None, sharex=False, sharey=False):
def sub_fig_generator(sub_figsize=(3, 3), col_n=10, row_n=1, wspace=None, hspace=None, sharex=False, sharey=False):
"""sub_fig_generator.
it will return a generator, each row contains <col_n> sub graph
@@ -13,7 +13,7 @@ def sub_fig_generator(sub_fs=(3, 3), col_n=10, row_n=1, wspace=None, hspace=None
Parameters
----------
sub_fs :
sub_figsize :
the figure size of each subgraph in <col_n> * <row_n> subgraphs
col_n :
the number of subgraph in each row; It will generating a new graph after generating <col_n> of subgraphs.
@@ -33,7 +33,7 @@ def sub_fig_generator(sub_fs=(3, 3), col_n=10, row_n=1, wspace=None, hspace=None
while True:
fig, axes = plt.subplots(
row_n, col_n, figsize=(sub_fs[0] * col_n, sub_fs[1] * row_n), sharex=sharex, sharey=sharey
row_n, col_n, figsize=(sub_figsize[0] * col_n, sub_figsize[1] * row_n), sharex=sharex, sharey=sharey
)
plt.subplots_adjust(wspace=wspace, hspace=hspace)
axes = axes.reshape(row_n, col_n)

View File

@@ -73,8 +73,8 @@ class Rolling:
The horizon of the prediction target.
This is used to override the prediction horizon of the file.
h_path : Optional[str]
the dumped data handler;
It may come from other data source. It will override the data handler in the config.
It is other data source that is dumped as a handler. It will override the data handler section in the config.
If it is not given, it will create a customized cache for the handler when `enable_handler_cache=True`
test_end : Optional[str]
the test end for the data. It is typically used together with the handler
You can do the same thing with task_ext_conf in a more complicated way
@@ -119,7 +119,7 @@ class Rolling:
with self.conf_path.open("r") as f:
return yaml.safe_load(f)
def _replace_hanler_with_cache(self, task: dict):
def _replace_handler_with_cache(self, task: dict):
"""
Due to the data processing part in original rolling is slow. So we have to
This class tries to add more feature
@@ -159,13 +159,20 @@ class Rolling:
# - get horizon automatically from the expression!!!!
raise NotImplementedError(f"This type of input is not supported")
else:
self.logger.info("The prediction horizon is overrided")
task["dataset"]["kwargs"]["handler"]["kwargs"]["label"] = [
"Ref($close, -{}) / Ref($close, -1) - 1".format(self.horizon + 1)
]
if enable_handler_cache and self.h_path is not None:
self.logger.info("Fail to override the horizon due to data handler cache")
else:
self.logger.info("The prediction horizon is overrided")
if isinstance(task["dataset"]["kwargs"]["handler"], dict):
task["dataset"]["kwargs"]["handler"]["kwargs"]["label"] = [
"Ref($close, -{}) / Ref($close, -1) - 1".format(self.horizon + 1)
]
else:
self.logger.warning("Try to automatically configure the lablel but failed.")
if enable_handler_cache:
task = self._replace_hanler_with_cache(task)
if self.h_path is not None or enable_handler_cache:
# if we already have provided data source or we want to create one
task = self._replace_handler_with_cache(task)
task = self._update_start_end_time(task)
if self.task_ext_conf is not None:
@@ -173,6 +180,16 @@ class Rolling:
self.logger.info(task)
return task
def run_basic_task(self):
"""
Run the basic task without rolling.
This is for fast testing for model tunning.
"""
task = self.basic_task()
print(task)
trainer = TrainerR(experiment_name=self.exp_name)
trainer([task])
def get_task_list(self) -> List[dict]:
"""return a batch of tasks for rolling."""
task = self.basic_task()

View File

@@ -80,6 +80,11 @@ class DDGDA(Rolling):
sim_task_model: UTIL_MODEL_TYPE = "gbdt",
meta_1st_train_end: Optional[str] = None,
alpha: float = 0.01,
loss_skip_thresh: int = 50,
fea_imp_n: Optional[int] = 30,
meta_data_proc: Optional[str] = "V01",
segments: Union[float, str] = 0.62,
hist_step_n: int = 30,
working_dir: Optional[Union[str, Path]] = None,
**kwargs,
):
@@ -94,6 +99,15 @@ class DDGDA(Rolling):
alpha: float
Setting the L2 regularization for ridge
The `alpha` is only passed to MetaModelDS (it is not passed to sim_task_model currently..)
loss_skip_thresh: int
The thresh to skip the loss calculation for each day. If the number of item is less than it, it will skip the loss on that day.
meta_data_proc : Optional[str]
How we process the meta dataset for learning meta model.
segments : Union[float, str]
if segments is a float:
The ratio of training data in the meta task dataset
if segments is a string:
it will try its best to put its data in training and ensure that the date `segments` is in the test set
"""
# NOTE:
# the horizon must match the meaning in the base task template
@@ -104,14 +118,22 @@ class DDGDA(Rolling):
super().__init__(**kwargs)
self.working_dir = self.conf_path.parent if working_dir is None else Path(working_dir)
self.proxy_hd = self.working_dir / "handler_proxy.pkl"
self.fea_imp_n = fea_imp_n
self.meta_data_proc = meta_data_proc
self.loss_skip_thresh = loss_skip_thresh
self.segments = segments
self.hist_step_n = hist_step_n
def _adjust_task(self, task: dict, astype: UTIL_MODEL_TYPE):
"""
some task are use for special purpose.
Base on the original task, we need to do some extra things.
For example:
- GBDT for calculating feature importance
- Linear or GBDT for calculating similarity
- Datset (well processed) that aligned to Linear that for meta learning
So we may need to change the dataset and model for the special purpose and other settings remains the same.
"""
# NOTE: here is just for aligning with previous implementation
# It is not necessary for the current implementation
@@ -119,12 +141,16 @@ class DDGDA(Rolling):
if astype == "gbdt":
task["model"] = LGBM_MODEL
if isinstance(handler, dict):
# We don't need preprocessing when using GBDT model
for k in ["infer_processors", "learn_processors"]:
if k in handler.setdefault("kwargs", {}):
handler["kwargs"].pop(k)
elif astype == "linear":
task["model"] = LINEAR_MODEL
handler["kwargs"].update(PROC_ARGS)
if isinstance(handler, dict):
handler["kwargs"].update(PROC_ARGS)
else:
self.logger.warning("The handler can't be adjusted.")
else:
raise ValueError(f"astype not supported: {astype}")
return task
@@ -155,12 +181,15 @@ class DDGDA(Rolling):
The meta model will be trained upon the proxy forecasting model.
This dataset is for the proxy forecasting model.
"""
topk = 30
fi = self._get_feature_importance()
col_selected = fi.nlargest(topk)
# NOTE: adjusting to `self.sim_task_model` just for aligning with previous implementation.
# In previous version. The data for proxy model is using sim_task_model's way for processing
task = self._adjust_task(self.basic_task(enable_handler_cache=False), self.sim_task_model)
task = replace_task_handler_with_cache(task, self.working_dir)
# if self.meta_data_proc is not None:
# else:
# # Otherwise, we don't need futher processing
# task = self.basic_task()
dataset = init_instance_by_config(task["dataset"])
prep_ds = dataset.prepare(slice(None), col_set=["feature", "label"], data_key=DataHandlerLP.DK_L)
@@ -168,12 +197,18 @@ class DDGDA(Rolling):
feature_df = prep_ds["feature"]
label_df = prep_ds["label"]
feature_selected = feature_df.loc[:, col_selected.index]
if self.fea_imp_n is not None:
fi = self._get_feature_importance()
col_selected = fi.nlargest(self.fea_imp_n)
feature_selected = feature_df.loc[:, col_selected.index]
else:
feature_selected = feature_df
feature_selected = feature_selected.groupby("datetime", group_keys=False).apply(
lambda df: (df - df.mean()).div(df.std())
)
feature_selected = feature_selected.fillna(0.0)
if self.meta_data_proc == "V01":
feature_selected = feature_selected.groupby("datetime", group_keys=False).apply(
lambda df: (df - df.mean()).div(df.std())
)
feature_selected = feature_selected.fillna(0.0)
df_all = {
"label": label_df.reindex(feature_selected.index),
@@ -223,7 +258,10 @@ class DDGDA(Rolling):
# 1) leverage the simplified proxy forecasting model to train meta model.
# - Only the dataset part is important, in current version of meta model will integrate the
# the train_start for training meta model does not necessarily align with final rolling
# NOTE:
# - The train_start for training meta model does not necessarily align with final rolling
# But please select a right time to make sure the finnal rolling tasks are not leaked in the training data.
# - The test_start is automatically aligned to the next day of test_end. Validation is ignored.
train_start = "2008-01-01" if self.train_start is None else self.train_start
train_end = "2010-12-31" if self.meta_1st_train_end is None else self.meta_1st_train_end
test_start = (pd.Timestamp(train_end) + pd.Timedelta(days=1)).strftime("%Y-%m-%d")
@@ -249,9 +287,9 @@ class DDGDA(Rolling):
kwargs = dict(
task_tpl=proxy_forecast_model_task,
step=self.step,
segments=0.62, # keep test period consistent with the dataset yaml
segments=self.segments, # keep test period consistent with the dataset yaml
trunc_days=1 + self.horizon,
hist_step_n=30,
hist_step_n=self.hist_step_n,
fill_method=fill_method,
rolling_ext_days=0,
)
@@ -268,7 +306,13 @@ class DDGDA(Rolling):
with R.start(experiment_name=self.meta_exp_name):
R.log_params(**kwargs)
mm = MetaModelDS(
step=self.step, hist_step_n=kwargs["hist_step_n"], lr=0.001, max_epoch=30, seed=43, alpha=self.alpha
step=self.step,
hist_step_n=kwargs["hist_step_n"],
lr=0.001,
max_epoch=30,
seed=43,
alpha=self.alpha,
loss_skip_thresh=self.loss_skip_thresh,
)
mm.fit(md)
R.save_objects(model=mm)

View File

@@ -7,7 +7,7 @@ from pathlib import Path
import warnings
import pandas as pd
from typing import Tuple, Union, List
from typing import Tuple, Union, List, Dict
from qlib.data import D
from qlib.utils import load_dataset, init_instance_by_config, time_to_slc_point
@@ -247,10 +247,14 @@ class StaticDataLoader(DataLoader, Serializable):
def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame:
self._maybe_load_raw_data()
# 1) Filter by instruments
if instruments is None:
df = self._data
else:
df = self._data.loc(axis=0)[:, instruments]
# 2) Filter by Datetime
if start_time is None and end_time is None:
return df # NOTE: avoid copy by loc
# pd.Timestamp(None) == NaT, use NaT as index can not fetch correct thing, so do not change None.
@@ -275,6 +279,55 @@ class StaticDataLoader(DataLoader, Serializable):
self._data = self._config
class NestedDataLoader(DataLoader):
"""
We have multiple DataLoader, we can use this class to combine them.
"""
def __init__(self, dataloader_l: List[Dict], join="left") -> None:
"""
Parameters
----------
dataloader_l : list[dict]
A list of dataloader, for exmaple
.. code-block:: python
nd = NestedDataLoader(
dataloader_l=[
{
"class": "qlib.contrib.data.loader.Alpha158DL",
}, {
"class": "qlib.contrib.data.loader.Alpha360DL",
"kwargs": {
"config": {
"label": ( ["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"])
}
}
}
]
)
join :
it will pass to pd.concat when merging it.
"""
super().__init__()
self.data_loader_l = [
(dl if isinstance(dl, DataLoader) else init_instance_by_config(dl)) for dl in dataloader_l
]
self.join = join
def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame:
df_full = None
for dl in self.data_loader_l:
df_current = dl.load(instruments, start_time, end_time)
if df_full is None:
df_full = df_current
else:
df_full = pd.merge(df_full, df_current, left_index=True, right_index=True, how=self.join)
return df_full.sort_index(axis=1)
class DataLoaderDH(DataLoader):
"""DataLoaderDH
DataLoader based on (D)ata (H)andler

View File

@@ -51,3 +51,6 @@ class MetaTask:
Return the **processed** meta_info
"""
return self.meta_info
def __repr__(self):
return f"MetaTask(task={self.task}, meta_info={self.meta_info})"

View File

@@ -108,6 +108,12 @@ class Index:
self.index_map = self.idx_list = np.arange(idx_list)
self._is_sorted = True
else:
# Check if all elements in idx_list are of the same type
if not all(isinstance(x, type(idx_list[0])) for x in idx_list):
raise TypeError("All elements in idx_list must be of the same type")
# Check if all elements in idx_list are of the same datetime64 precision
if isinstance(idx_list[0], np.datetime64) and not all(x.dtype == idx_list[0].dtype for x in idx_list):
raise TypeError("All elements in idx_list must be of the same datetime64 precision")
self.idx_list = np.array(idx_list)
# NOTE: only the first appearance is indexed
self.index_map = dict(zip(self.idx_list, range(len(self))))
@@ -131,7 +137,12 @@ class Index:
if self.idx_list.dtype.type is np.datetime64:
if isinstance(item, pd.Timestamp):
# This happens often when creating index based on pandas.DatetimeIndex and query with pd.Timestamp
return item.to_numpy()
return item.to_numpy().astype(self.idx_list.dtype)
elif isinstance(item, np.datetime64):
# This happens often when creating index based on np.datetime64 and query with another precision
return item.astype(self.idx_list.dtype)
# NOTE: It is hard to consider every case at first.
# We just try to cover part of cases to make it more user-friendly
return item
def index(self, item) -> int:

View File

@@ -161,7 +161,13 @@ def init_instance_by_config(
# path like 'file:///<path to pickle file>/obj.pkl'
pr = urlparse(config)
if pr.scheme == "file":
pr_path = os.path.join(pr.netloc, pr.path) if bool(pr.path) else pr.netloc
# To enable relative path like file://data/a/b/c.pkl. pr.netloc will be data
path = pr.path
if pr.netloc != "":
path = path.lstrip("/")
pr_path = os.path.join(pr.netloc, path) if bool(pr.path) else pr.netloc
with open(os.path.normpath(pr_path), "rb") as f:
return pickle.load(f)
else:

View File

@@ -1,18 +1,20 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import logging
import sys
import os
from pathlib import Path
import sys
import fire
from jinja2 import Template, meta
import ruamel.yaml as yaml
import qlib
import fire
import ruamel.yaml as yaml
from qlib.config import C
from qlib.model.trainer import task_train
from qlib.utils.data import update_config
from qlib.log import get_module_logger
from qlib.model.trainer import task_train
from qlib.utils import set_log_with_config
from qlib.utils.data import update_config
set_log_with_config(C.logging_config)
logger = get_module_logger("qrun", logging.INFO)
@@ -47,6 +49,39 @@ def sys_config(config, config_path):
sys.path.append(str(Path(config_path).parent.resolve().absolute() / p))
def render_template(config_path: str) -> str:
"""
render the template based on the environment
Parameters
----------
config_path : str
configuration path
Returns
-------
str
the rendered content
"""
with open(config_path, "r") as f:
config = f.read()
# Set up the Jinja2 environment
template = Template(config)
# Parse the template to find undeclared variables
env = template.environment
parsed_content = env.parse(config)
variables = meta.find_undeclared_variables(parsed_content)
# Get context from os.environ according to the variables
context = {var: os.getenv(var, "") for var in variables if var in os.environ}
logger.info(f"Render the template with the context: {context}")
# Render the template with the context
rendered_content = template.render(context)
return rendered_content
# workflow handler function
def workflow(config_path, experiment_name="workflow", uri_folder="mlruns"):
"""
@@ -67,8 +102,9 @@ def workflow(config_path, experiment_name="workflow", uri_folder="mlruns"):
market: csi300
"""
with open(config_path) as fp:
config = yaml.safe_load(fp)
# Render the template
rendered_yaml = render_template(config_path)
config = yaml.safe_load(rendered_yaml)
base_config_path = config.get("BASE_CONFIG_PATH", None)
if base_config_path:

View File

@@ -0,0 +1,50 @@
# TODO:
# dump alpha 360 to dataframe and merge it with Alpha158
import sys
import unittest
import qlib
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parent))
from qlib.data.dataset.loader import NestedDataLoader
from qlib.contrib.data.loader import Alpha158DL, Alpha360DL
class TestDataLoader(unittest.TestCase):
def test_nested_data_loader(self):
qlib.init()
nd = NestedDataLoader(
dataloader_l=[
{
"class": "qlib.contrib.data.loader.Alpha158DL",
},
{
"class": "qlib.contrib.data.loader.Alpha360DL",
"kwargs": {"config": {"label": (["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"])}},
},
]
)
# Of course you can use StaticDataLoader
dataset = nd.load()
assert dataset is not None
columns = dataset.columns.tolist()
columns_list = [tup[1] for tup in columns]
for col in Alpha158DL.get_feature_config()[1]:
assert col in columns_list
for col in Alpha360DL.get_feature_config()[1]:
assert col in columns_list
assert "LABEL0" in columns_list
# Then you can use it wth DataHandler;
if __name__ == "__main__":
unittest.main()

View File

@@ -94,6 +94,24 @@ class IndexDataTest(unittest.TestCase):
print(sd)
self.assertTrue(sd.iloc[0] == 2)
# test different precisions of time data
timeindex = [
np.datetime64("2024-06-22T00:00:00.000000000"),
np.datetime64("2024-06-21T00:00:00.000000000"),
np.datetime64("2024-06-20T00:00:00.000000000"),
]
sd = idd.SingleData([1, 2, 3], index=timeindex)
self.assertTrue(
sd.index.index(np.datetime64("2024-06-21T00:00:00.000000000"))
== sd.index.index(np.datetime64("2024-06-21T00:00:00"))
)
self.assertTrue(sd.index.index(pd.Timestamp("2024-06-21 00:00")) == 1)
# Bad case: the input is not aligned
timeindex[1] = (np.datetime64("2024-06-21T00:00:00.00"),)
with self.assertRaises(TypeError):
sd = idd.SingleData([1, 2, 3], index=timeindex)
def test_ops(self):
sd1 = idd.SingleData([1, 2, 3, 4], index=["foo", "bar", "f", "g"])
sd2 = idd.SingleData([1, 2, 3, 4], index=["foo", "bar", "f", "g"])

View File

@@ -0,0 +1,86 @@
import unittest
from qlib.contrib.model.pytorch_general_nn import GeneralPTNN
from qlib.data.dataset import DatasetH, TSDatasetH
from qlib.data.dataset.handler import DataHandlerLP
from qlib.tests import TestAutoData
class TestNN(TestAutoData):
def test_both_dataset(self):
data_handler_config = {
"start_time": "2008-01-01",
"end_time": "2020-08-01",
"instruments": "csi300",
"data_loader": {
"class": "QlibDataLoader", # Assuming QlibDataLoader is a string reference to the class
"kwargs": {
"config": {
"feature": [
["$high", "$close", "$low"],
["H", "C", "L"]
],
"label": [
["Ref($close, -2)/Ref($close, -1) - 1"],
["LABEL0"]
]
},
"freq": "day"
}
},
# TODO: processors
"learn_processors": [
{
"class": "DropnaLabel",
},
{
"class": "CSZScoreNorm",
"kwargs": {
"fields_group": "label"
}
}
]
}
segments = {
"train": ["2008-01-01", "2014-12-31"],
"valid": ["2015-01-01", "2016-12-31"],
"test": ["2017-01-01", "2020-08-01"]
}
data_handler = DataHandlerLP(**data_handler_config)
# time-series dataset
tsds = TSDatasetH(handler=data_handler, segments=segments)
# tabular dataset
tbds = DatasetH(handler=data_handler, segments=segments)
model_l = [
GeneralPTNN(
n_epochs=2,
pt_model_uri="qlib.contrib.model.pytorch_gru_ts.GRUModel",
pt_model_kwargs={
"d_feat":3,
"hidden_size":8,
"num_layers":1,
"dropout":0.,
},
),
GeneralPTNN(
n_epochs=2,
pt_model_uri="qlib.contrib.model.pytorch_nn.Net", # it is a MLP
pt_model_kwargs={
"input_dim":3,
},
),
]
for ds, model in reversed(list(zip((tsds, tbds), model_l))):
model.fit(ds) # It works
model.predict(ds) # It works
break
if __name__ == "__main__":
unittest.main()