1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-06-06 14:01:28 +08:00

Compare commits

..

28 Commits

Author SHA1 Message Date
Young
4c057f645e Successfully run training 2024-07-10 06:25:30 +00:00
Young
a9fc3435ab Almost success to run GRU 2024-07-10 05:59:49 +00:00
Young
e2879d9b1e We choose another mode as the initial version 2024-07-10 05:42:27 +00:00
Young
a67a6134b4 We must align with previous results 2024-07-10 05:34:09 +00:00
Young
f4674ef98c Add model template; 2024-07-10 05:31:14 +00:00
Young
0f9312593d Remove some deprecated code 2024-07-09 09:11:06 +00:00
Young
4405cb784f Init model for both dataset 2024-07-08 06:11:51 +00:00
you-n-g
a7d5a9b500 Nested data loader (#1822)
* nested data loader

* Amend

* add data loder test

* fix pylint error

* fix pytest error

* fix pytest error

* delete comments

* Update qlib/contrib/data/handler.py

---------

Co-authored-by: Linlang <Lv.Linlang@hotmail.com>
2024-07-05 15:44:16 +08:00
you-n-g
5190332c7e Add some misc features. (#1816)
* Normal mod

* Black linting

* Linting
2024-06-26 18:34:00 +08:00
cyncyw
cde80206e4 Update index_data.py for datatype conversion and alignment (#1813)
* Update index_data.py for data convertion and alignment

* Update qlib/utils/index_data.py

* Update qlib/utils/index_data.py

* fix linting

---------

Co-authored-by: taozhiwang <taozhiwa@gmail.com>
Co-authored-by: you-n-g <you-n-g@users.noreply.github.com>
2024-06-24 15:34:48 +08:00
cyncyw
a339fc11d1 add a note for code standard (#1814)
* add a note for code standard

* handle both cases

---------

Co-authored-by: taozhiwang <taozhiwa@gmail.com>
2024-06-24 15:33:45 +08:00
Linlang
33482047dc change weight data download url (#1812) 2024-06-21 13:05:53 +08:00
Fivele-Li
47bd13295b Fix Yahoo daily data format inconsistent (#1517)
* Fix FutureWarning: Passing unit-less datetime64 dtype to .astype is deprecated and will raise in a future version. Pass 'datetime64[ns]' instead

* align index format while end date contains current day data

* fix black

* fix black

* optimize code

* optimize code

* optimize code

* fix ci error

* check ci error

* fix ci error

* check ci error

* check ci error

* check ci error

* check ci error

* check ci error

* check ci error

* fix ci error

* fix ci error

* fix ci error

* fix ci error

* fix ci error

---------

Co-authored-by: Cadenza-Li <362237642@qq.com>
Co-authored-by: Linlang <Lv.Linlang@hotmail.com>
2024-06-21 11:22:23 +08:00
陈屹华
ebc0ca893e Fix TSDataSampler Slicing Bug #1716 (#1803)
* Fix TSDataSampler Slicing Bug #1716

* Fix TSDataSampler Slicing Bug #1716

* Fix TSDataSampler Slicing Bug #1716

* Fix TSDataSampler Slicing Bug with simplyer implmentation#1716
 with Simplified Implementation

* Refactor: Fix CI errors by addressing pylint formatting issues

* Refactor: Remove extraneous whitespace for improved code formatting with Black
2024-06-21 09:25:23 +08:00
Lee Yuntong
3a348aec9f Fix typo (#1811)
Co-authored-by: LeeYuntong <nukuihayu@outlook.com>
2024-06-20 18:12:07 +08:00
Lee Yuntong
37b908792b Fix typo (#1809)
Co-authored-by: LeeYuntong <nukuihayu@outlook.com>
2024-06-19 17:31:57 +08:00
raikiriww
73ec0f4003 Add "mse" metric option to ALSTM.metric_fn (#1810) 2024-06-19 17:31:47 +08:00
Linlang
155c17f8ff fix logo display error (#1804) 2024-06-06 13:39:49 +08:00
Yang
41b94059aa fix panic during normalizing the invalid data (#1698)
* fix panic during normalizing the invalid data

* fix yaml load

* change error to warning

* change error code

* optimize code

---------

Co-authored-by: Linlang <Lv.Linlang@hotmail.com>
2024-06-02 06:54:39 +08:00
block-gpt
7db83d84b7 Update utils.py for typo (#1751)
Fix typo

Co-authored-by: Linlang <Lv.Linlang@hotmail.com>
2024-06-01 19:33:23 +08:00
Hao Zhao
35e0fdd1c0 fix the bug that the HS_SYMBOLS_URL is 404 (#1758)
* fix the bug that the HS_SYMBOLS_URL is 404

* fix bug

* format with black

* fix pylint error

* change error code

* fix ci error

* fix ci error

* optimize code

* optimize code

* add comments

---------

Co-authored-by: Linlang <Lv.Linlang@hotmail.com>
2024-06-01 08:07:34 +08:00
you-n-g
598017f634 Update Dev in README.md (#1800) 2024-05-29 17:44:18 +08:00
igeni
907c888c23 changed concat of strings to f-strings and redundant type conversion was removed (#1767)
Co-authored-by: Linlang <Lv.Linlang@hotmail.com>
2024-05-28 12:13:12 +08:00
Linlang
02fe6b6974 bump verison 2024-05-24 16:38:48 +08:00
Linlang
b892b21045 update version 2024-05-24 15:14:49 +08:00
Linlang
155f80323c fix get data error (#1793)
* fix get data error

* fix get v0 data error

* optimize get_data code

* fix pylint error

* add comments
2024-05-24 12:59:50 +08:00
you-n-g
63021018d6 Update README.md's dataset 2024-05-21 08:15:18 +08:00
Linlang
f79a0eeaff fix docs (#1788)
Co-authored-by: Linlang Lv (iSoftStone Information) <v-lvlinlang@microsoft.com>
2024-05-21 04:23:55 +08:00
50 changed files with 1886 additions and 413 deletions

View File

@@ -45,6 +45,9 @@ jobs:
- name: Qlib installation test
run: |
# 2024-05-30 scs has released a new version: 3.2.4.post2,
# This will cause the CI to fail, so we have limited the version of scs for now.
python -m pip install "scs<=3.2.4"
python -m pip install pyqlib
- name: Install Lightgbm for MacOS
@@ -65,5 +68,8 @@ jobs:
cd qlib
- name: Test workflow by config
# On macos-11 system, it will lead to "Segmentation fault: 11" error,
# which may be caused by the excessive memory overhead of macos-11 system, so we disable macos-11 temporarily here.
if: ${{ matrix.os != 'macos-11' }}
run: |
qrun examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml

View File

@@ -72,8 +72,10 @@ jobs:
black . -l 120 --check --diff
- name: Make html with sphinx
# Since read the docs builds on ubuntu 22.04, we only need to test that the build passes on ubuntu 22.04.
if: ${{ matrix.os == 'ubuntu-22.04' }}
run: |
cd docs
cd docs
sphinx-build -W --keep-going -b html . _build
cd ..
@@ -159,11 +161,16 @@ jobs:
# Run after data downloads
- name: Check Qlib ipynb with nbconvert
# Running the nbconvert check on a macos-11 system results in a "Kernel died" error, so we've temporarily disabled macos-11 here.
if: ${{ matrix.os != 'macos-11' }}
run: |
# add more ipynb files in future
jupyter nbconvert --to notebook --execute examples/workflow_by_code.ipynb
- name: Test workflow by config (install from source)
# On macos-11 system, it will lead to "Segmentation fault: 11" error,
# which may be caused by the excessive memory overhead of macos-11 system, so we disable macos-11 temporarily here.
if: ${{ matrix.os != 'macos-11' }}
run: |
python -m pip install numba
python qlib/workflow/cli.py examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml

View File

@@ -40,7 +40,7 @@ Recent released features
Features released before 2021 are not listed here.
<p align="center">
<img src="http://fintech.msra.cn/images_v070/logo/1.png" />
<img src="docs/_static/img/logo/1.png" />
</p>
Qlib is an open-source, AI-oriented quantitative investment platform that aims to realize the potential, empower research, and create value using AI technologies in quantitative investment, from exploring ideas to implementing productions. Qlib supports diverse machine learning modeling paradigms, including supervised learning, market dynamics modeling, and reinforcement learning.
@@ -166,7 +166,7 @@ Also, users can install the latest dev version ``Qlib`` by the source code accor
* Clone the repository and install ``Qlib`` as follows.
```bash
git clone https://github.com/microsoft/qlib.git && cd qlib
pip install .
pip install . # `pip install -e .[dev]` is recommended for development. check details in docs/developer/code_standard_and_dev_guide.rst
```
**Note**: You can install Qlib with `python setup.py install` as well. But it is not the recommended approach. It will skip `pip` and cause obscure problems. For example, **only** the command ``pip install .`` **can** overwrite the stable version installed by ``pip install pyqlib``, while the command ``python setup.py install`` **can't**.
@@ -175,6 +175,20 @@ Also, users can install the latest dev version ``Qlib`` by the source code accor
**Tips for Mac**: If you are using Mac with M1, you might encounter issues in building the wheel for LightGBM, which is due to missing dependencies from OpenMP. To solve the problem, install openmp first with ``brew install libomp`` and then run ``pip install .`` to build it successfully.
## Data Preparation
❗ Due to more restrict data security policy. The offical dataset is disabled temporarily. You can try [this data source](https://github.com/chenditc/investment_data/releases) contributed by the community.
Here is an example to download the data updated on 20220720.
```bash
wget https://github.com/chenditc/investment_data/releases/download/20220720/qlib_bin.tar.gz
mkdir -p ~/.qlib/qlib_data/cn_data
tar -zxvf qlib_bin.tar.gz -C ~/.qlib/qlib_data/cn_data --strip-components=2
rm -f qlib_bin.tar.gz
```
The official dataset below will resume in short future.
----
Load and prepare data by running the following code:
### Get with module

View File

@@ -86,7 +86,7 @@ Example
},
}
# model initiaiton
# model initialization
model = init_instance_by_config(task["model"])
dataset = init_instance_by_config(task["dataset"])

View File

@@ -60,4 +60,4 @@ The `[dev]` option will help you to install some related packages when developin
.. code-block:: bash
pip install -e .[dev]
pip install -e ".[dev]"

View File

@@ -0,0 +1,15 @@
# Introduction
What is GeneralPtNN
- Fix previous design that fail to support both Time-series and tabular data
- Now you can just replace the Pytorch model structure to run a NN model.
We provide an example to demonstrate the effectiveness of the current design.
- `workflow_config_gru.yaml` align with previous results [GRU(Kyunghyun Cho, et al.)](../README.md#Alpha158 dataset)
- `workflow_config_mlp.yaml` align with previous results [MLP](../README.md#Alpha158 dataset)
# TODO
We will align existing models to current design.

View File

@@ -0,0 +1,97 @@
qlib_init:
provider_uri: "~/.qlib/qlib_data/cn_data"
region: cn
market: &market csi300
benchmark: &benchmark SH000300
data_handler_config: &data_handler_config
start_time: 2008-01-01
end_time: 2020-08-01
fit_start_time: 2008-01-01
fit_end_time: 2014-12-31
instruments: *market
infer_processors:
- class: FilterCol
kwargs:
fields_group: feature
col_list: ["RESI5", "WVMA5", "RSQR5", "KLEN", "RSQR10", "CORR5", "CORD5", "CORR10",
"ROC60", "RESI10", "VSTD5", "RSQR60", "CORR60", "WVMA60", "STD5",
"RSQR20", "CORD60", "CORD10", "CORR20", "KLOW"
]
- class: RobustZScoreNorm
kwargs:
fields_group: feature
clip_outlier: true
- class: Fillna
kwargs:
fields_group: feature
learn_processors:
- class: DropnaLabel
- class: CSRankNorm
kwargs:
fields_group: label
label: ["Ref($close, -2) / Ref($close, -1) - 1"]
port_analysis_config: &port_analysis_config
strategy:
class: TopkDropoutStrategy
module_path: qlib.contrib.strategy
kwargs:
signal: <PRED>
topk: 50
n_drop: 5
backtest:
start_time: 2017-01-01
end_time: 2020-08-01
account: 100000000
benchmark: *benchmark
exchange_kwargs:
limit_threshold: 0.095
deal_price: close
open_cost: 0.0005
close_cost: 0.0015
min_cost: 5
task:
model:
class: GeneralPTNN
module_path: qlib.contrib.model.pytorch_general_nn
kwargs:
d_feat: 20
hidden_size: 64
num_layers: 2
dropout: 0.0
n_epochs: 200
lr: 2e-4
early_stop: 10
batch_size: 800
metric: loss
loss: mse
n_jobs: 20
GPU: 0
dataset:
class: TSDatasetH
module_path: qlib.data.dataset
kwargs:
handler:
class: Alpha158
module_path: qlib.contrib.data.handler
kwargs: *data_handler_config
segments:
train: [2008-01-01, 2014-12-31]
valid: [2015-01-01, 2016-12-31]
test: [2017-01-01, 2020-08-01]
step_len: 20
record:
- class: SignalRecord
module_path: qlib.workflow.record_temp
kwargs:
model: <MODEL>
dataset: <DATASET>
- class: SigAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
ana_long_short: False
ann_scaler: 252
- class: PortAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
config: *port_analysis_config

View File

@@ -0,0 +1,98 @@
qlib_init:
provider_uri: "~/.qlib/qlib_data/cn_data"
region: cn
market: &market csi300
benchmark: &benchmark SH000300
data_handler_config: &data_handler_config
start_time: 2008-01-01
end_time: 2020-08-01
fit_start_time: 2008-01-01
fit_end_time: 2014-12-31
instruments: *market
infer_processors: [
{
"class" : "DropCol",
"kwargs":{"col_list": ["VWAP0"]}
},
{
"class" : "CSZFillna",
"kwargs":{"fields_group": "feature"}
}
]
learn_processors: [
{
"class" : "DropCol",
"kwargs":{"col_list": ["VWAP0"]}
},
{
"class" : "DropnaProcessor",
"kwargs":{"fields_group": "feature"}
},
"DropnaLabel",
{
"class": "CSZScoreNorm",
"kwargs": {"fields_group": "label"}
}
]
process_type: "independent"
port_analysis_config: &port_analysis_config
strategy:
class: TopkDropoutStrategy
module_path: qlib.contrib.strategy
kwargs:
signal: <PRED>
topk: 50
n_drop: 5
backtest:
start_time: 2017-01-01
end_time: 2020-08-01
account: 100000000
benchmark: *benchmark
exchange_kwargs:
limit_threshold: 0.095
deal_price: close
open_cost: 0.0005
close_cost: 0.0015
min_cost: 5
task:
model:
class: GeneralPTNN
module_path: qlib.contrib.model.pytorch_general_nn
kwargs:
loss: mse
lr: 0.002
optimizer: adam
max_steps: 8000
batch_size: 8192
GPU: 0
weight_decay: 0.0002
pt_model_kwargs:
input_dim: 157
dataset:
class: DatasetH
module_path: qlib.data.dataset
kwargs:
handler:
class: Alpha158
module_path: qlib.contrib.data.handler
kwargs: *data_handler_config
segments:
train: [2008-01-01, 2014-12-31]
valid: [2015-01-01, 2016-12-31]
test: [2017-01-01, 2020-08-01]
record:
- class: SignalRecord
module_path: qlib.workflow.record_temp
kwargs:
model: <MODEL>
dataset: <DATASET>
- class: SigAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
ana_long_short: False
ann_scaler: 252
- class: PortAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
config: *port_analysis_config

View File

@@ -1,5 +1,6 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import os
from pathlib import Path
from typing import Union
@@ -35,6 +36,10 @@ class DDGDABench(DDGDA):
if __name__ == "__main__":
GetData().qlib_data(exists_skip=True)
auto_init()
kwargs = {}
if os.environ.get("PROVIDER_URI", "") == "":
GetData().qlib_data(exists_skip=True)
else:
kwargs["provider_uri"] = os.environ["PROVIDER_URI"]
auto_init(**kwargs)
fire.Fire(DDGDABench)

View File

@@ -1,5 +1,6 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import os
from pathlib import Path
from typing import Union
@@ -31,6 +32,10 @@ class RollingBenchmark(Rolling):
if __name__ == "__main__":
GetData().qlib_data(exists_skip=True)
auto_init()
kwargs = {}
if os.environ.get("PROVIDER_URI", "") == "":
GetData().qlib_data(exists_skip=True)
else:
kwargs["provider_uri"] = os.environ["PROVIDER_URI"]
auto_init(**kwargs)
fire.Fire(RollingBenchmark)

View File

@@ -16,7 +16,7 @@ Current version of script with default value tries to connect localhost **via de
Run following command to install necessary libraries
```
pip install pytest coverage
pip install pytest coverage gdown
pip install arctic # NOTE: pip may fail to resolve the right package dependency !!! Please make sure the dependency are satisfied.
```
@@ -27,7 +27,8 @@ pip install arctic # NOTE: pip may fail to resolve the right package dependency
2. Please follow following steps to download example data
```bash
cd examples/orderbook_data/
python ../../scripts/get_data.py download_data --target_dir . --file_name highfreq_orderbook_example_data.zip
gdown https://drive.google.com/uc?id=15nZF7tFT_eKVZAcMFL1qPS4jGyJflH7e # Proxies may be necessary here.
python ../../scripts/get_data.py _unzip --file_path highfreq_orderbook_example_data.zip --target_dir .
```
3. Please import the example data to your mongo db

View File

@@ -20,7 +20,7 @@ We use China stock market data for our example.
1. Prepare CSI300 weight:
```bash
wget http://fintech.msra.cn/stock_data/downloads/csi300_weight.zip
wget https://github.com/SunsetWolf/qlib_dataset/releases/download/v0/csi300_weight.zip
unzip -d ~/.qlib/qlib_data/cn_data csi300_weight.zip
rm -f csi300_weight.zip
```

View File

@@ -161,7 +161,7 @@
" },\n",
"}\n",
"\n",
"# model initiaiton\n",
"# model initialization\n",
"model = init_instance_by_config(task[\"model\"])\n",
"dataset = init_instance_by_config(task[\"dataset\"])\n",
"\n",

View File

@@ -2,7 +2,7 @@
# Licensed under the MIT License.
from pathlib import Path
__version__ = "0.9.4.99"
__version__ = "0.9.5.99"
__version__bak = __version__ # This version is backup for QlibConfig.reset_qlib_version
import os
from typing import Union

View File

@@ -1,6 +1,7 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from qlib.contrib.data.loader import Alpha158DL, Alpha360DL
from ...data.dataset.handler import DataHandlerLP
from ...data.dataset.processor import Processor
from ...utils import get_callable_kwargs
@@ -66,7 +67,7 @@ class Alpha360(DataHandlerLP):
"class": "QlibDataLoader",
"kwargs": {
"config": {
"feature": self.get_feature_config(),
"feature": Alpha360DL.get_feature_config(),
"label": kwargs.pop("label", self.get_label_config()),
},
"filter_pipe": filter_pipe,
@@ -88,51 +89,6 @@ class Alpha360(DataHandlerLP):
def get_label_config(self):
return ["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"]
@staticmethod
def get_feature_config():
# NOTE:
# Alpha360 tries to provide a dataset with original price data
# the original price data includes the prices and volume in the last 60 days.
# To make it easier to learn models from this dataset, all the prices and volume
# are normalized by the latest price and volume data ( dividing by $close, $volume)
# So the latest normalized $close will be 1 (with name CLOSE0), the latest normalized $volume will be 1 (with name VOLUME0)
# If further normalization are executed (e.g. centralization), CLOSE0 and VOLUME0 will be 0.
fields = []
names = []
for i in range(59, 0, -1):
fields += ["Ref($close, %d)/$close" % i]
names += ["CLOSE%d" % i]
fields += ["$close/$close"]
names += ["CLOSE0"]
for i in range(59, 0, -1):
fields += ["Ref($open, %d)/$close" % i]
names += ["OPEN%d" % i]
fields += ["$open/$close"]
names += ["OPEN0"]
for i in range(59, 0, -1):
fields += ["Ref($high, %d)/$close" % i]
names += ["HIGH%d" % i]
fields += ["$high/$close"]
names += ["HIGH0"]
for i in range(59, 0, -1):
fields += ["Ref($low, %d)/$close" % i]
names += ["LOW%d" % i]
fields += ["$low/$close"]
names += ["LOW0"]
for i in range(59, 0, -1):
fields += ["Ref($vwap, %d)/$close" % i]
names += ["VWAP%d" % i]
fields += ["$vwap/$close"]
names += ["VWAP0"]
for i in range(59, 0, -1):
fields += ["Ref($volume, %d)/($volume+1e-12)" % i]
names += ["VOLUME%d" % i]
fields += ["$volume/($volume+1e-12)"]
names += ["VOLUME0"]
return fields, names
class Alpha360vwap(Alpha360):
def get_label_config(self):
@@ -190,242 +146,11 @@ class Alpha158(DataHandlerLP):
},
"rolling": {},
}
return self.parse_config_to_fields(conf)
return Alpha158DL.get_feature_config(conf)
def get_label_config(self):
return ["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"]
@staticmethod
def parse_config_to_fields(config):
"""create factors from config
config = {
'kbar': {}, # whether to use some hard-code kbar features
'price': { # whether to use raw price features
'windows': [0, 1, 2, 3, 4], # use price at n days ago
'feature': ['OPEN', 'HIGH', 'LOW'] # which price field to use
},
'volume': { # whether to use raw volume features
'windows': [0, 1, 2, 3, 4], # use volume at n days ago
},
'rolling': { # whether to use rolling operator based features
'windows': [5, 10, 20, 30, 60], # rolling windows size
'include': ['ROC', 'MA', 'STD'], # rolling operator to use
#if include is None we will use default operators
'exclude': ['RANK'], # rolling operator not to use
}
}
"""
fields = []
names = []
if "kbar" in config:
fields += [
"($close-$open)/$open",
"($high-$low)/$open",
"($close-$open)/($high-$low+1e-12)",
"($high-Greater($open, $close))/$open",
"($high-Greater($open, $close))/($high-$low+1e-12)",
"(Less($open, $close)-$low)/$open",
"(Less($open, $close)-$low)/($high-$low+1e-12)",
"(2*$close-$high-$low)/$open",
"(2*$close-$high-$low)/($high-$low+1e-12)",
]
names += [
"KMID",
"KLEN",
"KMID2",
"KUP",
"KUP2",
"KLOW",
"KLOW2",
"KSFT",
"KSFT2",
]
if "price" in config:
windows = config["price"].get("windows", range(5))
feature = config["price"].get("feature", ["OPEN", "HIGH", "LOW", "CLOSE", "VWAP"])
for field in feature:
field = field.lower()
fields += ["Ref($%s, %d)/$close" % (field, d) if d != 0 else "$%s/$close" % field for d in windows]
names += [field.upper() + str(d) for d in windows]
if "volume" in config:
windows = config["volume"].get("windows", range(5))
fields += ["Ref($volume, %d)/($volume+1e-12)" % d if d != 0 else "$volume/($volume+1e-12)" for d in windows]
names += ["VOLUME" + str(d) for d in windows]
if "rolling" in config:
windows = config["rolling"].get("windows", [5, 10, 20, 30, 60])
include = config["rolling"].get("include", None)
exclude = config["rolling"].get("exclude", [])
# `exclude` in dataset config unnecessary filed
# `include` in dataset config necessary field
def use(x):
return x not in exclude and (include is None or x in include)
# Some factor ref: https://guorn.com/static/upload/file/3/134065454575605.pdf
if use("ROC"):
# https://www.investopedia.com/terms/r/rateofchange.asp
# Rate of change, the price change in the past d days, divided by latest close price to remove unit
fields += ["Ref($close, %d)/$close" % d for d in windows]
names += ["ROC%d" % d for d in windows]
if use("MA"):
# https://www.investopedia.com/ask/answers/071414/whats-difference-between-moving-average-and-weighted-moving-average.asp
# Simple Moving Average, the simple moving average in the past d days, divided by latest close price to remove unit
fields += ["Mean($close, %d)/$close" % d for d in windows]
names += ["MA%d" % d for d in windows]
if use("STD"):
# The standard diviation of close price for the past d days, divided by latest close price to remove unit
fields += ["Std($close, %d)/$close" % d for d in windows]
names += ["STD%d" % d for d in windows]
if use("BETA"):
# The rate of close price change in the past d days, divided by latest close price to remove unit
# For example, price increase 10 dollar per day in the past d days, then Slope will be 10.
fields += ["Slope($close, %d)/$close" % d for d in windows]
names += ["BETA%d" % d for d in windows]
if use("RSQR"):
# The R-sqaure value of linear regression for the past d days, represent the trend linear
fields += ["Rsquare($close, %d)" % d for d in windows]
names += ["RSQR%d" % d for d in windows]
if use("RESI"):
# The redisdual for linear regression for the past d days, represent the trend linearity for past d days.
fields += ["Resi($close, %d)/$close" % d for d in windows]
names += ["RESI%d" % d for d in windows]
if use("MAX"):
# The max price for past d days, divided by latest close price to remove unit
fields += ["Max($high, %d)/$close" % d for d in windows]
names += ["MAX%d" % d for d in windows]
if use("LOW"):
# The low price for past d days, divided by latest close price to remove unit
fields += ["Min($low, %d)/$close" % d for d in windows]
names += ["MIN%d" % d for d in windows]
if use("QTLU"):
# The 80% quantile of past d day's close price, divided by latest close price to remove unit
# Used with MIN and MAX
fields += ["Quantile($close, %d, 0.8)/$close" % d for d in windows]
names += ["QTLU%d" % d for d in windows]
if use("QTLD"):
# The 20% quantile of past d day's close price, divided by latest close price to remove unit
fields += ["Quantile($close, %d, 0.2)/$close" % d for d in windows]
names += ["QTLD%d" % d for d in windows]
if use("RANK"):
# Get the percentile of current close price in past d day's close price.
# Represent the current price level comparing to past N days, add additional information to moving average.
fields += ["Rank($close, %d)" % d for d in windows]
names += ["RANK%d" % d for d in windows]
if use("RSV"):
# Represent the price position between upper and lower resistent price for past d days.
fields += ["($close-Min($low, %d))/(Max($high, %d)-Min($low, %d)+1e-12)" % (d, d, d) for d in windows]
names += ["RSV%d" % d for d in windows]
if use("IMAX"):
# The number of days between current date and previous highest price date.
# Part of Aroon Indicator https://www.investopedia.com/terms/a/aroon.asp
# The indicator measures the time between highs and the time between lows over a time period.
# The idea is that strong uptrends will regularly see new highs, and strong downtrends will regularly see new lows.
fields += ["IdxMax($high, %d)/%d" % (d, d) for d in windows]
names += ["IMAX%d" % d for d in windows]
if use("IMIN"):
# The number of days between current date and previous lowest price date.
# Part of Aroon Indicator https://www.investopedia.com/terms/a/aroon.asp
# The indicator measures the time between highs and the time between lows over a time period.
# The idea is that strong uptrends will regularly see new highs, and strong downtrends will regularly see new lows.
fields += ["IdxMin($low, %d)/%d" % (d, d) for d in windows]
names += ["IMIN%d" % d for d in windows]
if use("IMXD"):
# The time period between previous lowest-price date occur after highest price date.
# Large value suggest downward momemtum.
fields += ["(IdxMax($high, %d)-IdxMin($low, %d))/%d" % (d, d, d) for d in windows]
names += ["IMXD%d" % d for d in windows]
if use("CORR"):
# The correlation between absolute close price and log scaled trading volume
fields += ["Corr($close, Log($volume+1), %d)" % d for d in windows]
names += ["CORR%d" % d for d in windows]
if use("CORD"):
# The correlation between price change ratio and volume change ratio
fields += ["Corr($close/Ref($close,1), Log($volume/Ref($volume, 1)+1), %d)" % d for d in windows]
names += ["CORD%d" % d for d in windows]
if use("CNTP"):
# The percentage of days in past d days that price go up.
fields += ["Mean($close>Ref($close, 1), %d)" % d for d in windows]
names += ["CNTP%d" % d for d in windows]
if use("CNTN"):
# The percentage of days in past d days that price go down.
fields += ["Mean($close<Ref($close, 1), %d)" % d for d in windows]
names += ["CNTN%d" % d for d in windows]
if use("CNTD"):
# The diff between past up day and past down day
fields += ["Mean($close>Ref($close, 1), %d)-Mean($close<Ref($close, 1), %d)" % (d, d) for d in windows]
names += ["CNTD%d" % d for d in windows]
if use("SUMP"):
# The total gain / the absolute total price changed
# Similar to RSI indicator. https://www.investopedia.com/terms/r/rsi.asp
fields += [
"Sum(Greater($close-Ref($close, 1), 0), %d)/(Sum(Abs($close-Ref($close, 1)), %d)+1e-12)" % (d, d)
for d in windows
]
names += ["SUMP%d" % d for d in windows]
if use("SUMN"):
# The total lose / the absolute total price changed
# Can be derived from SUMP by SUMN = 1 - SUMP
# Similar to RSI indicator. https://www.investopedia.com/terms/r/rsi.asp
fields += [
"Sum(Greater(Ref($close, 1)-$close, 0), %d)/(Sum(Abs($close-Ref($close, 1)), %d)+1e-12)" % (d, d)
for d in windows
]
names += ["SUMN%d" % d for d in windows]
if use("SUMD"):
# The diff ratio between total gain and total lose
# Similar to RSI indicator. https://www.investopedia.com/terms/r/rsi.asp
fields += [
"(Sum(Greater($close-Ref($close, 1), 0), %d)-Sum(Greater(Ref($close, 1)-$close, 0), %d))"
"/(Sum(Abs($close-Ref($close, 1)), %d)+1e-12)" % (d, d, d)
for d in windows
]
names += ["SUMD%d" % d for d in windows]
if use("VMA"):
# Simple Volume Moving average: https://www.barchart.com/education/technical-indicators/volume_moving_average
fields += ["Mean($volume, %d)/($volume+1e-12)" % d for d in windows]
names += ["VMA%d" % d for d in windows]
if use("VSTD"):
# The standard deviation for volume in past d days.
fields += ["Std($volume, %d)/($volume+1e-12)" % d for d in windows]
names += ["VSTD%d" % d for d in windows]
if use("WVMA"):
# The volume weighted price change volatility
fields += [
"Std(Abs($close/Ref($close, 1)-1)*$volume, %d)/(Mean(Abs($close/Ref($close, 1)-1)*$volume, %d)+1e-12)"
% (d, d)
for d in windows
]
names += ["WVMA%d" % d for d in windows]
if use("VSUMP"):
# The total volume increase / the absolute total volume changed
fields += [
"Sum(Greater($volume-Ref($volume, 1), 0), %d)/(Sum(Abs($volume-Ref($volume, 1)), %d)+1e-12)"
% (d, d)
for d in windows
]
names += ["VSUMP%d" % d for d in windows]
if use("VSUMN"):
# The total volume increase / the absolute total volume changed
# Can be derived from VSUMP by VSUMN = 1 - VSUMP
fields += [
"Sum(Greater(Ref($volume, 1)-$volume, 0), %d)/(Sum(Abs($volume-Ref($volume, 1)), %d)+1e-12)"
% (d, d)
for d in windows
]
names += ["VSUMN%d" % d for d in windows]
if use("VSUMD"):
# The diff ratio between total volume increase and total volume decrease
# RSI indicator for volume
fields += [
"(Sum(Greater($volume-Ref($volume, 1), 0), %d)-Sum(Greater(Ref($volume, 1)-$volume, 0), %d))"
"/(Sum(Abs($volume-Ref($volume, 1)), %d)+1e-12)" % (d, d, d)
for d in windows
]
names += ["VSUMD%d" % d for d in windows]
return fields, names
class Alpha158vwap(Alpha158):
def get_label_config(self):

310
qlib/contrib/data/loader.py Normal file
View File

@@ -0,0 +1,310 @@
from qlib.data.dataset.loader import QlibDataLoader
class Alpha360DL(QlibDataLoader):
"""Dataloader to get Alpha360"""
def __init__(self, config=None, **kwargs):
_config = {
"feature": self.get_feature_config(),
}
if config is not None:
_config.update(config)
super().__init__(config=_config, **kwargs)
@staticmethod
def get_feature_config():
# NOTE:
# Alpha360 tries to provide a dataset with original price data
# the original price data includes the prices and volume in the last 60 days.
# To make it easier to learn models from this dataset, all the prices and volume
# are normalized by the latest price and volume data ( dividing by $close, $volume)
# So the latest normalized $close will be 1 (with name CLOSE0), the latest normalized $volume will be 1 (with name VOLUME0)
# If further normalization are executed (e.g. centralization), CLOSE0 and VOLUME0 will be 0.
fields = []
names = []
for i in range(59, 0, -1):
fields += ["Ref($close, %d)/$close" % i]
names += ["CLOSE%d" % i]
fields += ["$close/$close"]
names += ["CLOSE0"]
for i in range(59, 0, -1):
fields += ["Ref($open, %d)/$close" % i]
names += ["OPEN%d" % i]
fields += ["$open/$close"]
names += ["OPEN0"]
for i in range(59, 0, -1):
fields += ["Ref($high, %d)/$close" % i]
names += ["HIGH%d" % i]
fields += ["$high/$close"]
names += ["HIGH0"]
for i in range(59, 0, -1):
fields += ["Ref($low, %d)/$close" % i]
names += ["LOW%d" % i]
fields += ["$low/$close"]
names += ["LOW0"]
for i in range(59, 0, -1):
fields += ["Ref($vwap, %d)/$close" % i]
names += ["VWAP%d" % i]
fields += ["$vwap/$close"]
names += ["VWAP0"]
for i in range(59, 0, -1):
fields += ["Ref($volume, %d)/($volume+1e-12)" % i]
names += ["VOLUME%d" % i]
fields += ["$volume/($volume+1e-12)"]
names += ["VOLUME0"]
return fields, names
class Alpha158DL(QlibDataLoader):
"""Dataloader to get Alpha158"""
def __init__(self, config=None, **kwargs):
_config = {
"feature": self.get_feature_config(),
}
if config is not None:
_config.update(config)
super().__init__(config=_config, **kwargs)
@staticmethod
def get_feature_config(
config={
"kbar": {},
"price": {
"windows": [0],
"feature": ["OPEN", "HIGH", "LOW", "VWAP"],
},
"rolling": {},
}
):
"""create factors from config
config = {
'kbar': {}, # whether to use some hard-code kbar features
'price': { # whether to use raw price features
'windows': [0, 1, 2, 3, 4], # use price at n days ago
'feature': ['OPEN', 'HIGH', 'LOW'] # which price field to use
},
'volume': { # whether to use raw volume features
'windows': [0, 1, 2, 3, 4], # use volume at n days ago
},
'rolling': { # whether to use rolling operator based features
'windows': [5, 10, 20, 30, 60], # rolling windows size
'include': ['ROC', 'MA', 'STD'], # rolling operator to use
#if include is None we will use default operators
'exclude': ['RANK'], # rolling operator not to use
}
}
"""
fields = []
names = []
if "kbar" in config:
fields += [
"($close-$open)/$open",
"($high-$low)/$open",
"($close-$open)/($high-$low+1e-12)",
"($high-Greater($open, $close))/$open",
"($high-Greater($open, $close))/($high-$low+1e-12)",
"(Less($open, $close)-$low)/$open",
"(Less($open, $close)-$low)/($high-$low+1e-12)",
"(2*$close-$high-$low)/$open",
"(2*$close-$high-$low)/($high-$low+1e-12)",
]
names += [
"KMID",
"KLEN",
"KMID2",
"KUP",
"KUP2",
"KLOW",
"KLOW2",
"KSFT",
"KSFT2",
]
if "price" in config:
windows = config["price"].get("windows", range(5))
feature = config["price"].get("feature", ["OPEN", "HIGH", "LOW", "CLOSE", "VWAP"])
for field in feature:
field = field.lower()
fields += ["Ref($%s, %d)/$close" % (field, d) if d != 0 else "$%s/$close" % field for d in windows]
names += [field.upper() + str(d) for d in windows]
if "volume" in config:
windows = config["volume"].get("windows", range(5))
fields += ["Ref($volume, %d)/($volume+1e-12)" % d if d != 0 else "$volume/($volume+1e-12)" for d in windows]
names += ["VOLUME" + str(d) for d in windows]
if "rolling" in config:
windows = config["rolling"].get("windows", [5, 10, 20, 30, 60])
include = config["rolling"].get("include", None)
exclude = config["rolling"].get("exclude", [])
# `exclude` in dataset config unnecessary filed
# `include` in dataset config necessary field
def use(x):
return x not in exclude and (include is None or x in include)
# Some factor ref: https://guorn.com/static/upload/file/3/134065454575605.pdf
if use("ROC"):
# https://www.investopedia.com/terms/r/rateofchange.asp
# Rate of change, the price change in the past d days, divided by latest close price to remove unit
fields += ["Ref($close, %d)/$close" % d for d in windows]
names += ["ROC%d" % d for d in windows]
if use("MA"):
# https://www.investopedia.com/ask/answers/071414/whats-difference-between-moving-average-and-weighted-moving-average.asp
# Simple Moving Average, the simple moving average in the past d days, divided by latest close price to remove unit
fields += ["Mean($close, %d)/$close" % d for d in windows]
names += ["MA%d" % d for d in windows]
if use("STD"):
# The standard diviation of close price for the past d days, divided by latest close price to remove unit
fields += ["Std($close, %d)/$close" % d for d in windows]
names += ["STD%d" % d for d in windows]
if use("BETA"):
# The rate of close price change in the past d days, divided by latest close price to remove unit
# For example, price increase 10 dollar per day in the past d days, then Slope will be 10.
fields += ["Slope($close, %d)/$close" % d for d in windows]
names += ["BETA%d" % d for d in windows]
if use("RSQR"):
# The R-sqaure value of linear regression for the past d days, represent the trend linear
fields += ["Rsquare($close, %d)" % d for d in windows]
names += ["RSQR%d" % d for d in windows]
if use("RESI"):
# The redisdual for linear regression for the past d days, represent the trend linearity for past d days.
fields += ["Resi($close, %d)/$close" % d for d in windows]
names += ["RESI%d" % d for d in windows]
if use("MAX"):
# The max price for past d days, divided by latest close price to remove unit
fields += ["Max($high, %d)/$close" % d for d in windows]
names += ["MAX%d" % d for d in windows]
if use("LOW"):
# The low price for past d days, divided by latest close price to remove unit
fields += ["Min($low, %d)/$close" % d for d in windows]
names += ["MIN%d" % d for d in windows]
if use("QTLU"):
# The 80% quantile of past d day's close price, divided by latest close price to remove unit
# Used with MIN and MAX
fields += ["Quantile($close, %d, 0.8)/$close" % d for d in windows]
names += ["QTLU%d" % d for d in windows]
if use("QTLD"):
# The 20% quantile of past d day's close price, divided by latest close price to remove unit
fields += ["Quantile($close, %d, 0.2)/$close" % d for d in windows]
names += ["QTLD%d" % d for d in windows]
if use("RANK"):
# Get the percentile of current close price in past d day's close price.
# Represent the current price level comparing to past N days, add additional information to moving average.
fields += ["Rank($close, %d)" % d for d in windows]
names += ["RANK%d" % d for d in windows]
if use("RSV"):
# Represent the price position between upper and lower resistent price for past d days.
fields += ["($close-Min($low, %d))/(Max($high, %d)-Min($low, %d)+1e-12)" % (d, d, d) for d in windows]
names += ["RSV%d" % d for d in windows]
if use("IMAX"):
# The number of days between current date and previous highest price date.
# Part of Aroon Indicator https://www.investopedia.com/terms/a/aroon.asp
# The indicator measures the time between highs and the time between lows over a time period.
# The idea is that strong uptrends will regularly see new highs, and strong downtrends will regularly see new lows.
fields += ["IdxMax($high, %d)/%d" % (d, d) for d in windows]
names += ["IMAX%d" % d for d in windows]
if use("IMIN"):
# The number of days between current date and previous lowest price date.
# Part of Aroon Indicator https://www.investopedia.com/terms/a/aroon.asp
# The indicator measures the time between highs and the time between lows over a time period.
# The idea is that strong uptrends will regularly see new highs, and strong downtrends will regularly see new lows.
fields += ["IdxMin($low, %d)/%d" % (d, d) for d in windows]
names += ["IMIN%d" % d for d in windows]
if use("IMXD"):
# The time period between previous lowest-price date occur after highest price date.
# Large value suggest downward momemtum.
fields += ["(IdxMax($high, %d)-IdxMin($low, %d))/%d" % (d, d, d) for d in windows]
names += ["IMXD%d" % d for d in windows]
if use("CORR"):
# The correlation between absolute close price and log scaled trading volume
fields += ["Corr($close, Log($volume+1), %d)" % d for d in windows]
names += ["CORR%d" % d for d in windows]
if use("CORD"):
# The correlation between price change ratio and volume change ratio
fields += ["Corr($close/Ref($close,1), Log($volume/Ref($volume, 1)+1), %d)" % d for d in windows]
names += ["CORD%d" % d for d in windows]
if use("CNTP"):
# The percentage of days in past d days that price go up.
fields += ["Mean($close>Ref($close, 1), %d)" % d for d in windows]
names += ["CNTP%d" % d for d in windows]
if use("CNTN"):
# The percentage of days in past d days that price go down.
fields += ["Mean($close<Ref($close, 1), %d)" % d for d in windows]
names += ["CNTN%d" % d for d in windows]
if use("CNTD"):
# The diff between past up day and past down day
fields += ["Mean($close>Ref($close, 1), %d)-Mean($close<Ref($close, 1), %d)" % (d, d) for d in windows]
names += ["CNTD%d" % d for d in windows]
if use("SUMP"):
# The total gain / the absolute total price changed
# Similar to RSI indicator. https://www.investopedia.com/terms/r/rsi.asp
fields += [
"Sum(Greater($close-Ref($close, 1), 0), %d)/(Sum(Abs($close-Ref($close, 1)), %d)+1e-12)" % (d, d)
for d in windows
]
names += ["SUMP%d" % d for d in windows]
if use("SUMN"):
# The total lose / the absolute total price changed
# Can be derived from SUMP by SUMN = 1 - SUMP
# Similar to RSI indicator. https://www.investopedia.com/terms/r/rsi.asp
fields += [
"Sum(Greater(Ref($close, 1)-$close, 0), %d)/(Sum(Abs($close-Ref($close, 1)), %d)+1e-12)" % (d, d)
for d in windows
]
names += ["SUMN%d" % d for d in windows]
if use("SUMD"):
# The diff ratio between total gain and total lose
# Similar to RSI indicator. https://www.investopedia.com/terms/r/rsi.asp
fields += [
"(Sum(Greater($close-Ref($close, 1), 0), %d)-Sum(Greater(Ref($close, 1)-$close, 0), %d))"
"/(Sum(Abs($close-Ref($close, 1)), %d)+1e-12)" % (d, d, d)
for d in windows
]
names += ["SUMD%d" % d for d in windows]
if use("VMA"):
# Simple Volume Moving average: https://www.barchart.com/education/technical-indicators/volume_moving_average
fields += ["Mean($volume, %d)/($volume+1e-12)" % d for d in windows]
names += ["VMA%d" % d for d in windows]
if use("VSTD"):
# The standard deviation for volume in past d days.
fields += ["Std($volume, %d)/($volume+1e-12)" % d for d in windows]
names += ["VSTD%d" % d for d in windows]
if use("WVMA"):
# The volume weighted price change volatility
fields += [
"Std(Abs($close/Ref($close, 1)-1)*$volume, %d)/(Mean(Abs($close/Ref($close, 1)-1)*$volume, %d)+1e-12)"
% (d, d)
for d in windows
]
names += ["WVMA%d" % d for d in windows]
if use("VSUMP"):
# The total volume increase / the absolute total volume changed
fields += [
"Sum(Greater($volume-Ref($volume, 1), 0), %d)/(Sum(Abs($volume-Ref($volume, 1)), %d)+1e-12)"
% (d, d)
for d in windows
]
names += ["VSUMP%d" % d for d in windows]
if use("VSUMN"):
# The total volume increase / the absolute total volume changed
# Can be derived from VSUMP by VSUMN = 1 - VSUMP
fields += [
"Sum(Greater(Ref($volume, 1)-$volume, 0), %d)/(Sum(Abs($volume-Ref($volume, 1)), %d)+1e-12)"
% (d, d)
for d in windows
]
names += ["VSUMN%d" % d for d in windows]
if use("VSUMD"):
# The diff ratio between total volume increase and total volume decrease
# RSI indicator for volume
fields += [
"(Sum(Greater($volume-Ref($volume, 1), 0), %d)-Sum(Greater(Ref($volume, 1)-$volume, 0), %d))"
"/(Sum(Abs($volume-Ref($volume, 1)), %d)+1e-12)" % (d, d, d)
for d in windows
]
names += ["VSUMD%d" % d for d in windows]
return fields, names

View File

@@ -243,7 +243,7 @@ class MetaDatasetDS(MetaTaskDataset):
trunc_days: int = None,
rolling_ext_days: int = 0,
exp_name: Union[str, InternalData],
segments: Union[Dict[Text, Tuple], float],
segments: Union[Dict[Text, Tuple], float, str],
hist_step_n: int = 10,
task_mode: str = MetaTask.PROC_MODE_FULL,
fill_method: str = "max",
@@ -271,12 +271,16 @@ class MetaDatasetDS(MetaTaskDataset):
- str: the name of the experiment to store the performance of data
- InternalData: a prepared internal data
segments: Union[Dict[Text, Tuple], float]
the segments to divide data
both left and right
if the segment is a Dict
the segments to divide data
both left and right are included
if segments is a float:
the float represents the percentage of data for training
if segments is a string:
it will try its best to put its data in training and ensure that the date `segments` is in the test set
hist_step_n: int
length of historical steps for the meta infomation
Number of steps of the data similarity information
task_mode : str
Please refer to the docs of MetaTask
"""
@@ -383,10 +387,30 @@ class MetaDatasetDS(MetaTaskDataset):
if isinstance(self.segments, float):
train_task_n = int(len(self.meta_task_l) * self.segments)
if segment == "train":
return self.meta_task_l[:train_task_n]
train_tasks = self.meta_task_l[:train_task_n]
get_module_logger("MetaDatasetDS").info(f"The first train meta task: {train_tasks[0]}")
return train_tasks
elif segment == "test":
return self.meta_task_l[train_task_n:]
test_tasks = self.meta_task_l[train_task_n:]
get_module_logger("MetaDatasetDS").info(f"The first test meta task: {test_tasks[0]}")
return test_tasks
else:
raise NotImplementedError(f"This type of input is not supported")
elif isinstance(self.segments, str):
train_tasks = []
test_tasks = []
for t in self.meta_task_l:
test_end = t.task["dataset"]["kwargs"]["segments"]["test"][1]
if test_end is None or pd.Timestamp(test_end) < pd.Timestamp(self.segments):
train_tasks.append(t)
else:
test_tasks.append(t)
get_module_logger("MetaDatasetDS").info(f"The first train meta task: {train_tasks[0]}")
get_module_logger("MetaDatasetDS").info(f"The first test meta task: {test_tasks[0]}")
if segment == "train":
return train_tasks
elif segment == "test":
return test_tasks
raise NotImplementedError(f"This type of input is not supported")
else:
raise NotImplementedError(f"This type of input is not supported")

View File

@@ -53,7 +53,12 @@ class MetaModelDS(MetaTaskModel):
max_epoch=100,
seed=43,
alpha=0.0,
loss_skip_thresh=50,
):
"""
loss_skip_size: int
The number of threshold to skip the loss calculation for each day.
"""
self.step = step
self.hist_step_n = hist_step_n
self.clip_method = clip_method
@@ -63,6 +68,7 @@ class MetaModelDS(MetaTaskModel):
self.max_epoch = max_epoch
self.fitted = False
self.alpha = alpha
self.loss_skip_thresh = loss_skip_thresh
torch.manual_seed(seed)
def run_epoch(self, phase, task_list, epoch, opt, loss_l, ignore_weight=False):
@@ -88,12 +94,14 @@ class MetaModelDS(MetaTaskModel):
criterion = nn.MSELoss()
loss = criterion(pred, meta_input["y_test"])
elif self.criterion == "ic_loss":
criterion = ICLoss()
criterion = ICLoss(self.loss_skip_thresh)
try:
loss = criterion(pred, meta_input["y_test"], meta_input["test_idx"], skip_size=50)
loss = criterion(pred, meta_input["y_test"], meta_input["test_idx"])
except ValueError as e:
get_module_logger("MetaModelDS").warning(f"Exception `{e}` when calculating IC loss")
continue
else:
raise ValueError(f"Unknown criterion: {self.criterion}")
assert not np.isnan(loss.detach().item()), "NaN loss!"

View File

@@ -10,7 +10,11 @@ from qlib.log import get_module_logger
class ICLoss(nn.Module):
def forward(self, pred, y, idx, skip_size=50):
def __init__(self, skip_size=50):
super().__init__()
self.skip_size = skip_size
def forward(self, pred, y, idx):
"""forward.
FIXME:
- Some times it will be a slightly different from the result from `pandas.corr()`
@@ -33,7 +37,7 @@ class ICLoss(nn.Module):
skip_n = 0
for start_i, end_i in zip(diff_point, diff_point[1:]):
pred_focus = pred[start_i:end_i] # TODO: just for fake
if pred_focus.shape[0] < skip_size:
if pred_focus.shape[0] < self.skip_size:
# skip some days which have very small amount of stock.
skip_n += 1
continue
@@ -50,6 +54,7 @@ class ICLoss(nn.Module):
)
ic_all += ic_day
if len(diff_point) - 1 - skip_n <= 0:
__import__("ipdb").set_trace()
raise ValueError("No enough data for calculating IC")
if skip_n > 0:
get_module_logger("ICLoss").info(

View File

@@ -63,6 +63,7 @@ class LinearModel(Model):
df_train = pd.concat([df_train, df_valid])
except KeyError:
get_module_logger("LinearModel").info("include_valid=True, but valid does not exist")
df_train = df_train.dropna()
if df_train.empty:
raise ValueError("Empty data from dataset, please check your dataset config.")
if reweighter is not None:

View File

@@ -160,6 +160,10 @@ class ALSTM(Model):
if self.metric in ("", "loss"):
return -self.loss_fn(pred[mask], label[mask])
elif self.metric == "mse":
mask = ~torch.isnan(label)
weight = torch.ones_like(label)
return -self.mse(pred[mask], label[mask], weight[mask])
raise ValueError("unknown metric `%s`" % self.metric)

View File

@@ -0,0 +1,663 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from __future__ import division
from __future__ import print_function
from torch.utils.data import DataLoader, RandomSampler, StackDataset
import os
import numpy as np
import pandas as pd
from typing import Callable, Optional, Text, Union
from sklearn.metrics import roc_auc_score, mean_squared_error
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import StackDataset
from qlib.data.dataset.weight import Reweighter
from .pytorch_utils import count_parameters
from ...model.base import Model
from ...data.dataset import DatasetH, TSDatasetH
from ...data.dataset.handler import DataHandlerLP
from ...utils import (
auto_filter_kwargs,
init_instance_by_config,
unpack_archive_with_buffer,
save_multiple_parts_file,
get_or_create_path,
)
from ...log import get_module_logger
from ...workflow import R
from qlib.contrib.meta.data_selection.utils import ICLoss
from torch.nn import DataParallel
class GeneralPTNN(Model):
"""General Pytorch Neural Network Model
Parameters
----------
input_dim : int
input dimension
output_dim : int
output dimension
layers : tuple
layer sizes
lr : float
learning rate
optimizer : str
optimizer name
GPU : int
the GPU ID used for training
"""
def __init__(
self,
lr=0.001,
max_steps=300,
batch_size=2000,
early_stop_rounds=50,
eval_steps=20,
optimizer="gd",
loss="mse",
GPU=0,
seed=None,
weight_decay=0.0,
data_parall=False,
scheduler: Optional[Union[Callable]] = "default", # when it is Callable, it accept one argument named optimizer
init_model=None,
eval_train_metric=False,
pt_model_uri="qlib.contrib.model.pytorch_nn.Net",
pt_model_kwargs={
"input_dim": 360,
"layers": (256,),
},
valid_key=DataHandlerLP.DK_L,
# TODO: Infer Key is a more reasonable key. But it requires more detailed processing on label processing
):
# Set logger.
self.logger = get_module_logger("DNNModelPytorch")
self.logger.info("DNN pytorch version...")
# set hyper-parameters.
self.lr = lr
self.max_steps = max_steps
self.batch_size = batch_size
self.early_stop_rounds = early_stop_rounds
self.eval_steps = eval_steps
self.optimizer = optimizer.lower()
self.loss_type = loss
if isinstance(GPU, str):
self.device = torch.device(GPU)
else:
self.device = torch.device("cuda:%d" % (GPU) if torch.cuda.is_available() and GPU >= 0 else "cpu")
self.seed = seed
self.weight_decay = weight_decay
self.data_parall = data_parall
self.eval_train_metric = eval_train_metric
self.valid_key = valid_key
self.best_step = None
self.logger.info(
"DNN parameters setting:"
f"\nlr : {lr}"
f"\nmax_steps : {max_steps}"
f"\nbatch_size : {batch_size}"
f"\nearly_stop_rounds : {early_stop_rounds}"
f"\neval_steps : {eval_steps}"
f"\noptimizer : {optimizer}"
f"\nloss_type : {loss}"
f"\nseed : {seed}"
f"\ndevice : {self.device}"
f"\nuse_GPU : {self.use_gpu}"
f"\nweight_decay : {weight_decay}"
f"\nenable data parall : {self.data_parall}"
f"\npt_model_uri: {pt_model_uri}"
f"\npt_model_kwargs: {pt_model_kwargs}"
)
if self.seed is not None:
np.random.seed(self.seed)
torch.manual_seed(self.seed)
if loss not in {"mse", "binary"}:
raise NotImplementedError("loss {} is not supported!".format(loss))
self._scorer = mean_squared_error if loss == "mse" else roc_auc_score
if init_model is None:
self.dnn_model = init_instance_by_config({"class": pt_model_uri, "kwargs": pt_model_kwargs})
if self.data_parall:
self.dnn_model = DataParallel(self.dnn_model).to(self.device)
else:
self.dnn_model = init_model
self.logger.info("model:\n{:}".format(self.dnn_model))
self.logger.info("model size: {:.4f} MB".format(count_parameters(self.dnn_model)))
if optimizer.lower() == "adam":
self.train_optimizer = optim.Adam(self.dnn_model.parameters(), lr=self.lr, weight_decay=self.weight_decay)
elif optimizer.lower() == "gd":
self.train_optimizer = optim.SGD(self.dnn_model.parameters(), lr=self.lr, weight_decay=self.weight_decay)
else:
raise NotImplementedError("optimizer {} is not supported!".format(optimizer))
if scheduler == "default":
# Reduce learning rate when loss has stopped decrease
self.scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
self.train_optimizer,
mode="min",
factor=0.5,
patience=10,
verbose=True,
threshold=0.0001,
threshold_mode="rel",
cooldown=0,
min_lr=0.00001,
eps=1e-08,
)
elif scheduler is None:
self.scheduler = None
else:
self.scheduler = scheduler(optimizer=self.train_optimizer)
self.dnn_model.to(self.device)
@property
def use_gpu(self):
return self.device != torch.device("cpu")
def _eval_valid_dl(self, valid_loader, val_index):
with torch.no_grad():
self.dnn_model.eval()
val_loss = []
val_pred = []
val_label = []
for x_batch, y_batch in valid_loader:
x_batch = x_batch.to(self.device)
y_batch = y_batch.to(self.device)
cur_loss = self.get_loss(preds, y_batch, self.loss_type)
val_loss.append(cur_loss.detach().cpu().numpy().item())
val_loss = np.mean(val_loss)
val_pred = torch.cat(val_pred, axis=0).detach().cpu().numpy()
val_label = torch.cat(val_label, axis=0).detach().cpu().numpy()
val_metric = self.get_metric(val_pred, val_label, val_index).detach().cpu().numpy().item()
return val_loss, val_metric
def fit(
self,
dataset: Union[DatasetH, TSDatasetH],
verbose=True,
save_path=None,
):
ists = isinstance(dataset, TSDatasetH) # is this time series dataset
# prepare training
train_x = dataset.prepare("train", col_set="feature", data_key=DataHandlerLP.DK_L)
train_y = dataset.prepare("train", col_set="label", data_key=DataHandlerLP.DK_L)
train_ds = StackDataset(train_x, train_y)
train_sampler = RandomSampler(train_ds)
train_loader = DataLoader(train_ds, batch_size=self.batch_size, sampler=train_sampler)
# prepare validation
valid_x = dataset.prepare("train", col_set="feature", data_key=DataHandlerLP.DK_L)
valid_y = dataset.prepare("train", col_set="label", data_key=DataHandlerLP.DK_L)
valid_ds = StackDataset(valid_x, valid_y)
valid_loader = DataLoader(valid_ds, batch_size=self.batch_size, shuffle=False)
if ists:
val_index = valid_x.data_index
else:
val_index = valid_x.index
save_path = get_or_create_path(save_path)
stop_steps = 0
train_loss = 0
best_loss = np.inf
# train
self.logger.info("training...")
for step in range(1, self.max_steps + 1):
if stop_steps >= self.early_stop_rounds:
if verbose:
self.logger.info("\tearly stop")
break
loss = AverageMeter()
self.dnn_model.train()
self.train_optimizer.zero_grad()
for x_batch, y_batch in train_loader:
x_batch = x_batch.to(self.device)
y_batch = y_batch.to(self.device)
# forward
preds = self.dnn_model(x_batch)
cur_loss = self.get_loss(preds, y_batch, self.loss_type)
cur_loss.backward()
self.train_optimizer.step()
loss.update(cur_loss.item())
R.log_metrics(train_loss=loss.avg, step=step)
# validation
train_loss += loss.val
# for every `eval_steps` steps or at the last steps, we will evaluate the model.
if step % self.eval_steps == 0 or step == self.max_steps:
stop_steps += 1
train_loss /= self.eval_steps
val_loss, val_metric = self._eval_valid_dl(valid_loader, val_index)
R.log_metrics(val_loss=val_loss, step=step)
R.log_metrics(val_metric=val_metric, step=step)
if val_loss < best_loss:
if verbose:
self.logger.info(
"\tvalid loss update from {:.6f} to {:.6f}, save checkpoint.".format(
best_loss, val_loss
)
)
best_loss = val_loss
self.best_step = step
R.log_metrics(best_step=self.best_step, step=step)
stop_steps = 0
torch.save(self.dnn_model.state_dict(), save_path)
train_loss = 0
# update learning rate
if self.scheduler is not None:
auto_filter_kwargs(self.scheduler.step, warning=False)(metrics=val_loss, epoch=step)
R.log_metrics(lr=self.get_lr(), step=step)
# restore the optimal parameters after training
self.dnn_model.load_state_dict(torch.load(save_path, map_location=self.device))
if self.use_gpu:
torch.cuda.empty_cache()
def get_lr(self):
assert len(self.train_optimizer.param_groups) == 1
return self.train_optimizer.param_groups[0]["lr"]
def get_loss(self, pred, target, loss_type, w=None):
pred, target = pred.reshape(-1), target.reshape(-1)
if w is None:
# make it ones and the same size with pred
w = torch.ones_like(pred).to(pred.device)
if loss_type == "mse":
sqr_loss = torch.mul(pred - target, pred - target)
loss = torch.mul(sqr_loss, w).mean()
return loss
elif loss_type == "binary":
loss = nn.BCEWithLogitsLoss(weight=w)
return loss(pred, target)
else:
raise NotImplementedError("loss {} is not supported!".format(loss_type))
def get_metric(self, pred, target, index):
# NOTE: the order of the index must follow <datetime, instrument> sorted order
return -ICLoss()(pred, target, index) # pylint: disable=E1130
def _nn_predict(self, data, return_cpu=True):
"""Reusing predicting NN.
Scenarios
1) test inference (data may come from CPU and expect the output data is on CPU)
2) evaluation on training (data may come from GPU)
"""
if not isinstance(data, torch.Tensor):
if isinstance(data, pd.DataFrame):
data = data.values
data = torch.Tensor(data)
data = data.to(self.device)
preds = []
self.dnn_model.eval()
with torch.no_grad():
batch_size = 8096
for i in range(0, len(data), batch_size):
x = data[i : i + batch_size]
preds.append(self.dnn_model(x.to(self.device)).detach().reshape(-1))
if return_cpu:
preds = np.concatenate([pr.cpu().numpy() for pr in preds])
else:
preds = torch.cat(preds, axis=0)
return preds
def predict(self, dataset: DatasetH, segment: Union[Text, slice] = "test"):
x_test_pd = dataset.prepare(segment, col_set="feature", data_key=DataHandlerLP.DK_I)
preds = self._nn_predict(x_test_pd)
return pd.Series(preds.reshape(-1), index=x_test_pd.index)
class AverageMeter:
"""Computes and stores the average and current value"""
def __init__(self):
self.reset()
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
from ...model.utils import ConcatDataset
class GeneralPTNN(Model):
"""
Motivation:
We want to provide a Qlib General Pytorch Model Adaptor
You can reuse it for all kinds of Pytorch models.
It should include the training and predict process
Parameters
----------
d_feat : int
input dimension for each time step
metric: str
the evaluation metric used in early stop
optimizer : str
optimizer name
GPU : str
the GPU ID(s) used for training
"""
def __init__(
self,
n_epochs=200,
lr=0.001,
metric="",
batch_size=2000,
early_stop=20,
loss="mse",
optimizer="adam",
n_jobs=10,
GPU=0,
seed=None,
pt_model_uri="qlib.contrib.model.pytorch_gru_ts.GRUModel",
pt_model_kwargs={
"d_feat":6,
"hidden_size":64,
"num_layers":2,
"dropout":0.,
},
):
# Set logger.
self.logger = get_module_logger("GeneralPTNN")
self.logger.info("GeneralPTNN pytorch version...")
# set hyper-parameters.
self.n_epochs = n_epochs
self.lr = lr
self.metric = metric
self.batch_size = batch_size
self.early_stop = early_stop
self.optimizer = optimizer.lower()
self.loss = loss
self.device = torch.device("cuda:%d" % (GPU) if torch.cuda.is_available() and GPU >= 0 else "cpu")
self.n_jobs = n_jobs
self.seed = seed
self.pt_model_uri, self.pt_model_kwargs = pt_model_uri, pt_model_kwargs
self.dnn_model = init_instance_by_config({"class": pt_model_uri, "kwargs": pt_model_kwargs})
self.logger.info(
"GeneralPTNN parameters setting:"
"\nn_epochs : {}"
"\nlr : {}"
"\nmetric : {}"
"\nbatch_size : {}"
"\nearly_stop : {}"
"\noptimizer : {}"
"\nloss_type : {}"
"\ndevice : {}"
"\nn_jobs : {}"
"\nuse_GPU : {}"
"\nseed : {}"
"\npt_model_uri: {}"
"\npt_model_kwargs: {}".format(
n_epochs,
lr,
metric,
batch_size,
early_stop,
optimizer.lower(),
loss,
self.device,
n_jobs,
self.use_gpu,
seed,
pt_model_uri,
pt_model_kwargs,
)
)
if self.seed is not None:
np.random.seed(self.seed)
torch.manual_seed(self.seed)
self.logger.info("model:\n{:}".format(self.dnn_model))
self.logger.info("model size: {:.4f} MB".format(count_parameters(self.dnn_model)))
if optimizer.lower() == "adam":
self.train_optimizer = optim.Adam(self.dnn_model.parameters(), lr=self.lr)
elif optimizer.lower() == "gd":
self.train_optimizer = optim.SGD(self.dnn_model.parameters(), lr=self.lr)
else:
raise NotImplementedError("optimizer {} is not supported!".format(optimizer))
self.fitted = False
self.dnn_model.to(self.device)
@property
def use_gpu(self):
return self.device != torch.device("cpu")
def mse(self, pred, label, weight):
loss = weight * (pred - label) ** 2
return torch.mean(loss)
def loss_fn(self, pred, label, weight=None):
mask = ~torch.isnan(label)
if weight is None:
weight = torch.ones_like(label)
if self.loss == "mse":
return self.mse(pred[mask], label[mask], weight[mask])
raise ValueError("unknown loss `%s`" % self.loss)
def metric_fn(self, pred, label):
mask = torch.isfinite(label)
if self.metric in ("", "loss"):
return -self.loss_fn(pred[mask], label[mask])
raise ValueError("unknown metric `%s`" % self.metric)
def _get_fl(self, data: torch.Tensor):
"""
get feature and label from data
- Handle the different data shape of time series and tabular data
Parameters
----------
data : torch.Tensor
input data which maybe 3 dimension or 2 dimension
- 3dim: [batch_size, time_step, feature_dim]
- 2dim: [batch_size, feature_dim]
Returns
-------
Tuple[torch.Tensor, torch.Tensor]
"""
if data.dim() == 3:
# it is a time series dataset
feature = data[:, :, 0:-1].to(self.device)
label = data[:, -1, -1].to(self.device)
elif data.dim() == 2:
# it is a tabular dataset
feature = data[:, 0:-1].to(self.device)
label = data[:, -1].to(self.device)
else:
raise ValueError("Unsupported data shape.")
return feature, label
def train_epoch(self, data_loader):
self.dnn_model.train()
for data, weight in data_loader:
feature , label = self._get_fl(data)
pred = self.dnn_model(feature.float())
loss = self.loss_fn(pred, label, weight.to(self.device))
self.train_optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_value_(self.dnn_model.parameters(), 3.0)
self.train_optimizer.step()
def test_epoch(self, data_loader):
self.dnn_model.eval()
scores = []
losses = []
for data, weight in data_loader:
feature = data[:, :, 0:-1].to(self.device)
# feature[torch.isnan(feature)] = 0
label = data[:, -1, -1].to(self.device)
with torch.no_grad():
pred = self.dnn_model(feature.float())
loss = self.loss_fn(pred, label, weight.to(self.device))
losses.append(loss.item())
score = self.metric_fn(pred, label)
scores.append(score.item())
return np.mean(losses), np.mean(scores)
def fit(
self,
dataset: Union[DatasetH, TSDatasetH],
evals_result=dict(),
save_path=None,
reweighter=None,
):
ists = isinstance(dataset, TSDatasetH) # is this time series dataset
dl_train = dataset.prepare("train", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L)
dl_valid = dataset.prepare("valid", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L)
if dl_train.empty or dl_valid.empty:
raise ValueError("Empty data from dataset, please check your dataset config.")
if reweighter is None:
wl_train = np.ones(len(dl_train))
wl_valid = np.ones(len(dl_valid))
elif isinstance(reweighter, Reweighter):
wl_train = reweighter.reweight(dl_train)
wl_valid = reweighter.reweight(dl_valid)
else:
raise ValueError("Unsupported reweighter type.")
# Preprocess for data. To align to Dataset Interface for DataLoader
if ists:
dl_train.config(fillna_type="ffill+bfill") # process nan brought by dataloader
dl_valid.config(fillna_type="ffill+bfill") # process nan brought by dataloader
else:
# If it is a tabular, we convert the dataframe to numpy to be indexable by DataLoader
dl_train = dl_train.values
dl_valid = dl_valid.values
train_loader = DataLoader(
ConcatDataset(dl_train, wl_train),
batch_size=self.batch_size,
shuffle=True,
num_workers=self.n_jobs,
drop_last=True,
)
valid_loader = DataLoader(
ConcatDataset(dl_valid, wl_valid),
batch_size=self.batch_size,
shuffle=False,
num_workers=self.n_jobs,
drop_last=True,
)
del dl_train, dl_valid, wl_train, wl_valid
save_path = get_or_create_path(save_path)
stop_steps = 0
train_loss = 0
best_score = -np.inf
best_epoch = 0
evals_result["train"] = []
evals_result["valid"] = []
# train
self.logger.info("training...")
self.fitted = True
for step in range(self.n_epochs):
self.logger.info("Epoch%d:", step)
self.logger.info("training...")
self.train_epoch(train_loader)
self.logger.info("evaluating...")
train_loss, train_score = self.test_epoch(train_loader)
val_loss, val_score = self.test_epoch(valid_loader)
self.logger.info("train %.6f, valid %.6f" % (train_score, val_score))
evals_result["train"].append(train_score)
evals_result["valid"].append(val_score)
if val_score > best_score:
best_score = val_score
stop_steps = 0
best_epoch = step
best_param = copy.deepcopy(self.dnn_model.state_dict())
else:
stop_steps += 1
if stop_steps >= self.early_stop:
self.logger.info("early stop")
break
self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch))
self.dnn_model.load_state_dict(best_param)
torch.save(best_param, save_path)
if self.use_gpu:
torch.cuda.empty_cache()
def predict(self, dataset: Union[DatasetH, TSDatasetH]):
if not self.fitted:
raise ValueError("model is not fitted yet!")
dl_test = dataset.prepare("test", col_set=["feature", "label"], data_key=DataHandlerLP.DK_I)
dl_test.config(fillna_type="ffill+bfill")
test_loader = DataLoader(dl_test, batch_size=self.batch_size, num_workers=self.n_jobs)
self.dnn_model.eval()
preds = []
for data in test_loader:
feature = data[:, :, 0:-1].to(self.device)
with torch.no_grad():
pred = self.dnn_model(feature.float()).detach().cpu().numpy()
preds.append(pred)
return pd.Series(np.concatenate(preds), index=dl_test.get_index())

View File

@@ -1,25 +1,25 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from __future__ import division
from __future__ import print_function
import copy
from typing import Text, Union
import numpy as np
import pandas as pd
from typing import Text, Union
import copy
from ...utils import get_or_create_path
from ...log import get_module_logger
import torch
import torch.nn as nn
import torch.optim as optim
from .pytorch_utils import count_parameters
from ...model.base import Model
from qlib.workflow import R
from ...data.dataset import DatasetH
from ...data.dataset.handler import DataHandlerLP
from ...log import get_module_logger
from ...model.base import Model
from ...utils import get_or_create_path
from .pytorch_utils import count_parameters
class GRU(Model):
@@ -212,16 +212,31 @@ class GRU(Model):
evals_result=dict(),
save_path=None,
):
df_train, df_valid, df_test = dataset.prepare(
["train", "valid", "test"],
col_set=["feature", "label"],
data_key=DataHandlerLP.DK_L,
)
if df_train.empty or df_valid.empty:
raise ValueError("Empty data from dataset, please check your dataset config.")
# prepare training and validation data
dfs = {
k: dataset.prepare(
k,
col_set=["feature", "label"],
data_key=DataHandlerLP.DK_L,
)
for k in ["train", "valid"]
if k in dataset.segments
}
df_train, df_valid = dfs.get("train", pd.DataFrame()), dfs.get("valid", pd.DataFrame())
# check if training data is empty
if df_train.empty:
raise ValueError("Empty training data from dataset, please check your dataset config.")
df_train = df_train.dropna()
x_train, y_train = df_train["feature"], df_train["label"]
x_valid, y_valid = df_valid["feature"], df_valid["label"]
# check if validation data is provided
if not df_valid.empty:
df_valid = df_valid.dropna()
x_valid, y_valid = df_valid["feature"], df_valid["label"]
else:
x_valid, y_valid = None, None
save_path = get_or_create_path(save_path)
stop_steps = 0
@@ -235,32 +250,42 @@ class GRU(Model):
self.logger.info("training...")
self.fitted = True
best_param = copy.deepcopy(self.gru_model.state_dict())
for step in range(self.n_epochs):
self.logger.info("Epoch%d:", step)
self.logger.info("training...")
self.train_epoch(x_train, y_train)
self.logger.info("evaluating...")
train_loss, train_score = self.test_epoch(x_train, y_train)
val_loss, val_score = self.test_epoch(x_valid, y_valid)
self.logger.info("train %.6f, valid %.6f" % (train_score, val_score))
evals_result["train"].append(train_score)
evals_result["valid"].append(val_score)
if val_score > best_score:
best_score = val_score
stop_steps = 0
best_epoch = step
best_param = copy.deepcopy(self.gru_model.state_dict())
else:
stop_steps += 1
if stop_steps >= self.early_stop:
self.logger.info("early stop")
break
# evaluate on validation data if provided
if x_valid is not None and y_valid is not None:
val_loss, val_score = self.test_epoch(x_valid, y_valid)
self.logger.info("train %.6f, valid %.6f" % (train_score, val_score))
evals_result["valid"].append(val_score)
if val_score > best_score:
best_score = val_score
stop_steps = 0
best_epoch = step
best_param = copy.deepcopy(self.gru_model.state_dict())
else:
stop_steps += 1
if stop_steps >= self.early_stop:
self.logger.info("early stop")
break
self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch))
self.gru_model.load_state_dict(best_param)
torch.save(best_param, save_path)
# Logging
rec = R.get_recorder()
for k, v_l in evals_result.items():
for i, v in enumerate(v_l):
rec.log_metrics(step=i, **{k: v})
if self.use_gpu:
torch.cuda.empty_cache()
@@ -292,6 +317,7 @@ class GRU(Model):
class GRUModel(nn.Module):
def __init__(self, d_feat=6, hidden_size=64, num_layers=2, dropout=0.0):
super().__init__()

View File

@@ -1,5 +1,17 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
"""
Here we have a comprehensive set of analysis classes.
Here is an example.
.. code-block:: python
from qlib.contrib.report.data.ana import FeaMeanStd
fa = FeaMeanStd(ret_df)
fa.plot_all(wspace=0.3, sub_figsize=(12, 3), col_n=5)
"""
import pandas as pd
import numpy as np
from qlib.contrib.report.data.base import FeaAnalyser
@@ -152,6 +164,7 @@ class FeaSkewTurt(NumFeaAnalyser):
self._kurt[col].plot(ax=right_ax, label="kurt", color="green")
right_ax.set_xlabel("")
right_ax.set_ylabel("kurt")
right_ax.grid(None) # set the grid to None to avoid two layer of grid
h1, l1 = ax.get_legend_handles_labels()
h2, l2 = right_ax.get_legend_handles_labels()
@@ -171,12 +184,15 @@ class FeaMeanStd(NumFeaAnalyser):
ax.set_xlabel("")
ax.set_ylabel("mean")
ax.legend()
ax.tick_params(axis="x", rotation=90)
right_ax = ax.twinx()
self._std[col].plot(ax=right_ax, label="std", color="green")
right_ax.set_xlabel("")
right_ax.set_ylabel("std")
right_ax.tick_params(axis="x", rotation=90)
right_ax.grid(None) # set the grid to None to avoid two layer of grid
h1, l1 = ax.get_legend_handles_labels()
h2, l2 = right_ax.get_legend_handles_labels()

View File

@@ -14,6 +14,24 @@ from qlib.contrib.report.utils import sub_fig_generator
class FeaAnalyser:
def __init__(self, dataset: pd.DataFrame):
"""
Parameters
----------
dataset : pd.DataFrame
We often have multiple columns for dataset. Each column corresponds to one sub figure.
There will be a datatime column in the index levels.
Aggretation will be used for more summarized metrics overtime.
Here is an example of data:
.. code-block::
return
datetime instrument
2007-02-06 equity_tpx 0.010087
equity_spx 0.000786
"""
self._dataset = dataset
with TimeInspector.logt("calc_stat_values"):
self.calc_stat_values()

View File

@@ -4,7 +4,7 @@ import matplotlib.pyplot as plt
import pandas as pd
def sub_fig_generator(sub_fs=(3, 3), col_n=10, row_n=1, wspace=None, hspace=None, sharex=False, sharey=False):
def sub_fig_generator(sub_figsize=(3, 3), col_n=10, row_n=1, wspace=None, hspace=None, sharex=False, sharey=False):
"""sub_fig_generator.
it will return a generator, each row contains <col_n> sub graph
@@ -13,7 +13,7 @@ def sub_fig_generator(sub_fs=(3, 3), col_n=10, row_n=1, wspace=None, hspace=None
Parameters
----------
sub_fs :
sub_figsize :
the figure size of each subgraph in <col_n> * <row_n> subgraphs
col_n :
the number of subgraph in each row; It will generating a new graph after generating <col_n> of subgraphs.
@@ -33,7 +33,7 @@ def sub_fig_generator(sub_fs=(3, 3), col_n=10, row_n=1, wspace=None, hspace=None
while True:
fig, axes = plt.subplots(
row_n, col_n, figsize=(sub_fs[0] * col_n, sub_fs[1] * row_n), sharex=sharex, sharey=sharey
row_n, col_n, figsize=(sub_figsize[0] * col_n, sub_figsize[1] * row_n), sharex=sharex, sharey=sharey
)
plt.subplots_adjust(wspace=wspace, hspace=hspace)
axes = axes.reshape(row_n, col_n)

View File

@@ -73,8 +73,8 @@ class Rolling:
The horizon of the prediction target.
This is used to override the prediction horizon of the file.
h_path : Optional[str]
the dumped data handler;
It may come from other data source. It will override the data handler in the config.
It is other data source that is dumped as a handler. It will override the data handler section in the config.
If it is not given, it will create a customized cache for the handler when `enable_handler_cache=True`
test_end : Optional[str]
the test end for the data. It is typically used together with the handler
You can do the same thing with task_ext_conf in a more complicated way
@@ -119,7 +119,7 @@ class Rolling:
with self.conf_path.open("r") as f:
return yaml.safe_load(f)
def _replace_hanler_with_cache(self, task: dict):
def _replace_handler_with_cache(self, task: dict):
"""
Due to the data processing part in original rolling is slow. So we have to
This class tries to add more feature
@@ -159,13 +159,20 @@ class Rolling:
# - get horizon automatically from the expression!!!!
raise NotImplementedError(f"This type of input is not supported")
else:
self.logger.info("The prediction horizon is overrided")
task["dataset"]["kwargs"]["handler"]["kwargs"]["label"] = [
"Ref($close, -{}) / Ref($close, -1) - 1".format(self.horizon + 1)
]
if enable_handler_cache and self.h_path is not None:
self.logger.info("Fail to override the horizon due to data handler cache")
else:
self.logger.info("The prediction horizon is overrided")
if isinstance(task["dataset"]["kwargs"]["handler"], dict):
task["dataset"]["kwargs"]["handler"]["kwargs"]["label"] = [
"Ref($close, -{}) / Ref($close, -1) - 1".format(self.horizon + 1)
]
else:
self.logger.warning("Try to automatically configure the lablel but failed.")
if enable_handler_cache:
task = self._replace_hanler_with_cache(task)
if self.h_path is not None or enable_handler_cache:
# if we already have provided data source or we want to create one
task = self._replace_handler_with_cache(task)
task = self._update_start_end_time(task)
if self.task_ext_conf is not None:
@@ -173,6 +180,16 @@ class Rolling:
self.logger.info(task)
return task
def run_basic_task(self):
"""
Run the basic task without rolling.
This is for fast testing for model tunning.
"""
task = self.basic_task()
print(task)
trainer = TrainerR(experiment_name=self.exp_name)
trainer([task])
def get_task_list(self) -> List[dict]:
"""return a batch of tasks for rolling."""
task = self.basic_task()

View File

@@ -80,6 +80,11 @@ class DDGDA(Rolling):
sim_task_model: UTIL_MODEL_TYPE = "gbdt",
meta_1st_train_end: Optional[str] = None,
alpha: float = 0.01,
loss_skip_thresh: int = 50,
fea_imp_n: Optional[int] = 30,
meta_data_proc: Optional[str] = "V01",
segments: Union[float, str] = 0.62,
hist_step_n: int = 30,
working_dir: Optional[Union[str, Path]] = None,
**kwargs,
):
@@ -94,6 +99,15 @@ class DDGDA(Rolling):
alpha: float
Setting the L2 regularization for ridge
The `alpha` is only passed to MetaModelDS (it is not passed to sim_task_model currently..)
loss_skip_thresh: int
The thresh to skip the loss calculation for each day. If the number of item is less than it, it will skip the loss on that day.
meta_data_proc : Optional[str]
How we process the meta dataset for learning meta model.
segments : Union[float, str]
if segments is a float:
The ratio of training data in the meta task dataset
if segments is a string:
it will try its best to put its data in training and ensure that the date `segments` is in the test set
"""
# NOTE:
# the horizon must match the meaning in the base task template
@@ -104,14 +118,22 @@ class DDGDA(Rolling):
super().__init__(**kwargs)
self.working_dir = self.conf_path.parent if working_dir is None else Path(working_dir)
self.proxy_hd = self.working_dir / "handler_proxy.pkl"
self.fea_imp_n = fea_imp_n
self.meta_data_proc = meta_data_proc
self.loss_skip_thresh = loss_skip_thresh
self.segments = segments
self.hist_step_n = hist_step_n
def _adjust_task(self, task: dict, astype: UTIL_MODEL_TYPE):
"""
some task are use for special purpose.
Base on the original task, we need to do some extra things.
For example:
- GBDT for calculating feature importance
- Linear or GBDT for calculating similarity
- Datset (well processed) that aligned to Linear that for meta learning
So we may need to change the dataset and model for the special purpose and other settings remains the same.
"""
# NOTE: here is just for aligning with previous implementation
# It is not necessary for the current implementation
@@ -119,12 +141,16 @@ class DDGDA(Rolling):
if astype == "gbdt":
task["model"] = LGBM_MODEL
if isinstance(handler, dict):
# We don't need preprocessing when using GBDT model
for k in ["infer_processors", "learn_processors"]:
if k in handler.setdefault("kwargs", {}):
handler["kwargs"].pop(k)
elif astype == "linear":
task["model"] = LINEAR_MODEL
handler["kwargs"].update(PROC_ARGS)
if isinstance(handler, dict):
handler["kwargs"].update(PROC_ARGS)
else:
self.logger.warning("The handler can't be adjusted.")
else:
raise ValueError(f"astype not supported: {astype}")
return task
@@ -155,12 +181,15 @@ class DDGDA(Rolling):
The meta model will be trained upon the proxy forecasting model.
This dataset is for the proxy forecasting model.
"""
topk = 30
fi = self._get_feature_importance()
col_selected = fi.nlargest(topk)
# NOTE: adjusting to `self.sim_task_model` just for aligning with previous implementation.
# In previous version. The data for proxy model is using sim_task_model's way for processing
task = self._adjust_task(self.basic_task(enable_handler_cache=False), self.sim_task_model)
task = replace_task_handler_with_cache(task, self.working_dir)
# if self.meta_data_proc is not None:
# else:
# # Otherwise, we don't need futher processing
# task = self.basic_task()
dataset = init_instance_by_config(task["dataset"])
prep_ds = dataset.prepare(slice(None), col_set=["feature", "label"], data_key=DataHandlerLP.DK_L)
@@ -168,12 +197,18 @@ class DDGDA(Rolling):
feature_df = prep_ds["feature"]
label_df = prep_ds["label"]
feature_selected = feature_df.loc[:, col_selected.index]
if self.fea_imp_n is not None:
fi = self._get_feature_importance()
col_selected = fi.nlargest(self.fea_imp_n)
feature_selected = feature_df.loc[:, col_selected.index]
else:
feature_selected = feature_df
feature_selected = feature_selected.groupby("datetime", group_keys=False).apply(
lambda df: (df - df.mean()).div(df.std())
)
feature_selected = feature_selected.fillna(0.0)
if self.meta_data_proc == "V01":
feature_selected = feature_selected.groupby("datetime", group_keys=False).apply(
lambda df: (df - df.mean()).div(df.std())
)
feature_selected = feature_selected.fillna(0.0)
df_all = {
"label": label_df.reindex(feature_selected.index),
@@ -223,7 +258,10 @@ class DDGDA(Rolling):
# 1) leverage the simplified proxy forecasting model to train meta model.
# - Only the dataset part is important, in current version of meta model will integrate the
# the train_start for training meta model does not necessarily align with final rolling
# NOTE:
# - The train_start for training meta model does not necessarily align with final rolling
# But please select a right time to make sure the finnal rolling tasks are not leaked in the training data.
# - The test_start is automatically aligned to the next day of test_end. Validation is ignored.
train_start = "2008-01-01" if self.train_start is None else self.train_start
train_end = "2010-12-31" if self.meta_1st_train_end is None else self.meta_1st_train_end
test_start = (pd.Timestamp(train_end) + pd.Timedelta(days=1)).strftime("%Y-%m-%d")
@@ -249,9 +287,9 @@ class DDGDA(Rolling):
kwargs = dict(
task_tpl=proxy_forecast_model_task,
step=self.step,
segments=0.62, # keep test period consistent with the dataset yaml
segments=self.segments, # keep test period consistent with the dataset yaml
trunc_days=1 + self.horizon,
hist_step_n=30,
hist_step_n=self.hist_step_n,
fill_method=fill_method,
rolling_ext_days=0,
)
@@ -268,7 +306,13 @@ class DDGDA(Rolling):
with R.start(experiment_name=self.meta_exp_name):
R.log_params(**kwargs)
mm = MetaModelDS(
step=self.step, hist_step_n=kwargs["hist_step_n"], lr=0.001, max_epoch=30, seed=43, alpha=self.alpha
step=self.step,
hist_step_n=kwargs["hist_step_n"],
lr=0.001,
max_epoch=30,
seed=43,
alpha=self.alpha,
loss_skip_thresh=self.loss_skip_thresh,
)
mm.fit(md)
R.save_objects(model=mm)

View File

@@ -35,7 +35,7 @@ class Client:
def connect_server(self):
"""Connect to server."""
try:
self.sio.connect("ws://" + self.server_host + ":" + str(self.server_port))
self.sio.connect(f"ws://{self.server_host}:{self.server_port}")
except socketio.exceptions.ConnectionError:
self.logger.error("Cannot connect to server - check your network or server status")

View File

@@ -616,7 +616,7 @@ class DatasetProvider(abc.ABC):
data = pd.DataFrame(obj)
if not data.empty and not np.issubdtype(data.index.dtype, np.dtype("M")):
# If the underlaying provides the data not in datatime formmat, we'll convert it into datetime format
# If the underlaying provides the data not in datetime format, we'll convert it into datetime format
_calendar = Cal.calendar(freq=freq)
data.index = _calendar[data.index.values.astype(int)]
data.index.names = ["datetime"]

View File

@@ -403,7 +403,7 @@ class TSDataSampler:
np.full((1, self.data_arr.shape[1]), np.nan, dtype=self.data_arr.dtype),
axis=0,
)
self.nan_idx = -1 # The last line is all NaN
self.nan_idx = len(self.data_arr) - 1 # The last line is all NaN; setting it to -1 can cause bug #1716
# the data type will be changed
# The index of usable data is between start_idx and end_idx

View File

@@ -7,7 +7,7 @@ from pathlib import Path
import warnings
import pandas as pd
from typing import Tuple, Union, List
from typing import Tuple, Union, List, Dict
from qlib.data import D
from qlib.utils import load_dataset, init_instance_by_config, time_to_slc_point
@@ -247,10 +247,14 @@ class StaticDataLoader(DataLoader, Serializable):
def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame:
self._maybe_load_raw_data()
# 1) Filter by instruments
if instruments is None:
df = self._data
else:
df = self._data.loc(axis=0)[:, instruments]
# 2) Filter by Datetime
if start_time is None and end_time is None:
return df # NOTE: avoid copy by loc
# pd.Timestamp(None) == NaT, use NaT as index can not fetch correct thing, so do not change None.
@@ -275,6 +279,55 @@ class StaticDataLoader(DataLoader, Serializable):
self._data = self._config
class NestedDataLoader(DataLoader):
"""
We have multiple DataLoader, we can use this class to combine them.
"""
def __init__(self, dataloader_l: List[Dict], join="left") -> None:
"""
Parameters
----------
dataloader_l : list[dict]
A list of dataloader, for exmaple
.. code-block:: python
nd = NestedDataLoader(
dataloader_l=[
{
"class": "qlib.contrib.data.loader.Alpha158DL",
}, {
"class": "qlib.contrib.data.loader.Alpha360DL",
"kwargs": {
"config": {
"label": ( ["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"])
}
}
}
]
)
join :
it will pass to pd.concat when merging it.
"""
super().__init__()
self.data_loader_l = [
(dl if isinstance(dl, DataLoader) else init_instance_by_config(dl)) for dl in dataloader_l
]
self.join = join
def load(self, instruments=None, start_time=None, end_time=None) -> pd.DataFrame:
df_full = None
for dl in self.data_loader_l:
df_current = dl.load(instruments, start_time, end_time)
if df_full is None:
df_full = df_current
else:
df_full = pd.merge(df_full, df_current, left_index=True, right_index=True, how=self.join)
return df_full.sort_index(axis=1)
class DataLoaderDH(DataLoader):
"""DataLoaderDH
DataLoader based on (D)ata (H)andler

View File

@@ -9,7 +9,7 @@ if TYPE_CHECKING:
from qlib.data.dataset import DataHandler
def get_level_index(df: pd.DataFrame, level=Union[str, int]) -> int:
def get_level_index(df: pd.DataFrame, level: Union[str, int]) -> int:
"""
get the level index of `df` given `level`

View File

@@ -51,3 +51,6 @@ class MetaTask:
Return the **processed** meta_info
"""
return self.meta_info
def __repr__(self):
return f"MetaTask(task={self.task}, meta_info={self.meta_info})"

View File

@@ -41,7 +41,7 @@ def _log_task_info(task_config: dict):
def _exe_task(task_config: dict):
rec = R.get_recorder()
# model & dataset initiation
# model & dataset initialization
model: Model = init_instance_by_config(task_config["model"], accept_types=Model)
dataset: Dataset = init_instance_by_config(task_config["dataset"], accept_types=Dataset)
reweighter: Reweighter = task_config.get("reweighter", None)

View File

@@ -12,15 +12,11 @@ import datetime
from tqdm import tqdm
from pathlib import Path
from loguru import logger
from cryptography.fernet import Fernet
from qlib.utils import exists_qlib_data
class GetData:
REMOTE_URL = "https://qlibpublic.blob.core.windows.net/data/default/stock_data"
# "?" is not included in the token.
TOKEN = b"gAAAAABkmDhojHc0VSCDdNK1MqmRzNLeDFXe5hy8obHpa6SDQh4de6nW5gtzuD-fa6O_WZb0yyqYOL7ndOfJX_751W3xN5YB4-n-P22jK-t6ucoZqhT70KPD0Lf0_P328QPJVZ1gDnjIdjhi2YLOcP4BFTHLNYO0mvzszR8TKm9iT5AKRvuysWnpi8bbYwGU9zAcJK3x9EPL43hOGtxliFHcPNGMBoJW4g_ercdhi0-Qgv5_JLsV-29_MV-_AhuaYvJuN2dEywBy"
KEY = "EYcA8cgorA8X9OhyMwVfuFxn_1W3jGk6jCbs3L2oPoA="
REMOTE_URL = "https://github.com/SunsetWolf/qlib_dataset/releases/download"
def __init__(self, delete_zip_file=False):
"""
@@ -33,9 +29,45 @@ class GetData:
self.delete_zip_file = delete_zip_file
def merge_remote_url(self, file_name: str):
fernet = Fernet(self.KEY)
token = fernet.decrypt(self.TOKEN).decode()
return f"{self.REMOTE_URL}/{file_name}?{token}"
"""
Generate download links.
Parameters
----------
file_name: str
The name of the file to be downloaded.
The file name can be accompanied by a version number, (e.g.: v2/qlib_data_simple_cn_1d_latest.zip),
if no version number is attached, it will be downloaded from v0 by default.
"""
return f"{self.REMOTE_URL}/{file_name}" if "/" in file_name else f"{self.REMOTE_URL}/v0/{file_name}"
def download(self, url: str, target_path: [Path, str]):
"""
Download a file from the specified url.
Parameters
----------
url: str
The url of the data.
target_path: str
The location where the data is saved, including the file name.
"""
file_name = str(target_path).rsplit("/", maxsplit=1)[-1]
resp = requests.get(url, stream=True, timeout=60)
resp.raise_for_status()
if resp.status_code != 200:
raise requests.exceptions.HTTPError()
chunk_size = 1024
logger.warning(
f"The data for the example is collected from Yahoo Finance. Please be aware that the quality of the data might not be perfect. (You can refer to the original data source: https://finance.yahoo.com/lookup.)"
)
logger.info(f"{os.path.basename(file_name)} downloading......")
with tqdm(total=int(resp.headers.get("Content-Length", 0))) as p_bar:
with target_path.open("wb") as fp:
for chunk in resp.iter_content(chunk_size=chunk_size):
fp.write(chunk)
p_bar.update(chunk_size)
def download_data(self, file_name: str, target_dir: [Path, str], delete_old: bool = True):
"""
@@ -70,21 +102,7 @@ class GetData:
target_path = target_dir.joinpath(_target_file_name)
url = self.merge_remote_url(file_name)
resp = requests.get(url, stream=True, timeout=60)
resp.raise_for_status()
if resp.status_code != 200:
raise requests.exceptions.HTTPError()
chunk_size = 1024
logger.warning(
f"The data for the example is collected from Yahoo Finance. Please be aware that the quality of the data might not be perfect. (You can refer to the original data source: https://finance.yahoo.com/lookup.)"
)
logger.info(f"{os.path.basename(file_name)} downloading......")
with tqdm(total=int(resp.headers.get("Content-Length", 0))) as p_bar:
with target_path.open("wb") as fp:
for chunk in resp.iter_content(chunk_size=chunk_size):
fp.write(chunk)
p_bar.update(chunk_size)
self.download(url=url, target_path=target_path)
self._unzip(target_path, target_dir, delete_old)
if self.delete_zip_file:
@@ -99,7 +117,9 @@ class GetData:
return status
@staticmethod
def _unzip(file_path: Path, target_dir: Path, delete_old: bool = True):
def _unzip(file_path: [Path, str], target_dir: [Path, str], delete_old: bool = True):
file_path = Path(file_path)
target_dir = Path(target_dir)
if delete_old:
logger.warning(
f"will delete the old qlib data directory(features, instruments, calendars, features_cache, dataset_cache): {target_dir}"

View File

@@ -108,6 +108,12 @@ class Index:
self.index_map = self.idx_list = np.arange(idx_list)
self._is_sorted = True
else:
# Check if all elements in idx_list are of the same type
if not all(isinstance(x, type(idx_list[0])) for x in idx_list):
raise TypeError("All elements in idx_list must be of the same type")
# Check if all elements in idx_list are of the same datetime64 precision
if isinstance(idx_list[0], np.datetime64) and not all(x.dtype == idx_list[0].dtype for x in idx_list):
raise TypeError("All elements in idx_list must be of the same datetime64 precision")
self.idx_list = np.array(idx_list)
# NOTE: only the first appearance is indexed
self.index_map = dict(zip(self.idx_list, range(len(self))))
@@ -131,7 +137,12 @@ class Index:
if self.idx_list.dtype.type is np.datetime64:
if isinstance(item, pd.Timestamp):
# This happens often when creating index based on pandas.DatetimeIndex and query with pd.Timestamp
return item.to_numpy()
return item.to_numpy().astype(self.idx_list.dtype)
elif isinstance(item, np.datetime64):
# This happens often when creating index based on np.datetime64 and query with another precision
return item.astype(self.idx_list.dtype)
# NOTE: It is hard to consider every case at first.
# We just try to cover part of cases to make it more user-friendly
return item
def index(self, item) -> int:

View File

@@ -161,7 +161,13 @@ def init_instance_by_config(
# path like 'file:///<path to pickle file>/obj.pkl'
pr = urlparse(config)
if pr.scheme == "file":
pr_path = os.path.join(pr.netloc, pr.path) if bool(pr.path) else pr.netloc
# To enable relative path like file://data/a/b/c.pkl. pr.netloc will be data
path = pr.path
if pr.netloc != "":
path = path.lstrip("/")
pr_path = os.path.join(pr.netloc, path) if bool(pr.path) else pr.netloc
with open(os.path.normpath(pr_path), "rb") as f:
return pickle.load(f)
else:

View File

@@ -1,18 +1,20 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import logging
import sys
import os
from pathlib import Path
import sys
import fire
from jinja2 import Template, meta
import ruamel.yaml as yaml
import qlib
import fire
import ruamel.yaml as yaml
from qlib.config import C
from qlib.model.trainer import task_train
from qlib.utils.data import update_config
from qlib.log import get_module_logger
from qlib.model.trainer import task_train
from qlib.utils import set_log_with_config
from qlib.utils.data import update_config
set_log_with_config(C.logging_config)
logger = get_module_logger("qrun", logging.INFO)
@@ -47,6 +49,39 @@ def sys_config(config, config_path):
sys.path.append(str(Path(config_path).parent.resolve().absolute() / p))
def render_template(config_path: str) -> str:
"""
render the template based on the environment
Parameters
----------
config_path : str
configuration path
Returns
-------
str
the rendered content
"""
with open(config_path, "r") as f:
config = f.read()
# Set up the Jinja2 environment
template = Template(config)
# Parse the template to find undeclared variables
env = template.environment
parsed_content = env.parse(config)
variables = meta.find_undeclared_variables(parsed_content)
# Get context from os.environ according to the variables
context = {var: os.getenv(var, "") for var in variables if var in os.environ}
logger.info(f"Render the template with the context: {context}")
# Render the template with the context
rendered_content = template.render(context)
return rendered_content
# workflow handler function
def workflow(config_path, experiment_name="workflow", uri_folder="mlruns"):
"""
@@ -67,8 +102,9 @@ def workflow(config_path, experiment_name="workflow", uri_folder="mlruns"):
market: csi300
"""
with open(config_path) as fp:
config = yaml.safe_load(fp)
# Render the template
rendered_yaml = render_template(config_path)
config = yaml.safe_load(rendered_yaml)
base_config_path = config.get("BASE_CONFIG_PATH", None)
if base_config_path:

View File

@@ -242,7 +242,7 @@ class TimeAdjuster:
def shift(self, seg: tuple, step: int, rtype=SHIFT_SD) -> tuple:
"""
Shift the datatime of segment
Shift the datetime of segment
If there are None (which indicates unbounded index) in the segment, this method will return None.

View File

@@ -301,6 +301,7 @@ class Normalize:
na_values={col: symbol_na if col == self._symbol_field_name else default_na for col in columns},
)
# NOTE: It has been reported that there may be some problems here, and the specific issues will be dealt with when they are identified.
df = self._normalize_obj.normalize(df)
if df is not None and not df.empty:
if self._end_date is not None:

View File

@@ -9,7 +9,7 @@ pip install -r requirements.txt
```
## Usage of the dataset
> *Crypto dateset only support Data retrieval function but not support backtest function due to the lack of OHLC data.*
> *Crypto dataset only support Data retrieval function but not support backtest function due to the lack of OHLC data.*
## Collector Data

View File

@@ -15,7 +15,6 @@ from typing import Iterable, Tuple, List
import numpy as np
import pandas as pd
from lxml import etree
from loguru import logger
from yahooquery import Ticker
from tqdm import tqdm
@@ -190,17 +189,43 @@ def get_hs_stock_symbols() -> list:
global _HS_SYMBOLS # pylint: disable=W0603
def _get_symbol():
_res = set()
for _k, _v in (("ha", "ss"), ("sa", "sz"), ("gem", "sz")):
resp = requests.get(HS_SYMBOLS_URL.format(s_type=_k), timeout=None)
_res |= set(
map(
lambda x: "{}.{}".format(re.findall(r"\d+", x)[0], _v), # pylint: disable=W0640
etree.HTML(resp.text).xpath("//div[@class='result']/ul//li/a/text()"), # pylint: disable=I1101
)
)
time.sleep(3)
return _res
"""
Get the stock pool from a web page and process it into the format required by yahooquery.
Format of data retrieved from the web page: 600519, 000001
The data format required by yahooquery: 600519.ss, 000001.sz
Returns
-------
set: Returns the set of symbol codes.
Examples:
-------
{600000.ss, 600001.ss, 600002.ss, 600003.ss, ...}
"""
url = "http://99.push2.eastmoney.com/api/qt/clist/get?pn=1&pz=10000&po=1&np=1&fs=m:0+t:6,m:0+t:80,m:1+t:2,m:1+t:23,m:0+t:81+s:2048&fields=f12"
try:
resp = requests.get(url, timeout=None)
resp.raise_for_status()
except requests.exceptions.HTTPError as e:
raise requests.exceptions.HTTPError(f"Request to {url} failed with status code {resp.status_code}") from e
try:
_symbols = [_v["f12"] for _v in resp.json()["data"]["diff"]]
except Exception as e:
logger.warning("An error occurred while extracting data from the response.")
raise
if len(_symbols) < 3900:
raise ValueError("The complete list of stocks is not available.")
# Add suffix after the stock code to conform to yahooquery standard, otherwise the data will not be fetched.
_symbols = [
_symbol + ".ss" if _symbol.startswith("6") else _symbol + ".sz" if _symbol.startswith(("0", "3")) else None
for _symbol in _symbols
]
_symbols = [_symbol for _symbol in _symbols if _symbol is not None]
return set(_symbols)
if _HS_SYMBOLS is None:
symbols = set()

View File

@@ -796,6 +796,9 @@ class Run(BaseRun):
# get 1m data
$ python collector.py download_data --source_dir ~/.qlib/stock_data/source --region CN --start 2020-11-01 --end 2020-11-10 --delay 0.1 --interval 1m
"""
if self.interval == "1d" and pd.Timestamp(end) > pd.Timestamp(datetime.datetime.now().strftime("%Y-%m-%d")):
raise ValueError(f"end_date: {end} is greater than the current date.")
super(Run, self).download_data(max_collector_count, delay, start, end, check_data_length, limit_nums)
def normalize_data(

View File

@@ -46,7 +46,7 @@ if not _CYTHON_INSTALLED:
REQUIRED = [
"numpy>=1.12.0, <1.24",
"pandas>=0.25.1",
"scipy>=1.0.0",
"scipy>=1.7.3",
"requests>=2.18.0",
"sacred>=0.7.4",
"python-socketio",
@@ -82,7 +82,7 @@ REQUIRED = [
"dill",
"dataclasses;python_version<'3.7'",
"filelock",
"jinja2<3.1.0", # for passing the readthedocs workflow.
"jinja2",
"gym",
# Installing the latest version of protobuf for python versions below 3.8 will cause unit tests to fail.
"protobuf<=3.20.1;python_version<='3.8'",
@@ -166,6 +166,9 @@ setup(
"lxml",
"baostock",
"yahooquery",
# 2024-05-30 scs has released a new version: 3.2.4.post2,
# this version, causes qlib installation to fail, so we've limited the scs version a bit for now.
"scs<=3.2.4",
"beautifulsoup4",
# In version 0.4.11 of tianshou, the code:
# logits, hidden = self.actor(batch.obs, state=state, info=batch.info)

View File

@@ -0,0 +1,50 @@
# TODO:
# dump alpha 360 to dataframe and merge it with Alpha158
import sys
import unittest
import qlib
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parent))
from qlib.data.dataset.loader import NestedDataLoader
from qlib.contrib.data.loader import Alpha158DL, Alpha360DL
class TestDataLoader(unittest.TestCase):
def test_nested_data_loader(self):
qlib.init()
nd = NestedDataLoader(
dataloader_l=[
{
"class": "qlib.contrib.data.loader.Alpha158DL",
},
{
"class": "qlib.contrib.data.loader.Alpha360DL",
"kwargs": {"config": {"label": (["Ref($close, -2)/Ref($close, -1) - 1"], ["LABEL0"])}},
},
]
)
# Of course you can use StaticDataLoader
dataset = nd.load()
assert dataset is not None
columns = dataset.columns.tolist()
columns_list = [tup[1] for tup in columns]
for col in Alpha158DL.get_feature_config()[1]:
assert col in columns_list
for col in Alpha360DL.get_feature_config()[1]:
assert col in columns_list
assert "LABEL0" in columns_list
# Then you can use it wth DataHandler;
if __name__ == "__main__":
unittest.main()

View File

@@ -5,8 +5,9 @@ import unittest
import pytest
import sys
from qlib.tests import TestAutoData
from qlib.data.dataset import TSDatasetH
from qlib.data.dataset import TSDatasetH, TSDataSampler
import numpy as np
import pandas as pd
import time
from qlib.data.dataset.handler import DataHandlerLP
@@ -98,6 +99,54 @@ class TestDataset(TestAutoData):
print(idx[i])
class TestTSDataSampler(unittest.TestCase):
def test_TSDataSampler(self):
"""
Test TSDataSampler for issue #1716
"""
datetime_list = ["2000-01-31", "2000-02-29", "2000-03-31", "2000-04-30", "2000-05-31"]
instruments = ["000001", "000002", "000003", "000004", "000005"]
index = pd.MultiIndex.from_product(
[pd.to_datetime(datetime_list), instruments], names=["datetime", "instrument"]
)
data = np.random.randn(len(datetime_list) * len(instruments))
test_df = pd.DataFrame(data=data, index=index, columns=["factor"])
dataset = TSDataSampler(test_df, datetime_list[0], datetime_list[-1], step_len=2)
print()
print("--------------dataset[0]--------------")
print(dataset[0])
print("--------------dataset[1]--------------")
print(dataset[1])
assert len(dataset[0]) == 2
self.assertTrue(np.isnan(dataset[0][0]))
self.assertEqual(dataset[0][1], dataset[1][0])
self.assertEqual(dataset[1][1], dataset[2][0])
self.assertEqual(dataset[2][1], dataset[3][0])
def test_TSDataSampler2(self):
"""
Extra test TSDataSampler to prevent incorrect filling of nan for the values at the front
"""
datetime_list = ["2000-01-31", "2000-02-29", "2000-03-31", "2000-04-30", "2000-05-31"]
instruments = ["000001", "000002", "000003", "000004", "000005"]
index = pd.MultiIndex.from_product(
[pd.to_datetime(datetime_list), instruments], names=["datetime", "instrument"]
)
data = np.random.randn(len(datetime_list) * len(instruments))
test_df = pd.DataFrame(data=data, index=index, columns=["factor"])
dataset = TSDataSampler(test_df, datetime_list[2], datetime_list[-1], step_len=3)
print()
print("--------------dataset[0]--------------")
print(dataset[0])
print("--------------dataset[1]--------------")
print(dataset[1])
for i in range(3):
self.assertFalse(np.isnan(dataset[0][i]))
self.assertFalse(np.isnan(dataset[1][i]))
self.assertEqual(dataset[0][1], dataset[1][0])
self.assertEqual(dataset[0][2], dataset[1][1])
if __name__ == "__main__":
unittest.main(verbosity=10)

View File

@@ -94,6 +94,24 @@ class IndexDataTest(unittest.TestCase):
print(sd)
self.assertTrue(sd.iloc[0] == 2)
# test different precisions of time data
timeindex = [
np.datetime64("2024-06-22T00:00:00.000000000"),
np.datetime64("2024-06-21T00:00:00.000000000"),
np.datetime64("2024-06-20T00:00:00.000000000"),
]
sd = idd.SingleData([1, 2, 3], index=timeindex)
self.assertTrue(
sd.index.index(np.datetime64("2024-06-21T00:00:00.000000000"))
== sd.index.index(np.datetime64("2024-06-21T00:00:00"))
)
self.assertTrue(sd.index.index(pd.Timestamp("2024-06-21 00:00")) == 1)
# Bad case: the input is not aligned
timeindex[1] = (np.datetime64("2024-06-21T00:00:00.00"),)
with self.assertRaises(TypeError):
sd = idd.SingleData([1, 2, 3], index=timeindex)
def test_ops(self):
sd1 = idd.SingleData([1, 2, 3, 4], index=["foo", "bar", "f", "g"])
sd2 = idd.SingleData([1, 2, 3, 4], index=["foo", "bar", "f", "g"])

View File

@@ -0,0 +1,86 @@
import unittest
from qlib.contrib.model.pytorch_general_nn import GeneralPTNN
from qlib.data.dataset import DatasetH, TSDatasetH
from qlib.data.dataset.handler import DataHandlerLP
from qlib.tests import TestAutoData
class TestNN(TestAutoData):
def test_both_dataset(self):
data_handler_config = {
"start_time": "2008-01-01",
"end_time": "2020-08-01",
"instruments": "csi300",
"data_loader": {
"class": "QlibDataLoader", # Assuming QlibDataLoader is a string reference to the class
"kwargs": {
"config": {
"feature": [
["$high", "$close", "$low"],
["H", "C", "L"]
],
"label": [
["Ref($close, -2)/Ref($close, -1) - 1"],
["LABEL0"]
]
},
"freq": "day"
}
},
# TODO: processors
"learn_processors": [
{
"class": "DropnaLabel",
},
{
"class": "CSZScoreNorm",
"kwargs": {
"fields_group": "label"
}
}
]
}
segments = {
"train": ["2008-01-01", "2014-12-31"],
"valid": ["2015-01-01", "2016-12-31"],
"test": ["2017-01-01", "2020-08-01"]
}
data_handler = DataHandlerLP(**data_handler_config)
# time-series dataset
tsds = TSDatasetH(handler=data_handler, segments=segments)
# tabular dataset
tbds = DatasetH(handler=data_handler, segments=segments)
model_l = [
GeneralPTNN(
n_epochs=2,
pt_model_uri="qlib.contrib.model.pytorch_gru_ts.GRUModel",
pt_model_kwargs={
"d_feat":3,
"hidden_size":8,
"num_layers":1,
"dropout":0.,
},
),
GeneralPTNN(
n_epochs=2,
pt_model_uri="qlib.contrib.model.pytorch_nn.Net", # it is a MLP
pt_model_kwargs={
"input_dim":3,
},
),
]
for ds, model in reversed(list(zip((tsds, tbds), model_l))):
model.fit(ds) # It works
model.predict(ds) # It works
break
if __name__ == "__main__":
unittest.main()

View File

@@ -27,7 +27,7 @@ def train(uri_path: str = None):
model performance
"""
# model initiaiton
# model initialization
model = init_instance_by_config(CSI300_GBDT_TASK["model"])
dataset = init_instance_by_config(CSI300_GBDT_TASK["dataset"])
# To test __repr__