1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-03 19:10:58 +08:00

Merge branch 'nested_decision_exe' of https://github.com/microsoft/qlib into rl-dummy

This commit is contained in:
v-mingzhehan
2021-07-27 14:32:36 +00:00
41 changed files with 2644 additions and 340 deletions

View File

@@ -12,7 +12,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [windows-latest, ubuntu-16.04, ubuntu-18.04, ubuntu-20.04, macos-latest]
os: [windows-latest, ubuntu-16.04, ubuntu-18.04, ubuntu-20.04]
python-version: [3.6, 3.7, 3.8, 3.9]
steps:
@@ -36,42 +36,36 @@ jobs:
shell: bash
# Test Qlib installed with pip
- name: Install Qlib with pip
run: |
if [ "$RUNNER_OS" == "Windows" ]; then
$CONDA\\python.exe -m pip install numpy==1.19.5
$CONDA\\python.exe -m pip install pyqlib --ignore-installed ruamel.yaml numpy --user
else
sudo $CONDA/bin/python -m pip install numpy==1.19.5
sudo $CONDA/bin/python -m pip install pyqlib --ignore-installed ruamel.yaml numpy
fi
shell: bash
- name: Install Lightgbm for MacOS
if: runner.os == 'macOS'
run: |
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Microsoft/qlib/main/.github/brew_install.sh)"
HOMEBREW_NO_AUTO_UPDATE=1 brew install lightgbm
# - name: Install Qlib with pip
# run: |
# if [ "$RUNNER_OS" == "Windows" ]; then
# $CONDA\\python.exe -m pip install numpy==1.19.5
# $CONDA\\python.exe -m pip install pyqlib --ignore-installed ruamel.yaml numpy --user
# else
# sudo $CONDA/bin/python -m pip install numpy==1.19.5
# sudo $CONDA/bin/python -m pip install pyqlib --ignore-installed ruamel.yaml numpy
# fi
# shell: bash
- name: Test data downloads
run: |
if [ "$RUNNER_OS" == "Windows" ]; then
$CONDA\\python.exe scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data --interval 1d --region cn
else
$CONDA/bin/python scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data --interval 1d --region cn
fi
shell: bash
# - name: Test data downloads
# run: |
# if [ "$RUNNER_OS" == "Windows" ]; then
# $CONDA\\python.exe scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data --interval 1d --region cn
# else
# $CONDA/bin/python scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data --interval 1d --region cn
# fi
# shell: bash
- name: Test workflow by config (install from pip)
run: |
if [ "$RUNNER_OS" == "Windows" ]; then
$CONDA\\python.exe qlib\\workflow\\cli.py examples\\benchmarks\\LightGBM\\workflow_config_lightgbm_Alpha158.yaml
$CONDA\\python.exe -m pip uninstall -y pyqlib
else
$CONDA/bin/python qlib/workflow/cli.py examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml
sudo $CONDA/bin/python -m pip uninstall -y pyqlib
fi
shell: bash
# - name: Test workflow by config (install from pip)
# run: |
# if [ "$RUNNER_OS" == "Windows" ]; then
# $CONDA\\python.exe qlib\\workflow\\cli.py examples\\benchmarks\\LightGBM\\workflow_config_lightgbm_Alpha158.yaml
# $CONDA\\python.exe -m pip uninstall -y pyqlib
# else
# $CONDA/bin/python qlib/workflow/cli.py examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml
# sudo $CONDA/bin/python -m pip uninstall -y pyqlib
# fi
# shell: bash
# Test Qlib installed from source
- name: Install Qlib from source
@@ -89,6 +83,15 @@ jobs:
fi
shell: bash
- name: Test data downloads
run: |
if [ "$RUNNER_OS" == "Windows" ]; then
$CONDA\\python.exe scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data --interval 1d --region cn
else
$CONDA/bin/python scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data --interval 1d --region cn
fi
shell: bash
- name: Install test dependencies
run: |
if [ "$RUNNER_OS" == "Windows" ]; then
@@ -117,4 +120,4 @@ jobs:
else
$CONDA/bin/python qlib/workflow/cli.py examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml
fi
shell: bash
shell: bash

77
.github/workflows/test_macos.yml vendored Normal file
View File

@@ -0,0 +1,77 @@
# There are some issues (in the downloading data phase) on MacOS when running with other tests. So we split it into an individual config.
name: Test MacOS
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
build:
runs-on: macos-latest
strategy:
matrix:
python-version: [3.6, 3.7, 3.8, 3.9]
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Lint with Black
run: |
cd ..
sudo $CONDA/bin/python -m pip install black
$CONDA/bin/python -m black qlib -l 120 --check --diff
# Test Qlib installed with pip
# - name: Install Qlib with pip
# run: |
# sudo $CONDA/bin/python -m pip install numpy==1.19.5
# sudo $CONDA/bin/python -m pip install pyqlib --ignore-installed ruamel.yaml numpy
- name: Install Lightgbm for MacOS
run: |
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Microsoft/qlib/main/.github/brew_install.sh)"
HOMEBREW_NO_AUTO_UPDATE=1 brew install lightgbm
# - name: Test data downloads
# run: |
# $CONDA/bin/python scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data --interval 1d --region cn
# - name: Test workflow by config (install from pip)
# run: |
# $CONDA/bin/python qlib/workflow/cli.py examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml
# sudo $CONDA/bin/python -m pip uninstall -y pyqlib
# Test Qlib installed from source
- name: Install Qlib from source
run: |
sudo $CONDA/bin/python -m pip install --upgrade cython
sudo $CONDA/bin/python -m pip install numpy jupyter jupyter_contrib_nbextensions
sudo $CONDA/bin/python -m pip install -U scipy scikit-learn # installing without this line will cause errors on GitHub Actions, while instsalling locally won't
sudo $CONDA/bin/python setup.py install
- name: Test data downloads
run: |
$CONDA/bin/python scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data --interval 1d --region cn
- name: Install test dependencies
run: |
sudo $CONDA/bin/python -m pip install --upgrade pip
sudo $CONDA/bin/python -m pip install -U pyopenssl idna
sudo $CONDA/bin/python -m pip install black pytest
- name: Unit tests with Pytest
run: |
cd tests
$CONDA/bin/python -m pytest . --durations=0
- name: Test workflow by config (install from source)
run: |
$CONDA/bin/python qlib/workflow/cli.py examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml

View File

