mirror of
https://github.com/microsoft/qlib.git
synced 2026-06-29 00:51:19 +08:00
Compare commits
1 Commits
ptnn4both
...
fix_get_we
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3d8aca7723 |
@@ -60,4 +60,4 @@ The `[dev]` option will help you to install some related packages when developin
|
||||
|
||||
.. code-block:: bash
|
||||
|
||||
pip install -e ".[dev]"
|
||||
pip install -e .[dev]
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
|
||||
|
||||
# Introduction
|
||||
|
||||
What is GeneralPtNN
|
||||
- Fix previous design that fail to support both Time-series and tabular data
|
||||
- Now you can just replace the Pytorch model structure to run a NN model.
|
||||
|
||||
We provide an example to demonstrate the effectiveness of the current design.
|
||||
- `workflow_config_gru.yaml` align with previous results [GRU(Kyunghyun Cho, et al.)](../README.md#Alpha158 dataset)
|
||||
- `workflow_config_mlp.yaml` align with previous results [MLP](../README.md#Alpha158 dataset)
|
||||
|
||||
# TODO
|
||||
|
||||
We will align existing models to current design.
|
||||
@@ -1,97 +0,0 @@
|
||||
qlib_init:
|
||||
provider_uri: "~/.qlib/qlib_data/cn_data"
|
||||
region: cn
|
||||
market: &market csi300
|
||||
benchmark: &benchmark SH000300
|
||||
data_handler_config: &data_handler_config
|
||||
start_time: 2008-01-01
|
||||
end_time: 2020-08-01
|
||||
fit_start_time: 2008-01-01
|
||||
fit_end_time: 2014-12-31
|
||||
instruments: *market
|
||||
infer_processors:
|
||||
- class: FilterCol
|
||||
kwargs:
|
||||
fields_group: feature
|
||||
col_list: ["RESI5", "WVMA5", "RSQR5", "KLEN", "RSQR10", "CORR5", "CORD5", "CORR10",
|
||||
"ROC60", "RESI10", "VSTD5", "RSQR60", "CORR60", "WVMA60", "STD5",
|
||||
"RSQR20", "CORD60", "CORD10", "CORR20", "KLOW"
|
||||
]
|
||||
- class: RobustZScoreNorm
|
||||
kwargs:
|
||||
fields_group: feature
|
||||
clip_outlier: true
|
||||
- class: Fillna
|
||||
kwargs:
|
||||
fields_group: feature
|
||||
learn_processors:
|
||||
- class: DropnaLabel
|
||||
- class: CSRankNorm
|
||||
kwargs:
|
||||
fields_group: label
|
||||
label: ["Ref($close, -2) / Ref($close, -1) - 1"]
|
||||
|
||||
port_analysis_config: &port_analysis_config
|
||||
strategy:
|
||||
class: TopkDropoutStrategy
|
||||
module_path: qlib.contrib.strategy
|
||||
kwargs:
|
||||
signal: <PRED>
|
||||
topk: 50
|
||||
n_drop: 5
|
||||
backtest:
|
||||
start_time: 2017-01-01
|
||||
end_time: 2020-08-01
|
||||
account: 100000000
|
||||
benchmark: *benchmark
|
||||
exchange_kwargs:
|
||||
limit_threshold: 0.095
|
||||
deal_price: close
|
||||
open_cost: 0.0005
|
||||
close_cost: 0.0015
|
||||
min_cost: 5
|
||||
task:
|
||||
model:
|
||||
class: GeneralPTNN
|
||||
module_path: qlib.contrib.model.pytorch_general_nn
|
||||
kwargs:
|
||||
d_feat: 20
|
||||
hidden_size: 64
|
||||
num_layers: 2
|
||||
dropout: 0.0
|
||||
n_epochs: 200
|
||||
lr: 2e-4
|
||||
early_stop: 10
|
||||
batch_size: 800
|
||||
metric: loss
|
||||
loss: mse
|
||||
n_jobs: 20
|
||||
GPU: 0
|
||||
dataset:
|
||||
class: TSDatasetH
|
||||
module_path: qlib.data.dataset
|
||||
kwargs:
|
||||
handler:
|
||||
class: Alpha158
|
||||
module_path: qlib.contrib.data.handler
|
||||
kwargs: *data_handler_config
|
||||
segments:
|
||||
train: [2008-01-01, 2014-12-31]
|
||||
valid: [2015-01-01, 2016-12-31]
|
||||
test: [2017-01-01, 2020-08-01]
|
||||
step_len: 20
|
||||
record:
|
||||
- class: SignalRecord
|
||||
module_path: qlib.workflow.record_temp
|
||||
kwargs:
|
||||
model: <MODEL>
|
||||
dataset: <DATASET>
|
||||
- class: SigAnaRecord
|
||||
module_path: qlib.workflow.record_temp
|
||||
kwargs:
|
||||
ana_long_short: False
|
||||
ann_scaler: 252
|
||||
- class: PortAnaRecord
|
||||
module_path: qlib.workflow.record_temp
|
||||
kwargs:
|
||||
config: *port_analysis_config
|
||||
@@ -1,98 +0,0 @@
|
||||
qlib_init:
|
||||
provider_uri: "~/.qlib/qlib_data/cn_data"
|
||||
region: cn
|
||||
market: &market csi300
|
||||
benchmark: &benchmark SH000300
|
||||
data_handler_config: &data_handler_config
|
||||
start_time: 2008-01-01
|
||||
end_time: 2020-08-01
|
||||
fit_start_time: 2008-01-01
|
||||
fit_end_time: 2014-12-31
|
||||
instruments: *market
|
||||
infer_processors: [
|
||||
{
|
||||
"class" : "DropCol",
|
||||
"kwargs":{"col_list": ["VWAP0"]}
|
||||
},
|
||||
{
|
||||
"class" : "CSZFillna",
|
||||
"kwargs":{"fields_group": "feature"}
|
||||
}
|
||||
]
|
||||
learn_processors: [
|
||||
{
|
||||
"class" : "DropCol",
|
||||
"kwargs":{"col_list": ["VWAP0"]}
|
||||
},
|
||||
{
|
||||
"class" : "DropnaProcessor",
|
||||
"kwargs":{"fields_group": "feature"}
|
||||
},
|
||||
"DropnaLabel",
|
||||
{
|
||||
"class": "CSZScoreNorm",
|
||||
"kwargs": {"fields_group": "label"}
|
||||
}
|
||||
]
|
||||
process_type: "independent"
|
||||
|
||||
port_analysis_config: &port_analysis_config
|
||||
strategy:
|
||||
class: TopkDropoutStrategy
|
||||
module_path: qlib.contrib.strategy
|
||||
kwargs:
|
||||
signal: <PRED>
|
||||
topk: 50
|
||||
n_drop: 5
|
||||
backtest:
|
||||
start_time: 2017-01-01
|
||||
end_time: 2020-08-01
|
||||
account: 100000000
|
||||
benchmark: *benchmark
|
||||
exchange_kwargs:
|
||||
limit_threshold: 0.095
|
||||
deal_price: close
|
||||
open_cost: 0.0005
|
||||
close_cost: 0.0015
|
||||
min_cost: 5
|
||||
task:
|
||||
model:
|
||||
class: GeneralPTNN
|
||||
module_path: qlib.contrib.model.pytorch_general_nn
|
||||
kwargs:
|
||||
loss: mse
|
||||
lr: 0.002
|
||||
optimizer: adam
|
||||
max_steps: 8000
|
||||
batch_size: 8192
|
||||
GPU: 0
|
||||
weight_decay: 0.0002
|
||||
pt_model_kwargs:
|
||||
input_dim: 157
|
||||
dataset:
|
||||
class: DatasetH
|
||||
module_path: qlib.data.dataset
|
||||
kwargs:
|
||||
handler:
|
||||
class: Alpha158
|
||||
module_path: qlib.contrib.data.handler
|
||||
kwargs: *data_handler_config
|
||||
segments:
|
||||
train: [2008-01-01, 2014-12-31]
|
||||
valid: [2015-01-01, 2016-12-31]
|
||||
test: [2017-01-01, 2020-08-01]
|
||||
record:
|
||||
- class: SignalRecord
|
||||
module_path: qlib.workflow.record_temp
|
||||
kwargs:
|
||||
model: <MODEL>
|
||||
dataset: <DATASET>
|
||||
- class: SigAnaRecord
|
||||
module_path: qlib.workflow.record_temp
|
||||
kwargs:
|
||||
ana_long_short: False
|
||||
ann_scaler: 252
|
||||
- class: PortAnaRecord
|
||||
module_path: qlib.workflow.record_temp
|
||||
kwargs:
|
||||
config: *port_analysis_config
|
||||
@@ -1,6 +1,5 @@
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Union
|
||||
|
||||
@@ -36,10 +35,6 @@ class DDGDABench(DDGDA):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
kwargs = {}
|
||||
if os.environ.get("PROVIDER_URI", "") == "":
|
||||
GetData().qlib_data(exists_skip=True)
|
||||
else:
|
||||
kwargs["provider_uri"] = os.environ["PROVIDER_URI"]
|
||||
auto_init(**kwargs)
|
||||
GetData().qlib_data(exists_skip=True)
|
||||
auto_init()
|
||||
fire.Fire(DDGDABench)
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Union
|
||||
|
||||
@@ -32,10 +31,6 @@ class RollingBenchmark(Rolling):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
kwargs = {}
|
||||
if os.environ.get("PROVIDER_URI", "") == "":
|
||||
GetData().qlib_data(exists_skip=True)
|
||||
else:
|
||||
kwargs["provider_uri"] = os.environ["PROVIDER_URI"]
|
||||
auto_init(**kwargs)
|
||||
GetData().qlib_data(exists_skip=True)
|
||||
auto_init()
|
||||
fire.Fire(RollingBenchmark)
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
from qlib.contrib.data.loader import Alpha158DL, Alpha360DL
|
||||
from ...data.dataset.handler import DataHandlerLP
|
||||
from ...data.dataset.processor import Processor
|
||||
from ...utils import get_callable_kwargs
|
||||
@@ -67,7 +66,7 @@ class Alpha360(DataHandlerLP):
|
||||
"class": "QlibDataLoader",
|
||||
"kwargs": {
|
||||
"config": {
|
||||
"feature": Alpha360DL.get_feature_config(),
|
||||
"feature": self.get_feature_config(),
|
||||
"label": kwargs.pop("label", self.get_label_config()),
|
||||
},
|
||||
"filter_pipe": filter_pipe,
|
||||
@@ -89,6 +88,51 @@ class Alpha360(DataHandlerLP):
|
||||
def get_label_config(self):
|
||||
return ["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"]
|
||||
|
||||
@staticmethod
|
||||
def get_feature_config():
|
||||
# NOTE:
|
||||
# Alpha360 tries to provide a dataset with original price data
|
||||
# the original price data includes the prices and volume in the last 60 days.
|
||||
# To make it easier to learn models from this dataset, all the prices and volume
|
||||
# are normalized by the latest price and volume data ( dividing by $close, $volume)
|
||||
# So the latest normalized $close will be 1 (with name CLOSE0), the latest normalized $volume will be 1 (with name VOLUME0)
|
||||
# If further normalization are executed (e.g. centralization), CLOSE0 and VOLUME0 will be 0.
|
||||
fields = []
|
||||
names = []
|
||||
|
||||
for i in range(59, 0, -1):
|
||||
fields += ["Ref($close, %d)/$close" % i]
|
||||
names += ["CLOSE%d" % i]
|
||||
fields += ["$close/$close"]
|
||||
names += ["CLOSE0"]
|
||||
for i in range(59, 0, -1):
|
||||
fields += ["Ref($open, %d)/$close" % i]
|
||||
names += ["OPEN%d" % i]
|
||||
fields += ["$open/$close"]
|
||||
names += ["OPEN0"]
|
||||
for i in range(59, 0, -1):
|
||||
fields += ["Ref($high, %d)/$close" % i]
|
||||
names += ["HIGH%d" % i]
|
||||
fields += ["$high/$close"]
|
||||
names += ["HIGH0"]
|
||||
for i in range(59, 0, -1):
|
||||
fields += ["Ref($low, %d)/$close" % i]
|
||||
names += ["LOW%d" % i]
|
||||
fields += ["$low/$close"]
|
||||
names += ["LOW0"]
|
||||
for i in range(59, 0, -1):
|
||||
fields += ["Ref($vwap, %d)/$close" % i]
|
||||
names += ["VWAP%d" % i]
|
||||
fields += ["$vwap/$close"]
|
||||
names += ["VWAP0"]
|
||||
for i in range(59, 0, -1):
|
||||
fields += ["Ref($volume, %d)/($volume+1e-12)" % i]
|
||||
names += ["VOLUME%d" % i]
|
||||
fields += ["$volume/($volume+1e-12)"]
|
||||
names += ["VOLUME0"]
|
||||
|
||||
return fields, names
|
||||
|
||||
|
||||
class Alpha360vwap(Alpha360):
|
||||
def get_label_config(self):
|
||||
@@ -146,11 +190,242 @@ class Alpha158(DataHandlerLP):
|
||||
},
|
||||
"rolling": {},
|
||||
}
|
||||
return Alpha158DL.get_feature_config(conf)
|
||||
return self.parse_config_to_fields(conf)
|
||||
|
||||
def get_label_config(self):
|
||||
return ["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"]
|
||||
|
||||
@staticmethod
|
||||
def parse_config_to_fields(config):
|
||||
"""create factors from config
|
||||
|
||||
config = {
|
||||
'kbar': {}, # whether to use some hard-code kbar features
|
||||
'price': { # whether to use raw price features
|
||||
'windows': [0, 1, 2, 3, 4], # use price at n days ago
|
||||
'feature': ['OPEN', 'HIGH', 'LOW'] # which price field to use
|
||||
},
|
||||
'volume': { # whether to use raw volume features
|
||||
'windows': [0, 1, 2, 3, 4], # use volume at n days ago
|
||||
},
|
||||
'rolling': { # whether to use rolling operator based features
|
||||
'windows': [5, 10, 20, 30, 60], # rolling windows size
|
||||
'include': ['ROC', 'MA', 'STD'], # rolling operator to use
|
||||
#if include is None we will use default operators
|
||||
'exclude': ['RANK'], # rolling operator not to use
|
||||
}
|
||||
}
|
||||
"""
|
||||
fields = []
|
||||
names = []
|
||||
if "kbar" in config:
|
||||
fields += [
|
||||
"($close-$open)/$open",
|
||||
"($high-$low)/$open",
|
||||
"($close-$open)/($high-$low+1e-12)",
|
||||
"($high-Greater($open, $close))/$open",
|
||||
"($high-Greater($open, $close))/($high-$low+1e-12)",
|
||||
"(Less($open, $close)-$low)/$open",
|
||||
"(Less($open, $close)-$low)/($high-$low+1e-12)",
|
||||
"(2*$close-$high-$low)/$open",
|
||||
"(2*$close-$high-$low)/($high-$low+1e-12)",
|
||||
]
|
||||
names += [
|
||||
"KMID",
|
||||
"KLEN",
|
||||
"KMID2",
|
||||
"KUP",
|
||||
"KUP2",
|
||||
"KLOW",
|
||||
"KLOW2",
|
||||
"KSFT",
|
||||
"KSFT2",
|
||||
]
|
||||
if "price" in config:
|
||||
windows = config["price"].get("windows", range(5))
|
||||
feature = config["price"].get("feature", ["OPEN", "HIGH", "LOW", "CLOSE", "VWAP"])
|
||||
for field in feature:
|
||||
field = field.lower()
|
||||
fields += ["Ref($%s, %d)/$close" % (field, d) if d != 0 else "$%s/$close" % field for d in windows]
|
||||
names += [field.upper() + str(d) for d in windows]
|
||||
if "volume" in config:
|
||||
windows = config["volume"].get("windows", range(5))
|
||||
fields += ["Ref($volume, %d)/($volume+1e-12)" % d if d != 0 else "$volume/($volume+1e-12)" for d in windows]
|
||||
names += ["VOLUME" + str(d) for d in windows]
|
||||
if "rolling" in config:
|
||||
windows = config["rolling"].get("windows", [5, 10, 20, 30, 60])
|
||||
include = config["rolling"].get("include", None)
|
||||
exclude = config["rolling"].get("exclude", [])
|
||||
# `exclude` in dataset config unnecessary filed
|
||||
# `include` in dataset config necessary field
|
||||
|
||||
def use(x):
|
||||
return x not in exclude and (include is None or x in include)
|
||||
|
||||
# Some factor ref: https://guorn.com/static/upload/file/3/134065454575605.pdf
|
||||
if use("ROC"):
|
||||
# https://www.investopedia.com/terms/r/rateofchange.asp
|
||||
# Rate of change, the price change in the past d days, divided by latest close price to remove unit
|
||||
fields += ["Ref($close, %d)/$close" % d for d in windows]
|
||||
names += ["ROC%d" % d for d in windows]
|
||||
if use("MA"):
|
||||
# https://www.investopedia.com/ask/answers/071414/whats-difference-between-moving-average-and-weighted-moving-average.asp
|
||||
# Simple Moving Average, the simple moving average in the past d days, divided by latest close price to remove unit
|
||||
fields += ["Mean($close, %d)/$close" % d for d in windows]
|
||||
names += ["MA%d" % d for d in windows]
|
||||
if use("STD"):
|
||||
# The standard diviation of close price for the past d days, divided by latest close price to remove unit
|
||||
fields += ["Std($close, %d)/$close" % d for d in windows]
|
||||
names += ["STD%d" % d for d in windows]
|
||||
if use("BETA"):
|
||||
# The rate of close price change in the past d days, divided by latest close price to remove unit
|
||||
# For example, price increase 10 dollar per day in the past d days, then Slope will be 10.
|
||||
fields += ["Slope($close, %d)/$close" % d for d in windows]
|
||||
names += ["BETA%d" % d for d in windows]
|
||||
if use("RSQR"):
|
||||
# The R-sqaure value of linear regression for the past d days, represent the trend linear
|
||||
fields += ["Rsquare($close, %d)" % d for d in windows]
|
||||
names += ["RSQR%d" % d for d in windows]
|
||||
if use("RESI"):
|
||||
# The redisdual for linear regression for the past d days, represent the trend linearity for past d days.
|
||||
fields += ["Resi($close, %d)/$close" % d for d in windows]
|
||||
names += ["RESI%d" % d for d in windows]
|
||||
if use("MAX"):
|
||||
# The max price for past d days, divided by latest close price to remove unit
|
||||
fields += ["Max($high, %d)/$close" % d for d in windows]
|
||||
names += ["MAX%d" % d for d in windows]
|
||||
if use("LOW"):
|
||||
# The low price for past d days, divided by latest close price to remove unit
|
||||
fields += ["Min($low, %d)/$close" % d for d in windows]
|
||||
names += ["MIN%d" % d for d in windows]
|
||||
if use("QTLU"):
|
||||
# The 80% quantile of past d day's close price, divided by latest close price to remove unit
|
||||
# Used with MIN and MAX
|
||||
fields += ["Quantile($close, %d, 0.8)/$close" % d for d in windows]
|
||||
names += ["QTLU%d" % d for d in windows]
|
||||
if use("QTLD"):
|
||||
# The 20% quantile of past d day's close price, divided by latest close price to remove unit
|
||||
fields += ["Quantile($close, %d, 0.2)/$close" % d for d in windows]
|
||||
names += ["QTLD%d" % d for d in windows]
|
||||
if use("RANK"):
|
||||
# Get the percentile of current close price in past d day's close price.
|
||||
# Represent the current price level comparing to past N days, add additional information to moving average.
|
||||
fields += ["Rank($close, %d)" % d for d in windows]
|
||||
names += ["RANK%d" % d for d in windows]
|
||||
if use("RSV"):
|
||||
# Represent the price position between upper and lower resistent price for past d days.
|
||||
fields += ["($close-Min($low, %d))/(Max($high, %d)-Min($low, %d)+1e-12)" % (d, d, d) for d in windows]
|
||||
names += ["RSV%d" % d for d in windows]
|
||||
if use("IMAX"):
|
||||
# The number of days between current date and previous highest price date.
|
||||
# Part of Aroon Indicator https://www.investopedia.com/terms/a/aroon.asp
|
||||
# The indicator measures the time between highs and the time between lows over a time period.
|
||||
# The idea is that strong uptrends will regularly see new highs, and strong downtrends will regularly see new lows.
|
||||
fields += ["IdxMax($high, %d)/%d" % (d, d) for d in windows]
|
||||
names += ["IMAX%d" % d for d in windows]
|
||||
if use("IMIN"):
|
||||
# The number of days between current date and previous lowest price date.
|
||||
# Part of Aroon Indicator https://www.investopedia.com/terms/a/aroon.asp
|
||||
# The indicator measures the time between highs and the time between lows over a time period.
|
||||
# The idea is that strong uptrends will regularly see new highs, and strong downtrends will regularly see new lows.
|
||||
fields += ["IdxMin($low, %d)/%d" % (d, d) for d in windows]
|
||||
names += ["IMIN%d" % d for d in windows]
|
||||
if use("IMXD"):
|
||||
# The time period between previous lowest-price date occur after highest price date.
|
||||
# Large value suggest downward momemtum.
|
||||
fields += ["(IdxMax($high, %d)-IdxMin($low, %d))/%d" % (d, d, d) for d in windows]
|
||||
names += ["IMXD%d" % d for d in windows]
|
||||
if use("CORR"):
|
||||
# The correlation between absolute close price and log scaled trading volume
|
||||
fields += ["Corr($close, Log($volume+1), %d)" % d for d in windows]
|
||||
names += ["CORR%d" % d for d in windows]
|
||||
if use("CORD"):
|
||||
# The correlation between price change ratio and volume change ratio
|
||||
fields += ["Corr($close/Ref($close,1), Log($volume/Ref($volume, 1)+1), %d)" % d for d in windows]
|
||||
names += ["CORD%d" % d for d in windows]
|
||||
if use("CNTP"):
|
||||
# The percentage of days in past d days that price go up.
|
||||
fields += ["Mean($close>Ref($close, 1), %d)" % d for d in windows]
|
||||
names += ["CNTP%d" % d for d in windows]
|
||||
if use("CNTN"):
|
||||
# The percentage of days in past d days that price go down.
|
||||
fields += ["Mean($close<Ref($close, 1), %d)" % d for d in windows]
|
||||
names += ["CNTN%d" % d for d in windows]
|
||||
if use("CNTD"):
|
||||
# The diff between past up day and past down day
|
||||
fields += ["Mean($close>Ref($close, 1), %d)-Mean($close<Ref($close, 1), %d)" % (d, d) for d in windows]
|
||||
names += ["CNTD%d" % d for d in windows]
|
||||
if use("SUMP"):
|
||||
# The total gain / the absolute total price changed
|
||||
# Similar to RSI indicator. https://www.investopedia.com/terms/r/rsi.asp
|
||||
fields += [
|
||||
"Sum(Greater($close-Ref($close, 1), 0), %d)/(Sum(Abs($close-Ref($close, 1)), %d)+1e-12)" % (d, d)
|
||||
for d in windows
|
||||
]
|
||||
names += ["SUMP%d" % d for d in windows]
|
||||
if use("SUMN"):
|
||||
# The total lose / the absolute total price changed
|
||||
# Can be derived from SUMP by SUMN = 1 - SUMP
|
||||
# Similar to RSI indicator. https://www.investopedia.com/terms/r/rsi.asp
|
||||
fields += [
|
||||
"Sum(Greater(Ref($close, 1)-$close, 0), %d)/(Sum(Abs($close-Ref($close, 1)), %d)+1e-12)" % (d, d)
|
||||
for d in windows
|
||||
]
|
||||
names += ["SUMN%d" % d for d in windows]
|
||||
if use("SUMD"):
|
||||
# The diff ratio between total gain and total lose
|
||||
# Similar to RSI indicator. https://www.investopedia.com/terms/r/rsi.asp
|
||||
fields += [
|
||||
"(Sum(Greater($close-Ref($close, 1), 0), %d)-Sum(Greater(Ref($close, 1)-$close, 0), %d))"
|
||||
"/(Sum(Abs($close-Ref($close, 1)), %d)+1e-12)" % (d, d, d)
|
||||
for d in windows
|
||||
]
|
||||
names += ["SUMD%d" % d for d in windows]
|
||||
if use("VMA"):
|
||||
# Simple Volume Moving average: https://www.barchart.com/education/technical-indicators/volume_moving_average
|
||||
fields += ["Mean($volume, %d)/($volume+1e-12)" % d for d in windows]
|
||||
names += ["VMA%d" % d for d in windows]
|
||||
if use("VSTD"):
|
||||
# The standard deviation for volume in past d days.
|
||||
fields += ["Std($volume, %d)/($volume+1e-12)" % d for d in windows]
|
||||
names += ["VSTD%d" % d for d in windows]
|
||||
if use("WVMA"):
|
||||
# The volume weighted price change volatility
|
||||
fields += [
|
||||
"Std(Abs($close/Ref($close, 1)-1)*$volume, %d)/(Mean(Abs($close/Ref($close, 1)-1)*$volume, %d)+1e-12)"
|
||||
% (d, d)
|
||||
for d in windows
|
||||
]
|
||||
names += ["WVMA%d" % d for d in windows]
|
||||
if use("VSUMP"):
|
||||
# The total volume increase / the absolute total volume changed
|
||||
fields += [
|
||||
"Sum(Greater($volume-Ref($volume, 1), 0), %d)/(Sum(Abs($volume-Ref($volume, 1)), %d)+1e-12)"
|
||||
% (d, d)
|
||||
for d in windows
|
||||
]
|
||||
names += ["VSUMP%d" % d for d in windows]
|
||||
if use("VSUMN"):
|
||||
# The total volume increase / the absolute total volume changed
|
||||
# Can be derived from VSUMP by VSUMN = 1 - VSUMP
|
||||
fields += [
|
||||
"Sum(Greater(Ref($volume, 1)-$volume, 0), %d)/(Sum(Abs($volume-Ref($volume, 1)), %d)+1e-12)"
|
||||
% (d, d)
|
||||
for d in windows
|
||||
]
|
||||
names += ["VSUMN%d" % d for d in windows]
|
||||
if use("VSUMD"):
|
||||
# The diff ratio between total volume increase and total volume decrease
|
||||
# RSI indicator for volume
|
||||
fields += [
|
||||
"(Sum(Greater($volume-Ref($volume, 1), 0), %d)-Sum(Greater(Ref($volume, 1)-$volume, 0), %d))"
|
||||
"/(Sum(Abs($volume-Ref($volume, 1)), %d)+1e-12)" % (d, d, d)
|
||||
for d in windows
|
||||
]
|
||||
names += ["VSUMD%d" % d for d in windows]
|
||||
|
||||
return fields, names
|
||||
|
||||
|
||||
class Alpha158vwap(Alpha158):
|
||||
def get_label_config(self):
|
||||
|
||||
@@ -1,310 +0,0 @@
|
||||
from qlib.data.dataset.loader import QlibDataLoader
|
||||
|
||||
|
||||
class Alpha360DL(QlibDataLoader):
|
||||
"""Dataloader to get Alpha360"""
|
||||
|
||||
def __init__(self, config=None, **kwargs):
|
||||
_config = {
|
||||
"feature": self.get_feature_config(),
|
||||
}
|
||||
if config is not None:
|
||||
_config.update(config)
|
||||
super().__init__(config=_config, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def get_feature_config():
|
||||
# NOTE:
|
||||
# Alpha360 tries to provide a dataset with original price data
|
||||
# the original price data includes the prices and volume in the last 60 days.
|
||||
# To make it easier to learn models from this dataset, all the prices and volume
|
||||
# are normalized by the latest price and volume data ( dividing by $close, $volume)
|
||||
# So the latest normalized $close will be 1 (with name CLOSE0), the latest normalized $volume will be 1 (with name VOLUME0)
|
||||
# If further normalization are executed (e.g. centralization), CLOSE0 and VOLUME0 will be 0.
|
||||
fields = []
|
||||
names = []
|
||||
|
||||
for i in range(59, 0, -1):
|
||||
fields += ["Ref($close, %d)/$close" % i]
|
||||
names += ["CLOSE%d" % i]
|
||||
fields += ["$close/$close"]
|
||||
names += ["CLOSE0"]
|
||||
for i in range(59, 0, -1):
|
||||
fields += ["Ref($open, %d)/$close" % i]
|
||||
names += ["OPEN%d" % i]
|
||||
fields += ["$open/$close"]
|
||||
names += ["OPEN0"]
|
||||
for i in range(59, 0, -1):
|
||||
fields += ["Ref($high, %d)/$close" % i]
|
||||
names += ["HIGH%d" % i]
|
||||
fields += ["$high/$close"]
|
||||
names += ["HIGH0"]
|
||||
for i in range(59, 0, -1):
|
||||
fields += ["Ref($low, %d)/$close" % i]
|
||||
names += ["LOW%d" % i]
|
||||
fields += ["$low/$close"]
|
||||
names += ["LOW0"]
|
||||
for i in range(59, 0, -1):
|
||||
fields += ["Ref($vwap, %d)/$close" % i]
|
||||
names += ["VWAP%d" % i]
|
||||
fields += ["$vwap/$close"]
|
||||
names += ["VWAP0"]
|
||||
for i in range(59, 0, -1):
|
||||
fields += ["Ref($volume, %d)/($volume+1e-12)" % i]
|
||||
names += ["VOLUME%d" % i]
|
||||
fields += ["$volume/($volume+1e-12)"]
|
||||
names += ["VOLUME0"]
|
||||
|
||||
return fields, names
|
||||
|
||||
|
||||
class Alpha158DL(QlibDataLoader):
|
||||
"""Dataloader to get Alpha158"""
|
||||
|
||||
def __init__(self, config=None, **kwargs):
|
||||
_config = {
|
||||
"feature": self.get_feature_config(),
|
||||
}
|
||||
if config is not None:
|
||||
_config.update(config)
|
||||
super().__init__(config=_config, **kwargs)
|
||||
|
||||
@staticmethod
|
||||
def get_feature_config(
|
||||
config={
|
||||
"kbar": {},
|
||||
"price": {
|
||||
"windows": [0],
|
||||
"feature": ["OPEN", "HIGH", "LOW", "VWAP"],
|
||||
},
|
||||
"rolling": {},
|
||||
}
|
||||
):
|
||||
"""create factors from config
|
||||
|
||||
config = {
|
||||
'kbar': {}, # whether to use some hard-code kbar features
|
||||
'price': { # whether to use raw price features
|
||||
'windows': [0, 1, 2, 3, 4], # use price at n days ago
|
||||
'feature': ['OPEN', 'HIGH', 'LOW'] # which price field to use
|
||||
},
|
||||
'volume': { # whether to use raw volume features
|
||||
'windows': [0, 1, 2, 3, 4], # use volume at n days ago
|
||||
},
|
||||
'rolling': { # whether to use rolling operator based features
|
||||
'windows': [5, 10, 20, 30, 60], # rolling windows size
|
||||
'include': ['ROC', 'MA', 'STD'], # rolling operator to use
|
||||
#if include is None we will use default operators
|
||||
'exclude': ['RANK'], # rolling operator not to use
|
||||
}
|
||||
}
|
||||
"""
|
||||
fields = []
|
||||
names = []
|
||||
if "kbar" in config:
|
||||
fields += [
|
||||
"($close-$open)/$open",
|
||||
"($high-$low)/$open",
|
||||
"($close-$open)/($high-$low+1e-12)",
|
||||
"($high-Greater($open, $close))/$open",
|
||||
"($high-Greater($open, $close))/($high-$low+1e-12)",
|
||||
"(Less($open, $close)-$low)/$open",
|
||||
"(Less($open, $close)-$low)/($high-$low+1e-12)",
|
||||
"(2*$close-$high-$low)/$open",
|
||||
"(2*$close-$high-$low)/($high-$low+1e-12)",
|
||||
]
|
||||
names += [
|
||||
"KMID",
|
||||
"KLEN",
|
||||
"KMID2",
|
||||
"KUP",
|
||||
"KUP2",
|
||||
"KLOW",
|
||||
"KLOW2",
|
||||
"KSFT",
|
||||
"KSFT2",
|
||||
]
|
||||
if "price" in config:
|
||||
windows = config["price"].get("windows", range(5))
|
||||
feature = config["price"].get("feature", ["OPEN", "HIGH", "LOW", "CLOSE", "VWAP"])
|
||||
for field in feature:
|
||||
field = field.lower()
|
||||
fields += ["Ref($%s, %d)/$close" % (field, d) if d != 0 else "$%s/$close" % field for d in windows]
|
||||
names += [field.upper() + str(d) for d in windows]
|
||||
if "volume" in config:
|
||||
windows = config["volume"].get("windows", range(5))
|
||||
fields += ["Ref($volume, %d)/($volume+1e-12)" % d if d != 0 else "$volume/($volume+1e-12)" for d in windows]
|
||||
names += ["VOLUME" + str(d) for d in windows]
|
||||
if "rolling" in config:
|
||||
windows = config["rolling"].get("windows", [5, 10, 20, 30, 60])
|
||||
include = config["rolling"].get("include", None)
|
||||
exclude = config["rolling"].get("exclude", [])
|
||||
# `exclude` in dataset config unnecessary filed
|
||||
# `include` in dataset config necessary field
|
||||
|
||||
def use(x):
|
||||
return x not in exclude and (include is None or x in include)
|
||||
|
||||
# Some factor ref: https://guorn.com/static/upload/file/3/134065454575605.pdf
|
||||
if use("ROC"):
|
||||
# https://www.investopedia.com/terms/r/rateofchange.asp
|
||||
# Rate of change, the price change in the past d days, divided by latest close price to remove unit
|
||||
fields += ["Ref($close, %d)/$close" % d for d in windows]
|
||||
names += ["ROC%d" % d for d in windows]
|
||||
if use("MA"):
|
||||
# https://www.investopedia.com/ask/answers/071414/whats-difference-between-moving-average-and-weighted-moving-average.asp
|
||||
# Simple Moving Average, the simple moving average in the past d days, divided by latest close price to remove unit
|
||||
fields += ["Mean($close, %d)/$close" % d for d in windows]
|
||||
names += ["MA%d" % d for d in windows]
|
||||
if use("STD"):
|
||||
# The standard diviation of close price for the past d days, divided by latest close price to remove unit
|
||||
fields += ["Std($close, %d)/$close" % d for d in windows]
|
||||
names += ["STD%d" % d for d in windows]
|
||||
if use("BETA"):
|
||||
# The rate of close price change in the past d days, divided by latest close price to remove unit
|
||||
# For example, price increase 10 dollar per day in the past d days, then Slope will be 10.
|
||||
fields += ["Slope($close, %d)/$close" % d for d in windows]
|
||||
names += ["BETA%d" % d for d in windows]
|
||||
if use("RSQR"):
|
||||
# The R-sqaure value of linear regression for the past d days, represent the trend linear
|
||||
fields += ["Rsquare($close, %d)" % d for d in windows]
|
||||
names += ["RSQR%d" % d for d in windows]
|
||||
if use("RESI"):
|
||||
# The redisdual for linear regression for the past d days, represent the trend linearity for past d days.
|
||||
fields += ["Resi($close, %d)/$close" % d for d in windows]
|
||||
names += ["RESI%d" % d for d in windows]
|
||||
if use("MAX"):
|
||||
# The max price for past d days, divided by latest close price to remove unit
|
||||
fields += ["Max($high, %d)/$close" % d for d in windows]
|
||||
names += ["MAX%d" % d for d in windows]
|
||||
if use("LOW"):
|
||||
# The low price for past d days, divided by latest close price to remove unit
|
||||
fields += ["Min($low, %d)/$close" % d for d in windows]
|
||||
names += ["MIN%d" % d for d in windows]
|
||||
if use("QTLU"):
|
||||
# The 80% quantile of past d day's close price, divided by latest close price to remove unit
|
||||
# Used with MIN and MAX
|
||||
fields += ["Quantile($close, %d, 0.8)/$close" % d for d in windows]
|
||||
names += ["QTLU%d" % d for d in windows]
|
||||
if use("QTLD"):
|
||||
# The 20% quantile of past d day's close price, divided by latest close price to remove unit
|
||||
fields += ["Quantile($close, %d, 0.2)/$close" % d for d in windows]
|
||||
names += ["QTLD%d" % d for d in windows]
|
||||
if use("RANK"):
|
||||
# Get the percentile of current close price in past d day's close price.
|
||||
# Represent the current price level comparing to past N days, add additional information to moving average.
|
||||
fields += ["Rank($close, %d)" % d for d in windows]
|
||||
names += ["RANK%d" % d for d in windows]
|
||||
if use("RSV"):
|
||||
# Represent the price position between upper and lower resistent price for past d days.
|
||||
fields += ["($close-Min($low, %d))/(Max($high, %d)-Min($low, %d)+1e-12)" % (d, d, d) for d in windows]
|
||||
names += ["RSV%d" % d for d in windows]
|
||||
if use("IMAX"):
|
||||
# The number of days between current date and previous highest price date.
|
||||
# Part of Aroon Indicator https://www.investopedia.com/terms/a/aroon.asp
|
||||
# The indicator measures the time between highs and the time between lows over a time period.
|
||||
# The idea is that strong uptrends will regularly see new highs, and strong downtrends will regularly see new lows.
|
||||
fields += ["IdxMax($high, %d)/%d" % (d, d) for d in windows]
|
||||
names += ["IMAX%d" % d for d in windows]
|
||||
if use("IMIN"):
|
||||
# The number of days between current date and previous lowest price date.
|
||||
# Part of Aroon Indicator https://www.investopedia.com/terms/a/aroon.asp
|
||||
# The indicator measures the time between highs and the time between lows over a time period.
|
||||
# The idea is that strong uptrends will regularly see new highs, and strong downtrends will regularly see new lows.
|
||||
fields += ["IdxMin($low, %d)/%d" % (d, d) for d in windows]
|
||||
names += ["IMIN%d" % d for d in windows]
|
||||
if use("IMXD"):
|
||||
# The time period between previous lowest-price date occur after highest price date.
|
||||
# Large value suggest downward momemtum.
|
||||
fields += ["(IdxMax($high, %d)-IdxMin($low, %d))/%d" % (d, d, d) for d in windows]
|
||||
names += ["IMXD%d" % d for d in windows]
|
||||
if use("CORR"):
|
||||
# The correlation between absolute close price and log scaled trading volume
|
||||
fields += ["Corr($close, Log($volume+1), %d)" % d for d in windows]
|
||||
names += ["CORR%d" % d for d in windows]
|
||||
if use("CORD"):
|
||||
# The correlation between price change ratio and volume change ratio
|
||||
fields += ["Corr($close/Ref($close,1), Log($volume/Ref($volume, 1)+1), %d)" % d for d in windows]
|
||||
names += ["CORD%d" % d for d in windows]
|
||||
if use("CNTP"):
|
||||
# The percentage of days in past d days that price go up.
|
||||
fields += ["Mean($close>Ref($close, 1), %d)" % d for d in windows]
|
||||
names += ["CNTP%d" % d for d in windows]
|
||||
if use("CNTN"):
|
||||
# The percentage of days in past d days that price go down.
|
||||
fields += ["Mean($close<Ref($close, 1), %d)" % d for d in windows]
|
||||
names += ["CNTN%d" % d for d in windows]
|
||||
if use("CNTD"):
|
||||
# The diff between past up day and past down day
|
||||
fields += ["Mean($close>Ref($close, 1), %d)-Mean($close<Ref($close, 1), %d)" % (d, d) for d in windows]
|
||||
names += ["CNTD%d" % d for d in windows]
|
||||
if use("SUMP"):
|
||||
# The total gain / the absolute total price changed
|
||||
# Similar to RSI indicator. https://www.investopedia.com/terms/r/rsi.asp
|
||||
fields += [
|
||||
"Sum(Greater($close-Ref($close, 1), 0), %d)/(Sum(Abs($close-Ref($close, 1)), %d)+1e-12)" % (d, d)
|
||||
for d in windows
|
||||
]
|
||||
names += ["SUMP%d" % d for d in windows]
|
||||
if use("SUMN"):
|
||||
# The total lose / the absolute total price changed
|
||||
# Can be derived from SUMP by SUMN = 1 - SUMP
|
||||
# Similar to RSI indicator. https://www.investopedia.com/terms/r/rsi.asp
|
||||
fields += [
|
||||
"Sum(Greater(Ref($close, 1)-$close, 0), %d)/(Sum(Abs($close-Ref($close, 1)), %d)+1e-12)" % (d, d)
|
||||
for d in windows
|
||||
]
|
||||
names += ["SUMN%d" % d for d in windows]
|
||||
if use("SUMD"):
|
||||
# The diff ratio between total gain and total lose
|
||||
# Similar to RSI indicator. https://www.investopedia.com/terms/r/rsi.asp
|
||||
fields += [
|
||||
"(Sum(Greater($close-Ref($close, 1), 0), %d)-Sum(Greater(Ref($close, 1)-$close, 0), %d))"
|
||||
"/(Sum(Abs($close-Ref($close, 1)), %d)+1e-12)" % (d, d, d)
|
||||
for d in windows
|
||||
]
|
||||
names += ["SUMD%d" % d for d in windows]
|
||||
if use("VMA"):
|
||||
# Simple Volume Moving average: https://www.barchart.com/education/technical-indicators/volume_moving_average
|
||||
fields += ["Mean($volume, %d)/($volume+1e-12)" % d for d in windows]
|
||||
names += ["VMA%d" % d for d in windows]
|
||||
if use("VSTD"):
|
||||
# The standard deviation for volume in past d days.
|
||||
fields += ["Std($volume, %d)/($volume+1e-12)" % d for d in windows]
|
||||
names += ["VSTD%d" % d for d in windows]
|
||||
if use("WVMA"):
|
||||
# The volume weighted price change volatility
|
||||
fields += [
|
||||
"Std(Abs($close/Ref($close, 1)-1)*$volume, %d)/(Mean(Abs($close/Ref($close, 1)-1)*$volume, %d)+1e-12)"
|
||||
% (d, d)
|
||||
for d in windows
|
||||
]
|
||||
names += ["WVMA%d" % d for d in windows]
|
||||
if use("VSUMP"):
|
||||
# The total volume increase / the absolute total volume changed
|
||||
fields += [
|
||||
"Sum(Greater($volume-Ref($volume, 1), 0), %d)/(Sum(Abs($volume-Ref($volume, 1)), %d)+1e-12)"
|
||||
% (d, d)
|
||||
for d in windows
|
||||
]
|
||||
names += ["VSUMP%d" % d for d in windows]
|
||||
if use("VSUMN"):
|
||||
# The total volume increase / the absolute total volume changed
|
||||
# Can be derived from VSUMP by VSUMN = 1 - VSUMP
|
||||
fields += [
|
||||
"Sum(Greater(Ref($volume, 1)-$volume, 0), %d)/(Sum(Abs($volume-Ref($volume, 1)), %d)+1e-12)"
|
||||
% (d, d)
|
||||
for d in windows
|
||||
]
|
||||
names += ["VSUMN%d" % d for d in windows]
|
||||
if use("VSUMD"):
|
||||
# The diff ratio between total volume increase and total volume decrease
|
||||
# RSI indicator for volume
|
||||
fields += [
|
||||
"(Sum(Greater($volume-Ref($volume, 1), 0), %d)-Sum(Greater(Ref($volume, 1)-$volume, 0), %d))"
|
||||
"/(Sum(Abs($volume-Ref($volume, 1)), %d)+1e-12)" % (d, d, d)
|
||||
for d in windows
|
||||
]
|
||||
names += ["VSUMD%d" % d for d in windows]
|
||||
|
||||
return fields, names
|
||||
@@ -243,7 +243,7 @@ class MetaDatasetDS(MetaTaskDataset):
|
||||
trunc_days: int = None,
|
||||
rolling_ext_days: int = 0,
|
||||
exp_name: Union[str, InternalData],
|
||||
segments: Union[Dict[Text, Tuple], float, str],
|
||||
segments: Union[Dict[Text, Tuple], float],
|
||||
hist_step_n: int = 10,
|
||||
task_mode: str = MetaTask.PROC_MODE_FULL,
|
||||
fill_method: str = "max",
|
||||
@@ -271,16 +271,12 @@ class MetaDatasetDS(MetaTaskDataset):
|
||||
- str: the name of the experiment to store the performance of data
|
||||
- InternalData: a prepared internal data
|
||||
segments: Union[Dict[Text, Tuple], float]
|
||||
if the segment is a Dict
|
||||
the segments to divide data
|
||||
both left and right are included
|
||||
the segments to divide data
|
||||
both left and right
|
||||
if segments is a float:
|
||||
the float represents the percentage of data for training
|
||||
if segments is a string:
|
||||
it will try its best to put its data in training and ensure that the date `segments` is in the test set
|
||||
hist_step_n: int
|
||||
length of historical steps for the meta infomation
|
||||
Number of steps of the data similarity information
|
||||
task_mode : str
|
||||
Please refer to the docs of MetaTask
|
||||
"""
|
||||
@@ -387,30 +383,10 @@ class MetaDatasetDS(MetaTaskDataset):
|
||||
if isinstance(self.segments, float):
|
||||
train_task_n = int(len(self.meta_task_l) * self.segments)
|
||||
if segment == "train":
|
||||
train_tasks = self.meta_task_l[:train_task_n]
|
||||
get_module_logger("MetaDatasetDS").info(f"The first train meta task: {train_tasks[0]}")
|
||||
return train_tasks
|
||||
return self.meta_task_l[:train_task_n]
|
||||
elif segment == "test":
|
||||
test_tasks = self.meta_task_l[train_task_n:]
|
||||
get_module_logger("MetaDatasetDS").info(f"The first test meta task: {test_tasks[0]}")
|
||||
return test_tasks
|
||||
return self.meta_task_l[train_task_n:]
|
||||
else:
|
||||
raise NotImplementedError(f"This type of input is not supported")
|
||||
elif isinstance(self.segments, str):
|
||||
train_tasks = []
|
||||
test_tasks = []
|
||||
for t in self.meta_task_l:
|
||||
test_end = t.task["dataset"]["kwargs"]["segments"]["test"][1]
|
||||
if test_end is None or pd.Timestamp(test_end) < pd.Timestamp(self.segments):
|
||||
train_tasks.append(t)
|
||||
else:
|
||||
test_tasks.append(t)
|
||||
get_module_logger("MetaDatasetDS").info(f"The first train meta task: {train_tasks[0]}")
|
||||
get_module_logger("MetaDatasetDS").info(f"The first test meta task: {test_tasks[0]}")
|
||||
if segment == "train":
|
||||
return train_tasks
|
||||
elif segment == "test":
|
||||
return test_tasks
|
||||
raise NotImplementedError(f"This type of input is not supported")
|
||||
else:
|
||||
raise NotImplementedError(f"This type of input is not supported")
|
||||
|
||||
@@ -53,12 +53,7 @@ class MetaModelDS(MetaTaskModel):
|
||||
max_epoch=100,
|
||||
seed=43,
|
||||
alpha=0.0,
|
||||
loss_skip_thresh=50,
|
||||
):
|
||||
"""
|
||||
loss_skip_size: int
|
||||
The number of threshold to skip the loss calculation for each day.
|
||||
"""
|
||||
self.step = step
|
||||
self.hist_step_n = hist_step_n
|
||||
self.clip_method = clip_method
|
||||
@@ -68,7 +63,6 @@ class MetaModelDS(MetaTaskModel):
|
||||
self.max_epoch = max_epoch
|
||||
self.fitted = False
|
||||
self.alpha = alpha
|
||||
self.loss_skip_thresh = loss_skip_thresh
|
||||
torch.manual_seed(seed)
|
||||
|
||||
def run_epoch(self, phase, task_list, epoch, opt, loss_l, ignore_weight=False):
|
||||
@@ -94,14 +88,12 @@ class MetaModelDS(MetaTaskModel):
|
||||
criterion = nn.MSELoss()
|
||||
loss = criterion(pred, meta_input["y_test"])
|
||||
elif self.criterion == "ic_loss":
|
||||
criterion = ICLoss(self.loss_skip_thresh)
|
||||
criterion = ICLoss()
|
||||
try:
|
||||
loss = criterion(pred, meta_input["y_test"], meta_input["test_idx"])
|
||||
loss = criterion(pred, meta_input["y_test"], meta_input["test_idx"], skip_size=50)
|
||||
except ValueError as e:
|
||||
get_module_logger("MetaModelDS").warning(f"Exception `{e}` when calculating IC loss")
|
||||
continue
|
||||
else:
|
||||
raise ValueError(f"Unknown criterion: {self.criterion}")
|
||||
|
||||
assert not np.isnan(loss.detach().item()), "NaN loss!"
|
||||
|
||||
|
||||
@@ -10,11 +10,7 @@ from qlib.log import get_module_logger
|
||||
|
||||
|
||||
class ICLoss(nn.Module):
|
||||
def __init__(self, skip_size=50):
|
||||
super().__init__()
|
||||
self.skip_size = skip_size
|
||||
|
||||
def forward(self, pred, y, idx):
|
||||
def forward(self, pred, y, idx, skip_size=50):
|
||||
"""forward.
|
||||
FIXME:
|
||||
- Some times it will be a slightly different from the result from `pandas.corr()`
|
||||
@@ -37,7 +33,7 @@ class ICLoss(nn.Module):
|
||||
skip_n = 0
|
||||
for start_i, end_i in zip(diff_point, diff_point[1:]):
|
||||
pred_focus = pred[start_i:end_i] # TODO: just for fake
|
||||
if pred_focus.shape[0] < self.skip_size:
|
||||
if pred_focus.shape[0] < skip_size:
|
||||
# skip some days which have very small amount of stock.
|
||||
skip_n += 1
|
||||
continue
|
||||
@@ -54,7 +50,6 @@ class ICLoss(nn.Module):
|
||||
)
|
||||
ic_all += ic_day
|
||||
if len(diff_point) - 1 - skip_n <= 0:
|
||||
__import__("ipdb").set_trace()
|
||||
raise ValueError("No enough data for calculating IC")
|
||||
if skip_n > 0:
|
||||
get_module_logger("ICLoss").info(
|
||||
|
||||
@@ -63,7 +63,6 @@ class LinearModel(Model):
|
||||
df_train = pd.concat([df_train, df_valid])
|
||||
except KeyError:
|
||||
get_module_logger("LinearModel").info("include_valid=True, but valid does not exist")
|
||||
df_train = df_train.dropna()
|
||||
if df_train.empty:
|
||||
raise ValueError("Empty data from dataset, please check your dataset config.")
|
||||
if reweighter is not None:
|
||||
|
||||
@@ -1,663 +0,0 @@
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
from torch.utils.data import DataLoader, RandomSampler, StackDataset
|
||||
|
||||
|
||||
import os
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from typing import Callable, Optional, Text, Union
|
||||
from sklearn.metrics import roc_auc_score, mean_squared_error
|
||||
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import torch.optim as optim
|
||||
from torch.utils.data import StackDataset
|
||||
|
||||
from qlib.data.dataset.weight import Reweighter
|
||||
|
||||
from .pytorch_utils import count_parameters
|
||||
from ...model.base import Model
|
||||
from ...data.dataset import DatasetH, TSDatasetH
|
||||
from ...data.dataset.handler import DataHandlerLP
|
||||
from ...utils import (
|
||||
auto_filter_kwargs,
|
||||
init_instance_by_config,
|
||||
unpack_archive_with_buffer,
|
||||
save_multiple_parts_file,
|
||||
get_or_create_path,
|
||||
)
|
||||
from ...log import get_module_logger
|
||||
from ...workflow import R
|
||||
from qlib.contrib.meta.data_selection.utils import ICLoss
|
||||
from torch.nn import DataParallel
|
||||
|
||||
|
||||
class GeneralPTNN(Model):
|
||||
"""General Pytorch Neural Network Model
|
||||
Parameters
|
||||
----------
|
||||
input_dim : int
|
||||
input dimension
|
||||
output_dim : int
|
||||
output dimension
|
||||
layers : tuple
|
||||
layer sizes
|
||||
lr : float
|
||||
learning rate
|
||||
optimizer : str
|
||||
optimizer name
|
||||
GPU : int
|
||||
the GPU ID used for training
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
lr=0.001,
|
||||
max_steps=300,
|
||||
batch_size=2000,
|
||||
early_stop_rounds=50,
|
||||
eval_steps=20,
|
||||
optimizer="gd",
|
||||
loss="mse",
|
||||
GPU=0,
|
||||
seed=None,
|
||||
weight_decay=0.0,
|
||||
data_parall=False,
|
||||
scheduler: Optional[Union[Callable]] = "default", # when it is Callable, it accept one argument named optimizer
|
||||
init_model=None,
|
||||
eval_train_metric=False,
|
||||
pt_model_uri="qlib.contrib.model.pytorch_nn.Net",
|
||||
pt_model_kwargs={
|
||||
"input_dim": 360,
|
||||
"layers": (256,),
|
||||
},
|
||||
valid_key=DataHandlerLP.DK_L,
|
||||
# TODO: Infer Key is a more reasonable key. But it requires more detailed processing on label processing
|
||||
):
|
||||
# Set logger.
|
||||
self.logger = get_module_logger("DNNModelPytorch")
|
||||
self.logger.info("DNN pytorch version...")
|
||||
|
||||
# set hyper-parameters.
|
||||
self.lr = lr
|
||||
self.max_steps = max_steps
|
||||
self.batch_size = batch_size
|
||||
self.early_stop_rounds = early_stop_rounds
|
||||
self.eval_steps = eval_steps
|
||||
self.optimizer = optimizer.lower()
|
||||
self.loss_type = loss
|
||||
if isinstance(GPU, str):
|
||||
self.device = torch.device(GPU)
|
||||
else:
|
||||
self.device = torch.device("cuda:%d" % (GPU) if torch.cuda.is_available() and GPU >= 0 else "cpu")
|
||||
self.seed = seed
|
||||
self.weight_decay = weight_decay
|
||||
self.data_parall = data_parall
|
||||
self.eval_train_metric = eval_train_metric
|
||||
self.valid_key = valid_key
|
||||
|
||||
self.best_step = None
|
||||
|
||||
self.logger.info(
|
||||
"DNN parameters setting:"
|
||||
f"\nlr : {lr}"
|
||||
f"\nmax_steps : {max_steps}"
|
||||
f"\nbatch_size : {batch_size}"
|
||||
f"\nearly_stop_rounds : {early_stop_rounds}"
|
||||
f"\neval_steps : {eval_steps}"
|
||||
f"\noptimizer : {optimizer}"
|
||||
f"\nloss_type : {loss}"
|
||||
f"\nseed : {seed}"
|
||||
f"\ndevice : {self.device}"
|
||||
f"\nuse_GPU : {self.use_gpu}"
|
||||
f"\nweight_decay : {weight_decay}"
|
||||
f"\nenable data parall : {self.data_parall}"
|
||||
f"\npt_model_uri: {pt_model_uri}"
|
||||
f"\npt_model_kwargs: {pt_model_kwargs}"
|
||||
)
|
||||
|
||||
if self.seed is not None:
|
||||
np.random.seed(self.seed)
|
||||
torch.manual_seed(self.seed)
|
||||
|
||||
if loss not in {"mse", "binary"}:
|
||||
raise NotImplementedError("loss {} is not supported!".format(loss))
|
||||
self._scorer = mean_squared_error if loss == "mse" else roc_auc_score
|
||||
|
||||
if init_model is None:
|
||||
self.dnn_model = init_instance_by_config({"class": pt_model_uri, "kwargs": pt_model_kwargs})
|
||||
|
||||
if self.data_parall:
|
||||
self.dnn_model = DataParallel(self.dnn_model).to(self.device)
|
||||
else:
|
||||
self.dnn_model = init_model
|
||||
|
||||
self.logger.info("model:\n{:}".format(self.dnn_model))
|
||||
self.logger.info("model size: {:.4f} MB".format(count_parameters(self.dnn_model)))
|
||||
|
||||
if optimizer.lower() == "adam":
|
||||
self.train_optimizer = optim.Adam(self.dnn_model.parameters(), lr=self.lr, weight_decay=self.weight_decay)
|
||||
elif optimizer.lower() == "gd":
|
||||
self.train_optimizer = optim.SGD(self.dnn_model.parameters(), lr=self.lr, weight_decay=self.weight_decay)
|
||||
else:
|
||||
raise NotImplementedError("optimizer {} is not supported!".format(optimizer))
|
||||
|
||||
if scheduler == "default":
|
||||
# Reduce learning rate when loss has stopped decrease
|
||||
self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
|
||||
self.train_optimizer,
|
||||
mode="min",
|
||||
factor=0.5,
|
||||
patience=10,
|
||||
verbose=True,
|
||||
threshold=0.0001,
|
||||
threshold_mode="rel",
|
||||
cooldown=0,
|
||||
min_lr=0.00001,
|
||||
eps=1e-08,
|
||||
)
|
||||
elif scheduler is None:
|
||||
self.scheduler = None
|
||||
else:
|
||||
self.scheduler = scheduler(optimizer=self.train_optimizer)
|
||||
|
||||
self.dnn_model.to(self.device)
|
||||
|
||||
@property
|
||||
def use_gpu(self):
|
||||
return self.device != torch.device("cpu")
|
||||
|
||||
|
||||
def _eval_valid_dl(self, valid_loader, val_index):
|
||||
with torch.no_grad():
|
||||
self.dnn_model.eval()
|
||||
val_loss = []
|
||||
val_pred = []
|
||||
val_label = []
|
||||
for x_batch, y_batch in valid_loader:
|
||||
x_batch = x_batch.to(self.device)
|
||||
y_batch = y_batch.to(self.device)
|
||||
cur_loss = self.get_loss(preds, y_batch, self.loss_type)
|
||||
val_loss.append(cur_loss.detach().cpu().numpy().item())
|
||||
val_loss = np.mean(val_loss)
|
||||
val_pred = torch.cat(val_pred, axis=0).detach().cpu().numpy()
|
||||
val_label = torch.cat(val_label, axis=0).detach().cpu().numpy()
|
||||
val_metric = self.get_metric(val_pred, val_label, val_index).detach().cpu().numpy().item()
|
||||
return val_loss, val_metric
|
||||
|
||||
def fit(
|
||||
self,
|
||||
dataset: Union[DatasetH, TSDatasetH],
|
||||
verbose=True,
|
||||
save_path=None,
|
||||
):
|
||||
|
||||
ists = isinstance(dataset, TSDatasetH) # is this time series dataset
|
||||
|
||||
# prepare training
|
||||
train_x = dataset.prepare("train", col_set="feature", data_key=DataHandlerLP.DK_L)
|
||||
train_y = dataset.prepare("train", col_set="label", data_key=DataHandlerLP.DK_L)
|
||||
train_ds = StackDataset(train_x, train_y)
|
||||
train_sampler = RandomSampler(train_ds)
|
||||
train_loader = DataLoader(train_ds, batch_size=self.batch_size, sampler=train_sampler)
|
||||
|
||||
# prepare validation
|
||||
valid_x = dataset.prepare("train", col_set="feature", data_key=DataHandlerLP.DK_L)
|
||||
valid_y = dataset.prepare("train", col_set="label", data_key=DataHandlerLP.DK_L)
|
||||
valid_ds = StackDataset(valid_x, valid_y)
|
||||
valid_loader = DataLoader(valid_ds, batch_size=self.batch_size, shuffle=False)
|
||||
if ists:
|
||||
val_index = valid_x.data_index
|
||||
else:
|
||||
val_index = valid_x.index
|
||||
|
||||
|
||||
save_path = get_or_create_path(save_path)
|
||||
stop_steps = 0
|
||||
train_loss = 0
|
||||
best_loss = np.inf
|
||||
# train
|
||||
self.logger.info("training...")
|
||||
|
||||
|
||||
for step in range(1, self.max_steps + 1):
|
||||
if stop_steps >= self.early_stop_rounds:
|
||||
if verbose:
|
||||
self.logger.info("\tearly stop")
|
||||
break
|
||||
loss = AverageMeter()
|
||||
self.dnn_model.train()
|
||||
self.train_optimizer.zero_grad()
|
||||
|
||||
for x_batch, y_batch in train_loader:
|
||||
x_batch = x_batch.to(self.device)
|
||||
y_batch = y_batch.to(self.device)
|
||||
|
||||
# forward
|
||||
preds = self.dnn_model(x_batch)
|
||||
cur_loss = self.get_loss(preds, y_batch, self.loss_type)
|
||||
cur_loss.backward()
|
||||
self.train_optimizer.step()
|
||||
loss.update(cur_loss.item())
|
||||
R.log_metrics(train_loss=loss.avg, step=step)
|
||||
|
||||
# validation
|
||||
train_loss += loss.val
|
||||
# for every `eval_steps` steps or at the last steps, we will evaluate the model.
|
||||
if step % self.eval_steps == 0 or step == self.max_steps:
|
||||
stop_steps += 1
|
||||
train_loss /= self.eval_steps
|
||||
|
||||
val_loss, val_metric = self._eval_valid_dl(valid_loader, val_index)
|
||||
R.log_metrics(val_loss=val_loss, step=step)
|
||||
R.log_metrics(val_metric=val_metric, step=step)
|
||||
|
||||
if val_loss < best_loss:
|
||||
if verbose:
|
||||
self.logger.info(
|
||||
"\tvalid loss update from {:.6f} to {:.6f}, save checkpoint.".format(
|
||||
best_loss, val_loss
|
||||
)
|
||||
)
|
||||
best_loss = val_loss
|
||||
self.best_step = step
|
||||
R.log_metrics(best_step=self.best_step, step=step)
|
||||
stop_steps = 0
|
||||
torch.save(self.dnn_model.state_dict(), save_path)
|
||||
train_loss = 0
|
||||
# update learning rate
|
||||
if self.scheduler is not None:
|
||||
auto_filter_kwargs(self.scheduler.step, warning=False)(metrics=val_loss, epoch=step)
|
||||
R.log_metrics(lr=self.get_lr(), step=step)
|
||||
|
||||
# restore the optimal parameters after training
|
||||
self.dnn_model.load_state_dict(torch.load(save_path, map_location=self.device))
|
||||
if self.use_gpu:
|
||||
torch.cuda.empty_cache()
|
||||
|
||||
def get_lr(self):
|
||||
assert len(self.train_optimizer.param_groups) == 1
|
||||
return self.train_optimizer.param_groups[0]["lr"]
|
||||
|
||||
def get_loss(self, pred, target, loss_type, w=None):
|
||||
pred, target = pred.reshape(-1), target.reshape(-1)
|
||||
if w is None:
|
||||
# make it ones and the same size with pred
|
||||
w = torch.ones_like(pred).to(pred.device)
|
||||
|
||||
if loss_type == "mse":
|
||||
sqr_loss = torch.mul(pred - target, pred - target)
|
||||
loss = torch.mul(sqr_loss, w).mean()
|
||||
return loss
|
||||
elif loss_type == "binary":
|
||||
loss = nn.BCEWithLogitsLoss(weight=w)
|
||||
return loss(pred, target)
|
||||
else:
|
||||
raise NotImplementedError("loss {} is not supported!".format(loss_type))
|
||||
|
||||
def get_metric(self, pred, target, index):
|
||||
# NOTE: the order of the index must follow <datetime, instrument> sorted order
|
||||
return -ICLoss()(pred, target, index) # pylint: disable=E1130
|
||||
|
||||
def _nn_predict(self, data, return_cpu=True):
|
||||
"""Reusing predicting NN.
|
||||
Scenarios
|
||||
1) test inference (data may come from CPU and expect the output data is on CPU)
|
||||
2) evaluation on training (data may come from GPU)
|
||||
"""
|
||||
if not isinstance(data, torch.Tensor):
|
||||
if isinstance(data, pd.DataFrame):
|
||||
data = data.values
|
||||
data = torch.Tensor(data)
|
||||
data = data.to(self.device)
|
||||
preds = []
|
||||
self.dnn_model.eval()
|
||||
with torch.no_grad():
|
||||
batch_size = 8096
|
||||
for i in range(0, len(data), batch_size):
|
||||
x = data[i : i + batch_size]
|
||||
preds.append(self.dnn_model(x.to(self.device)).detach().reshape(-1))
|
||||
if return_cpu:
|
||||
preds = np.concatenate([pr.cpu().numpy() for pr in preds])
|
||||
else:
|
||||
preds = torch.cat(preds, axis=0)
|
||||
return preds
|
||||
|
||||
def predict(self, dataset: DatasetH, segment: Union[Text, slice] = "test"):
|
||||
x_test_pd = dataset.prepare(segment, col_set="feature", data_key=DataHandlerLP.DK_I)
|
||||
preds = self._nn_predict(x_test_pd)
|
||||
return pd.Series(preds.reshape(-1), index=x_test_pd.index)
|
||||
|
||||
|
||||
class AverageMeter:
|
||||
"""Computes and stores the average and current value"""
|
||||
|
||||
def __init__(self):
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
self.val = 0
|
||||
self.avg = 0
|
||||
self.sum = 0
|
||||
self.count = 0
|
||||
|
||||
def update(self, val, n=1):
|
||||
self.val = val
|
||||
self.sum += val * n
|
||||
self.count += n
|
||||
self.avg = self.sum / self.count
|
||||
|
||||
|
||||
from ...model.utils import ConcatDataset
|
||||
|
||||
class GeneralPTNN(Model):
|
||||
"""
|
||||
Motivation:
|
||||
We want to provide a Qlib General Pytorch Model Adaptor
|
||||
You can reuse it for all kinds of Pytorch models.
|
||||
It should include the training and predict process
|
||||
|
||||
Parameters
|
||||
----------
|
||||
d_feat : int
|
||||
input dimension for each time step
|
||||
metric: str
|
||||
the evaluation metric used in early stop
|
||||
optimizer : str
|
||||
optimizer name
|
||||
GPU : str
|
||||
the GPU ID(s) used for training
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
n_epochs=200,
|
||||
lr=0.001,
|
||||
metric="",
|
||||
batch_size=2000,
|
||||
early_stop=20,
|
||||
loss="mse",
|
||||
optimizer="adam",
|
||||
n_jobs=10,
|
||||
GPU=0,
|
||||
seed=None,
|
||||
pt_model_uri="qlib.contrib.model.pytorch_gru_ts.GRUModel",
|
||||
pt_model_kwargs={
|
||||
"d_feat":6,
|
||||
"hidden_size":64,
|
||||
"num_layers":2,
|
||||
"dropout":0.,
|
||||
},
|
||||
):
|
||||
# Set logger.
|
||||
self.logger = get_module_logger("GeneralPTNN")
|
||||
self.logger.info("GeneralPTNN pytorch version...")
|
||||
|
||||
# set hyper-parameters.
|
||||
self.n_epochs = n_epochs
|
||||
self.lr = lr
|
||||
self.metric = metric
|
||||
self.batch_size = batch_size
|
||||
self.early_stop = early_stop
|
||||
self.optimizer = optimizer.lower()
|
||||
self.loss = loss
|
||||
self.device = torch.device("cuda:%d" % (GPU) if torch.cuda.is_available() and GPU >= 0 else "cpu")
|
||||
self.n_jobs = n_jobs
|
||||
self.seed = seed
|
||||
|
||||
self.pt_model_uri, self.pt_model_kwargs = pt_model_uri, pt_model_kwargs
|
||||
self.dnn_model = init_instance_by_config({"class": pt_model_uri, "kwargs": pt_model_kwargs})
|
||||
|
||||
self.logger.info(
|
||||
"GeneralPTNN parameters setting:"
|
||||
"\nn_epochs : {}"
|
||||
"\nlr : {}"
|
||||
"\nmetric : {}"
|
||||
"\nbatch_size : {}"
|
||||
"\nearly_stop : {}"
|
||||
"\noptimizer : {}"
|
||||
"\nloss_type : {}"
|
||||
"\ndevice : {}"
|
||||
"\nn_jobs : {}"
|
||||
"\nuse_GPU : {}"
|
||||
"\nseed : {}"
|
||||
"\npt_model_uri: {}"
|
||||
"\npt_model_kwargs: {}".format(
|
||||
n_epochs,
|
||||
lr,
|
||||
metric,
|
||||
batch_size,
|
||||
early_stop,
|
||||
optimizer.lower(),
|
||||
loss,
|
||||
self.device,
|
||||
n_jobs,
|
||||
self.use_gpu,
|
||||
seed,
|
||||
pt_model_uri,
|
||||
pt_model_kwargs,
|
||||
)
|
||||
|
||||
)
|
||||
|
||||
if self.seed is not None:
|
||||
np.random.seed(self.seed)
|
||||
torch.manual_seed(self.seed)
|
||||
|
||||
self.logger.info("model:\n{:}".format(self.dnn_model))
|
||||
self.logger.info("model size: {:.4f} MB".format(count_parameters(self.dnn_model)))
|
||||
|
||||
if optimizer.lower() == "adam":
|
||||
self.train_optimizer = optim.Adam(self.dnn_model.parameters(), lr=self.lr)
|
||||
elif optimizer.lower() == "gd":
|
||||
self.train_optimizer = optim.SGD(self.dnn_model.parameters(), lr=self.lr)
|
||||
else:
|
||||
raise NotImplementedError("optimizer {} is not supported!".format(optimizer))
|
||||
|
||||
self.fitted = False
|
||||
self.dnn_model.to(self.device)
|
||||
|
||||
@property
|
||||
def use_gpu(self):
|
||||
return self.device != torch.device("cpu")
|
||||
|
||||
def mse(self, pred, label, weight):
|
||||
loss = weight * (pred - label) ** 2
|
||||
return torch.mean(loss)
|
||||
|
||||
def loss_fn(self, pred, label, weight=None):
|
||||
mask = ~torch.isnan(label)
|
||||
|
||||
if weight is None:
|
||||
weight = torch.ones_like(label)
|
||||
|
||||
if self.loss == "mse":
|
||||
return self.mse(pred[mask], label[mask], weight[mask])
|
||||
|
||||
raise ValueError("unknown loss `%s`" % self.loss)
|
||||
|
||||
def metric_fn(self, pred, label):
|
||||
mask = torch.isfinite(label)
|
||||
|
||||
if self.metric in ("", "loss"):
|
||||
return -self.loss_fn(pred[mask], label[mask])
|
||||
|
||||
raise ValueError("unknown metric `%s`" % self.metric)
|
||||
|
||||
|
||||
def _get_fl(self, data: torch.Tensor):
|
||||
"""
|
||||
get feature and label from data
|
||||
- Handle the different data shape of time series and tabular data
|
||||
|
||||
Parameters
|
||||
----------
|
||||
data : torch.Tensor
|
||||
input data which maybe 3 dimension or 2 dimension
|
||||
- 3dim: [batch_size, time_step, feature_dim]
|
||||
- 2dim: [batch_size, feature_dim]
|
||||
|
||||
Returns
|
||||
-------
|
||||
Tuple[torch.Tensor, torch.Tensor]
|
||||
"""
|
||||
if data.dim() == 3:
|
||||
# it is a time series dataset
|
||||
feature = data[:, :, 0:-1].to(self.device)
|
||||
label = data[:, -1, -1].to(self.device)
|
||||
elif data.dim() == 2:
|
||||
# it is a tabular dataset
|
||||
feature = data[:, 0:-1].to(self.device)
|
||||
label = data[:, -1].to(self.device)
|
||||
else:
|
||||
raise ValueError("Unsupported data shape.")
|
||||
return feature, label
|
||||
|
||||
def train_epoch(self, data_loader):
|
||||
self.dnn_model.train()
|
||||
|
||||
for data, weight in data_loader:
|
||||
feature , label = self._get_fl(data)
|
||||
|
||||
pred = self.dnn_model(feature.float())
|
||||
loss = self.loss_fn(pred, label, weight.to(self.device))
|
||||
|
||||
self.train_optimizer.zero_grad()
|
||||
loss.backward()
|
||||
torch.nn.utils.clip_grad_value_(self.dnn_model.parameters(), 3.0)
|
||||
self.train_optimizer.step()
|
||||
|
||||
def test_epoch(self, data_loader):
|
||||
self.dnn_model.eval()
|
||||
|
||||
scores = []
|
||||
losses = []
|
||||
|
||||
for data, weight in data_loader:
|
||||
feature = data[:, :, 0:-1].to(self.device)
|
||||
# feature[torch.isnan(feature)] = 0
|
||||
label = data[:, -1, -1].to(self.device)
|
||||
|
||||
with torch.no_grad():
|
||||
pred = self.dnn_model(feature.float())
|
||||
loss = self.loss_fn(pred, label, weight.to(self.device))
|
||||
losses.append(loss.item())
|
||||
|
||||
score = self.metric_fn(pred, label)
|
||||
scores.append(score.item())
|
||||
|
||||
return np.mean(losses), np.mean(scores)
|
||||
|
||||
def fit(
|
||||
self,
|
||||
dataset: Union[DatasetH, TSDatasetH],
|
||||
evals_result=dict(),
|
||||
save_path=None,
|
||||
reweighter=None,
|
||||
):
|
||||
ists = isinstance(dataset, TSDatasetH) # is this time series dataset
|
||||
|
||||
dl_train = dataset.prepare("train", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L)
|
||||
dl_valid = dataset.prepare("valid", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L)
|
||||
if dl_train.empty or dl_valid.empty:
|
||||
raise ValueError("Empty data from dataset, please check your dataset config.")
|
||||
|
||||
if reweighter is None:
|
||||
wl_train = np.ones(len(dl_train))
|
||||
wl_valid = np.ones(len(dl_valid))
|
||||
elif isinstance(reweighter, Reweighter):
|
||||
wl_train = reweighter.reweight(dl_train)
|
||||
wl_valid = reweighter.reweight(dl_valid)
|
||||
else:
|
||||
raise ValueError("Unsupported reweighter type.")
|
||||
|
||||
# Preprocess for data. To align to Dataset Interface for DataLoader
|
||||
if ists:
|
||||
dl_train.config(fillna_type="ffill+bfill") # process nan brought by dataloader
|
||||
dl_valid.config(fillna_type="ffill+bfill") # process nan brought by dataloader
|
||||
else:
|
||||
# If it is a tabular, we convert the dataframe to numpy to be indexable by DataLoader
|
||||
dl_train = dl_train.values
|
||||
dl_valid = dl_valid.values
|
||||
|
||||
train_loader = DataLoader(
|
||||
ConcatDataset(dl_train, wl_train),
|
||||
batch_size=self.batch_size,
|
||||
shuffle=True,
|
||||
num_workers=self.n_jobs,
|
||||
drop_last=True,
|
||||
)
|
||||
valid_loader = DataLoader(
|
||||
ConcatDataset(dl_valid, wl_valid),
|
||||
batch_size=self.batch_size,
|
||||
shuffle=False,
|
||||
num_workers=self.n_jobs,
|
||||
drop_last=True,
|
||||
)
|
||||
del dl_train, dl_valid, wl_train, wl_valid
|
||||
|
||||
save_path = get_or_create_path(save_path)
|
||||
|
||||
stop_steps = 0
|
||||
train_loss = 0
|
||||
best_score = -np.inf
|
||||
best_epoch = 0
|
||||
evals_result["train"] = []
|
||||
evals_result["valid"] = []
|
||||
|
||||
# train
|
||||
self.logger.info("training...")
|
||||
self.fitted = True
|
||||
|
||||
for step in range(self.n_epochs):
|
||||
self.logger.info("Epoch%d:", step)
|
||||
self.logger.info("training...")
|
||||
self.train_epoch(train_loader)
|
||||
self.logger.info("evaluating...")
|
||||
train_loss, train_score = self.test_epoch(train_loader)
|
||||
val_loss, val_score = self.test_epoch(valid_loader)
|
||||
self.logger.info("train %.6f, valid %.6f" % (train_score, val_score))
|
||||
evals_result["train"].append(train_score)
|
||||
evals_result["valid"].append(val_score)
|
||||
|
||||
if val_score > best_score:
|
||||
best_score = val_score
|
||||
stop_steps = 0
|
||||
best_epoch = step
|
||||
best_param = copy.deepcopy(self.dnn_model.state_dict())
|
||||
else:
|
||||
stop_steps += 1
|
||||
if stop_steps >= self.early_stop:
|
||||
self.logger.info("early stop")
|
||||
break
|
||||
|
||||
self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch))
|
||||
self.dnn_model.load_state_dict(best_param)
|
||||
torch.save(best_param, save_path)
|
||||
|
||||
if self.use_gpu:
|
||||
torch.cuda.empty_cache()
|
||||
|
||||
def predict(self, dataset: Union[DatasetH, TSDatasetH]):
|
||||
if not self.fitted:
|
||||
raise ValueError("model is not fitted yet!")
|
||||
|
||||
dl_test = dataset.prepare("test", col_set=["feature", "label"], data_key=DataHandlerLP.DK_I)
|
||||
dl_test.config(fillna_type="ffill+bfill")
|
||||
test_loader = DataLoader(dl_test, batch_size=self.batch_size, num_workers=self.n_jobs)
|
||||
self.dnn_model.eval()
|
||||
preds = []
|
||||
|
||||
for data in test_loader:
|
||||
feature = data[:, :, 0:-1].to(self.device)
|
||||
|
||||
with torch.no_grad():
|
||||
pred = self.dnn_model(feature.float()).detach().cpu().numpy()
|
||||
|
||||
preds.append(pred)
|
||||
|
||||
return pd.Series(np.concatenate(preds), index=dl_test.get_index())
|
||||
@@ -1,25 +1,25 @@
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
import copy
|
||||
from typing import Text, Union
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from typing import Text, Union
|
||||
import copy
|
||||
from ...utils import get_or_create_path
|
||||
from ...log import get_module_logger
|
||||
|
||||
import torch
|
||||
import torch.nn as nn
|
||||
import torch.optim as optim
|
||||
|
||||
from qlib.workflow import R
|
||||
|
||||
from .pytorch_utils import count_parameters
|
||||
from ...model.base import Model
|
||||
from ...data.dataset import DatasetH
|
||||
from ...data.dataset.handler import DataHandlerLP
|
||||
from ...log import get_module_logger
|
||||
from ...model.base import Model
|
||||
from ...utils import get_or_create_path
|
||||
from .pytorch_utils import count_parameters
|
||||
|
||||
|
||||
class GRU(Model):
|
||||
@@ -212,31 +212,16 @@ class GRU(Model):
|
||||
evals_result=dict(),
|
||||
save_path=None,
|
||||
):
|
||||
# prepare training and validation data
|
||||
dfs = {
|
||||
k: dataset.prepare(
|
||||
k,
|
||||
col_set=["feature", "label"],
|
||||
data_key=DataHandlerLP.DK_L,
|
||||
)
|
||||
for k in ["train", "valid"]
|
||||
if k in dataset.segments
|
||||
}
|
||||
df_train, df_valid = dfs.get("train", pd.DataFrame()), dfs.get("valid", pd.DataFrame())
|
||||
df_train, df_valid, df_test = dataset.prepare(
|
||||
["train", "valid", "test"],
|
||||
col_set=["feature", "label"],
|
||||
data_key=DataHandlerLP.DK_L,
|
||||
)
|
||||
if df_train.empty or df_valid.empty:
|
||||
raise ValueError("Empty data from dataset, please check your dataset config.")
|
||||
|
||||
# check if training data is empty
|
||||
if df_train.empty:
|
||||
raise ValueError("Empty training data from dataset, please check your dataset config.")
|
||||
|
||||
df_train = df_train.dropna()
|
||||
x_train, y_train = df_train["feature"], df_train["label"]
|
||||
|
||||
# check if validation data is provided
|
||||
if not df_valid.empty:
|
||||
df_valid = df_valid.dropna()
|
||||
x_valid, y_valid = df_valid["feature"], df_valid["label"]
|
||||
else:
|
||||
x_valid, y_valid = None, None
|
||||
x_valid, y_valid = df_valid["feature"], df_valid["label"]
|
||||
|
||||
save_path = get_or_create_path(save_path)
|
||||
stop_steps = 0
|
||||
@@ -250,42 +235,32 @@ class GRU(Model):
|
||||
self.logger.info("training...")
|
||||
self.fitted = True
|
||||
|
||||
best_param = copy.deepcopy(self.gru_model.state_dict())
|
||||
for step in range(self.n_epochs):
|
||||
self.logger.info("Epoch%d:", step)
|
||||
self.logger.info("training...")
|
||||
self.train_epoch(x_train, y_train)
|
||||
self.logger.info("evaluating...")
|
||||
train_loss, train_score = self.test_epoch(x_train, y_train)
|
||||
val_loss, val_score = self.test_epoch(x_valid, y_valid)
|
||||
self.logger.info("train %.6f, valid %.6f" % (train_score, val_score))
|
||||
evals_result["train"].append(train_score)
|
||||
evals_result["valid"].append(val_score)
|
||||
|
||||
# evaluate on validation data if provided
|
||||
if x_valid is not None and y_valid is not None:
|
||||
val_loss, val_score = self.test_epoch(x_valid, y_valid)
|
||||
self.logger.info("train %.6f, valid %.6f" % (train_score, val_score))
|
||||
evals_result["valid"].append(val_score)
|
||||
|
||||
if val_score > best_score:
|
||||
best_score = val_score
|
||||
stop_steps = 0
|
||||
best_epoch = step
|
||||
best_param = copy.deepcopy(self.gru_model.state_dict())
|
||||
else:
|
||||
stop_steps += 1
|
||||
if stop_steps >= self.early_stop:
|
||||
self.logger.info("early stop")
|
||||
break
|
||||
if val_score > best_score:
|
||||
best_score = val_score
|
||||
stop_steps = 0
|
||||
best_epoch = step
|
||||
best_param = copy.deepcopy(self.gru_model.state_dict())
|
||||
else:
|
||||
stop_steps += 1
|
||||
if stop_steps >= self.early_stop:
|
||||
self.logger.info("early stop")
|
||||
break
|
||||
|
||||
self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch))
|
||||
self.gru_model.load_state_dict(best_param)
|
||||
torch.save(best_param, save_path)
|
||||
|
||||
# Logging
|
||||
rec = R.get_recorder()
|
||||
for k, v_l in evals_result.items():
|
||||
for i, v in enumerate(v_l):
|
||||
rec.log_metrics(step=i, **{k: v})
|
||||
|
||||
if self.use_gpu:
|
||||
torch.cuda.empty_cache()
|
||||
|
||||
@@ -317,7 +292,6 @@ class GRU(Model):
|
||||
|
||||
|
||||
class GRUModel(nn.Module):
|
||||
|
||||
def __init__(self, d_feat=6, hidden_size=64, num_layers=2, dropout=0.0):
|
||||
super().__init__()
|
||||
|
||||
|
||||
@@ -1,17 +1,5 @@
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
"""
|
||||
Here we have a comprehensive set of analysis classes.
|
||||
|
||||
Here is an example.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from qlib.contrib.report.data.ana import FeaMeanStd
|
||||
fa = FeaMeanStd(ret_df)
|
||||
fa.plot_all(wspace=0.3, sub_figsize=(12, 3), col_n=5)
|
||||
|
||||
"""
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
from qlib.contrib.report.data.base import FeaAnalyser
|
||||
@@ -164,7 +152,6 @@ class FeaSkewTurt(NumFeaAnalyser):
|
||||
self._kurt[col].plot(ax=right_ax, label="kurt", color="green")
|
||||
right_ax.set_xlabel("")
|
||||
right_ax.set_ylabel("kurt")
|
||||
right_ax.grid(None) # set the grid to None to avoid two layer of grid
|
||||
|
||||
h1, l1 = ax.get_legend_handles_labels()
|
||||
h2, l2 = right_ax.get_legend_handles_labels()
|
||||
@@ -184,15 +171,12 @@ class FeaMeanStd(NumFeaAnalyser):
|
||||
ax.set_xlabel("")
|
||||
ax.set_ylabel("mean")
|
||||
ax.legend()
|
||||
ax.tick_params(axis="x", rotation=90)
|
||||
|
||||
right_ax = ax.twinx()
|
||||
|
||||
self._std[col].plot(ax=right_ax, label="std", color="green")
|
||||
right_ax.set_xlabel("")
|
||||
right_ax.set_ylabel("std")
|
||||
right_ax.tick_params(axis="x", rotation=90)
|
||||
right_ax.grid(None) # set the grid to None to avoid two layer of grid
|
||||
|
||||
h1, l1 = ax.get_legend_handles_labels()
|
||||
h2, l2 = right_ax.get_legend_handles_labels()
|
||||
|
||||
@@ -14,24 +14,6 @@ from qlib.contrib.report.utils import sub_fig_generator
|
||||
|
||||
class FeaAnalyser:
|
||||
def __init__(self, dataset: pd.DataFrame):
|
||||
"""
|
||||
|
||||
Parameters
|
||||
----------
|
||||
dataset : pd.DataFrame
|
||||
|
||||
We often have multiple columns for dataset. Each column corresponds to one sub figure.
|
||||
There will be a datatime column in the index levels.
|
||||
Aggretation will be used for more summarized metrics overtime.
|
||||
Here is an example of data:
|
||||
|
||||
.. code-block::
|
||||
|
||||
return
|
||||
datetime instrument
|
||||
2007-02-06 equity_tpx 0.010087
|
||||
equity_spx 0.000786
|
||||
"""
|
||||
self._dataset = dataset
|
||||
with TimeInspector.logt("calc_stat_values"):
|
||||
self.calc_stat_values()
|
||||
|
||||
@@ -4,7 +4,7 @@ import matplotlib.pyplot as plt
|
||||
import pandas as pd
|
||||
|
||||
|
||||
def sub_fig_generator(sub_figsize=(3, 3), col_n=10, row_n=1, wspace=None, hspace=None, sharex=False, sharey=False):
|
||||
def sub_fig_generator(sub_fs=(3, 3), col_n=10, row_n=1, wspace=None, hspace=None, sharex=False, sharey=False):
|
||||
"""sub_fig_generator.
|
||||
it will return a generator, each row contains <col_n> sub graph
|
||||
|
||||
@@ -13,7 +13,7 @@ def sub_fig_generator(sub_figsize=(3, 3), col_n=10, row_n=1, wspace=None, hspace
|
||||
|
||||
Parameters
|
||||
----------
|
||||
sub_figsize :
|
||||
sub_fs :
|
||||
the figure size of each subgraph in <col_n> * <row_n> subgraphs
|
||||
col_n :
|
||||
the number of subgraph in each row; It will generating a new graph after generating <col_n> of subgraphs.
|
||||
@@ -33,7 +33,7 @@ def sub_fig_generator(sub_figsize=(3, 3), col_n=10, row_n=1, wspace=None, hspace
|
||||
|
||||
while True:
|
||||
fig, axes = plt.subplots(
|
||||
row_n, col_n, figsize=(sub_figsize[0] * col_n, sub_figsize[1] * row_n), sharex=sharex, sharey=sharey
|
||||
row_n, col_n, figsize=(sub_fs[0] * col_n, sub_fs[1] * row_n), sharex=sharex, sharey=sharey
|
||||
)
|
||||
plt.subplots_adjust(wspace=wspace, hspace=hspace)
|
||||
axes = axes.reshape(row_n, col_n)
|
||||
|
||||
@@ -73,8 +73,8 @@ class Rolling:
|
||||
The horizon of the prediction target.
|
||||
This is used to override the prediction horizon of the file.
|
||||
h_path : Optional[str]
|
||||
It is other data source that is dumped as a handler. It will override the data handler section in the config.
|
||||
If it is not given, it will create a customized cache for the handler when `enable_handler_cache=True`
|
||||
the dumped data handler;
|
||||
It may come from other data source. It will override the data handler in the config.
|
||||
test_end : Optional[str]
|
||||
the test end for the data. It is typically used together with the handler
|
||||
You can do the same thing with task_ext_conf in a more complicated way
|
||||
@@ -119,7 +119,7 @@ class Rolling:
|
||||
with self.conf_path.open("r") as f:
|
||||
return yaml.safe_load(f)
|
||||
|
||||
def _replace_handler_with_cache(self, task: dict):
|
||||
def _replace_hanler_with_cache(self, task: dict):
|
||||
"""
|
||||
Due to the data processing part in original rolling is slow. So we have to
|
||||
This class tries to add more feature
|
||||
@@ -159,20 +159,13 @@ class Rolling:
|
||||
# - get horizon automatically from the expression!!!!
|
||||
raise NotImplementedError(f"This type of input is not supported")
|
||||
else:
|
||||
if enable_handler_cache and self.h_path is not None:
|
||||
self.logger.info("Fail to override the horizon due to data handler cache")
|
||||
else:
|
||||
self.logger.info("The prediction horizon is overrided")
|
||||
if isinstance(task["dataset"]["kwargs"]["handler"], dict):
|
||||
task["dataset"]["kwargs"]["handler"]["kwargs"]["label"] = [
|
||||
"Ref($close, -{}) / Ref($close, -1) - 1".format(self.horizon + 1)
|
||||
]
|
||||
else:
|
||||
self.logger.warning("Try to automatically configure the lablel but failed.")
|
||||
self.logger.info("The prediction horizon is overrided")
|
||||
task["dataset"]["kwargs"]["handler"]["kwargs"]["label"] = [
|
||||
"Ref($close, -{}) / Ref($close, -1) - 1".format(self.horizon + 1)
|
||||
]
|
||||
|
||||
if self.h_path is not None or enable_handler_cache:
|
||||
# if we already have provided data source or we want to create one
|
||||
task = self._replace_handler_with_cache(task)
|
||||
if enable_handler_cache:
|
||||
task = self._replace_hanler_with_cache(task)
|
||||
task = self._update_start_end_time(task)
|
||||
|
||||
if self.task_ext_conf is not None:
|
||||
@@ -180,16 +173,6 @@ class Rolling:
|
||||
self.logger.info(task)
|
||||
return task
|
||||
|
||||
def run_basic_task(self):
|
||||
"""
|
||||
Run the basic task without rolling.
|
||||
This is for fast testing for model tunning.
|
||||
"""
|
||||
task = self.basic_task()
|
||||
print(task)
|
||||
trainer = TrainerR(experiment_name=self.exp_name)
|
||||
trainer([task])
|
||||
|
||||
def get_task_list(self) -> List[dict]:
|
||||
"""return a batch of tasks for rolling."""
|
||||
task = self.basic_task()
|
||||
|
||||
@@ -80,11 +80,6 @@ class DDGDA(Rolling):
|
||||
sim_task_model: UTIL_MODEL_TYPE = "gbdt",
|
||||
meta_1st_train_end: Optional[str] = None,
|
||||
alpha: float = 0.01,
|
||||
loss_skip_thresh: int = 50,
|
||||
fea_imp_n: Optional[int] = 30,
|
||||
meta_data_proc: Optional[str] = "V01",
|
||||
segments: Union[float, str] = 0.62,
|
||||
hist_step_n: int = 30,
|
||||
working_dir: Optional[Union[str, Path]] = None,
|
||||
**kwargs,
|
||||
):
|
||||
@@ -99,15 +94,6 @@ class DDGDA(Rolling):
|
||||
alpha: float
|
||||
Setting the L2 regularization for ridge
|
||||
The `alpha` is only passed to MetaModelDS (it is not passed to sim_task_model currently..)
|
||||
loss_skip_thresh: int
|
||||
The thresh to skip the loss calculation for each day. If the number of item is less than it, it will skip the loss on that day.
|
||||
meta_data_proc : Optional[str]
|
||||
How we process the meta dataset for learning meta model.
|
||||
segments : Union[float, str]
|
||||
if segments is a float:
|
||||
The ratio of training data in the meta task dataset
|
||||
if segments is a string:
|
||||
it will try its best to put its data in training and ensure that the date `segments` is in the test set
|
||||
"""
|
||||
# NOTE:
|
||||
# the horizon must match the meaning in the base task template
|
||||
@@ -118,22 +104,14 @@ class DDGDA(Rolling):
|
||||
super().__init__(**kwargs)
|
||||
self.working_dir = self.conf_path.parent if working_dir is None else Path(working_dir)
|
||||
self.proxy_hd = self.working_dir / "handler_proxy.pkl"
|
||||
self.fea_imp_n = fea_imp_n
|
||||
self.meta_data_proc = meta_data_proc
|
||||
self.loss_skip_thresh = loss_skip_thresh
|
||||
self.segments = segments
|
||||
self.hist_step_n = hist_step_n
|
||||
|
||||
def _adjust_task(self, task: dict, astype: UTIL_MODEL_TYPE):
|
||||
"""
|
||||
Base on the original task, we need to do some extra things.
|
||||
|
||||
some task are use for special purpose.
|
||||
For example:
|
||||
- GBDT for calculating feature importance
|
||||
- Linear or GBDT for calculating similarity
|
||||
- Datset (well processed) that aligned to Linear that for meta learning
|
||||
|
||||
So we may need to change the dataset and model for the special purpose and other settings remains the same.
|
||||
"""
|
||||
# NOTE: here is just for aligning with previous implementation
|
||||
# It is not necessary for the current implementation
|
||||
@@ -141,16 +119,12 @@ class DDGDA(Rolling):
|
||||
if astype == "gbdt":
|
||||
task["model"] = LGBM_MODEL
|
||||
if isinstance(handler, dict):
|
||||
# We don't need preprocessing when using GBDT model
|
||||
for k in ["infer_processors", "learn_processors"]:
|
||||
if k in handler.setdefault("kwargs", {}):
|
||||
handler["kwargs"].pop(k)
|
||||
elif astype == "linear":
|
||||
task["model"] = LINEAR_MODEL
|
||||
if isinstance(handler, dict):
|
||||
handler["kwargs"].update(PROC_ARGS)
|
||||
else:
|
||||
self.logger.warning("The handler can't be adjusted.")
|
||||
handler["kwargs"].update(PROC_ARGS)
|
||||
else:
|
||||
raise ValueError(f"astype not supported: {astype}")
|
||||
return task
|
||||
@@ -181,15 +155,12 @@ class DDGDA(Rolling):
|
||||
The meta model will be trained upon the proxy forecasting model.
|
||||
This dataset is for the proxy forecasting model.
|
||||
"""
|
||||
|
||||
topk = 30
|
||||
fi = self._get_feature_importance()
|
||||
col_selected = fi.nlargest(topk)
|
||||
# NOTE: adjusting to `self.sim_task_model` just for aligning with previous implementation.
|
||||
# In previous version. The data for proxy model is using sim_task_model's way for processing
|
||||
task = self._adjust_task(self.basic_task(enable_handler_cache=False), self.sim_task_model)
|
||||
task = replace_task_handler_with_cache(task, self.working_dir)
|
||||
# if self.meta_data_proc is not None:
|
||||
# else:
|
||||
# # Otherwise, we don't need futher processing
|
||||
# task = self.basic_task()
|
||||
|
||||
dataset = init_instance_by_config(task["dataset"])
|
||||
prep_ds = dataset.prepare(slice(None), col_set=["feature", "label"], data_key=DataHandlerLP.DK_L)
|
||||
@@ -197,18 +168,12 @@ class DDGDA(Rolling):
|
||||
feature_df = prep_ds["feature"]
|
||||
label_df = prep_ds["label"]
|
||||
|
||||
if self.fea_imp_n is not None:
|
||||
fi = self._get_feature_importance()
|
||||
col_selected = fi.nlargest(self.fea_imp_n)
|
||||
feature_selected = feature_df.loc[:, col_selected.index]
|
||||
else:
|
||||
feature_selected = feature_df
|
||||
feature_selected = feature_df.loc[:, col_selected.index]
|
||||
|
||||
if self.meta_data_proc == "V01":
|
||||
feature_selected = feature_selected.groupby("datetime", group_keys=False).apply(
|
||||
lambda df: (df - df.mean()).div(df.std())
|
||||
)
|
||||
feature_selected = feature_selected.fillna(0.0)
|
||||
feature_selected = feature_selected.groupby("datetime", group_keys=False).apply(
|
||||
lambda df: (df - df.mean()).div(df.std())
|
||||
)
|
||||
feature_selected = feature_selected.fillna(0.0)
|
||||
|
||||
df_all = {
|
||||
"label": label_df.reindex(feature_selected.index),
|
||||
@@ -258,10 +223,7 @@ class DDGDA(Rolling):
|
||||
# 1) leverage the simplified proxy forecasting model to train meta model.
|
||||
# - Only the dataset part is important, in current version of meta model will integrate the
|
||||
|
||||
# NOTE:
|
||||
# - The train_start for training meta model does not necessarily align with final rolling
|
||||
# But please select a right time to make sure the finnal rolling tasks are not leaked in the training data.
|
||||
# - The test_start is automatically aligned to the next day of test_end. Validation is ignored.
|
||||
# the train_start for training meta model does not necessarily align with final rolling
|
||||
train_start = "2008-01-01" if self.train_start is None else self.train_start
|
||||
train_end = "2010-12-31" if self.meta_1st_train_end is None else self.meta_1st_train_end
|
||||
test_start = (pd.Timestamp(train_end) + pd.Timedelta(days=1)).strftime("%Y-%m-%d")
|
||||
@@ -287,9 +249,9 @@ class DDGDA(Rolling):
|
||||
kwargs = dict(
|
||||
task_tpl=proxy_forecast_model_task,
|
||||
step=self.step,
|
||||
segments=self.segments, # keep test period consistent with the dataset yaml
|
||||
segments=0.62, # keep test period consistent with the dataset yaml
|
||||
trunc_days=1 + self.horizon,
|
||||
hist_step_n=self.hist_step_n,
|
||||
hist_step_n=30,
|
||||
fill_method=fill_method,
|
||||
rolling_ext_days=0,
|
||||
)
|
||||
@@ -306,13 +268,7 @@ class DDGDA(Rolling):
|
||||
with R.start(experiment_name=self.meta_exp_name):
|
||||
R.log_params(**kwargs)
|
||||
mm = MetaModelDS(
|
||||
step=self.step,
|
||||
hist_step_n=kwargs["hist_step_n"],
|
||||
lr=0.001,
|
||||
max_epoch=30,
|
||||
seed=43,
|
||||
alpha=self.alpha,
|
||||
loss_skip_thresh=self.loss_skip_thresh,
|
||||
step=self.step, hist_step_n=kwargs["hist_step_n"], lr=0.001, max_epoch=30, seed=43, alpha=self.alpha
|
||||
)
|
||||
mm.fit(md)
|
||||
R.save_objects(model=mm)
|
||||
|
||||
@@ -7,7 +7,7 @@ from pathlib import Path
|
||||
import warnings
|
||||
import pandas as pd
|
||||
|
||||
from typing import Tuple, Union, List, Dict
|
||||
from typing import Tuple, Union, List
|
||||
|
||||
from qlib.data import D
|
||||
from qlib.utils import load_dataset, init_instance_by_config, time_to_slc_point
|
||||
@@ -247,14 +247,10 @@ class StaticDataLoader(DataLoader, Serializable):
|
||||
|
||||
def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame:
|
||||
self._maybe_load_raw_data()
|
||||
|
||||
# 1) Filter by instruments
|
||||
if instruments is None:
|
||||
df = self._data
|
||||
else:
|
||||
df = self._data.loc(axis=0)[:, instruments]
|
||||
|
||||
# 2) Filter by Datetime
|
||||
if start_time is None and end_time is None:
|
||||
return df # NOTE: avoid copy by loc
|
||||
# pd.Timestamp(None) == NaT, use NaT as index can not fetch correct thing, so do not change None.
|
||||
@@ -279,55 +275,6 @@ class StaticDataLoader(DataLoader, Serializable):
|
||||
self._data = self._config
|
||||
|
||||
|
||||
class NestedDataLoader(DataLoader):
|
||||
"""
|
||||
We have multiple DataLoader, we can use this class to combine them.
|
||||
"""
|
||||
|
||||
def __init__(self, dataloader_l: List[Dict], join="left") -> None:
|
||||
"""
|
||||
|
||||
Parameters
|
||||
----------
|
||||
dataloader_l : list[dict]
|
||||
A list of dataloader, for exmaple
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
nd = NestedDataLoader(
|
||||
dataloader_l=[
|
||||
{
|
||||
"class": "qlib.contrib.data.loader.Alpha158DL",
|
||||
}, {
|
||||
"class": "qlib.contrib.data.loader.Alpha360DL",
|
||||
"kwargs": {
|
||||
"config": {
|
||||
"label": ( ["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"])
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
)
|
||||
join :
|
||||
it will pass to pd.concat when merging it.
|
||||
"""
|
||||
super().__init__()
|
||||
self.data_loader_l = [
|
||||
(dl if isinstance(dl, DataLoader) else init_instance_by_config(dl)) for dl in dataloader_l
|
||||
]
|
||||
self.join = join
|
||||
|
||||
def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame:
|
||||
df_full = None
|
||||
for dl in self.data_loader_l:
|
||||
df_current = dl.load(instruments, start_time, end_time)
|
||||
if df_full is None:
|
||||
df_full = df_current
|
||||
else:
|
||||
df_full = pd.merge(df_full, df_current, left_index=True, right_index=True, how=self.join)
|
||||
return df_full.sort_index(axis=1)
|
||||
|
||||
|
||||
class DataLoaderDH(DataLoader):
|
||||
"""DataLoaderDH
|
||||
DataLoader based on (D)ata (H)andler
|
||||
|
||||
@@ -51,6 +51,3 @@ class MetaTask:
|
||||
Return the **processed** meta_info
|
||||
"""
|
||||
return self.meta_info
|
||||
|
||||
def __repr__(self):
|
||||
return f"MetaTask(task={self.task}, meta_info={self.meta_info})"
|
||||
|
||||
@@ -108,12 +108,6 @@ class Index:
|
||||
self.index_map = self.idx_list = np.arange(idx_list)
|
||||
self._is_sorted = True
|
||||
else:
|
||||
# Check if all elements in idx_list are of the same type
|
||||
if not all(isinstance(x, type(idx_list[0])) for x in idx_list):
|
||||
raise TypeError("All elements in idx_list must be of the same type")
|
||||
# Check if all elements in idx_list are of the same datetime64 precision
|
||||
if isinstance(idx_list[0], np.datetime64) and not all(x.dtype == idx_list[0].dtype for x in idx_list):
|
||||
raise TypeError("All elements in idx_list must be of the same datetime64 precision")
|
||||
self.idx_list = np.array(idx_list)
|
||||
# NOTE: only the first appearance is indexed
|
||||
self.index_map = dict(zip(self.idx_list, range(len(self))))
|
||||
@@ -137,12 +131,7 @@ class Index:
|
||||
if self.idx_list.dtype.type is np.datetime64:
|
||||
if isinstance(item, pd.Timestamp):
|
||||
# This happens often when creating index based on pandas.DatetimeIndex and query with pd.Timestamp
|
||||
return item.to_numpy().astype(self.idx_list.dtype)
|
||||
elif isinstance(item, np.datetime64):
|
||||
# This happens often when creating index based on np.datetime64 and query with another precision
|
||||
return item.astype(self.idx_list.dtype)
|
||||
# NOTE: It is hard to consider every case at first.
|
||||
# We just try to cover part of cases to make it more user-friendly
|
||||
return item.to_numpy()
|
||||
return item
|
||||
|
||||
def index(self, item) -> int:
|
||||
|
||||
@@ -161,13 +161,7 @@ def init_instance_by_config(
|
||||
# path like 'file:///<path to pickle file>/obj.pkl'
|
||||
pr = urlparse(config)
|
||||
if pr.scheme == "file":
|
||||
|
||||
# To enable relative path like file://data/a/b/c.pkl. pr.netloc will be data
|
||||
path = pr.path
|
||||
if pr.netloc != "":
|
||||
path = path.lstrip("/")
|
||||
|
||||
pr_path = os.path.join(pr.netloc, path) if bool(pr.path) else pr.netloc
|
||||
pr_path = os.path.join(pr.netloc, pr.path) if bool(pr.path) else pr.netloc
|
||||
with open(os.path.normpath(pr_path), "rb") as f:
|
||||
return pickle.load(f)
|
||||
else:
|
||||
|
||||
@@ -1,20 +1,18 @@
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
import logging
|
||||
import sys
|
||||
import os
|
||||
from pathlib import Path
|
||||
import sys
|
||||
|
||||
import fire
|
||||
from jinja2 import Template, meta
|
||||
import ruamel.yaml as yaml
|
||||
|
||||
import qlib
|
||||
import fire
|
||||
import ruamel.yaml as yaml
|
||||
from qlib.config import C
|
||||
from qlib.log import get_module_logger
|
||||
from qlib.model.trainer import task_train
|
||||
from qlib.utils import set_log_with_config
|
||||
from qlib.utils.data import update_config
|
||||
from qlib.log import get_module_logger
|
||||
from qlib.utils import set_log_with_config
|
||||
|
||||
set_log_with_config(C.logging_config)
|
||||
logger = get_module_logger("qrun", logging.INFO)
|
||||
@@ -49,39 +47,6 @@ def sys_config(config, config_path):
|
||||
sys.path.append(str(Path(config_path).parent.resolve().absolute() / p))
|
||||
|
||||
|
||||
def render_template(config_path: str) -> str:
|
||||
"""
|
||||
render the template based on the environment
|
||||
|
||||
Parameters
|
||||
----------
|
||||
config_path : str
|
||||
configuration path
|
||||
|
||||
Returns
|
||||
-------
|
||||
str
|
||||
the rendered content
|
||||
"""
|
||||
with open(config_path, "r") as f:
|
||||
config = f.read()
|
||||
# Set up the Jinja2 environment
|
||||
template = Template(config)
|
||||
|
||||
# Parse the template to find undeclared variables
|
||||
env = template.environment
|
||||
parsed_content = env.parse(config)
|
||||
variables = meta.find_undeclared_variables(parsed_content)
|
||||
|
||||
# Get context from os.environ according to the variables
|
||||
context = {var: os.getenv(var, "") for var in variables if var in os.environ}
|
||||
logger.info(f"Render the template with the context: {context}")
|
||||
|
||||
# Render the template with the context
|
||||
rendered_content = template.render(context)
|
||||
return rendered_content
|
||||
|
||||
|
||||
# workflow handler function
|
||||
def workflow(config_path, experiment_name="workflow", uri_folder="mlruns"):
|
||||
"""
|
||||
@@ -102,9 +67,8 @@ def workflow(config_path, experiment_name="workflow", uri_folder="mlruns"):
|
||||
market: csi300
|
||||
|
||||
"""
|
||||
# Render the template
|
||||
rendered_yaml = render_template(config_path)
|
||||
config = yaml.safe_load(rendered_yaml)
|
||||
with open(config_path) as fp:
|
||||
config = yaml.safe_load(fp)
|
||||
|
||||
base_config_path = config.get("BASE_CONFIG_PATH", None)
|
||||
if base_config_path:
|
||||
|
||||
@@ -1,50 +0,0 @@
|
||||
# TODO:
|
||||
# dump alpha 360 to dataframe and merge it with Alpha158
|
||||
|
||||
import sys
|
||||
import unittest
|
||||
import qlib
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.append(str(Path(__file__).resolve().parent))
|
||||
from qlib.data.dataset.loader import NestedDataLoader
|
||||
from qlib.contrib.data.loader import Alpha158DL, Alpha360DL
|
||||
|
||||
|
||||
class TestDataLoader(unittest.TestCase):
|
||||
|
||||
def test_nested_data_loader(self):
|
||||
qlib.init()
|
||||
nd = NestedDataLoader(
|
||||
dataloader_l=[
|
||||
{
|
||||
"class": "qlib.contrib.data.loader.Alpha158DL",
|
||||
},
|
||||
{
|
||||
"class": "qlib.contrib.data.loader.Alpha360DL",
|
||||
"kwargs": {"config": {"label": (["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"])}},
|
||||
},
|
||||
]
|
||||
)
|
||||
# Of course you can use StaticDataLoader
|
||||
|
||||
dataset = nd.load()
|
||||
|
||||
assert dataset is not None
|
||||
|
||||
columns = dataset.columns.tolist()
|
||||
columns_list = [tup[1] for tup in columns]
|
||||
|
||||
for col in Alpha158DL.get_feature_config()[1]:
|
||||
assert col in columns_list
|
||||
|
||||
for col in Alpha360DL.get_feature_config()[1]:
|
||||
assert col in columns_list
|
||||
|
||||
assert "LABEL0" in columns_list
|
||||
|
||||
# Then you can use it wth DataHandler;
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -94,24 +94,6 @@ class IndexDataTest(unittest.TestCase):
|
||||
print(sd)
|
||||
self.assertTrue(sd.iloc[0] == 2)
|
||||
|
||||
# test different precisions of time data
|
||||
timeindex = [
|
||||
np.datetime64("2024-06-22T00:00:00.000000000"),
|
||||
np.datetime64("2024-06-21T00:00:00.000000000"),
|
||||
np.datetime64("2024-06-20T00:00:00.000000000"),
|
||||
]
|
||||
sd = idd.SingleData([1, 2, 3], index=timeindex)
|
||||
self.assertTrue(
|
||||
sd.index.index(np.datetime64("2024-06-21T00:00:00.000000000"))
|
||||
== sd.index.index(np.datetime64("2024-06-21T00:00:00"))
|
||||
)
|
||||
self.assertTrue(sd.index.index(pd.Timestamp("2024-06-21 00:00")) == 1)
|
||||
|
||||
# Bad case: the input is not aligned
|
||||
timeindex[1] = (np.datetime64("2024-06-21T00:00:00.00"),)
|
||||
with self.assertRaises(TypeError):
|
||||
sd = idd.SingleData([1, 2, 3], index=timeindex)
|
||||
|
||||
def test_ops(self):
|
||||
sd1 = idd.SingleData([1, 2, 3, 4], index=["foo", "bar", "f", "g"])
|
||||
sd2 = idd.SingleData([1, 2, 3, 4], index=["foo", "bar", "f", "g"])
|
||||
|
||||
@@ -1,86 +0,0 @@
|
||||
|
||||
import unittest
|
||||
|
||||
from qlib.contrib.model.pytorch_general_nn import GeneralPTNN
|
||||
from qlib.data.dataset import DatasetH, TSDatasetH
|
||||
from qlib.data.dataset.handler import DataHandlerLP
|
||||
from qlib.tests import TestAutoData
|
||||
|
||||
|
||||
class TestNN(TestAutoData):
|
||||
|
||||
def test_both_dataset(self):
|
||||
data_handler_config = {
|
||||
"start_time": "2008-01-01",
|
||||
"end_time": "2020-08-01",
|
||||
"instruments": "csi300",
|
||||
"data_loader": {
|
||||
"class": "QlibDataLoader", # Assuming QlibDataLoader is a string reference to the class
|
||||
"kwargs": {
|
||||
"config": {
|
||||
"feature": [
|
||||
["$high", "$close", "$low"],
|
||||
["H", "C", "L"]
|
||||
],
|
||||
"label": [
|
||||
["Ref($close, -2)/Ref($close, -1) - 1"],
|
||||
["LABEL0"]
|
||||
]
|
||||
},
|
||||
"freq": "day"
|
||||
}
|
||||
},
|
||||
# TODO: processors
|
||||
"learn_processors": [
|
||||
{
|
||||
"class": "DropnaLabel",
|
||||
},
|
||||
{
|
||||
"class": "CSZScoreNorm",
|
||||
"kwargs": {
|
||||
"fields_group": "label"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
segments = {
|
||||
"train": ["2008-01-01", "2014-12-31"],
|
||||
"valid": ["2015-01-01", "2016-12-31"],
|
||||
"test": ["2017-01-01", "2020-08-01"]
|
||||
}
|
||||
data_handler = DataHandlerLP(**data_handler_config)
|
||||
|
||||
# time-series dataset
|
||||
tsds = TSDatasetH(handler=data_handler, segments=segments)
|
||||
|
||||
# tabular dataset
|
||||
tbds = DatasetH(handler=data_handler, segments=segments)
|
||||
|
||||
model_l = [
|
||||
GeneralPTNN(
|
||||
n_epochs=2,
|
||||
pt_model_uri="qlib.contrib.model.pytorch_gru_ts.GRUModel",
|
||||
pt_model_kwargs={
|
||||
"d_feat":3,
|
||||
"hidden_size":8,
|
||||
"num_layers":1,
|
||||
"dropout":0.,
|
||||
},
|
||||
),
|
||||
GeneralPTNN(
|
||||
n_epochs=2,
|
||||
pt_model_uri="qlib.contrib.model.pytorch_nn.Net", # it is a MLP
|
||||
pt_model_kwargs={
|
||||
"input_dim":3,
|
||||
},
|
||||
),
|
||||
]
|
||||
|
||||
for ds, model in reversed(list(zip((tsds, tbds), model_l))):
|
||||
model.fit(ds) # It works
|
||||
model.predict(ds) # It works
|
||||
break
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user