@@ -11,6 +11,8 @@
Recent released features
| Feature | Status |
| -- | ------ |
| Transformer & Localformer | [Released](https://github.com/microsoft/qlib/pull/508) on July 22, 2021 |
| Release Qlib v0.7.0 | [Released](https://github.com/microsoft/qlib/releases/tag/v0.7.0) on July 12, 2021 |
| TCTS Model | [Released](https://github.com/microsoft/qlib/pull/491) on July 1, 2021 |
| Online serving and automatic model rolling | :star: [Released](https://github.com/microsoft/qlib/pull/290) on May 17, 2021 |
| DoubleEnsemble Model | [Released](https://github.com/microsoft/qlib/pull/286) on Mar 2, 2021 |
@@ -290,6 +292,8 @@ Here is a list of models built on `Qlib`.
- [TabNet based on pytorch (Sercan O. Arik, et al. AAAI 2019)](qlib/contrib/model/pytorch_tabnet.py)
- [DoubleEnsemble based on LightGBM (Chuheng Zhang, et al. ICDM 2020)](qlib/contrib/model/double_ensemble.py)
- [TCTS based on pytorch (Xueqing Wu, et al. ICML 2021)](qlib/contrib/model/pytorch_tcts.py)
- [Transformer based on pytorch (Ashish Vaswani, et al. NeurIPS 2017)](qlib/contrib/model/pytorch_transformer.py)
- [Localformer based on pytorch (Juyong Jiang, et al.)](qlib/contrib/model/pytorch_localformer.py)
Your PR of new Quant models is highly welcomed.
@@ -389,7 +393,12 @@ Join IM discussion groups:
# Contributing
This project welcomes contributions and suggestions. Most contributions require you to agree to a
This project welcomes contributions and suggestions.
**Here are some
[code standards](docs/developer/code_standard.rst) when you submit a pull request.**
Most contributions require you to agree to a
Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us
the right to use your contribution. For details, visit https://cla.opensource.microsoft.com.

View File

@@ -179,6 +179,7 @@ After conversion, users can find their Qlib format data in the directory `~/.qli
The Restoration factor. Normally, ``factor = adjusted_price / original_price``, `adjusted price` reference: `split adjusted <https://www.investopedia.com/terms/s/splitadjusted.asp>`_
In the convention of `Qlib` data processing, `open, close, high, low, volume, money and factor` will be set to NaN if the stock is suspended.
If you want to use your own alpha-factor which can't be calculate by OCHLV, like PE, EPS and so on, you could add it to the CSV files with OHCLV together and then dump it to the Qlib format data.
Stock Pool (Market)
--------------------------------

View File

@@ -21,6 +21,8 @@ which including `Online Manager <#Online Manager>`_, `Online Strategy <#Online S
If you have many models or `task` needs to be managed, please consider `Task Management <../advanced/task_management.html>`_.
The `examples <https://github.com/microsoft/qlib/tree/main/examples/online_srv>`_ are based on some components in `Task Management <../advanced/task_management.html>`_ such as ``TrainerRM`` or ``Collector``.
**NOTE**: User should keep his data source updated to support online serving. For example, Qlib provides `a batch of scripts <https://github.com/microsoft/qlib/blob/main/scripts/data_collector/yahoo/README.md#automatic-update-of-daily-frequency-datafrom-yahoo-finance>`_ to help users update Yahoo daily data.
Online Manager
=============
@@ -43,4 +45,4 @@ Updater
=============
.. automodule:: qlib.workflow.online.update
:members:
:members:

View File

@@ -0,0 +1,20 @@
.. _code_standard:
=================================
Code Standard
=================================
Docstring
=================================
Please use the Numpy Style.
Continuous Integration
=================================
Continuous Integration (CI) tools help you stick to the quality standards by running tests every time you push a new commit and reporting the results to a pull request.
A common error is the mixed use of space and tab. You can fix the bug by inputing the following code in the command line.
.. code-block:: python
pip install black
python -m black . -l 120

View File

@@ -0,0 +1,3 @@
numpy==1.17.4
pandas==1.1.2
torch==1.2.0

View File

@@ -0,0 +1,82 @@
qlib_init:
provider_uri: "~/.qlib/qlib_data/cn_data"
region: cn
market: &market csi300
benchmark: &benchmark SH000300
data_handler_config: &data_handler_config
start_time: 2008-01-01
end_time: 2020-08-01
fit_start_time: 2008-01-01
fit_end_time: 2014-12-31
instruments: *market
infer_processors:
- class: FilterCol
kwargs:
fields_group: feature
col_list: ["RESI5", "WVMA5", "RSQR5", "KLEN", "RSQR10", "CORR5", "CORD5", "CORR10",
"ROC60", "RESI10", "VSTD5", "RSQR60", "CORR60", "WVMA60", "STD5",
"RSQR20", "CORD60", "CORD10", "CORR20", "KLOW"
]
- class: RobustZScoreNorm
kwargs:
fields_group: feature
clip_outlier: true
- class: Fillna
kwargs:
fields_group: feature
learn_processors:
- class: DropnaLabel
- class: CSRankNorm
kwargs:
fields_group: label
label: ["Ref($close, -2) / Ref($close, -1) - 1"]
port_analysis_config: &port_analysis_config
strategy:
class: TopkDropoutStrategy
module_path: qlib.contrib.strategy.strategy
kwargs:
topk: 50
n_drop: 5
backtest:
verbose: False
limit_threshold: 0.095
account: 100000000
benchmark: *benchmark
deal_price: close
open_cost: 0.0005
close_cost: 0.0015
min_cost: 5
task:
model:
class: LocalformerModel
module_path: qlib.contrib.model.pytorch_localformer_ts
kwargs:
seed: 0
n_jobs: 20
dataset:
class: TSDatasetH
module_path: qlib.data.dataset
kwargs:
handler:
class: Alpha158
module_path: qlib.contrib.data.handler
kwargs: *data_handler_config
segments:
train: [2008-01-01, 2014-12-31]
valid: [2015-01-01, 2016-12-31]
test: [2017-01-01, 2020-08-01]
step_len: 20
record:
- class: SignalRecord
module_path: qlib.workflow.record_temp
kwargs: {}
- class: SigAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
ana_long_short: False
ann_scaler: 252
- class: PortAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
config: *port_analysis_config

View File

@@ -0,0 +1,73 @@
qlib_init:
provider_uri: "~/.qlib/qlib_data/cn_data"
region: cn
market: &market csi300
benchmark: &benchmark SH000300
data_handler_config: &data_handler_config
start_time: 2008-01-01
end_time: 2020-08-01
fit_start_time: 2008-01-01
fit_end_time: 2014-12-31
instruments: *market
infer_processors:
- class: RobustZScoreNorm
kwargs:
fields_group: feature
clip_outlier: true
- class: Fillna
kwargs:
fields_group: feature
learn_processors:
- class: DropnaLabel
- class: CSRankNorm
kwargs:
fields_group: label
label: ["Ref($close, -2) / Ref($close, -1) - 1"]
port_analysis_config: &port_analysis_config
strategy:
class: TopkDropoutStrategy
module_path: qlib.contrib.strategy.strategy
kwargs:
topk: 50
n_drop: 5
backtest:
verbose: False
limit_threshold: 0.095
account: 100000000
benchmark: *benchmark
deal_price: close
open_cost: 0.0005
close_cost: 0.0015
min_cost: 5
task:
model:
class: LocalformerModel
module_path: qlib.contrib.model.pytorch_localformer
kwargs:
d_feat: 6
seed: 0
dataset:
class: DatasetH
module_path: qlib.data.dataset
kwargs:
handler:
class: Alpha360
module_path: qlib.contrib.data.handler
kwargs: *data_handler_config
segments:
train: [2008-01-01, 2014-12-31]
valid: [2015-01-01, 2016-12-31]
test: [2017-01-01, 2020-08-01]
record:
- class: SignalRecord
module_path: qlib.workflow.record_temp
kwargs: {}
- class: SigAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
ana_long_short: False
ann_scaler: 252
- class: PortAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
config: *port_analysis_config

View File

@@ -1,6 +1,6 @@
# Benchmarks Performance
Here are the results of each benchmark model running on Qlib's `Alpha360` and `Alpha158` dataset with China's A shared-stock & CSI300 data respectively. The values of each metric are the mean and std calculated based on 20 runs.
Here are the results of each benchmark model running on Qlib's `Alpha360` and `Alpha158` dataset with China's A shared-stock & CSI300 data respectively. The values of each metric are the mean and std calculated based on 20 runs with different random seeds.
The numbers shown below demonstrate the performance of the entire `workflow` of each model. We will update the `workflow` as well as models in the near future for better results.
@@ -23,6 +23,8 @@ The numbers shown below demonstrate the performance of the entire `workflow` of
| DoubleEnsemble (Chuheng Zhang, et al.) | Alpha360 | 0.0407±0.00| 0.3053±0.00 | 0.0490±0.00 | 0.3840±0.00 | 0.0380±0.02 | 0.5000±0.21 | -0.0984±0.02 |
| TabNet (Sercan O. Arik, et al.)| Alpha360 | 0.0192±0.00 | 0.1401±0.00| 0.0291±0.00 | 0.2163±0.00 | -0.0258±0.00 | -0.2961±0.00| -0.1429±0.00 |
| TCTS (Xueqing Wu, et al.)| Alpha360 | 0.0485±0.00 | 0.3689±0.04| 0.0586±0.00 | 0.4669±0.02 | 0.0816±0.02 | 1.1572±0.30| -0.0689±0.02 |
| Transformer (Ashish Vaswani, et al.)| Alpha360 | 0.0141±0.00 | 0.0917±0.02| 0.0331±0.00 | 0.2357±0.03 | -0.0259±0.03 | -0.3323±0.43| -0.1763±0.07 |
| Localformer (Juyong Jiang, et al.)| Alpha360 | 0.0408±0.00 | 0.2988±0.03| 0.0538±0.00 | 0.4105±0.02 | 0.0275±0.03 | 0.3464±0.37| -0.1182±0.03 |
## Alpha158 dataset
| Model Name | Dataset | IC | ICIR | Rank IC | Rank ICIR | Annualized Return | Information Ratio | Max Drawdown |
@@ -39,6 +41,8 @@ The numbers shown below demonstrate the performance of the entire `workflow` of
| GATs (Petar Velickovic, et al.) | Alpha158 (with selected 20 features) | 0.0349±0.00 | 0.2511±0.01| 0.0457±0.00 | 0.3537±0.01 | 0.0578±0.02 | 0.8221±0.25| -0.0824±0.02 |
| DoubleEnsemble (Chuheng Zhang, et al.) | Alpha158 | 0.0544±0.00 | 0.4338±0.01 | 0.0523±0.00 | 0.4257±0.01 | 0.1253±0.01 | 1.4105±0.14 | -0.0902±0.01 |
| TabNet (Sercan O. Arik, et al.)| Alpha158 | 0.0383±0.00 | 0.3414±0.00| 0.0388±0.00 | 0.3460±0.00 | 0.0226±0.00 | 0.2652±0.00| -0.1072±0.00 |
| Transformer (Ashish Vaswani, et al.)| Alpha158 | 0.0274±0.00 | 0.2166±0.04| 0.0409±0.00 | 0.3342±0.04 | 0.0204±0.03 | 0.2888±0.40| -0.1216±0.04 |
| Localformer (Juyong Jiang, et al.)| Alpha158 | 0.0355±0.00 | 0.2747±0.04| 0.0466±0.00 | 0.3762±0.03 | 0.0506±0.02 | 0.7447±0.34| -0.0875±0.02 |
- The selected 20 features are based on the feature importance of a lightgbm-based model.
- The base model of DoubleEnsemble is LGBM.

View File

@@ -0,0 +1,3 @@
numpy==1.17.4
pandas==1.1.2
torch==1.2.0

View File

@@ -0,0 +1,82 @@
qlib_init:
provider_uri: "~/.qlib/qlib_data/cn_data"
region: cn
market: &market csi300
benchmark: &benchmark SH000300
data_handler_config: &data_handler_config
start_time: 2008-01-01
end_time: 2020-08-01
fit_start_time: 2008-01-01
fit_end_time: 2014-12-31
instruments: *market
infer_processors:
- class: FilterCol
kwargs:
fields_group: feature
col_list: ["RESI5", "WVMA5", "RSQR5", "KLEN", "RSQR10", "CORR5", "CORD5", "CORR10",
"ROC60", "RESI10", "VSTD5", "RSQR60", "CORR60", "WVMA60", "STD5",
"RSQR20", "CORD60", "CORD10", "CORR20", "KLOW"
]
- class: RobustZScoreNorm
kwargs:
fields_group: feature
clip_outlier: true
- class: Fillna
kwargs:
fields_group: feature
learn_processors:
- class: DropnaLabel
- class: CSRankNorm
kwargs:
fields_group: label
label: ["Ref($close, -2) / Ref($close, -1) - 1"]
port_analysis_config: &port_analysis_config
strategy:
class: TopkDropoutStrategy
module_path: qlib.contrib.strategy.strategy
kwargs:
topk: 50
n_drop: 5
backtest:
verbose: False
limit_threshold: 0.095
account: 100000000
benchmark: *benchmark
deal_price: close
open_cost: 0.0005
close_cost: 0.0015
min_cost: 5
task:
model:
class: TransformerModel
module_path: qlib.contrib.model.pytorch_transformer_ts
kwargs:
seed: 0
n_jobs: 20
dataset:
class: TSDatasetH
module_path: qlib.data.dataset
kwargs:
handler:
class: Alpha158
module_path: qlib.contrib.data.handler
kwargs: *data_handler_config
segments:
train: [2008-01-01, 2014-12-31]
valid: [2015-01-01, 2016-12-31]
test: [2017-01-01, 2020-08-01]
step_len: 20
record:
- class: SignalRecord
module_path: qlib.workflow.record_temp
kwargs: {}
- class: SigAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
ana_long_short: False
ann_scaler: 252
- class: PortAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
config: *port_analysis_config

View File

@@ -0,0 +1,73 @@
qlib_init:
provider_uri: "~/.qlib/qlib_data/cn_data"
region: cn
market: &market csi300
benchmark: &benchmark SH000300
data_handler_config: &data_handler_config
start_time: 2008-01-01
end_time: 2020-08-01
fit_start_time: 2008-01-01
fit_end_time: 2014-12-31
instruments: *market
infer_processors:
- class: RobustZScoreNorm
kwargs:
fields_group: feature
clip_outlier: true
- class: Fillna
kwargs:
fields_group: feature
learn_processors:
- class: DropnaLabel
- class: CSRankNorm
kwargs:
fields_group: label
label: ["Ref($close, -2) / Ref($close, -1) - 1"]
port_analysis_config: &port_analysis_config
strategy:
class: TopkDropoutStrategy
module_path: qlib.contrib.strategy.strategy
kwargs:
topk: 50
n_drop: 5
backtest:
verbose: False
limit_threshold: 0.095
account: 100000000
benchmark: *benchmark
deal_price: close
open_cost: 0.0005
close_cost: 0.0015
min_cost: 5
task:
model:
class: TransformerModel
module_path: qlib.contrib.model.pytorch_transformer
kwargs:
d_feat: 6
seed: 0
dataset:
class: DatasetH
module_path: qlib.data.dataset
kwargs:
handler:
class: Alpha360
module_path: qlib.contrib.data.handler
kwargs: *data_handler_config
segments:
train: [2008-01-01, 2014-12-31]
valid: [2015-01-01, 2016-12-31]
test: [2017-01-01, 2020-08-01]
record:
- class: SignalRecord
module_path: qlib.workflow.record_temp
kwargs: {}
- class: SigAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
ana_long_short: False
ann_scaler: 252
- class: PortAnaRecord
module_path: qlib.workflow.record_temp
kwargs:
config: *port_analysis_config

View File

@@ -99,8 +99,6 @@ class HighFreqHandler(DataHandlerLP):
]
names += ["$volume_1"]
fields += ["Cut({0}, 240, None)".format(template_paused.format("Date($close)"))]
names += ["date"]
return fields, names

View File

@@ -33,6 +33,9 @@ class HighFreqNorm(Processor):
self.feature_vmin[name] = np.nanmin(part_values)
def __call__(self, df_features):
df_features["date"] = pd.to_datetime(
df_features.index.get_level_values(level="datetime").to_series().dt.date.values
)
df_features.set_index("date", append=True, drop=True, inplace=True)
df_values = df_features.values
names = {

View File

@@ -23,7 +23,6 @@ from qlib.config import REG_CN
from qlib.workflow import R
from qlib.tests.data import GetData
# init qlib
provider_uri = "~/.qlib/qlib_data/cn_data"
exp_folder_name = "run_all_model_records"
@@ -40,6 +39,7 @@ exp_manager = {
GetData().qlib_data(target_dir=provider_uri, region=REG_CN, exists_skip=True)
qlib.init(provider_uri=provider_uri, region=REG_CN, exp_manager=exp_manager)
# decorator to check the arguments
def only_allow_defined_args(function_to_decorate):
@functools.wraps(function_to_decorate)
@@ -92,7 +92,8 @@ def create_env():
# function to execute the cmd
def execute(cmd):
def execute(cmd, wait_when_err=False):
print("Running CMD:", cmd)
with subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1, universal_newlines=True, shell=True) as p:
for line in p.stdout:
sys.stdout.write(line.split("\b")[0])
@@ -102,6 +103,8 @@ def execute(cmd):
sys.stdout.write("\b" * 10 + "\b".join(line.split("\b")[1:-1]))
if p.returncode != 0:
if wait_when_err:
input("Press Enter to Continue")
return p.stderr
else:
return None
@@ -184,7 +187,15 @@ def gen_and_save_md_table(metrics, dataset):
# function to run the all the models
@only_allow_defined_args
def run(times=1, models=None, dataset="Alpha360", exclude=False):
def run(
times=1,
models=None,
dataset="Alpha360",
exclude=False,
qlib_uri: str = "git+https://github.com/microsoft/qlib#egg=pyqlib",
wait_before_rm_env: bool = False,
wait_when_err: bool = False,
):
"""
Please be aware that this function can only work under Linux. MacOS and Windows will be supported in the future.
Any PR to enhance this method is highly welcomed. Besides, this script doesn't support parrallel running the same model
@@ -200,6 +211,13 @@ def run(times=1, models=None, dataset="Alpha360", exclude=False):
determines whether the model being used is excluded or included.
dataset : str
determines the dataset to be used for each model.
qlib_uri : str
the uri to install qlib with pip
it could be url on the we or local path
wait_before_rm_env : bool
wait before remove environment.
wait_when_err : bool
wait when errors raised when executing commands
Usage:
-------
@@ -240,32 +258,36 @@ def run(times=1, models=None, dataset="Alpha360", exclude=False):
sys.stderr.write("\n")
# install requirements.txt
sys.stderr.write("Installing requirements.txt...\n")
execute(f"{python_path} -m pip install -r {req_path}")
execute(f"{python_path} -m pip install -r {req_path}", wait_when_err=wait_when_err)
sys.stderr.write("\n")
# setup gpu for tft
if fn == "TFT":
execute(
f"conda install -y --prefix {env_path} anaconda cudatoolkit=10.0 && conda install -y --prefix {env_path} cudnn"
f"conda install -y --prefix {env_path} anaconda cudatoolkit=10.0 && conda install -y --prefix {env_path} cudnn",
wait_when_err=wait_when_err,
)
sys.stderr.write("\n")
# install qlib
sys.stderr.write("Installing qlib...\n")
execute(f"{python_path} -m pip install --upgrade pip") # TODO: FIX ME!
execute(f"{python_path} -m pip install --upgrade cython") # TODO: FIX ME!
execute(f"{python_path} -m pip install --upgrade pip", wait_when_err=wait_when_err) # TODO: FIX ME!
execute(f"{python_path} -m pip install --upgrade cython", wait_when_err=wait_when_err) # TODO: FIX ME!
if fn == "TFT":
execute(
f"cd {env_path} && {python_path} -m pip install --upgrade --force-reinstall --ignore-installed PyYAML -e git+https://github.com/microsoft/qlib#egg=pyqlib"
f"cd {env_path} && {python_path} -m pip install --upgrade --force-reinstall --ignore-installed PyYAML -e {qlib_uri}",
wait_when_err=wait_when_err,
) # TODO: FIX ME!
else:
execute(
f"cd {env_path} && {python_path} -m pip install --upgrade --force-reinstall -e git+https://github.com/microsoft/qlib#egg=pyqlib"
f"cd {env_path} && {python_path} -m pip install --upgrade --force-reinstall -e {qlib_uri}",
wait_when_err=wait_when_err,
) # TODO: FIX ME!
sys.stderr.write("\n")
# run workflow_by_config for multiple times
for i in range(times):
sys.stderr.write(f"Running the model: {fn} for iteration {i+1}...\n")
errs = execute(
f"{python_path} {env_path / 'src/pyqlib/qlib/workflow/cli.py'} {yaml_path} {fn} {exp_folder_name}"
f"{python_path} {env_path / 'bin' / 'qrun'} {yaml_path} {fn} {exp_folder_name}",
wait_when_err=wait_when_err,
)
if errs is not None:
_errs = errors.get(fn, {})
@@ -274,6 +296,8 @@ def run(times=1, models=None, dataset="Alpha360", exclude=False):
sys.stderr.write("\n")
# remove env
sys.stderr.write(f"Deleting the environment: {env_path}...\n")
if wait_before_rm_env:
input("Press Enter to Continue")
shutil.rmtree(env_path)
# getting all results
sys.stderr.write(f"Retrieving results...\n")

View File

@@ -2,7 +2,7 @@
# Licensed under the MIT License.
__version__ = "0.6.3.99"
__version__ = "0.7.0.99"
__version__bak = __version__ # This version is backup for QlibConfig.reset_qlib_version

View File

@@ -185,7 +185,7 @@ def backtest(
exchange_kwargs={},
pos_type: str = "Position",
):
"""initialize the strategy and executor, then backtest funciton for the interaction of the outermost strategy and executor in the nested decision execution
"""initialize the strategy and executor, then backtest function for the interaction of the outermost strategy and executor in the nested decision execution
Parameters
----------

View File

@@ -1,9 +1,8 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from __future__ import annotations
import copy
from typing import Dict, List, Tuple
from typing import Dict, List, Tuple, TYPE_CHECKING
from qlib.utils import init_instance_by_config
import warnings
import pandas as pd
@@ -11,7 +10,9 @@ import pandas as pd
from .position import BasePosition, InfPosition, Position
from .report import Report, Indicator
from .order import BaseTradeDecision, Order
from .exchange import Exchange
if TYPE_CHECKING:
from .exchange import Exchange
"""
rtn & earning in the Account
@@ -73,6 +74,18 @@ class Account:
pos_type: str = "Position",
port_metr_enabled: bool = True,
):
"""the trade account of backtest.
Parameters
----------
init_cash : float, optional
initial cash, by default 1e9
position_dict : Dict[stock_id, {"amount": int, "price"(optional): float}], optional
initial stocks with amount and price,
if there is no price key in the dict of stocks, it will be filled by latest close price from qlib.
by default {}.
"""
self._pos_type = pos_type
self._port_metr_enabled = port_metr_enabled
@@ -109,7 +122,7 @@ class Account:
self.report = Report(freq, benchmark_config)
self.positions = {}
# trading related matric(e.g. high-frequency trading)
# trading related metrics(e.g. high-frequency trading)
self.indicator = Indicator()
def reset(self, freq=None, benchmark_config=None, init_report=False, port_metr_enabled: bool = None):
@@ -161,7 +174,7 @@ class Account:
self.accum_info.add_return_value(profit) # note here do not consider cost
def update_order(self, order, trade_val, cost, trade_price):
if not self.is_port_metr_enabled():
if self.current.skip_update():
# TODO: supporting polymorphism for account
# updating order for infinite position is meaningless
return
@@ -289,7 +302,7 @@ class Account:
if atomic is True and trade_info is None:
raise ValueError("trade_info is necessary in atomic executor")
elif atomic is False and inner_order_indicators is None:
raise ValueError("inner_order_indicators is necessary in unatomic executor")
raise ValueError("inner_order_indicators is necessary in un-atomic executor")
# TODO: `update_bar_count` and `update_current` should placed in Position and be merged.
self.update_bar_count()

View File

@@ -1,11 +1,15 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from __future__ import annotations
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .account import Account
from qlib.backtest.position import Position
from qlib.backtest.position import BasePosition, Position
import random
import logging
from typing import List, Tuple, Union
from typing import List, Tuple, Union, Callable, Iterable
import numpy as np
import pandas as pd
@@ -16,6 +20,7 @@ from ..config import C, REG_CN
from ..utils.resam import resam_ts_data, ts_data_last
from ..log import get_module_logger
from .order import Order, OrderDir, OrderHelper
from .high_performance_ds import PandasQuote
class Exchange:
@@ -33,6 +38,7 @@ class Exchange:
close_cost=0.0025,
min_cost=5,
extra_quote=None,
quote_cls=PandasQuote,
**kwargs,
):
"""__init__
@@ -103,10 +109,11 @@ class Exchange:
# TODO: the quote, trade_dates, codes are not necessray.
# It is just for performance consideration.
self.limit_type = self._get_limit_type(limit_threshold)
if limit_threshold is None:
if C.region == REG_CN:
self.logger.warning(f"limit_threshold not set. The stocks hit the limit may be bought/sold")
elif self._get_limit_type(limit_threshold) == self.LT_FLT and abs(limit_threshold) > 0.1:
elif self.limit_type == self.LT_FLT and abs(limit_threshold) > 0.1:
if C.region == REG_CN:
self.logger.warning(f"limit_threshold may not be set to a reasonable value")
@@ -128,10 +135,9 @@ class Exchange:
# $change is for calculating the limit of the stock
necessary_fields = {self.buy_price, self.sell_price, "$close", "$change", "$factor", "$volume"}
if self._get_limit_type(limit_threshold) == self.LT_TP_EXP:
if self.limit_type == self.LT_TP_EXP:
for exp in limit_threshold:
necessary_fields.add(exp)
subscribe_fields = list(necessary_fields | set(subscribe_fields))
all_fields = list(necessary_fields | set(subscribe_fields))
self.all_fields = all_fields
@@ -141,39 +147,43 @@ class Exchange:
self.limit_threshold: Union[Tuple[str, str], float, None] = limit_threshold
self.volume_threshold = volume_threshold
self.extra_quote = extra_quote
self.set_quote(codes, start_time, end_time)
self.get_quote_from_qlib()
def set_quote(self, codes, start_time, end_time):
if len(codes) == 0:
codes = D.instruments()
# init quote by quote_df
self.quote_cls = quote_cls
self.quote = self.quote_cls(self.quote_df)
self.quote = D.features(codes, self.all_fields, start_time, end_time, freq=self.freq, disk_cache=True).dropna(
subset=["$close"]
)
self.quote.columns = self.all_fields
def get_quote_from_qlib(self):
# get stock data from qlib
if len(self.codes) == 0:
self.codes = D.instruments()
self.quote_df = D.features(
self.codes, self.all_fields, self.start_time, self.end_time, freq=self.freq, disk_cache=True
).dropna(subset=["$close"])
self.quote_df.columns = self.all_fields
# check buy_price data and sell_price data
for attr in "buy_price", "sell_price":
pstr = getattr(self, attr) # price string
if self.quote[pstr].isna().any():
if self.quote_df[pstr].isna().any():
self.logger.warning("{} field data contains nan.".format(pstr))
if self.quote["$factor"].isna().any():
# update trade_w_adj_price
if self.quote_df["$factor"].isna().any():
# The 'factor.day.bin' file not exists, and `factor` field contains `nan`
# Use adjusted price
self.trade_w_adj_price = True
self.logger.warning("factor.day.bin file not exists or factor contains `nan`. Order using adjusted_price.")
if self.trade_unit is not None:
self.logger.warning(f"trade unit {self.trade_unit} is not supported in adjusted_price mode.")
else:
# The `factor.day.bin` file exists and all data `close` and `factor` are not `nan`
# Use normal price
self.trade_w_adj_price = False
# update limit
self._update_limit()
self._update_limit(self.limit_threshold)
quote_df = self.quote
# concat extra_quote
if self.extra_quote is not None:
# process extra_quote
if "$close" not in self.extra_quote:
@@ -192,21 +202,15 @@ class Exchange:
if "limit_buy" not in self.extra_quote.columns:
self.extra_quote["limit_buy"] = False
self.logger.warning("No limit_buy set for extra_quote. All stock will be able to be bought.")
assert set(self.extra_quote.columns) == set(quote_df.columns) - {"$change"}
quote_df = pd.concat([quote_df, self.extra_quote], sort=False, axis=0)
quote_dict = {}
for stock_id, stock_val in quote_df.groupby(level="instrument"):
quote_dict[stock_id] = stock_val.droplevel(level="instrument")
self.quote = quote_dict
assert set(self.extra_quote.columns) == set(self.quote_df.columns) - {"$change"}
self.quote_df = pd.concat([self.quote_df, extra_quote], sort=False, axis=0)
LT_TP_EXP = "(exp)" # Tuple[str, str]
LT_FLT = "float" # float
LT_NONE = "none" # none
def _get_limit_type(self, limit_threshold):
"""get limit type"""
if isinstance(limit_threshold, Tuple):
return self.LT_TP_EXP
elif isinstance(limit_threshold, float):
@@ -216,19 +220,19 @@ class Exchange:
else:
raise NotImplementedError(f"This type of `limit_threshold` is not supported")
def _update_limit(self):
def _update_limit(self, limit_threshold):
# check limit_threshold
lt_type = self._get_limit_type(self.limit_threshold)
if lt_type == self.LT_NONE:
self.quote["limit_buy"] = False
self.quote["limit_sell"] = False
elif lt_type == self.LT_TP_EXP:
limit_type = self._get_limit_type(limit_threshold)
if limit_type == self.LT_NONE:
self.quote_df["limit_buy"] = False
self.quote_df["limit_sell"] = False
elif limit_type == self.LT_TP_EXP:
# set limit
self.quote["limit_buy"] = self.quote[self.limit_threshold[0]]
self.quote["limit_sell"] = self.quote[self.limit_threshold[1]]
elif lt_type == self.LT_FLT:
self.quote["limit_buy"] = self.quote["$change"].ge(self.limit_threshold)
self.quote["limit_sell"] = self.quote["$change"].le(-self.limit_threshold) # pylint: disable=E1130
self.quote_df["limit_buy"] = self.quote_df[limit_threshold[0]]
self.quote_df["limit_sell"] = self.quote_df[limit_threshold[1]]
elif limit_type == self.LT_FLT:
self.quote_df["limit_buy"] = self.quote_df["$change"].ge(limit_threshold)
self.quote_df["limit_sell"] = self.quote_df["$change"].le(-limit_threshold) # pylint: disable=E1130
def check_stock_limit(self, stock_id, start_time, end_time, direction=None):
"""
@@ -242,20 +246,20 @@ class Exchange:
"""
if direction is None:
buy_limit = resam_ts_data(self.quote[stock_id]["limit_buy"], start_time, end_time, method="all")
sell_limit = resam_ts_data(self.quote[stock_id]["limit_sell"], start_time, end_time, method="all")
buy_limit = self.quote.get_data(stock_id, start_time, end_time, fields="limit_buy", method="all")
sell_limit = self.quote.get_data(stock_id, start_time, end_time, fields="limit_sell", method="all")
return buy_limit or sell_limit
elif direction == Order.BUY:
return resam_ts_data(self.quote[stock_id]["limit_buy"], start_time, end_time, method="all")
return self.quote.get_data(stock_id, start_time, end_time, fields="limit_buy", method="all")
elif direction == Order.SELL:
return resam_ts_data(self.quote[stock_id]["limit_sell"], start_time, end_time, method="all")
return self.quote.get_data(stock_id, start_time, end_time, fields="limit_sell", method="all")
else:
raise ValueError(f"direction {direction} is not supported!")
def check_stock_suspended(self, stock_id, start_time, end_time):
# is suspended
if stock_id in self.quote:
return resam_ts_data(self.quote[stock_id], start_time, end_time, method=None) is None
if stock_id in self.quote.get_all_stock():
return self.quote.get_data(stock_id, start_time, end_time) is None
else:
return True
@@ -278,7 +282,7 @@ class Exchange:
else:
return True
def deal_order(self, order, trade_account=None, position=None):
def deal_order(self, order, trade_account: Account = None, position: BasePosition = None):
"""
Deal order when the actual transaction
@@ -289,13 +293,12 @@ class Exchange:
:param position: position to be updated after dealing the order.
:return: trade_val, trade_cost, trade_price
"""
# need to check order first
# TODO: check the order unit limit in the exchange!!!!
# The order limit is related to the adj factor and the cur_amount.
# factor = self.quote[(order.stock_id, order.trade_date)]['$factor']
# cur_amount = trade_account.current.get_stock_amount(order.stock_id)
# check order first.
if self.check_order(order) is False:
raise AttributeError("need to check order first")
order.deal_amount = 0.0
# using np.nan instead of None to make it more convenient to should the value in format string
return 0.0, 0.0, np.nan
if trade_account is not None and position is not None:
raise ValueError("trade_account and position can only choose one")
@@ -304,25 +307,29 @@ class Exchange:
trade_val, trade_cost = self._calc_trade_info_by_order(
order, trade_account.current if trade_account else position
)
# update account
if order.deal_amount > 1e-5:
# If the order can only be deal 0 aomount. Nothing to be updated
# Otherwise, it will result some stock with 0 amount in the position
# If the order can only be deal 0 amount. Nothing to be updated
# Otherwise, it will result in
# 1) some stock with 0 amount in the position
# 2) `trade_unit` of trade_cost will be lost in user account
if trade_account:
trade_account.update_order(order=order, trade_val=trade_val, cost=trade_cost, trade_price=trade_price)
elif position:
position.update_order(order=order, trade_val=trade_val, cost=trade_cost, trade_price=trade_price)
else:
# if dealing is not successful, the trade_cost should be zero
trade_cost = 0
return trade_val, trade_cost, trade_price
def get_quote_info(self, stock_id, start_time, end_time, method=ts_data_last):
return resam_ts_data(self.quote[stock_id], start_time, end_time, method=method)
return self.quote.get_data(stock_id, start_time, end_time, method=method)
def get_close(self, stock_id, start_time, end_time, method=ts_data_last):
return resam_ts_data(self.quote[stock_id]["$close"], start_time, end_time, method=method)
return self.quote.get_data(stock_id, start_time, end_time, fields="$close", method=method)
def get_volume(self, stock_id, start_time, end_time, method="sum"):
return resam_ts_data(self.quote[stock_id]["$volume"], start_time, end_time, method=method)
return self.quote.get_data(stock_id, start_time, end_time, fields="$volume", method=method)
def get_deal_price(self, stock_id, start_time, end_time, direction: OrderDir, method=ts_data_last):
if direction == OrderDir.SELL:
@@ -331,7 +338,7 @@ class Exchange:
pstr = self.buy_price
else:
raise NotImplementedError(f"This type of input is not supported")
deal_price = resam_ts_data(self.quote[stock_id][pstr], start_time, end_time, method=method)
deal_price = self.quote.get_data(stock_id, start_time, end_time, fields=pstr, method=method)
if method is not None and (np.isclose(deal_price, 0.0) or np.isnan(deal_price)):
self.logger.warning(f"(stock_id:{stock_id}, trade_time:{(start_time, end_time)}, {pstr}): {deal_price}!!!")
self.logger.warning(f"setting deal_price to close price")
@@ -346,10 +353,10 @@ class Exchange:
`None`: if the stock is suspended `None` may be returned
`float`: return factor if the factor exists
"""
assert (start_time is not None and end_time is not None, "the time range must be given")
if stock_id not in self.quote:
assert start_time is not None and end_time is not None, "the time range must be given"
if stock_id not in self.quote.get_all_stock():
return None
return resam_ts_data(self.quote[stock_id]["$factor"], start_time, end_time, method=ts_data_last)
return self.quote.get_data(stock_id, start_time, end_time, fields="$factor", method=ts_data_last)
def generate_amount_position_from_weight_position(
self, weight_position, cash, start_time, end_time, direction=OrderDir.BUY
@@ -509,7 +516,7 @@ class Exchange:
)
return value
def _get_factor_or_raise_erorr(self, factor: float = None, stock_id: str = None, start_time=None, end_time=None):
def _get_factor_or_raise_error(self, factor: float = None, stock_id: str = None, start_time=None, end_time=None):
"""Please refer to the docs of get_amount_of_trade_unit"""
if factor is None:
if stock_id is not None and start_time is not None and end_time is not None:
@@ -537,7 +544,7 @@ class Exchange:
the end time of trading range
"""
if not self.trade_w_adj_price and self.trade_unit is not None:
factor = self._get_factor_or_raise_erorr(
factor = self._get_factor_or_raise_error(
factor=factor, stock_id=stock_id, start_time=start_time, end_time=end_time
)
return self.trade_unit / factor
@@ -556,7 +563,7 @@ class Exchange:
"""
if not self.trade_w_adj_price and self.trade_unit is not None:
# the minimal amount is 1. Add 0.1 for solving precision problem.
factor = self._get_factor_or_raise_erorr(
factor = self._get_factor_or_raise_error(
factor=factor, stock_id=stock_id, start_time=start_time, end_time=end_time
)
return (deal_amount * factor + 0.1) // self.trade_unit * self.trade_unit / factor
@@ -626,7 +633,7 @@ class Exchange:
order.stock_id, order.start_time, order.end_time, order.deal_amount
)
trade_val = order.deal_amount * trade_price
trade_cost = trade_val * self.open_cost
trade_cost = max(trade_val * self.open_cost, self.min_cost)
else:
raise NotImplementedError("order type {} error".format(order.type))

View File

@@ -1,5 +1,6 @@
from abc import abstractclassmethod, abstractmethod
import copy
from qlib.backtest.position import BasePosition
from qlib.log import get_module_logger
from types import GeneratorType
from qlib.backtest.account import Account
@@ -32,6 +33,7 @@ class BaseExecutor:
track_data: bool = False,
trade_exchange: Exchange = None,
common_infra: CommonInfrastructure = None,
settle_type=BasePosition.ST_NO,
**kwargs,
):
"""
@@ -95,6 +97,8 @@ class BaseExecutor:
- trade_exchange : Exchange, optional
exchange that provides market info
settle_type : str
Please refer to the docs of BasePosition.settle_start
"""
self.time_per_step = time_per_step
self.indicator_config = indicator_config
@@ -104,6 +108,7 @@ class BaseExecutor:
self._trade_exchange = trade_exchange
self.level_infra = LevelInfrastructure()
self.level_infra.reset_infra(common_infra=common_infra)
self._settle_type = settle_type
self.reset(start_time=start_time, end_time=end_time, common_infra=common_infra)
if common_infra is None:
get_module_logger("BaseExecutor").warning(f"`common_infra` is not set for {self}")
@@ -235,6 +240,9 @@ class BaseExecutor:
if atomic and trade_decision.get_range_limit(default_value=None) is not None:
raise ValueError("atomic executor doesn't support specify `range_limit`")
if self._settle_type != BasePosition.ST_NO:
self.trade_account.current.settle_start(self._settle_type)
obj = self._collect_data(trade_decision=trade_decision, level=level)
if isinstance(obj, GeneratorType):
@@ -256,6 +264,10 @@ class BaseExecutor:
)
self.trade_calendar.step()
if self._settle_type != BasePosition.ST_NO:
self.trade_account.current.settle_commit()
if return_value is not None:
return_value.update({"execute_result": res})
return res
@@ -366,7 +378,7 @@ class NestedExecutor(BaseExecutor):
trade_decision = self._update_trade_decision(trade_decision)
if trade_decision.empty() and self._skip_empty_decision:
# give one chance for outer stategy to update the strategy
# give one chance for outer strategy to update the strategy
# - For updating some information in the sub executor(the strategy have no knowledge of the inner
# executor when generating the decision)
break
@@ -393,7 +405,7 @@ class NestedExecutor(BaseExecutor):
execute_result.extend(_inner_execute_result)
inner_order_indicators.append(
self.inner_executor.trade_account.get_trade_indicator().get_order_indicator()
self.inner_executor.trade_account.get_trade_indicator().get_order_indicator(raw=True)
)
else:
# do nothing and just step forward
@@ -409,6 +421,9 @@ class NestedExecutor(BaseExecutor):
class SimulatorExecutor(BaseExecutor):
"""Executor that simulate the true market"""
# TODO: TT_SERIAL & TT_PARAL will be replaced by feature fix_pos now.
# Please remove them in the future.
# available trade_types
TT_SERIAL = "serial"
## The orders will be executed serially in a sequence
@@ -486,42 +501,22 @@ class SimulatorExecutor(BaseExecutor):
execute_result = []
for order in self._get_order_iterator(trade_decision):
if self.trade_exchange.check_order(order) is True:
# execute the order.
# NOTE: The trade_account will be changed in this function
trade_val, trade_cost, trade_price = self.trade_exchange.deal_order(
order, trade_account=self.trade_account
# execute the order.
# NOTE: The trade_account will be changed in this function
trade_val, trade_cost, trade_price = self.trade_exchange.deal_order(order, trade_account=self.trade_account)
execute_result.append((order, trade_val, trade_cost, trade_price))
if self.verbose:
print(
"[I {:%Y-%m-%d %H:%M:%S}]: {} {}, price {:.2f}, amount {}, deal_amount {}, factor {}, value {:.2f}, cash {:.2f}.".format(
trade_start_time,
"sell" if order.direction == Order.SELL else "buy",
order.stock_id,
trade_price,
order.amount,
order.deal_amount,
order.factor,
trade_val,
self.trade_account.get_cash(),
)
)
execute_result.append((order, trade_val, trade_cost, trade_price))
if self.verbose:
if order.direction == Order.SELL: # sell
print(
"[I {:%Y-%m-%d %H:%M:%S}]: sell {}, price {:.2f}, amount {}, deal_amount {}, factor {}, value {:.2f}.".format(
trade_start_time,
order.stock_id,
trade_price,
order.amount,
order.deal_amount,
order.factor,
trade_val,
)
)
else:
print(
"[I {:%Y-%m-%d %H:%M:%S}]: buy {}, price {:.2f}, amount {}, deal_amount {}, factor {}, value {:.2f}.".format(
trade_start_time,
order.stock_id,
trade_price,
order.amount,
order.deal_amount,
order.factor,
trade_val,
)
)
else:
if self.verbose:
print("[W {:%Y-%m-%d %H:%M:%S}]: {} wrong.".format(trade_start_time, order.stock_id))
# do nothing
pass
return execute_result, {"trade_info": execute_result}

View File

@@ -0,0 +1,449 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import logging
from typing import List, Text, Tuple, Union, Callable, Iterable, Dict
from collections import OrderedDict
import inspect
import pandas as pd
from ..utils.resam import resam_ts_data
from ..log import get_module_logger
class BaseQuote:
def __init__(self, quote_df: pd.DataFrame):
self.logger = get_module_logger("online operator", level=logging.INFO)
def get_all_stock(self) -> Iterable:
"""return all stock codes
Return
------
Iterable
all stock codes
"""
raise NotImplementedError(f"Please implement the `get_all_stock` method")
def get_data(
self,
stock_id: Union[str, list],
start_time: Union[pd.Timestamp, str],
end_time: Union[pd.Timestamp, str],
fields: Union[str, list] = None,
method: Union[str, Callable] = None,
) -> Union[None, float, pd.Series, pd.DataFrame]:
"""get the specific fields of stock data during start time and end_time,
and apply method to the data.
Example:
.. code-block::
$close $volume
instrument datetime
SH600000 2010-01-04 86.778313 16162960.0
2010-01-05 87.433578 28117442.0
2010-01-06 85.713585 23632884.0
2010-01-07 83.788803 20813402.0
2010-01-08 84.730675 16044853.0
SH600655 2010-01-04 2699.567383 158193.328125
2010-01-08 2612.359619 77501.406250
2010-01-11 2712.982422 160852.390625
2010-01-12 2788.688232 164587.937500
2010-01-13 2790.604004 145460.453125
print(get_data(stock_id=["SH600000", "SH600655"], start_time="2010-01-04", end_time="2010-01-05", fields=["$close", "$volume"], method="last"))
$close $volume
instrument
SH600000 87.433578 28117442.0
SH600655 2699.567383 158193.328125
print(get_data(stock_id="SH600000", start_time="2010-01-04", end_time="2010-01-05", fields=["$close", "$volume"], method="last"))
$close 87.433578
$volume 28117442.0
print(get_data(stock_id="SH600000", start_time="2010-01-04", end_time="2010-01-05", fields="$close", method="last"))
87.433578
Parameters
----------
stock_id: Union[str, list]
start_time : Union[pd.Timestamp, str]
closed start time for backtest
end_time : Union[pd.Timestamp, str]
closed end time for backtest
fields : Union[str, List]
the columns of data to fetch
method : Union[str, Callable]
the method apply to data.
e.g [None, "last", "all", "sum", "mean", "any", qlib/utils/resam.py/ts_data_last]
Return
----------
Union[None, float, pd.Series, pd.DataFrame]
The resampled DataFrame/Series/value, return None when the resampled data is empty.
"""
raise NotImplementedError(f"Please implement the `get_data` method")
class PandasQuote(BaseQuote):
def __init__(self, quote_df: pd.DataFrame):
super().__init__(quote_df=quote_df)
quote_dict = {}
for stock_id, stock_val in quote_df.groupby(level="instrument"):
quote_dict[stock_id] = stock_val.droplevel(level="instrument")
self.data = quote_dict
def get_all_stock(self):
return self.data.keys()
def get_data(self, stock_id, start_time, end_time, fields=None, method=None):
if fields is None:
return resam_ts_data(self.data[stock_id], start_time, end_time, method=method)
elif isinstance(fields, (str, list)):
return resam_ts_data(self.data[stock_id][fields], start_time, end_time, method=method)
else:
raise ValueError(f"fields must be None, str or list")
class BaseSingleMetric:
"""
The data structure of the single metric.
The following methods are used for computing metrics in one indicator.
"""
def __init__(self, metric: Union[dict, pd.Series]):
raise NotImplementedError(f"Please implement the `__init__` method")
def __add__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric":
raise NotImplementedError(f"Please implement the `__add__` method")
def __radd__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric":
return self + other
def __sub__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric":
raise NotImplementedError(f"Please implement the `__sub__` method")
def __rsub__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric":
raise NotImplementedError(f"Please implement the `__rsub__` method")
def __mul__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric":
raise NotImplementedError(f"Please implement the `__mul__` method")
def __truediv__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric":
raise NotImplementedError(f"Please implement the `__truediv__` method")
def __eq__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric":
raise NotImplementedError(f"Please implement the `__eq__` method")
def __gt__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric":
raise NotImplementedError(f"Please implement the `__gt__` method")
def __lt__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric":
raise NotImplementedError(f"Please implement the `__lt__` method")
def __len__(self) -> int:
raise NotImplementedError(f"Please implement the `__len__` method")
def sum(self) -> float:
raise NotImplementedError(f"Please implement the `sum` method")
def mean(self) -> float:
raise NotImplementedError(f"Please implement the `mean` method")
def count(self) -> int:
"""Return the count of the single metric, NaN is not included."""
raise NotImplementedError(f"Please implement the `count` method")
def abs(self) -> "BaseSingleMetric":
raise NotImplementedError(f"Please implement the `abs` method")
def astype(self, type: type) -> "BaseSingleMetric":
raise NotImplementedError(f"Please implement the `astype` method")
@property
def empty(self) -> bool:
"""If metric is empyt, return True."""
raise NotImplementedError(f"Please implement the `empty` method")
def add(self, other: "BaseSingleMetric", fill_value: float = None) -> "BaseSingleMetric":
"""Replace np.NaN with fill_value in two metrics and add them."""
raise NotImplementedError(f"Please implement the `add` method")
def replace(self, replace_dict: dict) -> "BaseSingleMetric":
"""Replace the value of metric according to replace_dict."""
raise NotImplementedError(f"Please implement the `replace` method")
def apply(self, func: dict) -> "BaseSingleMetric":
"""Replace the value of metric with func(metric).
Currently, the func is only qlib/backtest/order/Order.parse_dir.
"""
raise NotImplementedError(f"Please implement the 'apply' method")
class BaseOrderIndicator:
"""
The data structure of order indicator.
!!!NOTE: There are two ways to organize the data structure. Please choose a better way.
1. One way is using BaseSingleMetric to represent each metric. For example, the data
structure of PandasOrderIndicator is Dict[str, PandasSingleMetric]. It uses
PandasSingleMetric based on pd.Series to represent each metric.
2. The another way doesn't use BaseSingleMetric to represent each metric. The data
structure of PandasOrderIndicator is a whole matrix. It means you are not neccesary
to inherit the BaseSingleMetric.
"""
def assign(self, col: str, metric: Union[dict, pd.Series]):
"""assign one metric.
Parameters
----------
col : str
the metric name of one metric.
metric : Union[dict, pd.Series]
the metric data.
"""
pass
def transfer(self, func: Callable, new_col: str = None) -> Union[None, BaseSingleMetric]:
"""compute new metric with existing metrics.
Parameters
----------
func : Callable
the func of computing new metric.
the kwargs of func will be replaced with metric data by name in this function.
e.g.
def func(pa):
return (pa > 0).astype(int).sum() / pa.count()
new_col : str, optional
New metric will be assigned in the data if new_col is not None, by default None.
Return
----------
BaseSingleMetric
new metric.
"""
pass
def get_metric_series(self, metric: str) -> pd.Series:
"""return the single metric with pd.Series format.
Parameters
----------
metric : str
the metric name.
Return
----------
pd.Series
the single metric.
If there is no metric name in the data, return pd.Series().
"""
pass
@staticmethod
def sum_all_indicators(
indicators: list, metrics: Union[str, List[str]], fill_value: float = None
) -> Dict[str, BaseSingleMetric]:
"""sum indicators with the same metrics.
Parameters
----------
indicators : List[BaseOrderIndicator]
the list of all inner indicators.
metrics : Union[str, List[str]]
all metrics needs ot be sumed.
fill_value : float, optional
fill np.NaN with value. By default None.
Return
----------
Dict[str: PandasSingleMetric]
a dict of metric name and data.
"""
pass
def to_series(self) -> Dict[Text, pd.Series]:
"""return the metrics as pandas series
for example: { "ffr":
SH600068 NaN
SH600079 1.0
SH600266 NaN
...
SZ300692 NaN
SZ300719 NaN,
...
}
"""
raise NotImplementedError(f"Please implement the `to_series` method")
class PandasSingleMetric:
"""Each SingleMetric is based on pd.Series."""
def __init__(self, metric: Union[dict, pd.Series]):
if isinstance(metric, dict):
self.metric = pd.Series(metric)
elif isinstance(metric, pd.Series):
self.metric = metric
else:
raise ValueError(f"metric must be dict or pd.Series")
def __add__(self, other):
if isinstance(other, (int, float)):
return PandasSingleMetric(self.metric + other)
elif isinstance(other, PandasSingleMetric):
return PandasSingleMetric(self.metric + other.metric)
else:
return NotImplemented
def __sub__(self, other):
if isinstance(other, (int, float)):
return PandasSingleMetric(self.metric - other)
elif isinstance(other, PandasSingleMetric):
return PandasSingleMetric(self.metric - other.metric)
else:
return NotImplemented
def __rsub__(self, other):
if isinstance(other, (int, float)):
return PandasSingleMetric(other - self.metric)
elif isinstance(other, PandasSingleMetric):
return PandasSingleMetric(other.metric - self.metric)
else:
return NotImplemented
def __mul__(self, other):
if isinstance(other, (int, float)):
return PandasSingleMetric(self.metric * other)
elif isinstance(other, PandasSingleMetric):
return PandasSingleMetric(self.metric * other.metric)
else:
return NotImplemented
def __truediv__(self, other):
if isinstance(other, (int, float)):
return PandasSingleMetric(self.metric / other)
elif isinstance(other, PandasSingleMetric):
return PandasSingleMetric(self.metric / other.metric)
else:
return NotImplemented
def __eq__(self, other):
if isinstance(other, (int, float)):
return PandasSingleMetric(self.metric == other)
elif isinstance(other, PandasSingleMetric):
return PandasSingleMetric(self.metric == other.metric)
else:
return NotImplemented
def __gt__(self, other):
if isinstance(other, (int, float)):
return PandasSingleMetric(self.metric < other)
elif isinstance(other, PandasSingleMetric):
return PandasSingleMetric(self.metric < other.metric)
else:
return NotImplemented
def __lt__(self, other):
if isinstance(other, (int, float)):
return PandasSingleMetric(self.metric > other)
elif isinstance(other, PandasSingleMetric):
return PandasSingleMetric(self.metric > other.metric)
else:
return NotImplemented
def __len__(self):
return len(self.metric)
def sum(self):
return self.metric.sum()
def mean(self):
return self.metric.mean()
def count(self):
return self.metric.count()
def abs(self):
return PandasSingleMetric(self.metric.abs())
def astype(self, type):
return PandasSingleMetric(self.metric.astype(type))
@property
def empty(self):
return self.metric.empty
def add(self, other, fill_value=None):
return PandasSingleMetric(self.metric.add(other.metric, fill_value=fill_value))
def replace(self, replace_dict: dict):
return PandasSingleMetric(self.metric.replace(replace_dict))
def apply(self, func: Callable):
return PandasSingleMetric(self.metric.apply(func))
class PandasOrderIndicator(BaseOrderIndicator):
"""
The data structure is OrderedDict(str: PandasSingleMetric).
Each PandasSingleMetric based on pd.Series is one metric.
Str is the name of metric.
"""
def __init__(self):
self.data: Dict[str, PandasSingleMetric] = OrderedDict()
def assign(self, col: str, metric: Union[dict, pd.Series]):
self.data[col] = PandasSingleMetric(metric)
def transfer(self, func: Callable, new_col: str = None) -> Union[None, PandasSingleMetric]:
func_sig = inspect.signature(func).parameters.keys()
func_kwargs = {sig: self.data[sig] for sig in func_sig}
tmp_metric = func(**func_kwargs)
if new_col is not None:
self.data[new_col] = tmp_metric
else:
return tmp_metric
def get_metric_series(self, metric: str) -> Union[pd.Series]:
if metric in self.data:
return self.data[metric].metric
else:
return pd.Series()
@staticmethod
def sum_all_indicators(
indicators: list, metrics: Union[str, List[str]], fill_value=None
) -> Dict[str, PandasSingleMetric]:
metric_dict = {}
if isinstance(metrics, str):
metrics = [metrics]
for metric in metrics:
tmp_metric = PandasSingleMetric({})
for indicator in indicators:
tmp_metric = tmp_metric.add(indicator.data[metric], fill_value)
metric_dict[metric] = tmp_metric.metric
return metric_dict
def to_series(self):
return {k: v.metric for k, v in self.data.items()}

View File

@@ -59,12 +59,19 @@ class Order:
# 3) results
# - users should not care about these values
# - they are set by the backtest system after finishing the results.
# What the value should be about in all kinds of cases
# - not tradable: the deal_amount == 0 , factor is None
# - the stock is suspended and the entire order fails. No cost for this order
# - dealed or partially dealed: deal_amount >= 0 and factor is not None
deal_amount: Optional[float] = None # `deal_amount` is a non-negative value
factor: Optional[float] = None
# TODO:
# a status field to indicate the dealing result of the order
# FIXME:
# for compatible now.
# Plese remove them in the future
# Please remove them in the future
SELL: ClassVar[OrderDir] = OrderDir.SELL
BUY: ClassVar[OrderDir] = OrderDir.BUY
@@ -72,6 +79,7 @@ class Order:
if self.direction not in {Order.SELL, Order.BUY}:
raise NotImplementedError("direction not supported, `Order.SELL` for sell, `Order.BUY` for buy")
self.deal_amount = 0
self.factor = None
@property
def amount_delta(self) -> float:

View File

@@ -4,10 +4,14 @@
import copy
import pathlib
from typing import Dict, List
from typing import Dict, List, Union
import pandas as pd
from datetime import timedelta
import numpy as np
from .order import Order
from ..data.data import D
class BasePosition:
@@ -16,8 +20,8 @@ class BasePosition:
Please refer to the `Position` class for the position
"""
def __init__(self, cash=0.0, *args, **kwargs) -> None:
pass
def __init__(self, cash=0.0, *args, **kwargs):
self._settle_type = self.ST_NO
def skip_update(self) -> bool:
"""
@@ -120,13 +124,16 @@ class BasePosition:
"""
raise NotImplementedError(f"Please implement the `get_stock_amount` method")
def get_cash(self) -> float:
def get_cash(self, include_settle: bool = False) -> float:
"""
Returns
-------
float:
the cash in position
the available(tradable) cash in position
include_settle:
will the unsettled(delayed) cash included
Default: not include those unavailable cash
"""
raise NotImplementedError(f"Please implement the `get_cash` method")
@@ -184,6 +191,37 @@ class BasePosition:
"""
raise NotImplementedError(f"Please implement the `add_count_all` method")
ST_CASH = "cash"
ST_NO = None
def settle_start(self, settle_type: str):
"""
settlement start
It will act like start and commit a transaction
Parameters
----------
settle_type : str
Should we make delay the settlement in each execution (each execution will make the executor a step forward)
- "cash": make the cash settlement delayed.
- The cash you get can't be used in current step (e.g. you can't sell a stock to get cash to buy another
stock)
- None: not settlement mechanism
- TODO: other assets will be supported in the future.
"""
raise NotImplementedError(f"Please implement the `settle_conf` method")
def settle_commit(self):
"""
settlement commit
Parameters
----------
settle_type : str
please refer to the documents of Executor
"""
raise NotImplementedError(f"Please implement the `settle_commit` method")
class Position(BasePosition):
"""Position
@@ -199,7 +237,22 @@ class Position(BasePosition):
}
"""
def __init__(self, cash=0, position_dict={}):
def __init__(self, cash: float = 0, position_dict: Dict[str, Dict[str, float]] = {}):
"""Init position by cash and position_dict.
Parameters
----------
start_time :
the start time of backtest. It's for filling the initial value of stocks.
cash : float, optional
initial cash in account, by default 0
position_dict : Dict[stock_id, {"amount": int, "price"(optional): float}], optional
initial stocks with parameters amount and price,
if there is no price key in the dict of stocks, it will be filled by _fill_stock_value.
by default {}.
"""
super().__init__()
# NOTE: The position dict must be copied!!!
# Otherwise the initial value
self.init_cash = cash
@@ -207,6 +260,50 @@ class Position(BasePosition):
self.position["cash"] = cash
self.position["now_account_value"] = self.calculate_value()
def _fill_stock_value(
self, position_dict: dict, start_time: Union[str, pd.Timestamp], freq: str, last_days: int = 30
):
"""fill the stock value by the close price of latest last_days from qlib.
Parameters
----------
position_dict : Dict[stock_id, {"amount": int, "price": float}]
initial holding stocks.
start_time :
the start time of backtest.
last_days : int, optional
the days to get the latest close price, by default 30.
Return
----------
Dict[stock_id, {"amount": int, "price": float}]
initial holding stocks with filled price.
"""
stock_list = []
for stock in position_dict:
if ("price" not in position_dict[stock]) or (position_dict[stock]["price"] is None):
stock_list.append(stock)
if len(stock_list) == 0:
return position_dict
start_time = pd.Timestamp(start_time)
# note that start time is 2020-01-01 00:00:00 if raw start time is "2020-01-01"
price_end_time = start_time
price_start_time = start_time - timedelta(days=last_days)
price_df = D.features(
stock_list, ["$close"], price_start_time, price_end_time, freq=freq, disk_cache=True
).dropna()
price_dict = price_df.groupby(["instrument"]).tail(1).reset_index(level=1, drop=True)["$close"].to_dict()
if len(price_dict) < len(stock_list):
raise ValueError(f"there is no close price in qlib")
for stock in stock_list:
position_dict[stock]["price"] = price_dict[stock]
return position_dict
def _init_stock(self, stock_id, amount, price=None):
"""
initialization the stock in current position
@@ -250,7 +347,13 @@ class Position(BasePosition):
elif abs(self.position[stock_id]["amount"]) <= 1e-5:
self._del_stock(stock_id)
self.position["cash"] += trade_val - cost
new_cash = trade_val - cost
if self._settle_type == self.ST_CASH:
self.position["cash_delay"] += new_cash
elif self._settle_type == self.ST_NO:
self.position["cash"] += new_cash
else:
raise NotImplementedError(f"This type of input is not supported")
def _del_stock(self, stock_id):
del self.position[stock_id]
@@ -278,9 +381,6 @@ class Position(BasePosition):
def update_stock_weight(self, stock_id, weight):
self.position[stock_id]["weight"] = weight
def update_cash(self, cash):
self.position["cash"] = cash
def calculate_stock_value(self):
stock_list = self.get_stock_list()
value = 0
@@ -290,11 +390,11 @@ class Position(BasePosition):
def calculate_value(self):
value = self.calculate_stock_value()
value += self.position["cash"]
value += self.position["cash"] + self.position.get("cash_delay", 0.0)
return value
def get_stock_list(self):
stock_list = list(set(self.position.keys()) - {"cash", "now_account_value"})
stock_list = list(set(self.position.keys()) - {"cash", "now_account_value", "cash_delay"})
return stock_list
def get_stock_price(self, code):
@@ -313,8 +413,11 @@ class Position(BasePosition):
def get_stock_weight(self, code):
return self.position[code]["weight"]
def get_cash(self):
return self.position["cash"]
def get_cash(self, include_settle=False):
cash = self.position["cash"]
if include_settle:
cash += self.position.get("cash_delay", 0.0)
return cash
def get_stock_amount_dict(self):
"""generate stock amount dict {stock_id : amount of stock}"""
@@ -326,7 +429,7 @@ class Position(BasePosition):
def get_stock_weight_dict(self, only_stock=False):
"""get_stock_weight_dict
generate stock weight fict {stock_id : value weight of stock in the position}
generate stock weight dict {stock_id : value weight of stock in the position}
it is meaningful in the beginning or the end of each trade date
:param only_stock: If only_stock=True, the weight of each stock in total stock will be returned
@@ -355,49 +458,20 @@ class Position(BasePosition):
for stock_code, weight in weight_dict.items():
self.update_stock_weight(stock_code, weight)
def save_position(self, path):
path = pathlib.Path(path)
p = copy.deepcopy(self.position)
cash = pd.Series(dtype=float)
cash["init_cash"] = self.init_cash
cash["cash"] = p["cash"]
cash["now_account_value"] = p["now_account_value"]
del p["cash"]
del p["now_account_value"]
positions = pd.DataFrame.from_dict(p, orient="index")
with pd.ExcelWriter(path) as writer:
positions.to_excel(writer, sheet_name="position")
cash.to_excel(writer, sheet_name="info")
def settle_start(self, settle_type):
assert self._settle_type == self.ST_NO, "Currently, settlement can't be nested!!!!!"
self._settle_type = settle_type
if settle_type == self.ST_CASH:
self.position["cash_delay"] = 0.0
def load_position(self, path):
"""load position information from a file
should have format below
sheet "position"
columns: ['stock', f'count_{bar}', 'amount', 'price', 'weight']
f'count_{bar}': <how many bars the security has been hold>,
'amount': <the amount of the security>,
'price': <the close price of security in the last trading day>,
'weight': <the security weight of total position value>,
sheet "cash"
index: ['init_cash', 'cash', 'now_account_value']
'init_cash': <inital cash when account was created>,
'cash': <current cash in account>,
'now_account_value': <current total account value, should equal to sum(price[stock]*amount[stock])>
"""
path = pathlib.Path(path)
positions = pd.read_excel(open(path, "rb"), sheet_name="position", index_col=0)
cash_record = pd.read_excel(open(path, "rb"), sheet_name="info", index_col=0)
positions = positions.to_dict(orient="index")
init_cash = cash_record.loc["init_cash"].values[0]
cash = cash_record.loc["cash"].values[0]
now_account_value = cash_record.loc["now_account_value"].values[0]
# assign values
self.position = {}
self.init_cash = init_cash
self.position = positions
self.position["cash"] = cash
self.position["now_account_value"] = now_account_value
def settle_commit(self):
if self._settle_type != self.ST_NO:
if self._settle_type == self.ST_CASH:
self.position["cash"] += self.position["cash_delay"]
del self.position["cash_delay"]
else:
raise NotImplementedError(f"This type of input is not supported")
self._settle_type = self.ST_NO
class InfPosition(BasePosition):
@@ -440,7 +514,7 @@ class InfPosition(BasePosition):
def get_stock_amount(self, code) -> float:
return np.inf
def get_cash(self) -> float:
def get_cash(self, include_settle=False) -> float:
return np.inf
def get_stock_amount_dict(self) -> Dict:
@@ -454,3 +528,9 @@ class InfPosition(BasePosition):
def update_weight_all(self):
raise NotImplementedError(f"InfPosition doesn't support update_weight_all")
def settle_start(self, settle_type: str):
pass
def settle_commit(self):
pass

View File

@@ -5,8 +5,7 @@
from collections import OrderedDict
from logging import warning
import pathlib
from typing import Dict, List, Tuple
import warnings
from typing import Dict, List, Tuple, Union, Callable
import numpy as np
import pandas as pd
@@ -17,10 +16,12 @@ from qlib.backtest.exchange import Exchange
from qlib.backtest.order import BaseTradeDecision, Order, OrderDir
from qlib.backtest.utils import TradeCalendarManager
from .high_performance_ds import PandasOrderIndicator
from ..data import D
from ..tests.config import CSI300_BENCH
from ..utils.resam import get_higher_eq_freq_feature, resam_ts_data
from ..utils.time import Freq
from .order import IdxTradeRange
class Report:
@@ -62,6 +63,7 @@ class Report:
- Else, it represent end time of benchmark, by default None
"""
self.init_vars()
self.init_bench(freq=freq, benchmark_config=benchmark_config)
@@ -253,10 +255,12 @@ class Indicator:
"""
def __init__(self):
def __init__(self, order_indicator_cls=PandasOrderIndicator):
self.order_indicator_cls = order_indicator_cls
# order indicator is metrics for a single order for a specific step
self.order_indicator_his = OrderedDict()
self.order_indicator: Dict[str, pd.Series] = OrderedDict()
self.order_indicator = self.order_indicator_cls()
# trade indicator is metrics for all orders for a specific step
self.trade_indicator_his = OrderedDict()
@@ -266,13 +270,13 @@ class Indicator:
# def reset(self, trade_calendar: TradeCalendarManager):
def reset(self):
self.order_indicator = OrderedDict()
self.order_indicator = self.order_indicator_cls()
self.trade_indicator = OrderedDict()
# self._trade_calendar = trade_calendar
def record(self, trade_start_time):
self.order_indicator_his[trade_start_time] = self.order_indicator
self.trade_indicator_his[trade_start_time] = self.trade_indicator
self.order_indicator_his[trade_start_time] = self.get_order_indicator()
self.trade_indicator_his[trade_start_time] = self.get_trade_indicator()
def _update_order_trade_info(self, trade_info: list):
amount = dict()
@@ -281,6 +285,7 @@ class Indicator:
trade_value = dict()
trade_cost = dict()
trade_dir = dict()
pa = dict()
for order, _trade_val, _trade_cost, _trade_price in trade_info:
amount[order.stock_id] = order.amount_delta
@@ -289,66 +294,64 @@ class Indicator:
trade_value[order.stock_id] = _trade_val * order.sign
trade_cost[order.stock_id] = _trade_cost
trade_dir[order.stock_id] = order.direction
# The PA in the innermost layer is meanless
pa[order.stock_id] = 0
self.order_indicator["amount"] = self.order_indicator["inner_amount"] = pd.Series(amount)
self.order_indicator["deal_amount"] = pd.Series(deal_amount)
self.order_indicator.assign("amount", amount)
self.order_indicator.assign("inner_amount", amount)
self.order_indicator.assign("deal_amount", deal_amount)
# NOTE: trade_price and baseline price will be same on the lowest-level
self.order_indicator["trade_price"] = pd.Series(trade_price)
self.order_indicator["trade_value"] = pd.Series(trade_value)
self.order_indicator["trade_cost"] = pd.Series(trade_cost)
self.order_indicator["trade_dir"] = pd.Series(trade_dir)
self.order_indicator.assign("trade_price", trade_price)
self.order_indicator.assign("trade_value", trade_value)
self.order_indicator.assign("trade_cost", trade_cost)
self.order_indicator.assign("trade_dir", trade_dir)
self.order_indicator.assign("pa", pa)
def _update_order_fulfill_rate(self):
self.order_indicator["ffr"] = self.order_indicator["deal_amount"] / self.order_indicator["amount"]
def func(deal_amount, amount):
# deal_amount is np.NaN when there is no inner decision. So full fill rate is 0.
tmp_deal_amount = deal_amount.replace({np.NaN: 0})
return tmp_deal_amount / amount
def _update_order_price_advantage(self):
# NOTE:
# trade_price and baseline price will be same on the lowest-level
# So Pa should be 0 or do nothing
self.order_indicator["pa"] = 0
self.order_indicator.transfer(func, "ffr")
def update_order_indicators(self, trade_info: list):
self._update_order_trade_info(trade_info=trade_info)
self._update_order_fulfill_rate()
self._update_order_price_advantage()
def _agg_order_trade_info(self, inner_order_indicators: List[Dict[str, pd.Series]]):
inner_amount = pd.Series()
deal_amount = pd.Series()
trade_price = pd.Series()
trade_value = pd.Series()
trade_cost = pd.Series()
trade_dir = pd.Series()
for _order_indicator in inner_order_indicators:
inner_amount = inner_amount.add(_order_indicator["inner_amount"], fill_value=0)
deal_amount = deal_amount.add(_order_indicator["deal_amount"], fill_value=0)
trade_price = trade_price.add(
_order_indicator["trade_price"] * _order_indicator["deal_amount"], fill_value=0
)
trade_value = trade_value.add(_order_indicator["trade_value"], fill_value=0)
trade_cost = trade_cost.add(_order_indicator["trade_cost"], fill_value=0)
trade_dir = trade_dir.add(_order_indicator["trade_dir"], fill_value=0)
# calculate total trade amount with each inner order indicator.
def trade_amount_func(deal_amount, trade_price):
return deal_amount * trade_price
trade_dir = trade_dir.apply(Order.parse_dir)
for indicator in inner_order_indicators:
indicator.transfer(trade_amount_func, "trade_price")
self.order_indicator["inner_amount"] = inner_amount
self.order_indicator["deal_amount"] = deal_amount
trade_price /= self.order_indicator["deal_amount"]
self.order_indicator["trade_price"] = trade_price
self.order_indicator["trade_value"] = trade_value
self.order_indicator["trade_cost"] = trade_cost
self.order_indicator["trade_dir"] = trade_dir
# sum inner order indicators with same metric.
all_metric = ["inner_amount", "deal_amount", "trade_price", "trade_value", "trade_cost", "trade_dir"]
metric_dict = self.order_indicator_cls.sum_all_indicators(inner_order_indicators, all_metric, fill_value=0)
for metric in metric_dict:
self.order_indicator.assign(metric, metric_dict[metric])
def func(trade_price, deal_amount):
# trade_price is np.NaN instead of inf when deal_amount is zero.
tmp_deal_amount = deal_amount.replace({0: np.NaN})
return trade_price / tmp_deal_amount
self.order_indicator.transfer(func, "trade_price")
def func_apply(trade_dir):
return trade_dir.apply(Order.parse_dir)
self.order_indicator.transfer(func_apply, "trade_dir")
def _update_trade_amount(self, outer_trade_decision: BaseTradeDecision):
# NOTE: these indicator is designed for order execution, so the
decision: List[Order] = outer_trade_decision.get_decision()
if decision is None:
self.order_indicator["amount"] = pd.Series()
if len(decision) == 0:
self.order_indicator.assign("amount", {})
else:
self.order_indicator["amount"] = pd.Series({order.stock_id: order.amount_delta for order in decision})
def _agg_order_fulfill_rate(self):
self.order_indicator["ffr"] = self.order_indicator["deal_amount"] / self.order_indicator["amount"]
self.order_indicator.assign("amount", {order.stock_id: order.amount_delta for order in decision})
def _get_base_vol_pri(
self,
@@ -368,10 +371,12 @@ class Indicator:
agg = pa_config.get("agg", "twap").lower()
price = pa_config.get("price", "deal_price").lower()
# NOTE: IndexTradeRange is not supported!!!!! Because inner index is not available
trade_start_time, trade_end_time = decision.trade_range.clip_time_range(
start_time=trade_start_time, end_time=trade_end_time
)
if decision.trade_range is not None:
if isinstance(decision.trade_range, IdxTradeRange):
raise TypeError(f"IdxTradeRange is not supported")
trade_start_time, trade_end_time = decision.trade_range.clip_time_range(
start_time=trade_start_time, end_time=trade_end_time
)
if price == "deal_price":
price_s = trade_exchange.get_deal_price(
@@ -429,17 +434,16 @@ class Indicator:
"price": "$close", # TODO: this is not supported now!!!!!
# default to use deal price of the exchange
}
"""
# TODO: I think there are potentials to be optimized
trade_dir = self.order_indicator["trade_dir"]
trade_dir = self.order_indicator.get_metric_series("trade_dir")
if len(trade_dir) > 0:
bp_all, bv_all = [], []
# <step, inst, (base_volume | base_price)>
for oi, (dec, start, end) in zip(inner_order_indicators, decision_list):
bp_s = oi.get("base_price", pd.Series()).reindex(trade_dir.index)
bv_s = oi.get("base_volume", pd.Series()).reindex(trade_dir.index)
bp_s = oi.get_metric_series("base_price").reindex(trade_dir.index)
bv_s = oi.get_metric_series("base_volume").reindex(trade_dir.index)
bp_new, bv_new = {}, {}
for pr, v, (inst, direction) in zip(bp_s.values, bv_s.values, trade_dir.items()):
if np.isnan(pr):
@@ -463,17 +467,24 @@ class Indicator:
bp_all = pd.concat(bp_all, axis=1)
bv_all = pd.concat(bv_all, axis=1)
self.order_indicator["base_volume"] = bv_all.sum(axis=1)
self.order_indicator["base_price"] = (bp_all * bv_all).sum(axis=1) / self.order_indicator["base_volume"]
base_volume = bv_all.sum(axis=1)
self.order_indicator.assign("base_volume", base_volume)
self.order_indicator.assign("base_price", (bp_all * bv_all).sum(axis=1) / base_volume)
def _agg_order_price_advantage(self):
if not self.order_indicator["trade_price"].empty:
sign = 1 - self.order_indicator["trade_dir"] * 2
self.order_indicator["pa"] = sign * (
self.order_indicator["trade_price"] / self.order_indicator["base_price"] - 1
)
def if_empty_func(trade_price):
return trade_price.empty
if_empty = self.order_indicator.transfer(if_empty_func)
if not if_empty:
def func(trade_dir, trade_price, base_price):
sign = 1 - trade_dir * 2
return sign * (trade_price / base_price - 1)
self.order_indicator.transfer(func, "pa")
else:
self.order_indicator["pa"] = pd.Series()
self.order_indicator.assign("pa", {})
def agg_order_indicators(
self,
@@ -485,55 +496,74 @@ class Indicator:
):
self._agg_order_trade_info(inner_order_indicators)
self._update_trade_amount(outer_trade_decision)
self._agg_order_fulfill_rate()
self._update_order_fulfill_rate()
pa_config = indicator_config.get("pa_config", {})
self._agg_base_price(inner_order_indicators, decision_list, trade_exchange, pa_config=pa_config)
self._agg_base_price(inner_order_indicators, decision_list, trade_exchange, pa_config=pa_config) # TODO
self._agg_order_price_advantage()
def _cal_trade_fulfill_rate(self, method="mean"):
if method == "mean":
return self.order_indicator["ffr"].mean()
def func(ffr):
return ffr.mean()
elif method == "amount_weighted":
weights = self.order_indicator["deal_amount"].abs()
return (self.order_indicator["ffr"] * weights).sum() / weights.sum()
def func(ffr, deal_amount):
return (ffr * deal_amount.abs()).sum() / (deal_amount.abs().sum())
elif method == "value_weighted":
weights = self.order_indicator["trade_value"].abs()
return (self.order_indicator["ffr"] * weights).sum() / weights.sum()
def func(ffr, trade_value):
return (ffr * trade_value.abs()).sum() / (trade_value.abs().sum())
else:
raise ValueError(f"method {method} is not supported!")
return self.order_indicator.transfer(func)
def _cal_trade_price_advantage(self, method="mean"):
pa_order = self.order_indicator["pa"]
if isinstance(pa_order, (int, float)):
# pa from atomic executor
return pa_order
if method == "mean":
return pa_order.mean()
def func(pa):
return pa.mean()
elif method == "amount_weighted":
weights = self.order_indicator["deal_amount"].abs()
return (pa_order * weights).sum() / weights.sum()
def func(pa, deal_amount):
return (pa * deal_amount.abs()).sum() / (deal_amount.abs().sum())
elif method == "value_weighted":
weights = self.order_indicator["trade_value"].abs()
return (pa_order * weights).sum() / weights.sum()
def func(pa, trade_value):
return (pa * trade_value.abs()).sum() / (trade_value.abs().sum())
else:
raise ValueError(f"method {method} is not supported!")
return self.order_indicator.transfer(func)
def _cal_trade_positive_rate(self):
pa_order = self.order_indicator["pa"]
if isinstance(pa_order, (int, float)):
# pa from atomic executor
return pa_order
return (pa_order > 0).astype(int).sum() / pa_order.count()
def func(pa):
return (pa > 0).astype(int).sum() / pa.count()
return self.order_indicator.transfer(func)
def _cal_deal_amount(self):
return self.order_indicator["deal_amount"].abs().sum()
def func(deal_amount):
return deal_amount.abs().sum()
return self.order_indicator.transfer(func)
def _cal_trade_value(self):
return self.order_indicator["trade_value"].abs().sum()
def func(trade_value):
return trade_value.abs().sum()
return self.order_indicator.transfer(func)
def _cal_trade_order_count(self):
return self.order_indicator["amount"].count()
def func(amount):
return amount.count()
return self.order_indicator.transfer(func)
def cal_trade_indicators(self, trade_start_time, freq, indicator_config={}):
show_indicator = indicator_config.get("show_indicator", False)
@@ -558,8 +588,10 @@ class Indicator:
)
)
def get_order_indicator(self):
return self.order_indicator
def get_order_indicator(self, raw: bool = False):
if raw:
return self.order_indicator
return self.order_indicator.to_series()
def get_trade_indicator(self):
return self.trade_indicator

View File

@@ -0,0 +1,331 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from __future__ import division
from __future__ import print_function
import os
import numpy as np
import pandas as pd
from typing import Text, Union
import copy
import math
from ...utils import get_or_create_path
from ...log import get_module_logger
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from .pytorch_utils import count_parameters
from ...model.base import Model
from ...data.dataset import DatasetH, TSDatasetH
from ...data.dataset.handler import DataHandlerLP
from torch.nn.modules.container import ModuleList
# qrun examples/benchmarks/Localformer/workflow_config_localformer_Alpha360.yaml ”
class LocalformerModel(Model):
def __init__(
self,
d_feat: int = 20,
d_model: int = 64,
batch_size: int = 2048,
nhead: int = 2,
num_layers: int = 2,
dropout: float = 0,
n_epochs=100,
lr=0.0001,
metric="",
early_stop=5,
loss="mse",
optimizer="adam",
reg=1e-3,
n_jobs=10,
GPU=0,
seed=None,
**kwargs
):
# set hyper-parameters.
self.d_model = d_model
self.dropout = dropout
self.n_epochs = n_epochs
self.lr = lr
self.reg = reg
self.metric = metric
self.batch_size = batch_size
self.early_stop = early_stop
self.optimizer = optimizer.lower()
self.loss = loss
self.n_jobs = n_jobs
self.device = torch.device("cuda:%d" % GPU if torch.cuda.is_available() and GPU >= 0 else "cpu")
self.seed = seed
self.logger = get_module_logger("TransformerModel")
self.logger.info("Naive Transformer:" "\nbatch_size : {}" "\ndevice : {}".format(self.batch_size, self.device))
if self.seed is not None:
np.random.seed(self.seed)
torch.manual_seed(self.seed)
self.model = Transformer(d_feat, d_model, nhead, num_layers, dropout, self.device)
if optimizer.lower() == "adam":
self.train_optimizer = optim.Adam(self.model.parameters(), lr=self.lr, weight_decay=self.reg)
elif optimizer.lower() == "gd":
self.train_optimizer = optim.SGD(self.model.parameters(), lr=self.lr, weight_decay=self.reg)
else:
raise NotImplementedError("optimizer {} is not supported!".format(optimizer))
self.fitted = False
self.model.to(self.device)
@property
def use_gpu(self):
return self.device != torch.device("cpu")
def mse(self, pred, label):
loss = (pred.float() - label.float()) ** 2
return torch.mean(loss)
def loss_fn(self, pred, label):
mask = ~torch.isnan(label)
if self.loss == "mse":
return self.mse(pred[mask], label[mask])
raise ValueError("unknown loss `%s`" % self.loss)
def metric_fn(self, pred, label):
mask = torch.isfinite(label)
if self.metric == "" or self.metric == "loss":
return -self.loss_fn(pred[mask], label[mask])
raise ValueError("unknown metric `%s`" % self.metric)
def train_epoch(self, x_train, y_train):
x_train_values = x_train.values
y_train_values = np.squeeze(y_train.values)
self.model.train()
indices = np.arange(len(x_train_values))
np.random.shuffle(indices)
for i in range(len(indices))[:: self.batch_size]:
if len(indices) - i < self.batch_size:
break
feature = torch.from_numpy(x_train_values[indices[i : i + self.batch_size]]).float().to(self.device)
label = torch.from_numpy(y_train_values[indices[i : i + self.batch_size]]).float().to(self.device)
pred = self.model(feature)
loss = self.loss_fn(pred, label)
self.train_optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_value_(self.model.parameters(), 3.0)
self.train_optimizer.step()
def test_epoch(self, data_x, data_y):
# prepare training data
x_values = data_x.values
y_values = np.squeeze(data_y.values)
self.model.eval()
scores = []
losses = []
indices = np.arange(len(x_values))
for i in range(len(indices))[:: self.batch_size]:
if len(indices) - i < self.batch_size:
break
feature = torch.from_numpy(x_values[indices[i : i + self.batch_size]]).float().to(self.device)
label = torch.from_numpy(y_values[indices[i : i + self.batch_size]]).float().to(self.device)
with torch.no_grad():
pred = self.model(feature)
loss = self.loss_fn(pred, label)
losses.append(loss.item())
score = self.metric_fn(pred, label)
scores.append(score.item())
return np.mean(losses), np.mean(scores)
def fit(
self,
dataset: DatasetH,
evals_result=dict(),
save_path=None,
):
df_train, df_valid, df_test = dataset.prepare(
["train", "valid", "test"],
col_set=["feature", "label"],
data_key=DataHandlerLP.DK_L,
)
x_train, y_train = df_train["feature"], df_train["label"]
x_valid, y_valid = df_valid["feature"], df_valid["label"]
save_path = get_or_create_path(save_path)
stop_steps = 0
train_loss = 0
best_score = -np.inf
best_epoch = 0
evals_result["train"] = []
evals_result["valid"] = []
# train
self.logger.info("training...")
self.fitted = True
for step in range(self.n_epochs):
self.logger.info("Epoch%d:", step)
self.logger.info("training...")
self.train_epoch(x_train, y_train)
self.logger.info("evaluating...")
train_loss, train_score = self.test_epoch(x_train, y_train)
val_loss, val_score = self.test_epoch(x_valid, y_valid)
self.logger.info("train %.6f, valid %.6f" % (train_score, val_score))
evals_result["train"].append(train_score)
evals_result["valid"].append(val_score)
if val_score > best_score:
best_score = val_score
stop_steps = 0
best_epoch = step
best_param = copy.deepcopy(self.model.state_dict())
else:
stop_steps += 1
if stop_steps >= self.early_stop:
self.logger.info("early stop")
break
self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch))
self.model.load_state_dict(best_param)
torch.save(best_param, save_path)
if self.use_gpu:
torch.cuda.empty_cache()
def predict(self, dataset: DatasetH, segment: Union[Text, slice] = "test"):
if not self.fitted:
raise ValueError("model is not fitted yet!")
x_test = dataset.prepare(segment, col_set="feature", data_key=DataHandlerLP.DK_I)
index = x_test.index
self.model.eval()
x_values = x_test.values
sample_num = x_values.shape[0]
preds = []
for begin in range(sample_num)[:: self.batch_size]:
if sample_num - begin < self.batch_size:
end = sample_num
else:
end = begin + self.batch_size
x_batch = torch.from_numpy(x_values[begin:end]).float().to(self.device)
with torch.no_grad():
pred = self.model(x_batch).detach().cpu().numpy()
preds.append(pred)
return pd.Series(np.concatenate(preds), index=index)
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=1000):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer("pe", pe)
def forward(self, x):
# [T, N, F]
return x + self.pe[: x.size(0), :]
def _get_clones(module, N):
return ModuleList([copy.deepcopy(module) for i in range(N)])
class LocalformerEncoder(nn.Module):
__constants__ = ["norm"]
def __init__(self, encoder_layer, num_layers, d_model):
super(LocalformerEncoder, self).__init__()
self.layers = _get_clones(encoder_layer, num_layers)
self.conv = _get_clones(nn.Conv1d(d_model, d_model, 3, 1, 1), num_layers)
self.num_layers = num_layers
def forward(self, src, mask):
output = src
out = src
for i, mod in enumerate(self.layers):
# [T, N, F] --> [N, T, F] --> [N, F, T]
out = output.transpose(1, 0).transpose(2, 1)
out = self.conv[i](out).transpose(2, 1).transpose(1, 0)
output = mod(output + out, src_mask=mask)
return output + out
class Transformer(nn.Module):
def __init__(self, d_feat=6, d_model=8, nhead=4, num_layers=2, dropout=0.5, device=None):
super(Transformer, self).__init__()
self.rnn = nn.GRU(
input_size=d_model,
hidden_size=d_model,
num_layers=num_layers,
batch_first=False,
dropout=dropout,
)
self.feature_layer = nn.Linear(d_feat, d_model)
self.pos_encoder = PositionalEncoding(d_model)
self.encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dropout=dropout)
self.transformer_encoder = LocalformerEncoder(self.encoder_layer, num_layers=num_layers, d_model=d_model)
self.decoder_layer = nn.Linear(d_model, 1)
self.device = device
self.d_feat = d_feat
def forward(self, src):
# src [N, F*T] --> [N, T, F]
src = src.reshape(len(src), self.d_feat, -1).permute(0, 2, 1)
src = self.feature_layer(src)
# src [N, T, F] --> [T, N, F], [60, 512, 8]
src = src.transpose(1, 0) # not batch first
mask = None
src = self.pos_encoder(src)
output = self.transformer_encoder(src, mask) # [60, 512, 8]
output, _ = self.rnn(output)
# [T, N, F] --> [N, T*F]
output = self.decoder_layer(output.transpose(1, 0)[:, -1, :]) # [512, 1]
return output.squeeze()

View File

@@ -0,0 +1,308 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from __future__ import division
from __future__ import print_function
import os
import numpy as np
import pandas as pd
import copy
import math
from ...utils import get_or_create_path
from ...log import get_module_logger
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from .pytorch_utils import count_parameters
from ...model.base import Model
from ...data.dataset import DatasetH, TSDatasetH
from ...data.dataset.handler import DataHandlerLP
from torch.nn.modules.container import ModuleList
class LocalformerModel(Model):
def __init__(
self,
d_feat: int = 20,
d_model: int = 64,
batch_size: int = 8192,
nhead: int = 2,
num_layers: int = 2,
dropout: float = 0,
n_epochs=100,
lr=0.0001,
metric="",
early_stop=5,
loss="mse",
optimizer="adam",
reg=1e-3,
n_jobs=10,
GPU=0,
seed=None,
**kwargs
):
# set hyper-parameters.
self.d_model = d_model
self.dropout = dropout
self.n_epochs = n_epochs
self.lr = lr
self.reg = reg
self.metric = metric
self.batch_size = batch_size
self.early_stop = early_stop
self.optimizer = optimizer.lower()
self.loss = loss
self.n_jobs = n_jobs
self.device = torch.device("cuda:%d" % GPU if torch.cuda.is_available() and GPU >= 0 else "cpu")
self.seed = seed
self.logger = get_module_logger("TransformerModel")
self.logger.info(
"Improved Transformer:" "\nbatch_size : {}" "\ndevice : {}".format(self.batch_size, self.device)
)
if self.seed is not None:
np.random.seed(self.seed)
torch.manual_seed(self.seed)
self.model = Transformer(d_feat, d_model, nhead, num_layers, dropout, self.device)
if optimizer.lower() == "adam":
self.train_optimizer = optim.Adam(self.model.parameters(), lr=self.lr, weight_decay=self.reg)
elif optimizer.lower() == "gd":
self.train_optimizer = optim.SGD(self.model.parameters(), lr=self.lr, weight_decay=self.reg)
else:
raise NotImplementedError("optimizer {} is not supported!".format(optimizer))
self.fitted = False
self.model.to(self.device)
@property
def use_gpu(self):
return self.device != torch.device("cpu")
def mse(self, pred, label):
loss = (pred.float() - label.float()) ** 2
return torch.mean(loss)
def loss_fn(self, pred, label):
mask = ~torch.isnan(label)
if self.loss == "mse":
return self.mse(pred[mask], label[mask])
raise ValueError("unknown loss `%s`" % self.loss)
def metric_fn(self, pred, label):
mask = torch.isfinite(label)
if self.metric == "" or self.metric == "loss":
return -self.loss_fn(pred[mask], label[mask])
raise ValueError("unknown metric `%s`" % self.metric)
def train_epoch(self, data_loader):
self.model.train()
for data in data_loader:
feature = data[:, :, 0:-1].to(self.device)
label = data[:, -1, -1].to(self.device)
pred = self.model(feature.float()) # .float()
loss = self.loss_fn(pred, label)
self.train_optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_value_(self.model.parameters(), 3.0)
self.train_optimizer.step()
def test_epoch(self, data_loader):
self.model.eval()
scores = []
losses = []
for data in data_loader:
feature = data[:, :, 0:-1].to(self.device)
label = data[:, -1, -1].to(self.device)
with torch.no_grad():
pred = self.model(feature.float()) # .float()
loss = self.loss_fn(pred, label)
losses.append(loss.item())
score = self.metric_fn(pred, label)
scores.append(score.item())
return np.mean(losses), np.mean(scores)
def fit(
self,
dataset: DatasetH,
evals_result=dict(),
save_path=None,
):
dl_train = dataset.prepare("train", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L)
dl_valid = dataset.prepare("valid", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L)
dl_train.config(fillna_type="ffill+bfill") # process nan brought by dataloader
dl_valid.config(fillna_type="ffill+bfill") # process nan brought by dataloader
train_loader = DataLoader(
dl_train, batch_size=self.batch_size, shuffle=True, num_workers=self.n_jobs, drop_last=True
)
valid_loader = DataLoader(
dl_valid, batch_size=self.batch_size, shuffle=False, num_workers=self.n_jobs, drop_last=True
)
save_path = get_or_create_path(save_path)
stop_steps = 0
train_loss = 0
best_score = -np.inf
best_epoch = 0
evals_result["train"] = []
evals_result["valid"] = []
# train
self.logger.info("training...")
self.fitted = True
for step in range(self.n_epochs):
self.logger.info("Epoch%d:", step)
self.logger.info("training...")
self.train_epoch(train_loader)
self.logger.info("evaluating...")
train_loss, train_score = self.test_epoch(train_loader)
val_loss, val_score = self.test_epoch(valid_loader)
self.logger.info("train %.6f, valid %.6f" % (train_score, val_score))
evals_result["train"].append(train_score)
evals_result["valid"].append(val_score)
if val_score > best_score:
best_score = val_score
stop_steps = 0
best_epoch = step
best_param = copy.deepcopy(self.model.state_dict())
else:
stop_steps += 1
if stop_steps >= self.early_stop:
self.logger.info("early stop")
break
self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch))
self.model.load_state_dict(best_param)
torch.save(best_param, save_path)
if self.use_gpu:
torch.cuda.empty_cache()
def predict(self, dataset):
if not self.fitted:
raise ValueError("model is not fitted yet!")
dl_test = dataset.prepare("test", col_set=["feature", "label"], data_key=DataHandlerLP.DK_I)
dl_test.config(fillna_type="ffill+bfill")
test_loader = DataLoader(dl_test, batch_size=self.batch_size, num_workers=self.n_jobs)
self.model.eval()
preds = []
for data in test_loader:
feature = data[:, :, 0:-1].to(self.device)
with torch.no_grad():
pred = self.model(feature.float()).detach().cpu().numpy()
preds.append(pred)
return pd.Series(np.concatenate(preds), index=dl_test.get_index())
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=1000):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer("pe", pe)
def forward(self, x):
# [T, N, F]
return x + self.pe[: x.size(0), :]
def _get_clones(module, N):
return ModuleList([copy.deepcopy(module) for i in range(N)])
class LocalformerEncoder(nn.Module):
__constants__ = ["norm"]
def __init__(self, encoder_layer, num_layers, d_model):
super(LocalformerEncoder, self).__init__()
self.layers = _get_clones(encoder_layer, num_layers)
self.conv = _get_clones(nn.Conv1d(d_model, d_model, 3, 1, 1), num_layers)
self.num_layers = num_layers
def forward(self, src, mask):
output = src
out = src
for i, mod in enumerate(self.layers):
# [T, N, F] --> [N, T, F] --> [N, F, T]
out = output.transpose(1, 0).transpose(2, 1)
out = self.conv[i](out).transpose(2, 1).transpose(1, 0)
output = mod(output + out, src_mask=mask)
return output + out
class Transformer(nn.Module):
def __init__(self, d_feat=6, d_model=8, nhead=4, num_layers=2, dropout=0.5, device=None):
super(Transformer, self).__init__()
self.rnn = nn.GRU(
input_size=d_model,
hidden_size=d_model,
num_layers=num_layers,
batch_first=False,
dropout=dropout,
)
self.feature_layer = nn.Linear(d_feat, d_model)
self.pos_encoder = PositionalEncoding(d_model)
self.encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dropout=dropout)
self.transformer_encoder = LocalformerEncoder(self.encoder_layer, num_layers=num_layers, d_model=d_model)
self.decoder_layer = nn.Linear(d_model, 1)
self.device = device
self.d_feat = d_feat
def forward(self, src):
# src [N, T, F], [512, 60, 6]
src = self.feature_layer(src) # [512, 60, 8]
# src [N, T, F] --> [T, N, F], [60, 512, 8]
src = src.transpose(1, 0) # not batch first
mask = None
src = self.pos_encoder(src)
output = self.transformer_encoder(src, mask) # [60, 512, 8]
output, _ = self.rnn(output)
# [T, N, F] --> [N, T*F]
output = self.decoder_layer(output.transpose(1, 0)[:, -1, :]) # [512, 1]
return output.squeeze()

View File

@@ -0,0 +1,294 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from __future__ import division
from __future__ import print_function
import os
import numpy as np
import pandas as pd
from typing import Text, Union
import copy
import math
from ...utils import get_or_create_path
from ...log import get_module_logger
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from .pytorch_utils import count_parameters
from ...model.base import Model
from ...data.dataset import DatasetH, TSDatasetH
from ...data.dataset.handler import DataHandlerLP
# qrun examples/benchmarks/Transformer/workflow_config_transformer_Alpha360.yaml ”
class TransformerModel(Model):
def __init__(
self,
d_feat: int = 20,
d_model: int = 64,
batch_size: int = 2048,
nhead: int = 2,
num_layers: int = 2,
dropout: float = 0,
n_epochs=100,
lr=0.0001,
metric="",
early_stop=5,
loss="mse",
optimizer="adam",
reg=1e-3,
n_jobs=10,
GPU=0,
seed=None,
**kwargs
):
# set hyper-parameters.
self.d_model = d_model
self.dropout = dropout
self.n_epochs = n_epochs
self.lr = lr
self.reg = reg
self.metric = metric
self.batch_size = batch_size
self.early_stop = early_stop
self.optimizer = optimizer.lower()
self.loss = loss
self.n_jobs = n_jobs
self.device = torch.device("cuda:%d" % GPU if torch.cuda.is_available() and GPU >= 0 else "cpu")
self.seed = seed
self.logger = get_module_logger("TransformerModel")
self.logger.info("Naive Transformer:" "\nbatch_size : {}" "\ndevice : {}".format(self.batch_size, self.device))
if self.seed is not None:
np.random.seed(self.seed)
torch.manual_seed(self.seed)
self.model = Transformer(d_feat, d_model, nhead, num_layers, dropout, self.device)
if optimizer.lower() == "adam":
self.train_optimizer = optim.Adam(self.model.parameters(), lr=self.lr, weight_decay=self.reg)
elif optimizer.lower() == "gd":
self.train_optimizer = optim.SGD(self.model.parameters(), lr=self.lr, weight_decay=self.reg)
else:
raise NotImplementedError("optimizer {} is not supported!".format(optimizer))
self.fitted = False
self.model.to(self.device)
@property
def use_gpu(self):
return self.device != torch.device("cpu")
def mse(self, pred, label):
loss = (pred.float() - label.float()) ** 2
return torch.mean(loss)
def loss_fn(self, pred, label):
mask = ~torch.isnan(label)
if self.loss == "mse":
return self.mse(pred[mask], label[mask])
raise ValueError("unknown loss `%s`" % self.loss)
def metric_fn(self, pred, label):
mask = torch.isfinite(label)
if self.metric == "" or self.metric == "loss":
return -self.loss_fn(pred[mask], label[mask])
raise ValueError("unknown metric `%s`" % self.metric)
def train_epoch(self, x_train, y_train):
x_train_values = x_train.values
y_train_values = np.squeeze(y_train.values)
self.model.train()
indices = np.arange(len(x_train_values))
np.random.shuffle(indices)
for i in range(len(indices))[:: self.batch_size]:
if len(indices) - i < self.batch_size:
break
feature = torch.from_numpy(x_train_values[indices[i : i + self.batch_size]]).float().to(self.device)
label = torch.from_numpy(y_train_values[indices[i : i + self.batch_size]]).float().to(self.device)
pred = self.model(feature)
loss = self.loss_fn(pred, label)
self.train_optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_value_(self.model.parameters(), 3.0)
self.train_optimizer.step()
def test_epoch(self, data_x, data_y):
# prepare training data
x_values = data_x.values
y_values = np.squeeze(data_y.values)
self.model.eval()
scores = []
losses = []
indices = np.arange(len(x_values))
for i in range(len(indices))[:: self.batch_size]:
if len(indices) - i < self.batch_size:
break
feature = torch.from_numpy(x_values[indices[i : i + self.batch_size]]).float().to(self.device)
label = torch.from_numpy(y_values[indices[i : i + self.batch_size]]).float().to(self.device)
with torch.no_grad():
pred = self.model(feature)
loss = self.loss_fn(pred, label)
losses.append(loss.item())
score = self.metric_fn(pred, label)
scores.append(score.item())
return np.mean(losses), np.mean(scores)
def fit(
self,
dataset: DatasetH,
evals_result=dict(),
save_path=None,
):
df_train, df_valid, df_test = dataset.prepare(
["train", "valid", "test"],
col_set=["feature", "label"],
data_key=DataHandlerLP.DK_L,
)
x_train, y_train = df_train["feature"], df_train["label"]
x_valid, y_valid = df_valid["feature"], df_valid["label"]
save_path = get_or_create_path(save_path)
stop_steps = 0
train_loss = 0
best_score = -np.inf
best_epoch = 0
evals_result["train"] = []
evals_result["valid"] = []
# train
self.logger.info("training...")
self.fitted = True
for step in range(self.n_epochs):
self.logger.info("Epoch%d:", step)
self.logger.info("training...")
self.train_epoch(x_train, y_train)
self.logger.info("evaluating...")
train_loss, train_score = self.test_epoch(x_train, y_train)
val_loss, val_score = self.test_epoch(x_valid, y_valid)
self.logger.info("train %.6f, valid %.6f" % (train_score, val_score))
evals_result["train"].append(train_score)
evals_result["valid"].append(val_score)
if val_score > best_score:
best_score = val_score
stop_steps = 0
best_epoch = step
best_param = copy.deepcopy(self.model.state_dict())
else:
stop_steps += 1
if stop_steps >= self.early_stop:
self.logger.info("early stop")
break
self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch))
self.model.load_state_dict(best_param)
torch.save(best_param, save_path)
if self.use_gpu:
torch.cuda.empty_cache()
def predict(self, dataset: DatasetH, segment: Union[Text, slice] = "test"):
if not self.fitted:
raise ValueError("model is not fitted yet!")
x_test = dataset.prepare(segment, col_set="feature", data_key=DataHandlerLP.DK_I)
index = x_test.index
self.model.eval()
x_values = x_test.values
sample_num = x_values.shape[0]
preds = []
for begin in range(sample_num)[:: self.batch_size]:
if sample_num - begin < self.batch_size:
end = sample_num
else:
end = begin + self.batch_size
x_batch = torch.from_numpy(x_values[begin:end]).float().to(self.device)
with torch.no_grad():
pred = self.model(x_batch).detach().cpu().numpy()
preds.append(pred)
return pd.Series(np.concatenate(preds), index=index)
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=1000):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer("pe", pe)
def forward(self, x):
# [T, N, F]
return x + self.pe[: x.size(0), :]
class Transformer(nn.Module):
def __init__(self, d_feat=6, d_model=8, nhead=4, num_layers=2, dropout=0.5, device=None):
super(Transformer, self).__init__()
self.feature_layer = nn.Linear(d_feat, d_model)
self.pos_encoder = PositionalEncoding(d_model)
self.encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dropout=dropout)
self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)
self.decoder_layer = nn.Linear(d_model, 1)
self.device = device
self.d_feat = d_feat
def forward(self, src):
# src [N, F*T] --> [N, T, F]
src = src.reshape(len(src), self.d_feat, -1).permute(0, 2, 1)
src = self.feature_layer(src)
# src [N, T, F] --> [T, N, F], [60, 512, 8]
src = src.transpose(1, 0) # not batch first
mask = None
src = self.pos_encoder(src)
output = self.transformer_encoder(src, mask) # [60, 512, 8]
# [T, N, F] --> [N, T*F]
output = self.decoder_layer(output.transpose(1, 0)[:, -1, :]) # [512, 1]
return output.squeeze()

View File

@@ -0,0 +1,269 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from __future__ import division
from __future__ import print_function
import os
import numpy as np
import pandas as pd
import copy
import math
from ...utils import get_or_create_path
from ...log import get_module_logger
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from .pytorch_utils import count_parameters
from ...model.base import Model
from ...data.dataset import DatasetH, TSDatasetH
from ...data.dataset.handler import DataHandlerLP
class TransformerModel(Model):
def __init__(
self,
d_feat: int = 20,
d_model: int = 64,
batch_size: int = 8192,
nhead: int = 2,
num_layers: int = 2,
dropout: float = 0,
n_epochs=100,
lr=0.0001,
metric="",
early_stop=5,
loss="mse",
optimizer="adam",
reg=1e-3,
n_jobs=10,
GPU=0,
seed=None,
**kwargs
):
# set hyper-parameters.
self.d_model = d_model
self.dropout = dropout
self.n_epochs = n_epochs
self.lr = lr
self.reg = reg
self.metric = metric
self.batch_size = batch_size
self.early_stop = early_stop
self.optimizer = optimizer.lower()
self.loss = loss
self.n_jobs = n_jobs
self.device = torch.device("cuda:%d" % GPU if torch.cuda.is_available() and GPU >= 0 else "cpu")
self.seed = seed
self.logger = get_module_logger("TransformerModel")
self.logger.info("Naive Transformer:" "\nbatch_size : {}" "\ndevice : {}".format(self.batch_size, self.device))
if self.seed is not None:
np.random.seed(self.seed)
torch.manual_seed(self.seed)
self.model = Transformer(d_feat, d_model, nhead, num_layers, dropout, self.device)
if optimizer.lower() == "adam":
self.train_optimizer = optim.Adam(self.model.parameters(), lr=self.lr, weight_decay=self.reg)
elif optimizer.lower() == "gd":
self.train_optimizer = optim.SGD(self.model.parameters(), lr=self.lr, weight_decay=self.reg)
else:
raise NotImplementedError("optimizer {} is not supported!".format(optimizer))
self.fitted = False
self.model.to(self.device)
@property
def use_gpu(self):
return self.device != torch.device("cpu")
def mse(self, pred, label):
loss = (pred.float() - label.float()) ** 2
return torch.mean(loss)
def loss_fn(self, pred, label):
mask = ~torch.isnan(label)
if self.loss == "mse":
return self.mse(pred[mask], label[mask])
raise ValueError("unknown loss `%s`" % self.loss)
def metric_fn(self, pred, label):
mask = torch.isfinite(label)
if self.metric == "" or self.metric == "loss":
return -self.loss_fn(pred[mask], label[mask])
raise ValueError("unknown metric `%s`" % self.metric)
def train_epoch(self, data_loader):
self.model.train()
for data in data_loader:
feature = data[:, :, 0:-1].to(self.device)
label = data[:, -1, -1].to(self.device)
pred = self.model(feature.float()) # .float()
loss = self.loss_fn(pred, label)
self.train_optimizer.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_value_(self.model.parameters(), 3.0)
self.train_optimizer.step()
def test_epoch(self, data_loader):
self.model.eval()
scores = []
losses = []
for data in data_loader:
feature = data[:, :, 0:-1].to(self.device)
label = data[:, -1, -1].to(self.device)
with torch.no_grad():
pred = self.model(feature.float()) # .float()
loss = self.loss_fn(pred, label)
losses.append(loss.item())
score = self.metric_fn(pred, label)
scores.append(score.item())
return np.mean(losses), np.mean(scores)
def fit(
self,
dataset: DatasetH,
evals_result=dict(),
save_path=None,
):
dl_train = dataset.prepare("train", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L)
dl_valid = dataset.prepare("valid", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L)
dl_train.config(fillna_type="ffill+bfill") # process nan brought by dataloader
dl_valid.config(fillna_type="ffill+bfill") # process nan brought by dataloader
train_loader = DataLoader(
dl_train, batch_size=self.batch_size, shuffle=True, num_workers=self.n_jobs, drop_last=True
)
valid_loader = DataLoader(
dl_valid, batch_size=self.batch_size, shuffle=False, num_workers=self.n_jobs, drop_last=True
)
save_path = get_or_create_path(save_path)
stop_steps = 0
train_loss = 0
best_score = -np.inf
best_epoch = 0
evals_result["train"] = []
evals_result["valid"] = []
# train
self.logger.info("training...")
self.fitted = True
for step in range(self.n_epochs):
self.logger.info("Epoch%d:", step)
self.logger.info("training...")
self.train_epoch(train_loader)
self.logger.info("evaluating...")
train_loss, train_score = self.test_epoch(train_loader)
val_loss, val_score = self.test_epoch(valid_loader)
self.logger.info("train %.6f, valid %.6f" % (train_score, val_score))
evals_result["train"].append(train_score)
evals_result["valid"].append(val_score)
if val_score > best_score:
best_score = val_score
stop_steps = 0
best_epoch = step
best_param = copy.deepcopy(self.model.state_dict())
else:
stop_steps += 1
if stop_steps >= self.early_stop:
self.logger.info("early stop")
break
self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch))
self.model.load_state_dict(best_param)
torch.save(best_param, save_path)
if self.use_gpu:
torch.cuda.empty_cache()
def predict(self, dataset):
if not self.fitted:
raise ValueError("model is not fitted yet!")
dl_test = dataset.prepare("test", col_set=["feature", "label"], data_key=DataHandlerLP.DK_I)
dl_test.config(fillna_type="ffill+bfill")
test_loader = DataLoader(dl_test, batch_size=self.batch_size, num_workers=self.n_jobs)
self.model.eval()
preds = []
for data in test_loader:
feature = data[:, :, 0:-1].to(self.device)
with torch.no_grad():
pred = self.model(feature.float()).detach().cpu().numpy()
preds.append(pred)
return pd.Series(np.concatenate(preds), index=dl_test.get_index())
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=1000):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer("pe", pe)
def forward(self, x):
# [T, N, F]
return x + self.pe[: x.size(0), :]
class Transformer(nn.Module):
def __init__(self, d_feat=6, d_model=8, nhead=4, num_layers=2, dropout=0.5, device=None):
super(Transformer, self).__init__()
self.feature_layer = nn.Linear(d_feat, d_model)
self.pos_encoder = PositionalEncoding(d_model)
self.encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dropout=dropout)
self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers)
self.decoder_layer = nn.Linear(d_model, 1)
self.device = device
self.d_feat = d_feat
def forward(self, src):
# src [N, T, F], [512, 60, 6]
src = self.feature_layer(src) # [512, 60, 8]
# src [N, T, F] --> [T, N, F], [60, 512, 8]
src = src.transpose(1, 0) # not batch first
mask = None
src = self.pos_encoder(src)
output = self.transformer_encoder(src, mask) # [60, 512, 8]
# [T, N, F] --> [N, T*F]
output = self.decoder_layer(output.transpose(1, 0)[:, -1, :]) # [512, 1]
return output.squeeze()

View File

@@ -18,7 +18,12 @@ from qlib.backtest.utils import get_start_end_idx
class TWAPStrategy(BaseStrategy):
"""TWAP Strategy for trading"""
"""TWAP Strategy for trading
NOTE:
- This TWAP strategy will celling round when trading. This will make the TWAP trading strategy produce the order
ealier when the total trade unit of amount is less than the trading step
"""
def reset(self, outer_trade_decision: BaseTradeDecision = None, **kwargs):
"""
@@ -58,11 +63,11 @@ class TWAPStrategy(BaseStrategy):
trade_start_time, trade_end_time = self.trade_calendar.get_step_time(trade_step)
order_list = []
for order in self.outer_trade_decision.get_decision():
# if not tradable, continue
if not self.trade_exchange.is_stock_tradable(
stock_id=order.stock_id, start_time=trade_start_time, end_time=trade_end_time
):
continue
# Don't peek the future information
# if not self.trade_exchange.is_stock_tradable(
# stock_id=order.stock_id, start_time=trade_start_time, end_time=trade_end_time
# ):
# continue
_amount_trade_unit = self.trade_exchange.get_amount_of_trade_unit(
stock_id=order.stock_id, start_time=order.start_time, end_time=order.end_time
)

View File

@@ -1056,13 +1056,21 @@ class ClientProvider(BaseProvider):
"""
def __init__(self):
def is_instance_of_provider(instance: object, cls: type):
if isinstance(instance, Wrapper):
p = getattr(instance, "_provider", None)
return False if p is None else isinstance(p, cls)
return isinstance(instance, cls)
from .client import Client
self.client = Client(C.flask_server, C.flask_port)
self.logger = get_module_logger(self.__class__.__name__)
if isinstance(Cal, ClientCalendarProvider):
if is_instance_of_provider(Cal, ClientCalendarProvider):
Cal.set_conn(self.client)
if isinstance(Inst, ClientInstrumentProvider):
if is_instance_of_provider(Inst, ClientInstrumentProvider):
Inst.set_conn(self.client)
if hasattr(DatasetD, "provider"):
DatasetD.provider.set_conn(self.client)

View File

@@ -1,17 +1,20 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
# Base exception class
class QlibException(Exception):
def __init__(self, message):
super(QlibException, self).__init__(message)
# Error type for reinitialization when starting an experiment
class RecorderInitializationError(QlibException):
"""Error type for re-initialization when starting an experiment"""
pass
# Error type for Recorder when can not load object
class LoadObjectError(QlibException):
"""Error type for Recorder when can not load object"""
pass

View File

@@ -53,7 +53,8 @@ def workflow(config_path, experiment_name="workflow", uri_folder="mlruns"):
exp_manager["kwargs"]["uri"] = "file:" + str(Path(os.getcwd()).resolve() / uri_folder)
qlib.init(**config.get("qlib_init"), exp_manager=exp_manager)
task_train(config.get("task"), experiment_name=experiment_name)
recorder = task_train(config.get("task"), experiment_name=experiment_name)
recorder.save_objects(config=config)
# function to run worklflow by config

View File

@@ -325,7 +325,7 @@ class MLflowExperiment(Experiment):
UNLIMITED = 50000 # FIXME: Mlflow can only list 50000 records at most!!!!!!!
def list_recorders(self, max_results: int = UNLIMITED, status: Union[str, None] = None):
def list_recorders(self, max_results: int = UNLIMITED, status: Union[str, None] = None, filter_string: str = ""):
"""
Parameters
----------
@@ -334,8 +334,12 @@ class MLflowExperiment(Experiment):
status : str
the criteria based on status to filter results.
`None` indicates no filtering.
filter_string : str
mlflow supported filter string like 'params."my_param"="a" and tags."my_tag"="b"', use this will help to reduce too much run number.
"""
runs = self._client.search_runs(self.id, run_view_type=ViewType.ACTIVE_ONLY, max_results=max_results)
runs = self._client.search_runs(
self.id, run_view_type=ViewType.ACTIVE_ONLY, max_results=max_results, filter_string=filter_string
)
recorders = dict()
for i in range(len(runs)):
recorder = MLflowRecorder(self.id, self._uri, mlflow_run=runs[i])

View File

@@ -139,6 +139,7 @@ class RecorderCollector(Collector):
rec_filter_func=None,
artifacts_path={"pred": "pred.pkl"},
artifacts_key=None,
list_kwargs={},
):
"""
Init RecorderCollector.
@@ -150,6 +151,7 @@ class RecorderCollector(Collector):
rec_filter_func (Callable, optional): filter the recorder by return True or False. Defaults to None.
artifacts_path (dict, optional): The artifacts name and its path in Recorder. Defaults to {"pred": "pred.pkl", "IC": "sig_analysis/ic.pkl"}.
artifacts_key (str or List, optional): the artifacts key you want to get. If None, get all artifacts.
list_kwargs (str): arguments for list_recorders function.
"""
super().__init__(process_list=process_list)
if isinstance(experiment, str):
@@ -163,6 +165,7 @@ class RecorderCollector(Collector):
self.rec_key_func = rec_key_func
self.artifacts_key = artifacts_key
self.rec_filter_func = rec_filter_func
self.list_kwargs = list_kwargs
def collect(self, artifacts_key=None, rec_filter_func=None, only_exist=True) -> dict:
"""
@@ -187,7 +190,7 @@ class RecorderCollector(Collector):
collect_dict = {}
# filter records
recs = self.experiment.list_recorders()
recs = self.experiment.list_recorders(**self.list_kwargs)
recs_flt = {}
for rid, rec in recs.items():
if rec_filter_func is None or rec_filter_func(rec):

View File

@@ -1,10 +1,14 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import sys, traceback, signal, atexit, logging
import atexit
import logging
import sys
import traceback
from ..log import get_module_logger
from . import R
from .recorder import Recorder
from ..log import get_module_logger
logger = get_module_logger("workflow", logging.INFO)

View File

@@ -78,6 +78,7 @@ def future_calendar_collector(qlib_dir: [str, Path], freq: str = "day"):
data_list.append(_row_data[0])
data_list = sorted(data_list)
date_list = generate_qlib_calendar(data_list, freq=freq)
date_list = sorted(set(daily_calendar.loc[:, 0].values.tolist() + date_list))
write_calendar_to_qlib(qlib_dir, date_list, freq=freq)
bs.logout()
logger.info(f"get trading dates success: {start_year}-01-01 to {end_year}-12-31")

View File

@@ -71,7 +71,7 @@ pip install -r requirements.txt
- examples:
```bash
# cn 1d data
python collector.py download_data --source_dir ~/.qlib/stock_data/source/cn_1d --start 2020-01-01 --end 2020-12-31 --delay 1 --interval 1d --region US
python collector.py download_data --source_dir ~/.qlib/stock_data/source/cn_1d --start 2020-01-01 --end 2020-12-31 --delay 1 --interval 1d --region CN
# cn 1min data
python collector.py download_data --source_dir ~/.qlib/stock_data/source/cn_1min --delay 1 --interval 1min --region CN
# us 1d data

View File

@@ -283,6 +283,16 @@ class YahooNormalize(BaseNormalize):
COLUMNS = ["open", "close", "high", "low", "volume"]
DAILY_FORMAT = "%Y-%m-%d"
@staticmethod
def calc_change(df: pd.DataFrame, last_close: float) -> pd.Series:
df = df.copy()
_tmp_series = df["close"].fillna(method="ffill")
_tmp_shift_series = _tmp_series.shift(1)
if last_close is not None:
_tmp_shift_series.iloc[0] = float(last_close)
change_series = _tmp_series / _tmp_shift_series - 1
return change_series
@staticmethod
def normalize_yahoo(
df: pd.DataFrame,
@@ -310,11 +320,29 @@ class YahooNormalize(BaseNormalize):
)
df.sort_index(inplace=True)
df.loc[(df["volume"] <= 0) | np.isnan(df["volume"]), set(df.columns) - {symbol_field_name}] = np.nan
_tmp_series = df["close"].fillna(method="ffill")
_tmp_shift_series = _tmp_series.shift(1)
if last_close is not None:
_tmp_shift_series.iloc[0] = float(last_close)
df["change"] = _tmp_series / _tmp_shift_series - 1
change_series = YahooNormalize.calc_change(df, last_close)
# NOTE: The data obtained by Yahoo finance sometimes has exceptions
# WARNING: If it is normal for a `symbol(exchange)` to differ by a factor of *89* to *111* for consecutive trading days,
# WARNING: the logic in the following line needs to be modified
_count = 0
while True:
# NOTE: may appear unusual for many days in a row
change_series = YahooNormalize.calc_change(df, last_close)
_mask = (change_series >= 89) & (change_series <= 111)
if not _mask.any():
break
_tmp_cols = ["high", "close", "low", "open", "adjclose"]
df.loc[_mask, _tmp_cols] = df.loc[_mask, _tmp_cols] / 100
_count += 1
if _count >= 10:
_symbol = df.loc[df[symbol_field_name].first_valid_index()]["symbol"]
logger.warning(
f"{_symbol} `change` is abnormal for {_count} consecutive days, please check the specific data file carefully"
)
df["change"] = YahooNormalize.calc_change(df, last_close)
columns += ["change"]
df.loc[(df["volume"] <= 0) | np.isnan(df["volume"]), columns] = np.nan
@@ -852,7 +880,7 @@ class Run(BaseRun):
if self.interval.lower() == "1min":
if qlib_data_1d_dir is None or not Path(qlib_data_1d_dir).expanduser().exists():
raise ValueError(
"If normalize 1min, the qlib_data_1d_dir parameter must be set: --qlib_data_1d_dir <user qlib 1d data >, Reference: https://github.com/zhupr/qlib/tree/support_extend_data/scripts/data_collector/yahoo#automatic-update-of-daily-frequency-datafrom-yahoo-finance"
"If normalize 1min, the qlib_data_1d_dir parameter must be set: --qlib_data_1d_dir <user qlib 1d data >, Reference: https://github.com/microsoft/qlib/tree/main/scripts/data_collector/yahoo#automatic-update-of-daily-frequency-datafrom-yahoo-finance"
)
super(Run, self).normalize_data(
date_field_name, symbol_field_name, end_date=end_date, qlib_data_1d_dir=qlib_data_1d_dir

View File

@@ -244,6 +244,10 @@ class DumpDataBase:
if df is None or df.empty:
logger.warning(f"{code} data is None or empty")
return
# try to remove dup rows or it will cause exception when reindex.
df = df.drop_duplicates(self.date_field_name)
# features save dir
features_dir = self._features_dir.joinpath(code_to_fname(code).lower())
features_dir.mkdir(parents=True, exist_ok=True)

View File

@@ -11,7 +11,7 @@ NAME = "pyqlib"
DESCRIPTION = "A Quantitative-research Platform"
REQUIRES_PYTHON = ">=3.5.0"
VERSION = "0.6.3.99"
VERSION = "0.7.0.99"
# Detect Cython
try: