diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e7b775bf4..29265b1eb 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,7 +12,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - os: [windows-latest, ubuntu-16.04, ubuntu-18.04, ubuntu-20.04, macos-latest] + os: [windows-latest, ubuntu-16.04, ubuntu-18.04, ubuntu-20.04] python-version: [3.6, 3.7, 3.8, 3.9] steps: @@ -36,42 +36,36 @@ jobs: shell: bash # Test Qlib installed with pip - - name: Install Qlib with pip - run: | - if [ "$RUNNER_OS" == "Windows" ]; then - $CONDA\\python.exe -m pip install numpy==1.19.5 - $CONDA\\python.exe -m pip install pyqlib --ignore-installed ruamel.yaml numpy --user - else - sudo $CONDA/bin/python -m pip install numpy==1.19.5 - sudo $CONDA/bin/python -m pip install pyqlib --ignore-installed ruamel.yaml numpy - fi - shell: bash - - - name: Install Lightgbm for MacOS - if: runner.os == 'macOS' - run: | - /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Microsoft/qlib/main/.github/brew_install.sh)" - HOMEBREW_NO_AUTO_UPDATE=1 brew install lightgbm + # - name: Install Qlib with pip + # run: | + # if [ "$RUNNER_OS" == "Windows" ]; then + # $CONDA\\python.exe -m pip install numpy==1.19.5 + # $CONDA\\python.exe -m pip install pyqlib --ignore-installed ruamel.yaml numpy --user + # else + # sudo $CONDA/bin/python -m pip install numpy==1.19.5 + # sudo $CONDA/bin/python -m pip install pyqlib --ignore-installed ruamel.yaml numpy + # fi + # shell: bash - - name: Test data downloads - run: | - if [ "$RUNNER_OS" == "Windows" ]; then - $CONDA\\python.exe scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data --interval 1d --region cn - else - $CONDA/bin/python scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data --interval 1d --region cn - fi - shell: bash + # - name: Test data downloads + # run: | + # if [ "$RUNNER_OS" == "Windows" ]; then + # $CONDA\\python.exe scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data --interval 1d --region cn + # else + # $CONDA/bin/python scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data --interval 1d --region cn + # fi + # shell: bash - - name: Test workflow by config (install from pip) - run: | - if [ "$RUNNER_OS" == "Windows" ]; then - $CONDA\\python.exe qlib\\workflow\\cli.py examples\\benchmarks\\LightGBM\\workflow_config_lightgbm_Alpha158.yaml - $CONDA\\python.exe -m pip uninstall -y pyqlib - else - $CONDA/bin/python qlib/workflow/cli.py examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml - sudo $CONDA/bin/python -m pip uninstall -y pyqlib - fi - shell: bash + # - name: Test workflow by config (install from pip) + # run: | + # if [ "$RUNNER_OS" == "Windows" ]; then + # $CONDA\\python.exe qlib\\workflow\\cli.py examples\\benchmarks\\LightGBM\\workflow_config_lightgbm_Alpha158.yaml + # $CONDA\\python.exe -m pip uninstall -y pyqlib + # else + # $CONDA/bin/python qlib/workflow/cli.py examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml + # sudo $CONDA/bin/python -m pip uninstall -y pyqlib + # fi + # shell: bash # Test Qlib installed from source - name: Install Qlib from source @@ -89,6 +83,15 @@ jobs: fi shell: bash + - name: Test data downloads + run: | + if [ "$RUNNER_OS" == "Windows" ]; then + $CONDA\\python.exe scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data --interval 1d --region cn + else + $CONDA/bin/python scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data --interval 1d --region cn + fi + shell: bash + - name: Install test dependencies run: | if [ "$RUNNER_OS" == "Windows" ]; then @@ -117,4 +120,4 @@ jobs: else $CONDA/bin/python qlib/workflow/cli.py examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml fi - shell: bash \ No newline at end of file + shell: bash diff --git a/.github/workflows/test_macos.yml b/.github/workflows/test_macos.yml new file mode 100644 index 000000000..e52c27786 --- /dev/null +++ b/.github/workflows/test_macos.yml @@ -0,0 +1,77 @@ +# There are some issues (in the downloading data phase) on MacOS when running with other tests. So we split it into an individual config. +name: Test MacOS + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + build: + + runs-on: macos-latest + strategy: + matrix: + python-version: [3.6, 3.7, 3.8, 3.9] + + steps: + - uses: actions/checkout@v2 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + + - name: Lint with Black + run: | + cd .. + sudo $CONDA/bin/python -m pip install black + $CONDA/bin/python -m black qlib -l 120 --check --diff + + # Test Qlib installed with pip + # - name: Install Qlib with pip + # run: | + # sudo $CONDA/bin/python -m pip install numpy==1.19.5 + # sudo $CONDA/bin/python -m pip install pyqlib --ignore-installed ruamel.yaml numpy + + - name: Install Lightgbm for MacOS + run: | + /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Microsoft/qlib/main/.github/brew_install.sh)" + HOMEBREW_NO_AUTO_UPDATE=1 brew install lightgbm + + # - name: Test data downloads + # run: | + # $CONDA/bin/python scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data --interval 1d --region cn + + # - name: Test workflow by config (install from pip) + # run: | + # $CONDA/bin/python qlib/workflow/cli.py examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml + # sudo $CONDA/bin/python -m pip uninstall -y pyqlib + + # Test Qlib installed from source + - name: Install Qlib from source + run: | + sudo $CONDA/bin/python -m pip install --upgrade cython + sudo $CONDA/bin/python -m pip install numpy jupyter jupyter_contrib_nbextensions + sudo $CONDA/bin/python -m pip install -U scipy scikit-learn # installing without this line will cause errors on GitHub Actions, while instsalling locally won't + sudo $CONDA/bin/python setup.py install + + - name: Test data downloads + run: | + $CONDA/bin/python scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data --interval 1d --region cn + + - name: Install test dependencies + run: | + sudo $CONDA/bin/python -m pip install --upgrade pip + sudo $CONDA/bin/python -m pip install -U pyopenssl idna + sudo $CONDA/bin/python -m pip install black pytest + + - name: Unit tests with Pytest + run: | + cd tests + $CONDA/bin/python -m pytest . --durations=0 + + - name: Test workflow by config (install from source) + run: | + $CONDA/bin/python qlib/workflow/cli.py examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml diff --git a/README.md b/README.md index b68cdaf10..422046c13 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,8 @@ Recent released features | Feature | Status | | -- | ------ | +| Transformer & Localformer | [Released](https://github.com/microsoft/qlib/pull/508) on July 22, 2021 | +| Release Qlib v0.7.0 | [Released](https://github.com/microsoft/qlib/releases/tag/v0.7.0) on July 12, 2021 | | TCTS Model | [Released](https://github.com/microsoft/qlib/pull/491) on July 1, 2021 | | Online serving and automatic model rolling | :star: [Released](https://github.com/microsoft/qlib/pull/290) on May 17, 2021 | | DoubleEnsemble Model | [Released](https://github.com/microsoft/qlib/pull/286) on Mar 2, 2021 | @@ -290,6 +292,8 @@ Here is a list of models built on `Qlib`. - [TabNet based on pytorch (Sercan O. Arik, et al. AAAI 2019)](qlib/contrib/model/pytorch_tabnet.py) - [DoubleEnsemble based on LightGBM (Chuheng Zhang, et al. ICDM 2020)](qlib/contrib/model/double_ensemble.py) - [TCTS based on pytorch (Xueqing Wu, et al. ICML 2021)](qlib/contrib/model/pytorch_tcts.py) +- [Transformer based on pytorch (Ashish Vaswani, et al. NeurIPS 2017)](qlib/contrib/model/pytorch_transformer.py) +- [Localformer based on pytorch (Juyong Jiang, et al.)](qlib/contrib/model/pytorch_localformer.py) Your PR of new Quant models is highly welcomed. @@ -389,7 +393,12 @@ Join IM discussion groups: # Contributing -This project welcomes contributions and suggestions. Most contributions require you to agree to a +This project welcomes contributions and suggestions. +**Here are some +[code standards](docs/developer/code_standard.rst) when you submit a pull request.** + + +Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the right to use your contribution. For details, visit https://cla.opensource.microsoft.com. diff --git a/docs/component/data.rst b/docs/component/data.rst index 73072f369..a3dc74052 100644 --- a/docs/component/data.rst +++ b/docs/component/data.rst @@ -179,6 +179,7 @@ After conversion, users can find their Qlib format data in the directory `~/.qli The Restoration factor. Normally, ``factor = adjusted_price / original_price``, `adjusted price` reference: `split adjusted `_ In the convention of `Qlib` data processing, `open, close, high, low, volume, money and factor` will be set to NaN if the stock is suspended. + If you want to use your own alpha-factor which can't be calculate by OCHLV, like PE, EPS and so on, you could add it to the CSV files with OHCLV together and then dump it to the Qlib format data. Stock Pool (Market) -------------------------------- diff --git a/docs/component/online.rst b/docs/component/online.rst index accc936dd..22a6afaf9 100644 --- a/docs/component/online.rst +++ b/docs/component/online.rst @@ -21,6 +21,8 @@ which including `Online Manager <#Online Manager>`_, `Online Strategy <#Online S If you have many models or `task` needs to be managed, please consider `Task Management <../advanced/task_management.html>`_. The `examples `_ are based on some components in `Task Management <../advanced/task_management.html>`_ such as ``TrainerRM`` or ``Collector``. +**NOTE**: User should keep his data source updated to support online serving. For example, Qlib provides `a batch of scripts `_ to help users update Yahoo daily data. + Online Manager ============= @@ -43,4 +45,4 @@ Updater ============= .. automodule:: qlib.workflow.online.update - :members: \ No newline at end of file + :members: diff --git a/docs/developer/code_standard.rst b/docs/developer/code_standard.rst new file mode 100644 index 000000000..23ea713ba --- /dev/null +++ b/docs/developer/code_standard.rst @@ -0,0 +1,20 @@ +.. _code_standard: + +================================= +Code Standard +================================= + +Docstring +================================= +Please use the Numpy Style. + +Continuous Integration +================================= +Continuous Integration (CI) tools help you stick to the quality standards by running tests every time you push a new commit and reporting the results to a pull request. + +A common error is the mixed use of space and tab. You can fix the bug by inputing the following code in the command line. + +.. code-block:: python + + pip install black + python -m black . -l 120 \ No newline at end of file diff --git a/examples/benchmarks/Localformer/requirements.txt b/examples/benchmarks/Localformer/requirements.txt new file mode 100644 index 000000000..d5b918797 --- /dev/null +++ b/examples/benchmarks/Localformer/requirements.txt @@ -0,0 +1,3 @@ +numpy==1.17.4 +pandas==1.1.2 +torch==1.2.0 \ No newline at end of file diff --git a/examples/benchmarks/Localformer/workflow_config_localformer_Alpha158.yaml b/examples/benchmarks/Localformer/workflow_config_localformer_Alpha158.yaml new file mode 100644 index 000000000..d7e967333 --- /dev/null +++ b/examples/benchmarks/Localformer/workflow_config_localformer_Alpha158.yaml @@ -0,0 +1,82 @@ +qlib_init: + provider_uri: "~/.qlib/qlib_data/cn_data" + region: cn +market: &market csi300 +benchmark: &benchmark SH000300 +data_handler_config: &data_handler_config + start_time: 2008-01-01 + end_time: 2020-08-01 + fit_start_time: 2008-01-01 + fit_end_time: 2014-12-31 + instruments: *market + infer_processors: + - class: FilterCol + kwargs: + fields_group: feature + col_list: ["RESI5", "WVMA5", "RSQR5", "KLEN", "RSQR10", "CORR5", "CORD5", "CORR10", + "ROC60", "RESI10", "VSTD5", "RSQR60", "CORR60", "WVMA60", "STD5", + "RSQR20", "CORD60", "CORD10", "CORR20", "KLOW" + ] + - class: RobustZScoreNorm + kwargs: + fields_group: feature + clip_outlier: true + - class: Fillna + kwargs: + fields_group: feature + learn_processors: + - class: DropnaLabel + - class: CSRankNorm + kwargs: + fields_group: label + label: ["Ref($close, -2) / Ref($close, -1) - 1"] + +port_analysis_config: &port_analysis_config + strategy: + class: TopkDropoutStrategy + module_path: qlib.contrib.strategy.strategy + kwargs: + topk: 50 + n_drop: 5 + backtest: + verbose: False + limit_threshold: 0.095 + account: 100000000 + benchmark: *benchmark + deal_price: close + open_cost: 0.0005 + close_cost: 0.0015 + min_cost: 5 +task: + model: + class: LocalformerModel + module_path: qlib.contrib.model.pytorch_localformer_ts + kwargs: + seed: 0 + n_jobs: 20 + dataset: + class: TSDatasetH + module_path: qlib.data.dataset + kwargs: + handler: + class: Alpha158 + module_path: qlib.contrib.data.handler + kwargs: *data_handler_config + segments: + train: [2008-01-01, 2014-12-31] + valid: [2015-01-01, 2016-12-31] + test: [2017-01-01, 2020-08-01] + step_len: 20 + record: + - class: SignalRecord + module_path: qlib.workflow.record_temp + kwargs: {} + - class: SigAnaRecord + module_path: qlib.workflow.record_temp + kwargs: + ana_long_short: False + ann_scaler: 252 + - class: PortAnaRecord + module_path: qlib.workflow.record_temp + kwargs: + config: *port_analysis_config diff --git a/examples/benchmarks/Localformer/workflow_config_localformer_Alpha360.yaml b/examples/benchmarks/Localformer/workflow_config_localformer_Alpha360.yaml new file mode 100644 index 000000000..1c8489461 --- /dev/null +++ b/examples/benchmarks/Localformer/workflow_config_localformer_Alpha360.yaml @@ -0,0 +1,73 @@ +qlib_init: + provider_uri: "~/.qlib/qlib_data/cn_data" + region: cn +market: &market csi300 +benchmark: &benchmark SH000300 +data_handler_config: &data_handler_config + start_time: 2008-01-01 + end_time: 2020-08-01 + fit_start_time: 2008-01-01 + fit_end_time: 2014-12-31 + instruments: *market + infer_processors: + - class: RobustZScoreNorm + kwargs: + fields_group: feature + clip_outlier: true + - class: Fillna + kwargs: + fields_group: feature + learn_processors: + - class: DropnaLabel + - class: CSRankNorm + kwargs: + fields_group: label + label: ["Ref($close, -2) / Ref($close, -1) - 1"] +port_analysis_config: &port_analysis_config + strategy: + class: TopkDropoutStrategy + module_path: qlib.contrib.strategy.strategy + kwargs: + topk: 50 + n_drop: 5 + backtest: + verbose: False + limit_threshold: 0.095 + account: 100000000 + benchmark: *benchmark + deal_price: close + open_cost: 0.0005 + close_cost: 0.0015 + min_cost: 5 +task: + model: + class: LocalformerModel + module_path: qlib.contrib.model.pytorch_localformer + kwargs: + d_feat: 6 + seed: 0 + dataset: + class: DatasetH + module_path: qlib.data.dataset + kwargs: + handler: + class: Alpha360 + module_path: qlib.contrib.data.handler + kwargs: *data_handler_config + segments: + train: [2008-01-01, 2014-12-31] + valid: [2015-01-01, 2016-12-31] + test: [2017-01-01, 2020-08-01] + record: + - class: SignalRecord + module_path: qlib.workflow.record_temp + kwargs: {} + - class: SigAnaRecord + module_path: qlib.workflow.record_temp + kwargs: + ana_long_short: False + ann_scaler: 252 + - class: PortAnaRecord + module_path: qlib.workflow.record_temp + kwargs: + config: *port_analysis_config \ No newline at end of file diff --git a/examples/benchmarks/README.md b/examples/benchmarks/README.md index 1920a6a3b..ee2c0a833 100644 --- a/examples/benchmarks/README.md +++ b/examples/benchmarks/README.md @@ -1,6 +1,6 @@ # Benchmarks Performance -Here are the results of each benchmark model running on Qlib's `Alpha360` and `Alpha158` dataset with China's A shared-stock & CSI300 data respectively. The values of each metric are the mean and std calculated based on 20 runs. +Here are the results of each benchmark model running on Qlib's `Alpha360` and `Alpha158` dataset with China's A shared-stock & CSI300 data respectively. The values of each metric are the mean and std calculated based on 20 runs with different random seeds. The numbers shown below demonstrate the performance of the entire `workflow` of each model. We will update the `workflow` as well as models in the near future for better results. @@ -23,6 +23,8 @@ The numbers shown below demonstrate the performance of the entire `workflow` of | DoubleEnsemble (Chuheng Zhang, et al.) | Alpha360 | 0.0407±0.00| 0.3053±0.00 | 0.0490±0.00 | 0.3840±0.00 | 0.0380±0.02 | 0.5000±0.21 | -0.0984±0.02 | | TabNet (Sercan O. Arik, et al.)| Alpha360 | 0.0192±0.00 | 0.1401±0.00| 0.0291±0.00 | 0.2163±0.00 | -0.0258±0.00 | -0.2961±0.00| -0.1429±0.00 | | TCTS (Xueqing Wu, et al.)| Alpha360 | 0.0485±0.00 | 0.3689±0.04| 0.0586±0.00 | 0.4669±0.02 | 0.0816±0.02 | 1.1572±0.30| -0.0689±0.02 | +| Transformer (Ashish Vaswani, et al.)| Alpha360 | 0.0141±0.00 | 0.0917±0.02| 0.0331±0.00 | 0.2357±0.03 | -0.0259±0.03 | -0.3323±0.43| -0.1763±0.07 | +| Localformer (Juyong Jiang, et al.)| Alpha360 | 0.0408±0.00 | 0.2988±0.03| 0.0538±0.00 | 0.4105±0.02 | 0.0275±0.03 | 0.3464±0.37| -0.1182±0.03 | ## Alpha158 dataset | Model Name | Dataset | IC | ICIR | Rank IC | Rank ICIR | Annualized Return | Information Ratio | Max Drawdown | @@ -39,6 +41,8 @@ The numbers shown below demonstrate the performance of the entire `workflow` of | GATs (Petar Velickovic, et al.) | Alpha158 (with selected 20 features) | 0.0349±0.00 | 0.2511±0.01| 0.0457±0.00 | 0.3537±0.01 | 0.0578±0.02 | 0.8221±0.25| -0.0824±0.02 | | DoubleEnsemble (Chuheng Zhang, et al.) | Alpha158 | 0.0544±0.00 | 0.4338±0.01 | 0.0523±0.00 | 0.4257±0.01 | 0.1253±0.01 | 1.4105±0.14 | -0.0902±0.01 | | TabNet (Sercan O. Arik, et al.)| Alpha158 | 0.0383±0.00 | 0.3414±0.00| 0.0388±0.00 | 0.3460±0.00 | 0.0226±0.00 | 0.2652±0.00| -0.1072±0.00 | +| Transformer (Ashish Vaswani, et al.)| Alpha158 | 0.0274±0.00 | 0.2166±0.04| 0.0409±0.00 | 0.3342±0.04 | 0.0204±0.03 | 0.2888±0.40| -0.1216±0.04 | +| Localformer (Juyong Jiang, et al.)| Alpha158 | 0.0355±0.00 | 0.2747±0.04| 0.0466±0.00 | 0.3762±0.03 | 0.0506±0.02 | 0.7447±0.34| -0.0875±0.02 | - The selected 20 features are based on the feature importance of a lightgbm-based model. - The base model of DoubleEnsemble is LGBM. diff --git a/examples/benchmarks/Transformer/requirements.txt b/examples/benchmarks/Transformer/requirements.txt new file mode 100644 index 000000000..d5b918797 --- /dev/null +++ b/examples/benchmarks/Transformer/requirements.txt @@ -0,0 +1,3 @@ +numpy==1.17.4 +pandas==1.1.2 +torch==1.2.0 \ No newline at end of file diff --git a/examples/benchmarks/Transformer/workflow_config_transformer_Alpha158.yaml b/examples/benchmarks/Transformer/workflow_config_transformer_Alpha158.yaml new file mode 100644 index 000000000..54707386f --- /dev/null +++ b/examples/benchmarks/Transformer/workflow_config_transformer_Alpha158.yaml @@ -0,0 +1,82 @@ +qlib_init: + provider_uri: "~/.qlib/qlib_data/cn_data" + region: cn +market: &market csi300 +benchmark: &benchmark SH000300 +data_handler_config: &data_handler_config + start_time: 2008-01-01 + end_time: 2020-08-01 + fit_start_time: 2008-01-01 + fit_end_time: 2014-12-31 + instruments: *market + infer_processors: + - class: FilterCol + kwargs: + fields_group: feature + col_list: ["RESI5", "WVMA5", "RSQR5", "KLEN", "RSQR10", "CORR5", "CORD5", "CORR10", + "ROC60", "RESI10", "VSTD5", "RSQR60", "CORR60", "WVMA60", "STD5", + "RSQR20", "CORD60", "CORD10", "CORR20", "KLOW" + ] + - class: RobustZScoreNorm + kwargs: + fields_group: feature + clip_outlier: true + - class: Fillna + kwargs: + fields_group: feature + learn_processors: + - class: DropnaLabel + - class: CSRankNorm + kwargs: + fields_group: label + label: ["Ref($close, -2) / Ref($close, -1) - 1"] + +port_analysis_config: &port_analysis_config + strategy: + class: TopkDropoutStrategy + module_path: qlib.contrib.strategy.strategy + kwargs: + topk: 50 + n_drop: 5 + backtest: + verbose: False + limit_threshold: 0.095 + account: 100000000 + benchmark: *benchmark + deal_price: close + open_cost: 0.0005 + close_cost: 0.0015 + min_cost: 5 +task: + model: + class: TransformerModel + module_path: qlib.contrib.model.pytorch_transformer_ts + kwargs: + seed: 0 + n_jobs: 20 + dataset: + class: TSDatasetH + module_path: qlib.data.dataset + kwargs: + handler: + class: Alpha158 + module_path: qlib.contrib.data.handler + kwargs: *data_handler_config + segments: + train: [2008-01-01, 2014-12-31] + valid: [2015-01-01, 2016-12-31] + test: [2017-01-01, 2020-08-01] + step_len: 20 + record: + - class: SignalRecord + module_path: qlib.workflow.record_temp + kwargs: {} + - class: SigAnaRecord + module_path: qlib.workflow.record_temp + kwargs: + ana_long_short: False + ann_scaler: 252 + - class: PortAnaRecord + module_path: qlib.workflow.record_temp + kwargs: + config: *port_analysis_config diff --git a/examples/benchmarks/Transformer/workflow_config_transformer_Alpha360.yaml b/examples/benchmarks/Transformer/workflow_config_transformer_Alpha360.yaml new file mode 100644 index 000000000..e568a1b30 --- /dev/null +++ b/examples/benchmarks/Transformer/workflow_config_transformer_Alpha360.yaml @@ -0,0 +1,73 @@ +qlib_init: + provider_uri: "~/.qlib/qlib_data/cn_data" + region: cn +market: &market csi300 +benchmark: &benchmark SH000300 +data_handler_config: &data_handler_config + start_time: 2008-01-01 + end_time: 2020-08-01 + fit_start_time: 2008-01-01 + fit_end_time: 2014-12-31 + instruments: *market + infer_processors: + - class: RobustZScoreNorm + kwargs: + fields_group: feature + clip_outlier: true + - class: Fillna + kwargs: + fields_group: feature + learn_processors: + - class: DropnaLabel + - class: CSRankNorm + kwargs: + fields_group: label + label: ["Ref($close, -2) / Ref($close, -1) - 1"] +port_analysis_config: &port_analysis_config + strategy: + class: TopkDropoutStrategy + module_path: qlib.contrib.strategy.strategy + kwargs: + topk: 50 + n_drop: 5 + backtest: + verbose: False + limit_threshold: 0.095 + account: 100000000 + benchmark: *benchmark + deal_price: close + open_cost: 0.0005 + close_cost: 0.0015 + min_cost: 5 +task: + model: + class: TransformerModel + module_path: qlib.contrib.model.pytorch_transformer + kwargs: + d_feat: 6 + seed: 0 + dataset: + class: DatasetH + module_path: qlib.data.dataset + kwargs: + handler: + class: Alpha360 + module_path: qlib.contrib.data.handler + kwargs: *data_handler_config + segments: + train: [2008-01-01, 2014-12-31] + valid: [2015-01-01, 2016-12-31] + test: [2017-01-01, 2020-08-01] + record: + - class: SignalRecord + module_path: qlib.workflow.record_temp + kwargs: {} + - class: SigAnaRecord + module_path: qlib.workflow.record_temp + kwargs: + ana_long_short: False + ann_scaler: 252 + - class: PortAnaRecord + module_path: qlib.workflow.record_temp + kwargs: + config: *port_analysis_config \ No newline at end of file diff --git a/examples/highfreq/highfreq_handler.py b/examples/highfreq/highfreq_handler.py index 19bb2550b..c15c3ec41 100644 --- a/examples/highfreq/highfreq_handler.py +++ b/examples/highfreq/highfreq_handler.py @@ -99,8 +99,6 @@ class HighFreqHandler(DataHandlerLP): ] names += ["$volume_1"] - fields += ["Cut({0}, 240, None)".format(template_paused.format("Date($close)"))] - names += ["date"] return fields, names diff --git a/examples/highfreq/highfreq_processor.py b/examples/highfreq/highfreq_processor.py index f0ab0dec2..62065469b 100644 --- a/examples/highfreq/highfreq_processor.py +++ b/examples/highfreq/highfreq_processor.py @@ -33,6 +33,9 @@ class HighFreqNorm(Processor): self.feature_vmin[name] = np.nanmin(part_values) def __call__(self, df_features): + df_features["date"] = pd.to_datetime( + df_features.index.get_level_values(level="datetime").to_series().dt.date.values + ) df_features.set_index("date", append=True, drop=True, inplace=True) df_values = df_features.values names = { diff --git a/examples/run_all_model.py b/examples/run_all_model.py index c79fee004..1284d8e99 100644 --- a/examples/run_all_model.py +++ b/examples/run_all_model.py @@ -23,7 +23,6 @@ from qlib.config import REG_CN from qlib.workflow import R from qlib.tests.data import GetData - # init qlib provider_uri = "~/.qlib/qlib_data/cn_data" exp_folder_name = "run_all_model_records" @@ -40,6 +39,7 @@ exp_manager = { GetData().qlib_data(target_dir=provider_uri, region=REG_CN, exists_skip=True) qlib.init(provider_uri=provider_uri, region=REG_CN, exp_manager=exp_manager) + # decorator to check the arguments def only_allow_defined_args(function_to_decorate): @functools.wraps(function_to_decorate) @@ -92,7 +92,8 @@ def create_env(): # function to execute the cmd -def execute(cmd): +def execute(cmd, wait_when_err=False): + print("Running CMD:", cmd) with subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1, universal_newlines=True, shell=True) as p: for line in p.stdout: sys.stdout.write(line.split("\b")[0]) @@ -102,6 +103,8 @@ def execute(cmd): sys.stdout.write("\b" * 10 + "\b".join(line.split("\b")[1:-1])) if p.returncode != 0: + if wait_when_err: + input("Press Enter to Continue") return p.stderr else: return None @@ -184,7 +187,15 @@ def gen_and_save_md_table(metrics, dataset): # function to run the all the models @only_allow_defined_args -def run(times=1, models=None, dataset="Alpha360", exclude=False): +def run( + times=1, + models=None, + dataset="Alpha360", + exclude=False, + qlib_uri: str = "git+https://github.com/microsoft/qlib#egg=pyqlib", + wait_before_rm_env: bool = False, + wait_when_err: bool = False, +): """ Please be aware that this function can only work under Linux. MacOS and Windows will be supported in the future. Any PR to enhance this method is highly welcomed. Besides, this script doesn't support parrallel running the same model @@ -200,6 +211,13 @@ def run(times=1, models=None, dataset="Alpha360", exclude=False): determines whether the model being used is excluded or included. dataset : str determines the dataset to be used for each model. + qlib_uri : str + the uri to install qlib with pip + it could be url on the we or local path + wait_before_rm_env : bool + wait before remove environment. + wait_when_err : bool + wait when errors raised when executing commands Usage: ------- @@ -240,32 +258,36 @@ def run(times=1, models=None, dataset="Alpha360", exclude=False): sys.stderr.write("\n") # install requirements.txt sys.stderr.write("Installing requirements.txt...\n") - execute(f"{python_path} -m pip install -r {req_path}") + execute(f"{python_path} -m pip install -r {req_path}", wait_when_err=wait_when_err) sys.stderr.write("\n") # setup gpu for tft if fn == "TFT": execute( - f"conda install -y --prefix {env_path} anaconda cudatoolkit=10.0 && conda install -y --prefix {env_path} cudnn" + f"conda install -y --prefix {env_path} anaconda cudatoolkit=10.0 && conda install -y --prefix {env_path} cudnn", + wait_when_err=wait_when_err, ) sys.stderr.write("\n") # install qlib sys.stderr.write("Installing qlib...\n") - execute(f"{python_path} -m pip install --upgrade pip") # TODO: FIX ME! - execute(f"{python_path} -m pip install --upgrade cython") # TODO: FIX ME! + execute(f"{python_path} -m pip install --upgrade pip", wait_when_err=wait_when_err) # TODO: FIX ME! + execute(f"{python_path} -m pip install --upgrade cython", wait_when_err=wait_when_err) # TODO: FIX ME! if fn == "TFT": execute( - f"cd {env_path} && {python_path} -m pip install --upgrade --force-reinstall --ignore-installed PyYAML -e git+https://github.com/microsoft/qlib#egg=pyqlib" + f"cd {env_path} && {python_path} -m pip install --upgrade --force-reinstall --ignore-installed PyYAML -e {qlib_uri}", + wait_when_err=wait_when_err, ) # TODO: FIX ME! else: execute( - f"cd {env_path} && {python_path} -m pip install --upgrade --force-reinstall -e git+https://github.com/microsoft/qlib#egg=pyqlib" + f"cd {env_path} && {python_path} -m pip install --upgrade --force-reinstall -e {qlib_uri}", + wait_when_err=wait_when_err, ) # TODO: FIX ME! sys.stderr.write("\n") # run workflow_by_config for multiple times for i in range(times): sys.stderr.write(f"Running the model: {fn} for iteration {i+1}...\n") errs = execute( - f"{python_path} {env_path / 'src/pyqlib/qlib/workflow/cli.py'} {yaml_path} {fn} {exp_folder_name}" + f"{python_path} {env_path / 'bin' / 'qrun'} {yaml_path} {fn} {exp_folder_name}", + wait_when_err=wait_when_err, ) if errs is not None: _errs = errors.get(fn, {}) @@ -274,6 +296,8 @@ def run(times=1, models=None, dataset="Alpha360", exclude=False): sys.stderr.write("\n") # remove env sys.stderr.write(f"Deleting the environment: {env_path}...\n") + if wait_before_rm_env: + input("Press Enter to Continue") shutil.rmtree(env_path) # getting all results sys.stderr.write(f"Retrieving results...\n") diff --git a/qlib/__init__.py b/qlib/__init__.py index 5f45f4557..6f76bbcaa 100644 --- a/qlib/__init__.py +++ b/qlib/__init__.py @@ -2,7 +2,7 @@ # Licensed under the MIT License. -__version__ = "0.6.3.99" +__version__ = "0.7.0.99" __version__bak = __version__ # This version is backup for QlibConfig.reset_qlib_version diff --git a/qlib/backtest/__init__.py b/qlib/backtest/__init__.py index dbfbd4a0e..1babd08c7 100644 --- a/qlib/backtest/__init__.py +++ b/qlib/backtest/__init__.py @@ -185,7 +185,7 @@ def backtest( exchange_kwargs={}, pos_type: str = "Position", ): - """initialize the strategy and executor, then backtest funciton for the interaction of the outermost strategy and executor in the nested decision execution + """initialize the strategy and executor, then backtest function for the interaction of the outermost strategy and executor in the nested decision execution Parameters ---------- diff --git a/qlib/backtest/account.py b/qlib/backtest/account.py index 9b9a25c23..542c0fba2 100644 --- a/qlib/backtest/account.py +++ b/qlib/backtest/account.py @@ -1,9 +1,8 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. - - +from __future__ import annotations import copy -from typing import Dict, List, Tuple +from typing import Dict, List, Tuple, TYPE_CHECKING from qlib.utils import init_instance_by_config import warnings import pandas as pd @@ -11,7 +10,9 @@ import pandas as pd from .position import BasePosition, InfPosition, Position from .report import Report, Indicator from .order import BaseTradeDecision, Order -from .exchange import Exchange + +if TYPE_CHECKING: + from .exchange import Exchange """ rtn & earning in the Account @@ -73,6 +74,18 @@ class Account: pos_type: str = "Position", port_metr_enabled: bool = True, ): + """the trade account of backtest. + + Parameters + ---------- + init_cash : float, optional + initial cash, by default 1e9 + position_dict : Dict[stock_id, {"amount": int, "price"(optional): float}], optional + initial stocks with amount and price, + if there is no price key in the dict of stocks, it will be filled by latest close price from qlib. + by default {}. + """ + self._pos_type = pos_type self._port_metr_enabled = port_metr_enabled @@ -109,7 +122,7 @@ class Account: self.report = Report(freq, benchmark_config) self.positions = {} - # trading related matric(e.g. high-frequency trading) + # trading related metrics(e.g. high-frequency trading) self.indicator = Indicator() def reset(self, freq=None, benchmark_config=None, init_report=False, port_metr_enabled: bool = None): @@ -161,7 +174,7 @@ class Account: self.accum_info.add_return_value(profit) # note here do not consider cost def update_order(self, order, trade_val, cost, trade_price): - if not self.is_port_metr_enabled(): + if self.current.skip_update(): # TODO: supporting polymorphism for account # updating order for infinite position is meaningless return @@ -289,7 +302,7 @@ class Account: if atomic is True and trade_info is None: raise ValueError("trade_info is necessary in atomic executor") elif atomic is False and inner_order_indicators is None: - raise ValueError("inner_order_indicators is necessary in unatomic executor") + raise ValueError("inner_order_indicators is necessary in un-atomic executor") # TODO: `update_bar_count` and `update_current` should placed in Position and be merged. self.update_bar_count() diff --git a/qlib/backtest/exchange.py b/qlib/backtest/exchange.py index ea1d012eb..9044179e0 100644 --- a/qlib/backtest/exchange.py +++ b/qlib/backtest/exchange.py @@ -1,11 +1,15 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +from __future__ import annotations +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from .account import Account -from qlib.backtest.position import Position +from qlib.backtest.position import BasePosition, Position import random import logging -from typing import List, Tuple, Union +from typing import List, Tuple, Union, Callable, Iterable import numpy as np import pandas as pd @@ -16,6 +20,7 @@ from ..config import C, REG_CN from ..utils.resam import resam_ts_data, ts_data_last from ..log import get_module_logger from .order import Order, OrderDir, OrderHelper +from .high_performance_ds import PandasQuote class Exchange: @@ -33,6 +38,7 @@ class Exchange: close_cost=0.0025, min_cost=5, extra_quote=None, + quote_cls=PandasQuote, **kwargs, ): """__init__ @@ -103,10 +109,11 @@ class Exchange: # TODO: the quote, trade_dates, codes are not necessray. # It is just for performance consideration. + self.limit_type = self._get_limit_type(limit_threshold) if limit_threshold is None: if C.region == REG_CN: self.logger.warning(f"limit_threshold not set. The stocks hit the limit may be bought/sold") - elif self._get_limit_type(limit_threshold) == self.LT_FLT and abs(limit_threshold) > 0.1: + elif self.limit_type == self.LT_FLT and abs(limit_threshold) > 0.1: if C.region == REG_CN: self.logger.warning(f"limit_threshold may not be set to a reasonable value") @@ -128,10 +135,9 @@ class Exchange: # $change is for calculating the limit of the stock necessary_fields = {self.buy_price, self.sell_price, "$close", "$change", "$factor", "$volume"} - if self._get_limit_type(limit_threshold) == self.LT_TP_EXP: + if self.limit_type == self.LT_TP_EXP: for exp in limit_threshold: necessary_fields.add(exp) - subscribe_fields = list(necessary_fields | set(subscribe_fields)) all_fields = list(necessary_fields | set(subscribe_fields)) self.all_fields = all_fields @@ -141,39 +147,43 @@ class Exchange: self.limit_threshold: Union[Tuple[str, str], float, None] = limit_threshold self.volume_threshold = volume_threshold self.extra_quote = extra_quote - self.set_quote(codes, start_time, end_time) + self.get_quote_from_qlib() - def set_quote(self, codes, start_time, end_time): - if len(codes) == 0: - codes = D.instruments() + # init quote by quote_df + self.quote_cls = quote_cls + self.quote = self.quote_cls(self.quote_df) - self.quote = D.features(codes, self.all_fields, start_time, end_time, freq=self.freq, disk_cache=True).dropna( - subset=["$close"] - ) - self.quote.columns = self.all_fields + def get_quote_from_qlib(self): + # get stock data from qlib + if len(self.codes) == 0: + self.codes = D.instruments() + self.quote_df = D.features( + self.codes, self.all_fields, self.start_time, self.end_time, freq=self.freq, disk_cache=True + ).dropna(subset=["$close"]) + self.quote_df.columns = self.all_fields + # check buy_price data and sell_price data for attr in "buy_price", "sell_price": pstr = getattr(self, attr) # price string - if self.quote[pstr].isna().any(): + if self.quote_df[pstr].isna().any(): self.logger.warning("{} field data contains nan.".format(pstr)) - if self.quote["$factor"].isna().any(): + # update trade_w_adj_price + if self.quote_df["$factor"].isna().any(): # The 'factor.day.bin' file not exists, and `factor` field contains `nan` # Use adjusted price self.trade_w_adj_price = True self.logger.warning("factor.day.bin file not exists or factor contains `nan`. Order using adjusted_price.") if self.trade_unit is not None: self.logger.warning(f"trade unit {self.trade_unit} is not supported in adjusted_price mode.") - else: # The `factor.day.bin` file exists and all data `close` and `factor` are not `nan` # Use normal price self.trade_w_adj_price = False - # update limit - self._update_limit() + self._update_limit(self.limit_threshold) - quote_df = self.quote + # concat extra_quote if self.extra_quote is not None: # process extra_quote if "$close" not in self.extra_quote: @@ -192,21 +202,15 @@ class Exchange: if "limit_buy" not in self.extra_quote.columns: self.extra_quote["limit_buy"] = False self.logger.warning("No limit_buy set for extra_quote. All stock will be able to be bought.") - - assert set(self.extra_quote.columns) == set(quote_df.columns) - {"$change"} - quote_df = pd.concat([quote_df, self.extra_quote], sort=False, axis=0) - - quote_dict = {} - for stock_id, stock_val in quote_df.groupby(level="instrument"): - quote_dict[stock_id] = stock_val.droplevel(level="instrument") - - self.quote = quote_dict + assert set(self.extra_quote.columns) == set(self.quote_df.columns) - {"$change"} + self.quote_df = pd.concat([self.quote_df, extra_quote], sort=False, axis=0) LT_TP_EXP = "(exp)" # Tuple[str, str] LT_FLT = "float" # float LT_NONE = "none" # none def _get_limit_type(self, limit_threshold): + """get limit type""" if isinstance(limit_threshold, Tuple): return self.LT_TP_EXP elif isinstance(limit_threshold, float): @@ -216,19 +220,19 @@ class Exchange: else: raise NotImplementedError(f"This type of `limit_threshold` is not supported") - def _update_limit(self): + def _update_limit(self, limit_threshold): # check limit_threshold - lt_type = self._get_limit_type(self.limit_threshold) - if lt_type == self.LT_NONE: - self.quote["limit_buy"] = False - self.quote["limit_sell"] = False - elif lt_type == self.LT_TP_EXP: + limit_type = self._get_limit_type(limit_threshold) + if limit_type == self.LT_NONE: + self.quote_df["limit_buy"] = False + self.quote_df["limit_sell"] = False + elif limit_type == self.LT_TP_EXP: # set limit - self.quote["limit_buy"] = self.quote[self.limit_threshold[0]] - self.quote["limit_sell"] = self.quote[self.limit_threshold[1]] - elif lt_type == self.LT_FLT: - self.quote["limit_buy"] = self.quote["$change"].ge(self.limit_threshold) - self.quote["limit_sell"] = self.quote["$change"].le(-self.limit_threshold) # pylint: disable=E1130 + self.quote_df["limit_buy"] = self.quote_df[limit_threshold[0]] + self.quote_df["limit_sell"] = self.quote_df[limit_threshold[1]] + elif limit_type == self.LT_FLT: + self.quote_df["limit_buy"] = self.quote_df["$change"].ge(limit_threshold) + self.quote_df["limit_sell"] = self.quote_df["$change"].le(-limit_threshold) # pylint: disable=E1130 def check_stock_limit(self, stock_id, start_time, end_time, direction=None): """ @@ -242,20 +246,20 @@ class Exchange: """ if direction is None: - buy_limit = resam_ts_data(self.quote[stock_id]["limit_buy"], start_time, end_time, method="all") - sell_limit = resam_ts_data(self.quote[stock_id]["limit_sell"], start_time, end_time, method="all") + buy_limit = self.quote.get_data(stock_id, start_time, end_time, fields="limit_buy", method="all") + sell_limit = self.quote.get_data(stock_id, start_time, end_time, fields="limit_sell", method="all") return buy_limit or sell_limit elif direction == Order.BUY: - return resam_ts_data(self.quote[stock_id]["limit_buy"], start_time, end_time, method="all") + return self.quote.get_data(stock_id, start_time, end_time, fields="limit_buy", method="all") elif direction == Order.SELL: - return resam_ts_data(self.quote[stock_id]["limit_sell"], start_time, end_time, method="all") + return self.quote.get_data(stock_id, start_time, end_time, fields="limit_sell", method="all") else: raise ValueError(f"direction {direction} is not supported!") def check_stock_suspended(self, stock_id, start_time, end_time): # is suspended - if stock_id in self.quote: - return resam_ts_data(self.quote[stock_id], start_time, end_time, method=None) is None + if stock_id in self.quote.get_all_stock(): + return self.quote.get_data(stock_id, start_time, end_time) is None else: return True @@ -278,7 +282,7 @@ class Exchange: else: return True - def deal_order(self, order, trade_account=None, position=None): + def deal_order(self, order, trade_account: Account = None, position: BasePosition = None): """ Deal order when the actual transaction @@ -289,13 +293,12 @@ class Exchange: :param position: position to be updated after dealing the order. :return: trade_val, trade_cost, trade_price """ - # need to check order first - # TODO: check the order unit limit in the exchange!!!! - # The order limit is related to the adj factor and the cur_amount. - # factor = self.quote[(order.stock_id, order.trade_date)]['$factor'] - # cur_amount = trade_account.current.get_stock_amount(order.stock_id) + # check order first. if self.check_order(order) is False: - raise AttributeError("need to check order first") + order.deal_amount = 0.0 + # using np.nan instead of None to make it more convenient to should the value in format string + return 0.0, 0.0, np.nan + if trade_account is not None and position is not None: raise ValueError("trade_account and position can only choose one") @@ -304,25 +307,29 @@ class Exchange: trade_val, trade_cost = self._calc_trade_info_by_order( order, trade_account.current if trade_account else position ) - # update account if order.deal_amount > 1e-5: - # If the order can only be deal 0 aomount. Nothing to be updated - # Otherwise, it will result some stock with 0 amount in the position + # If the order can only be deal 0 amount. Nothing to be updated + # Otherwise, it will result in + # 1) some stock with 0 amount in the position + # 2) `trade_unit` of trade_cost will be lost in user account if trade_account: trade_account.update_order(order=order, trade_val=trade_val, cost=trade_cost, trade_price=trade_price) elif position: position.update_order(order=order, trade_val=trade_val, cost=trade_cost, trade_price=trade_price) + else: + # if dealing is not successful, the trade_cost should be zero + trade_cost = 0 return trade_val, trade_cost, trade_price def get_quote_info(self, stock_id, start_time, end_time, method=ts_data_last): - return resam_ts_data(self.quote[stock_id], start_time, end_time, method=method) + return self.quote.get_data(stock_id, start_time, end_time, method=method) def get_close(self, stock_id, start_time, end_time, method=ts_data_last): - return resam_ts_data(self.quote[stock_id]["$close"], start_time, end_time, method=method) + return self.quote.get_data(stock_id, start_time, end_time, fields="$close", method=method) def get_volume(self, stock_id, start_time, end_time, method="sum"): - return resam_ts_data(self.quote[stock_id]["$volume"], start_time, end_time, method=method) + return self.quote.get_data(stock_id, start_time, end_time, fields="$volume", method=method) def get_deal_price(self, stock_id, start_time, end_time, direction: OrderDir, method=ts_data_last): if direction == OrderDir.SELL: @@ -331,7 +338,7 @@ class Exchange: pstr = self.buy_price else: raise NotImplementedError(f"This type of input is not supported") - deal_price = resam_ts_data(self.quote[stock_id][pstr], start_time, end_time, method=method) + deal_price = self.quote.get_data(stock_id, start_time, end_time, fields=pstr, method=method) if method is not None and (np.isclose(deal_price, 0.0) or np.isnan(deal_price)): self.logger.warning(f"(stock_id:{stock_id}, trade_time:{(start_time, end_time)}, {pstr}): {deal_price}!!!") self.logger.warning(f"setting deal_price to close price") @@ -346,10 +353,10 @@ class Exchange: `None`: if the stock is suspended `None` may be returned `float`: return factor if the factor exists """ - assert (start_time is not None and end_time is not None, "the time range must be given") - if stock_id not in self.quote: + assert start_time is not None and end_time is not None, "the time range must be given" + if stock_id not in self.quote.get_all_stock(): return None - return resam_ts_data(self.quote[stock_id]["$factor"], start_time, end_time, method=ts_data_last) + return self.quote.get_data(stock_id, start_time, end_time, fields="$factor", method=ts_data_last) def generate_amount_position_from_weight_position( self, weight_position, cash, start_time, end_time, direction=OrderDir.BUY @@ -509,7 +516,7 @@ class Exchange: ) return value - def _get_factor_or_raise_erorr(self, factor: float = None, stock_id: str = None, start_time=None, end_time=None): + def _get_factor_or_raise_error(self, factor: float = None, stock_id: str = None, start_time=None, end_time=None): """Please refer to the docs of get_amount_of_trade_unit""" if factor is None: if stock_id is not None and start_time is not None and end_time is not None: @@ -537,7 +544,7 @@ class Exchange: the end time of trading range """ if not self.trade_w_adj_price and self.trade_unit is not None: - factor = self._get_factor_or_raise_erorr( + factor = self._get_factor_or_raise_error( factor=factor, stock_id=stock_id, start_time=start_time, end_time=end_time ) return self.trade_unit / factor @@ -556,7 +563,7 @@ class Exchange: """ if not self.trade_w_adj_price and self.trade_unit is not None: # the minimal amount is 1. Add 0.1 for solving precision problem. - factor = self._get_factor_or_raise_erorr( + factor = self._get_factor_or_raise_error( factor=factor, stock_id=stock_id, start_time=start_time, end_time=end_time ) return (deal_amount * factor + 0.1) // self.trade_unit * self.trade_unit / factor @@ -626,7 +633,7 @@ class Exchange: order.stock_id, order.start_time, order.end_time, order.deal_amount ) trade_val = order.deal_amount * trade_price - trade_cost = trade_val * self.open_cost + trade_cost = max(trade_val * self.open_cost, self.min_cost) else: raise NotImplementedError("order type {} error".format(order.type)) diff --git a/qlib/backtest/executor.py b/qlib/backtest/executor.py index 999e6d8a7..0121a904e 100644 --- a/qlib/backtest/executor.py +++ b/qlib/backtest/executor.py @@ -1,5 +1,6 @@ from abc import abstractclassmethod, abstractmethod import copy +from qlib.backtest.position import BasePosition from qlib.log import get_module_logger from types import GeneratorType from qlib.backtest.account import Account @@ -32,6 +33,7 @@ class BaseExecutor: track_data: bool = False, trade_exchange: Exchange = None, common_infra: CommonInfrastructure = None, + settle_type=BasePosition.ST_NO, **kwargs, ): """ @@ -95,6 +97,8 @@ class BaseExecutor: - trade_exchange : Exchange, optional exchange that provides market info + settle_type : str + Please refer to the docs of BasePosition.settle_start """ self.time_per_step = time_per_step self.indicator_config = indicator_config @@ -104,6 +108,7 @@ class BaseExecutor: self._trade_exchange = trade_exchange self.level_infra = LevelInfrastructure() self.level_infra.reset_infra(common_infra=common_infra) + self._settle_type = settle_type self.reset(start_time=start_time, end_time=end_time, common_infra=common_infra) if common_infra is None: get_module_logger("BaseExecutor").warning(f"`common_infra` is not set for {self}") @@ -235,6 +240,9 @@ class BaseExecutor: if atomic and trade_decision.get_range_limit(default_value=None) is not None: raise ValueError("atomic executor doesn't support specify `range_limit`") + if self._settle_type != BasePosition.ST_NO: + self.trade_account.current.settle_start(self._settle_type) + obj = self._collect_data(trade_decision=trade_decision, level=level) if isinstance(obj, GeneratorType): @@ -256,6 +264,10 @@ class BaseExecutor: ) self.trade_calendar.step() + + if self._settle_type != BasePosition.ST_NO: + self.trade_account.current.settle_commit() + if return_value is not None: return_value.update({"execute_result": res}) return res @@ -366,7 +378,7 @@ class NestedExecutor(BaseExecutor): trade_decision = self._update_trade_decision(trade_decision) if trade_decision.empty() and self._skip_empty_decision: - # give one chance for outer stategy to update the strategy + # give one chance for outer strategy to update the strategy # - For updating some information in the sub executor(the strategy have no knowledge of the inner # executor when generating the decision) break @@ -393,7 +405,7 @@ class NestedExecutor(BaseExecutor): execute_result.extend(_inner_execute_result) inner_order_indicators.append( - self.inner_executor.trade_account.get_trade_indicator().get_order_indicator() + self.inner_executor.trade_account.get_trade_indicator().get_order_indicator(raw=True) ) else: # do nothing and just step forward @@ -409,6 +421,9 @@ class NestedExecutor(BaseExecutor): class SimulatorExecutor(BaseExecutor): """Executor that simulate the true market""" + # TODO: TT_SERIAL & TT_PARAL will be replaced by feature fix_pos now. + # Please remove them in the future. + # available trade_types TT_SERIAL = "serial" ## The orders will be executed serially in a sequence @@ -486,42 +501,22 @@ class SimulatorExecutor(BaseExecutor): execute_result = [] for order in self._get_order_iterator(trade_decision): - if self.trade_exchange.check_order(order) is True: - # execute the order. - # NOTE: The trade_account will be changed in this function - trade_val, trade_cost, trade_price = self.trade_exchange.deal_order( - order, trade_account=self.trade_account + # execute the order. + # NOTE: The trade_account will be changed in this function + trade_val, trade_cost, trade_price = self.trade_exchange.deal_order(order, trade_account=self.trade_account) + execute_result.append((order, trade_val, trade_cost, trade_price)) + if self.verbose: + print( + "[I {:%Y-%m-%d %H:%M:%S}]: {} {}, price {:.2f}, amount {}, deal_amount {}, factor {}, value {:.2f}, cash {:.2f}.".format( + trade_start_time, + "sell" if order.direction == Order.SELL else "buy", + order.stock_id, + trade_price, + order.amount, + order.deal_amount, + order.factor, + trade_val, + self.trade_account.get_cash(), + ) ) - execute_result.append((order, trade_val, trade_cost, trade_price)) - if self.verbose: - if order.direction == Order.SELL: # sell - print( - "[I {:%Y-%m-%d %H:%M:%S}]: sell {}, price {:.2f}, amount {}, deal_amount {}, factor {}, value {:.2f}.".format( - trade_start_time, - order.stock_id, - trade_price, - order.amount, - order.deal_amount, - order.factor, - trade_val, - ) - ) - else: - print( - "[I {:%Y-%m-%d %H:%M:%S}]: buy {}, price {:.2f}, amount {}, deal_amount {}, factor {}, value {:.2f}.".format( - trade_start_time, - order.stock_id, - trade_price, - order.amount, - order.deal_amount, - order.factor, - trade_val, - ) - ) - - else: - if self.verbose: - print("[W {:%Y-%m-%d %H:%M:%S}]: {} wrong.".format(trade_start_time, order.stock_id)) - # do nothing - pass return execute_result, {"trade_info": execute_result} diff --git a/qlib/backtest/high_performance_ds.py b/qlib/backtest/high_performance_ds.py new file mode 100644 index 000000000..c60d3f97e --- /dev/null +++ b/qlib/backtest/high_performance_ds.py @@ -0,0 +1,449 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +import logging +from typing import List, Text, Tuple, Union, Callable, Iterable, Dict +from collections import OrderedDict + +import inspect +import pandas as pd + +from ..utils.resam import resam_ts_data +from ..log import get_module_logger + + +class BaseQuote: + def __init__(self, quote_df: pd.DataFrame): + self.logger = get_module_logger("online operator", level=logging.INFO) + + def get_all_stock(self) -> Iterable: + """return all stock codes + + Return + ------ + Iterable + all stock codes + """ + + raise NotImplementedError(f"Please implement the `get_all_stock` method") + + def get_data( + self, + stock_id: Union[str, list], + start_time: Union[pd.Timestamp, str], + end_time: Union[pd.Timestamp, str], + fields: Union[str, list] = None, + method: Union[str, Callable] = None, + ) -> Union[None, float, pd.Series, pd.DataFrame]: + """get the specific fields of stock data during start time and end_time, + and apply method to the data. + + Example: + .. code-block:: + $close $volume + instrument datetime + SH600000 2010-01-04 86.778313 16162960.0 + 2010-01-05 87.433578 28117442.0 + 2010-01-06 85.713585 23632884.0 + 2010-01-07 83.788803 20813402.0 + 2010-01-08 84.730675 16044853.0 + + SH600655 2010-01-04 2699.567383 158193.328125 + 2010-01-08 2612.359619 77501.406250 + 2010-01-11 2712.982422 160852.390625 + 2010-01-12 2788.688232 164587.937500 + 2010-01-13 2790.604004 145460.453125 + + print(get_data(stock_id=["SH600000", "SH600655"], start_time="2010-01-04", end_time="2010-01-05", fields=["$close", "$volume"], method="last")) + + $close $volume + instrument + SH600000 87.433578 28117442.0 + SH600655 2699.567383 158193.328125 + + print(get_data(stock_id="SH600000", start_time="2010-01-04", end_time="2010-01-05", fields=["$close", "$volume"], method="last")) + + $close 87.433578 + $volume 28117442.0 + + print(get_data(stock_id="SH600000", start_time="2010-01-04", end_time="2010-01-05", fields="$close", method="last")) + + 87.433578 + + Parameters + ---------- + stock_id: Union[str, list] + start_time : Union[pd.Timestamp, str] + closed start time for backtest + end_time : Union[pd.Timestamp, str] + closed end time for backtest + fields : Union[str, List] + the columns of data to fetch + method : Union[str, Callable] + the method apply to data. + e.g [None, "last", "all", "sum", "mean", "any", qlib/utils/resam.py/ts_data_last] + + Return + ---------- + Union[None, float, pd.Series, pd.DataFrame] + The resampled DataFrame/Series/value, return None when the resampled data is empty. + """ + + raise NotImplementedError(f"Please implement the `get_data` method") + + +class PandasQuote(BaseQuote): + def __init__(self, quote_df: pd.DataFrame): + super().__init__(quote_df=quote_df) + quote_dict = {} + for stock_id, stock_val in quote_df.groupby(level="instrument"): + quote_dict[stock_id] = stock_val.droplevel(level="instrument") + self.data = quote_dict + + def get_all_stock(self): + return self.data.keys() + + def get_data(self, stock_id, start_time, end_time, fields=None, method=None): + if fields is None: + return resam_ts_data(self.data[stock_id], start_time, end_time, method=method) + elif isinstance(fields, (str, list)): + return resam_ts_data(self.data[stock_id][fields], start_time, end_time, method=method) + else: + raise ValueError(f"fields must be None, str or list") + + +class BaseSingleMetric: + """ + The data structure of the single metric. + The following methods are used for computing metrics in one indicator. + """ + + def __init__(self, metric: Union[dict, pd.Series]): + raise NotImplementedError(f"Please implement the `__init__` method") + + def __add__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + raise NotImplementedError(f"Please implement the `__add__` method") + + def __radd__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + return self + other + + def __sub__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + raise NotImplementedError(f"Please implement the `__sub__` method") + + def __rsub__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + raise NotImplementedError(f"Please implement the `__rsub__` method") + + def __mul__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + raise NotImplementedError(f"Please implement the `__mul__` method") + + def __truediv__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + raise NotImplementedError(f"Please implement the `__truediv__` method") + + def __eq__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + raise NotImplementedError(f"Please implement the `__eq__` method") + + def __gt__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + raise NotImplementedError(f"Please implement the `__gt__` method") + + def __lt__(self, other: Union["BaseSingleMetric", int, float]) -> "BaseSingleMetric": + raise NotImplementedError(f"Please implement the `__lt__` method") + + def __len__(self) -> int: + raise NotImplementedError(f"Please implement the `__len__` method") + + def sum(self) -> float: + raise NotImplementedError(f"Please implement the `sum` method") + + def mean(self) -> float: + raise NotImplementedError(f"Please implement the `mean` method") + + def count(self) -> int: + """Return the count of the single metric, NaN is not included.""" + + raise NotImplementedError(f"Please implement the `count` method") + + def abs(self) -> "BaseSingleMetric": + raise NotImplementedError(f"Please implement the `abs` method") + + def astype(self, type: type) -> "BaseSingleMetric": + raise NotImplementedError(f"Please implement the `astype` method") + + @property + def empty(self) -> bool: + """If metric is empyt, return True.""" + + raise NotImplementedError(f"Please implement the `empty` method") + + def add(self, other: "BaseSingleMetric", fill_value: float = None) -> "BaseSingleMetric": + """Replace np.NaN with fill_value in two metrics and add them.""" + + raise NotImplementedError(f"Please implement the `add` method") + + def replace(self, replace_dict: dict) -> "BaseSingleMetric": + """Replace the value of metric according to replace_dict.""" + + raise NotImplementedError(f"Please implement the `replace` method") + + def apply(self, func: dict) -> "BaseSingleMetric": + """Replace the value of metric with func(metric). + Currently, the func is only qlib/backtest/order/Order.parse_dir. + """ + + raise NotImplementedError(f"Please implement the 'apply' method") + + +class BaseOrderIndicator: + """ + The data structure of order indicator. + !!!NOTE: There are two ways to organize the data structure. Please choose a better way. + 1. One way is using BaseSingleMetric to represent each metric. For example, the data + structure of PandasOrderIndicator is Dict[str, PandasSingleMetric]. It uses + PandasSingleMetric based on pd.Series to represent each metric. + 2. The another way doesn't use BaseSingleMetric to represent each metric. The data + structure of PandasOrderIndicator is a whole matrix. It means you are not neccesary + to inherit the BaseSingleMetric. + """ + + def assign(self, col: str, metric: Union[dict, pd.Series]): + """assign one metric. + + Parameters + ---------- + col : str + the metric name of one metric. + metric : Union[dict, pd.Series] + the metric data. + """ + + pass + + def transfer(self, func: Callable, new_col: str = None) -> Union[None, BaseSingleMetric]: + """compute new metric with existing metrics. + + Parameters + ---------- + func : Callable + the func of computing new metric. + the kwargs of func will be replaced with metric data by name in this function. + e.g. + def func(pa): + return (pa > 0).astype(int).sum() / pa.count() + new_col : str, optional + New metric will be assigned in the data if new_col is not None, by default None. + + Return + ---------- + BaseSingleMetric + new metric. + """ + + pass + + def get_metric_series(self, metric: str) -> pd.Series: + """return the single metric with pd.Series format. + + Parameters + ---------- + metric : str + the metric name. + + Return + ---------- + pd.Series + the single metric. + If there is no metric name in the data, return pd.Series(). + """ + + pass + + @staticmethod + def sum_all_indicators( + indicators: list, metrics: Union[str, List[str]], fill_value: float = None + ) -> Dict[str, BaseSingleMetric]: + """sum indicators with the same metrics. + + Parameters + ---------- + indicators : List[BaseOrderIndicator] + the list of all inner indicators. + metrics : Union[str, List[str]] + all metrics needs ot be sumed. + fill_value : float, optional + fill np.NaN with value. By default None. + + Return + ---------- + Dict[str: PandasSingleMetric] + a dict of metric name and data. + """ + + pass + + def to_series(self) -> Dict[Text, pd.Series]: + """return the metrics as pandas series + + for example: { "ffr": + SH600068 NaN + SH600079 1.0 + SH600266 NaN + ... + SZ300692 NaN + SZ300719 NaN, + ... + } + """ + raise NotImplementedError(f"Please implement the `to_series` method") + + +class PandasSingleMetric: + """Each SingleMetric is based on pd.Series.""" + + def __init__(self, metric: Union[dict, pd.Series]): + if isinstance(metric, dict): + self.metric = pd.Series(metric) + elif isinstance(metric, pd.Series): + self.metric = metric + else: + raise ValueError(f"metric must be dict or pd.Series") + + def __add__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(self.metric + other) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(self.metric + other.metric) + else: + return NotImplemented + + def __sub__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(self.metric - other) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(self.metric - other.metric) + else: + return NotImplemented + + def __rsub__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(other - self.metric) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(other.metric - self.metric) + else: + return NotImplemented + + def __mul__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(self.metric * other) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(self.metric * other.metric) + else: + return NotImplemented + + def __truediv__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(self.metric / other) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(self.metric / other.metric) + else: + return NotImplemented + + def __eq__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(self.metric == other) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(self.metric == other.metric) + else: + return NotImplemented + + def __gt__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(self.metric < other) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(self.metric < other.metric) + else: + return NotImplemented + + def __lt__(self, other): + if isinstance(other, (int, float)): + return PandasSingleMetric(self.metric > other) + elif isinstance(other, PandasSingleMetric): + return PandasSingleMetric(self.metric > other.metric) + else: + return NotImplemented + + def __len__(self): + return len(self.metric) + + def sum(self): + return self.metric.sum() + + def mean(self): + return self.metric.mean() + + def count(self): + return self.metric.count() + + def abs(self): + return PandasSingleMetric(self.metric.abs()) + + def astype(self, type): + return PandasSingleMetric(self.metric.astype(type)) + + @property + def empty(self): + return self.metric.empty + + def add(self, other, fill_value=None): + return PandasSingleMetric(self.metric.add(other.metric, fill_value=fill_value)) + + def replace(self, replace_dict: dict): + return PandasSingleMetric(self.metric.replace(replace_dict)) + + def apply(self, func: Callable): + return PandasSingleMetric(self.metric.apply(func)) + + +class PandasOrderIndicator(BaseOrderIndicator): + """ + The data structure is OrderedDict(str: PandasSingleMetric). + Each PandasSingleMetric based on pd.Series is one metric. + Str is the name of metric. + """ + + def __init__(self): + self.data: Dict[str, PandasSingleMetric] = OrderedDict() + + def assign(self, col: str, metric: Union[dict, pd.Series]): + self.data[col] = PandasSingleMetric(metric) + + def transfer(self, func: Callable, new_col: str = None) -> Union[None, PandasSingleMetric]: + func_sig = inspect.signature(func).parameters.keys() + func_kwargs = {sig: self.data[sig] for sig in func_sig} + tmp_metric = func(**func_kwargs) + if new_col is not None: + self.data[new_col] = tmp_metric + else: + return tmp_metric + + def get_metric_series(self, metric: str) -> Union[pd.Series]: + if metric in self.data: + return self.data[metric].metric + else: + return pd.Series() + + @staticmethod + def sum_all_indicators( + indicators: list, metrics: Union[str, List[str]], fill_value=None + ) -> Dict[str, PandasSingleMetric]: + metric_dict = {} + if isinstance(metrics, str): + metrics = [metrics] + for metric in metrics: + tmp_metric = PandasSingleMetric({}) + for indicator in indicators: + tmp_metric = tmp_metric.add(indicator.data[metric], fill_value) + metric_dict[metric] = tmp_metric.metric + return metric_dict + + def to_series(self): + return {k: v.metric for k, v in self.data.items()} diff --git a/qlib/backtest/order.py b/qlib/backtest/order.py index 816bb6fa0..abd02554a 100644 --- a/qlib/backtest/order.py +++ b/qlib/backtest/order.py @@ -59,12 +59,19 @@ class Order: # 3) results # - users should not care about these values # - they are set by the backtest system after finishing the results. + # What the value should be about in all kinds of cases + # - not tradable: the deal_amount == 0 , factor is None + # - the stock is suspended and the entire order fails. No cost for this order + # - dealed or partially dealed: deal_amount >= 0 and factor is not None deal_amount: Optional[float] = None # `deal_amount` is a non-negative value factor: Optional[float] = None + # TODO: + # a status field to indicate the dealing result of the order + # FIXME: # for compatible now. - # Plese remove them in the future + # Please remove them in the future SELL: ClassVar[OrderDir] = OrderDir.SELL BUY: ClassVar[OrderDir] = OrderDir.BUY @@ -72,6 +79,7 @@ class Order: if self.direction not in {Order.SELL, Order.BUY}: raise NotImplementedError("direction not supported, `Order.SELL` for sell, `Order.BUY` for buy") self.deal_amount = 0 + self.factor = None @property def amount_delta(self) -> float: diff --git a/qlib/backtest/position.py b/qlib/backtest/position.py index 7c32edc81..e4f1ab40c 100644 --- a/qlib/backtest/position.py +++ b/qlib/backtest/position.py @@ -4,10 +4,14 @@ import copy import pathlib -from typing import Dict, List +from typing import Dict, List, Union + import pandas as pd +from datetime import timedelta import numpy as np + from .order import Order +from ..data.data import D class BasePosition: @@ -16,8 +20,8 @@ class BasePosition: Please refer to the `Position` class for the position """ - def __init__(self, cash=0.0, *args, **kwargs) -> None: - pass + def __init__(self, cash=0.0, *args, **kwargs): + self._settle_type = self.ST_NO def skip_update(self) -> bool: """ @@ -120,13 +124,16 @@ class BasePosition: """ raise NotImplementedError(f"Please implement the `get_stock_amount` method") - def get_cash(self) -> float: + def get_cash(self, include_settle: bool = False) -> float: """ Returns ------- float: - the cash in position + the available(tradable) cash in position + include_settle: + will the unsettled(delayed) cash included + Default: not include those unavailable cash """ raise NotImplementedError(f"Please implement the `get_cash` method") @@ -184,6 +191,37 @@ class BasePosition: """ raise NotImplementedError(f"Please implement the `add_count_all` method") + ST_CASH = "cash" + ST_NO = None + + def settle_start(self, settle_type: str): + """ + settlement start + It will act like start and commit a transaction + + Parameters + ---------- + settle_type : str + Should we make delay the settlement in each execution (each execution will make the executor a step forward) + - "cash": make the cash settlement delayed. + - The cash you get can't be used in current step (e.g. you can't sell a stock to get cash to buy another + stock) + - None: not settlement mechanism + - TODO: other assets will be supported in the future. + """ + raise NotImplementedError(f"Please implement the `settle_conf` method") + + def settle_commit(self): + """ + settlement commit + + Parameters + ---------- + settle_type : str + please refer to the documents of Executor + """ + raise NotImplementedError(f"Please implement the `settle_commit` method") + class Position(BasePosition): """Position @@ -199,7 +237,22 @@ class Position(BasePosition): } """ - def __init__(self, cash=0, position_dict={}): + def __init__(self, cash: float = 0, position_dict: Dict[str, Dict[str, float]] = {}): + """Init position by cash and position_dict. + + Parameters + ---------- + start_time : + the start time of backtest. It's for filling the initial value of stocks. + cash : float, optional + initial cash in account, by default 0 + position_dict : Dict[stock_id, {"amount": int, "price"(optional): float}], optional + initial stocks with parameters amount and price, + if there is no price key in the dict of stocks, it will be filled by _fill_stock_value. + by default {}. + """ + super().__init__() + # NOTE: The position dict must be copied!!! # Otherwise the initial value self.init_cash = cash @@ -207,6 +260,50 @@ class Position(BasePosition): self.position["cash"] = cash self.position["now_account_value"] = self.calculate_value() + def _fill_stock_value( + self, position_dict: dict, start_time: Union[str, pd.Timestamp], freq: str, last_days: int = 30 + ): + """fill the stock value by the close price of latest last_days from qlib. + + Parameters + ---------- + position_dict : Dict[stock_id, {"amount": int, "price": float}] + initial holding stocks. + start_time : + the start time of backtest. + last_days : int, optional + the days to get the latest close price, by default 30. + + Return + ---------- + Dict[stock_id, {"amount": int, "price": float}] + initial holding stocks with filled price. + """ + + stock_list = [] + for stock in position_dict: + if ("price" not in position_dict[stock]) or (position_dict[stock]["price"] is None): + stock_list.append(stock) + + if len(stock_list) == 0: + return position_dict + + start_time = pd.Timestamp(start_time) + # note that start time is 2020-01-01 00:00:00 if raw start time is "2020-01-01" + price_end_time = start_time + price_start_time = start_time - timedelta(days=last_days) + price_df = D.features( + stock_list, ["$close"], price_start_time, price_end_time, freq=freq, disk_cache=True + ).dropna() + price_dict = price_df.groupby(["instrument"]).tail(1).reset_index(level=1, drop=True)["$close"].to_dict() + + if len(price_dict) < len(stock_list): + raise ValueError(f"there is no close price in qlib") + + for stock in stock_list: + position_dict[stock]["price"] = price_dict[stock] + return position_dict + def _init_stock(self, stock_id, amount, price=None): """ initialization the stock in current position @@ -250,7 +347,13 @@ class Position(BasePosition): elif abs(self.position[stock_id]["amount"]) <= 1e-5: self._del_stock(stock_id) - self.position["cash"] += trade_val - cost + new_cash = trade_val - cost + if self._settle_type == self.ST_CASH: + self.position["cash_delay"] += new_cash + elif self._settle_type == self.ST_NO: + self.position["cash"] += new_cash + else: + raise NotImplementedError(f"This type of input is not supported") def _del_stock(self, stock_id): del self.position[stock_id] @@ -278,9 +381,6 @@ class Position(BasePosition): def update_stock_weight(self, stock_id, weight): self.position[stock_id]["weight"] = weight - def update_cash(self, cash): - self.position["cash"] = cash - def calculate_stock_value(self): stock_list = self.get_stock_list() value = 0 @@ -290,11 +390,11 @@ class Position(BasePosition): def calculate_value(self): value = self.calculate_stock_value() - value += self.position["cash"] + value += self.position["cash"] + self.position.get("cash_delay", 0.0) return value def get_stock_list(self): - stock_list = list(set(self.position.keys()) - {"cash", "now_account_value"}) + stock_list = list(set(self.position.keys()) - {"cash", "now_account_value", "cash_delay"}) return stock_list def get_stock_price(self, code): @@ -313,8 +413,11 @@ class Position(BasePosition): def get_stock_weight(self, code): return self.position[code]["weight"] - def get_cash(self): - return self.position["cash"] + def get_cash(self, include_settle=False): + cash = self.position["cash"] + if include_settle: + cash += self.position.get("cash_delay", 0.0) + return cash def get_stock_amount_dict(self): """generate stock amount dict {stock_id : amount of stock}""" @@ -326,7 +429,7 @@ class Position(BasePosition): def get_stock_weight_dict(self, only_stock=False): """get_stock_weight_dict - generate stock weight fict {stock_id : value weight of stock in the position} + generate stock weight dict {stock_id : value weight of stock in the position} it is meaningful in the beginning or the end of each trade date :param only_stock: If only_stock=True, the weight of each stock in total stock will be returned @@ -355,49 +458,20 @@ class Position(BasePosition): for stock_code, weight in weight_dict.items(): self.update_stock_weight(stock_code, weight) - def save_position(self, path): - path = pathlib.Path(path) - p = copy.deepcopy(self.position) - cash = pd.Series(dtype=float) - cash["init_cash"] = self.init_cash - cash["cash"] = p["cash"] - cash["now_account_value"] = p["now_account_value"] - del p["cash"] - del p["now_account_value"] - positions = pd.DataFrame.from_dict(p, orient="index") - with pd.ExcelWriter(path) as writer: - positions.to_excel(writer, sheet_name="position") - cash.to_excel(writer, sheet_name="info") + def settle_start(self, settle_type): + assert self._settle_type == self.ST_NO, "Currently, settlement can't be nested!!!!!" + self._settle_type = settle_type + if settle_type == self.ST_CASH: + self.position["cash_delay"] = 0.0 - def load_position(self, path): - """load position information from a file - should have format below - sheet "position" - columns: ['stock', f'count_{bar}', 'amount', 'price', 'weight'] - f'count_{bar}': , - 'amount': , - 'price': , - 'weight': , - - sheet "cash" - index: ['init_cash', 'cash', 'now_account_value'] - 'init_cash': , - 'cash': , - 'now_account_value': - """ - path = pathlib.Path(path) - positions = pd.read_excel(open(path, "rb"), sheet_name="position", index_col=0) - cash_record = pd.read_excel(open(path, "rb"), sheet_name="info", index_col=0) - positions = positions.to_dict(orient="index") - init_cash = cash_record.loc["init_cash"].values[0] - cash = cash_record.loc["cash"].values[0] - now_account_value = cash_record.loc["now_account_value"].values[0] - # assign values - self.position = {} - self.init_cash = init_cash - self.position = positions - self.position["cash"] = cash - self.position["now_account_value"] = now_account_value + def settle_commit(self): + if self._settle_type != self.ST_NO: + if self._settle_type == self.ST_CASH: + self.position["cash"] += self.position["cash_delay"] + del self.position["cash_delay"] + else: + raise NotImplementedError(f"This type of input is not supported") + self._settle_type = self.ST_NO class InfPosition(BasePosition): @@ -440,7 +514,7 @@ class InfPosition(BasePosition): def get_stock_amount(self, code) -> float: return np.inf - def get_cash(self) -> float: + def get_cash(self, include_settle=False) -> float: return np.inf def get_stock_amount_dict(self) -> Dict: @@ -454,3 +528,9 @@ class InfPosition(BasePosition): def update_weight_all(self): raise NotImplementedError(f"InfPosition doesn't support update_weight_all") + + def settle_start(self, settle_type: str): + pass + + def settle_commit(self): + pass diff --git a/qlib/backtest/report.py b/qlib/backtest/report.py index 84cae2568..2d188dd18 100644 --- a/qlib/backtest/report.py +++ b/qlib/backtest/report.py @@ -5,8 +5,7 @@ from collections import OrderedDict from logging import warning import pathlib -from typing import Dict, List, Tuple -import warnings +from typing import Dict, List, Tuple, Union, Callable import numpy as np import pandas as pd @@ -17,10 +16,12 @@ from qlib.backtest.exchange import Exchange from qlib.backtest.order import BaseTradeDecision, Order, OrderDir from qlib.backtest.utils import TradeCalendarManager +from .high_performance_ds import PandasOrderIndicator from ..data import D from ..tests.config import CSI300_BENCH from ..utils.resam import get_higher_eq_freq_feature, resam_ts_data from ..utils.time import Freq +from .order import IdxTradeRange class Report: @@ -62,6 +63,7 @@ class Report: - Else, it represent end time of benchmark, by default None """ + self.init_vars() self.init_bench(freq=freq, benchmark_config=benchmark_config) @@ -253,10 +255,12 @@ class Indicator: """ - def __init__(self): + def __init__(self, order_indicator_cls=PandasOrderIndicator): + self.order_indicator_cls = order_indicator_cls + # order indicator is metrics for a single order for a specific step self.order_indicator_his = OrderedDict() - self.order_indicator: Dict[str, pd.Series] = OrderedDict() + self.order_indicator = self.order_indicator_cls() # trade indicator is metrics for all orders for a specific step self.trade_indicator_his = OrderedDict() @@ -266,13 +270,13 @@ class Indicator: # def reset(self, trade_calendar: TradeCalendarManager): def reset(self): - self.order_indicator = OrderedDict() + self.order_indicator = self.order_indicator_cls() self.trade_indicator = OrderedDict() # self._trade_calendar = trade_calendar def record(self, trade_start_time): - self.order_indicator_his[trade_start_time] = self.order_indicator - self.trade_indicator_his[trade_start_time] = self.trade_indicator + self.order_indicator_his[trade_start_time] = self.get_order_indicator() + self.trade_indicator_his[trade_start_time] = self.get_trade_indicator() def _update_order_trade_info(self, trade_info: list): amount = dict() @@ -281,6 +285,7 @@ class Indicator: trade_value = dict() trade_cost = dict() trade_dir = dict() + pa = dict() for order, _trade_val, _trade_cost, _trade_price in trade_info: amount[order.stock_id] = order.amount_delta @@ -289,66 +294,64 @@ class Indicator: trade_value[order.stock_id] = _trade_val * order.sign trade_cost[order.stock_id] = _trade_cost trade_dir[order.stock_id] = order.direction + # The PA in the innermost layer is meanless + pa[order.stock_id] = 0 - self.order_indicator["amount"] = self.order_indicator["inner_amount"] = pd.Series(amount) - self.order_indicator["deal_amount"] = pd.Series(deal_amount) + self.order_indicator.assign("amount", amount) + self.order_indicator.assign("inner_amount", amount) + self.order_indicator.assign("deal_amount", deal_amount) # NOTE: trade_price and baseline price will be same on the lowest-level - self.order_indicator["trade_price"] = pd.Series(trade_price) - self.order_indicator["trade_value"] = pd.Series(trade_value) - self.order_indicator["trade_cost"] = pd.Series(trade_cost) - self.order_indicator["trade_dir"] = pd.Series(trade_dir) + self.order_indicator.assign("trade_price", trade_price) + self.order_indicator.assign("trade_value", trade_value) + self.order_indicator.assign("trade_cost", trade_cost) + self.order_indicator.assign("trade_dir", trade_dir) + self.order_indicator.assign("pa", pa) def _update_order_fulfill_rate(self): - self.order_indicator["ffr"] = self.order_indicator["deal_amount"] / self.order_indicator["amount"] + def func(deal_amount, amount): + # deal_amount is np.NaN when there is no inner decision. So full fill rate is 0. + tmp_deal_amount = deal_amount.replace({np.NaN: 0}) + return tmp_deal_amount / amount - def _update_order_price_advantage(self): - # NOTE: - # trade_price and baseline price will be same on the lowest-level - # So Pa should be 0 or do nothing - self.order_indicator["pa"] = 0 + self.order_indicator.transfer(func, "ffr") def update_order_indicators(self, trade_info: list): self._update_order_trade_info(trade_info=trade_info) self._update_order_fulfill_rate() - self._update_order_price_advantage() def _agg_order_trade_info(self, inner_order_indicators: List[Dict[str, pd.Series]]): - inner_amount = pd.Series() - deal_amount = pd.Series() - trade_price = pd.Series() - trade_value = pd.Series() - trade_cost = pd.Series() - trade_dir = pd.Series() - for _order_indicator in inner_order_indicators: - inner_amount = inner_amount.add(_order_indicator["inner_amount"], fill_value=0) - deal_amount = deal_amount.add(_order_indicator["deal_amount"], fill_value=0) - trade_price = trade_price.add( - _order_indicator["trade_price"] * _order_indicator["deal_amount"], fill_value=0 - ) - trade_value = trade_value.add(_order_indicator["trade_value"], fill_value=0) - trade_cost = trade_cost.add(_order_indicator["trade_cost"], fill_value=0) - trade_dir = trade_dir.add(_order_indicator["trade_dir"], fill_value=0) + # calculate total trade amount with each inner order indicator. + def trade_amount_func(deal_amount, trade_price): + return deal_amount * trade_price - trade_dir = trade_dir.apply(Order.parse_dir) + for indicator in inner_order_indicators: + indicator.transfer(trade_amount_func, "trade_price") - self.order_indicator["inner_amount"] = inner_amount - self.order_indicator["deal_amount"] = deal_amount - trade_price /= self.order_indicator["deal_amount"] - self.order_indicator["trade_price"] = trade_price - self.order_indicator["trade_value"] = trade_value - self.order_indicator["trade_cost"] = trade_cost - self.order_indicator["trade_dir"] = trade_dir + # sum inner order indicators with same metric. + all_metric = ["inner_amount", "deal_amount", "trade_price", "trade_value", "trade_cost", "trade_dir"] + metric_dict = self.order_indicator_cls.sum_all_indicators(inner_order_indicators, all_metric, fill_value=0) + for metric in metric_dict: + self.order_indicator.assign(metric, metric_dict[metric]) + + def func(trade_price, deal_amount): + # trade_price is np.NaN instead of inf when deal_amount is zero. + tmp_deal_amount = deal_amount.replace({0: np.NaN}) + return trade_price / tmp_deal_amount + + self.order_indicator.transfer(func, "trade_price") + + def func_apply(trade_dir): + return trade_dir.apply(Order.parse_dir) + + self.order_indicator.transfer(func_apply, "trade_dir") def _update_trade_amount(self, outer_trade_decision: BaseTradeDecision): # NOTE: these indicator is designed for order execution, so the decision: List[Order] = outer_trade_decision.get_decision() - if decision is None: - self.order_indicator["amount"] = pd.Series() + if len(decision) == 0: + self.order_indicator.assign("amount", {}) else: - self.order_indicator["amount"] = pd.Series({order.stock_id: order.amount_delta for order in decision}) - - def _agg_order_fulfill_rate(self): - self.order_indicator["ffr"] = self.order_indicator["deal_amount"] / self.order_indicator["amount"] + self.order_indicator.assign("amount", {order.stock_id: order.amount_delta for order in decision}) def _get_base_vol_pri( self, @@ -368,10 +371,12 @@ class Indicator: agg = pa_config.get("agg", "twap").lower() price = pa_config.get("price", "deal_price").lower() - # NOTE: IndexTradeRange is not supported!!!!! Because inner index is not available - trade_start_time, trade_end_time = decision.trade_range.clip_time_range( - start_time=trade_start_time, end_time=trade_end_time - ) + if decision.trade_range is not None: + if isinstance(decision.trade_range, IdxTradeRange): + raise TypeError(f"IdxTradeRange is not supported") + trade_start_time, trade_end_time = decision.trade_range.clip_time_range( + start_time=trade_start_time, end_time=trade_end_time + ) if price == "deal_price": price_s = trade_exchange.get_deal_price( @@ -429,17 +434,16 @@ class Indicator: "price": "$close", # TODO: this is not supported now!!!!! # default to use deal price of the exchange } - """ # TODO: I think there are potentials to be optimized - trade_dir = self.order_indicator["trade_dir"] + trade_dir = self.order_indicator.get_metric_series("trade_dir") if len(trade_dir) > 0: bp_all, bv_all = [], [] # for oi, (dec, start, end) in zip(inner_order_indicators, decision_list): - bp_s = oi.get("base_price", pd.Series()).reindex(trade_dir.index) - bv_s = oi.get("base_volume", pd.Series()).reindex(trade_dir.index) + bp_s = oi.get_metric_series("base_price").reindex(trade_dir.index) + bv_s = oi.get_metric_series("base_volume").reindex(trade_dir.index) bp_new, bv_new = {}, {} for pr, v, (inst, direction) in zip(bp_s.values, bv_s.values, trade_dir.items()): if np.isnan(pr): @@ -463,17 +467,24 @@ class Indicator: bp_all = pd.concat(bp_all, axis=1) bv_all = pd.concat(bv_all, axis=1) - self.order_indicator["base_volume"] = bv_all.sum(axis=1) - self.order_indicator["base_price"] = (bp_all * bv_all).sum(axis=1) / self.order_indicator["base_volume"] + base_volume = bv_all.sum(axis=1) + self.order_indicator.assign("base_volume", base_volume) + self.order_indicator.assign("base_price", (bp_all * bv_all).sum(axis=1) / base_volume) def _agg_order_price_advantage(self): - if not self.order_indicator["trade_price"].empty: - sign = 1 - self.order_indicator["trade_dir"] * 2 - self.order_indicator["pa"] = sign * ( - self.order_indicator["trade_price"] / self.order_indicator["base_price"] - 1 - ) + def if_empty_func(trade_price): + return trade_price.empty + + if_empty = self.order_indicator.transfer(if_empty_func) + if not if_empty: + + def func(trade_dir, trade_price, base_price): + sign = 1 - trade_dir * 2 + return sign * (trade_price / base_price - 1) + + self.order_indicator.transfer(func, "pa") else: - self.order_indicator["pa"] = pd.Series() + self.order_indicator.assign("pa", {}) def agg_order_indicators( self, @@ -485,55 +496,74 @@ class Indicator: ): self._agg_order_trade_info(inner_order_indicators) self._update_trade_amount(outer_trade_decision) - self._agg_order_fulfill_rate() + self._update_order_fulfill_rate() pa_config = indicator_config.get("pa_config", {}) - self._agg_base_price(inner_order_indicators, decision_list, trade_exchange, pa_config=pa_config) + self._agg_base_price(inner_order_indicators, decision_list, trade_exchange, pa_config=pa_config) # TODO self._agg_order_price_advantage() def _cal_trade_fulfill_rate(self, method="mean"): if method == "mean": - return self.order_indicator["ffr"].mean() + + def func(ffr): + return ffr.mean() + elif method == "amount_weighted": - weights = self.order_indicator["deal_amount"].abs() - return (self.order_indicator["ffr"] * weights).sum() / weights.sum() + + def func(ffr, deal_amount): + return (ffr * deal_amount.abs()).sum() / (deal_amount.abs().sum()) + elif method == "value_weighted": - weights = self.order_indicator["trade_value"].abs() - return (self.order_indicator["ffr"] * weights).sum() / weights.sum() + + def func(ffr, trade_value): + return (ffr * trade_value.abs()).sum() / (trade_value.abs().sum()) + else: raise ValueError(f"method {method} is not supported!") + return self.order_indicator.transfer(func) def _cal_trade_price_advantage(self, method="mean"): - pa_order = self.order_indicator["pa"] - if isinstance(pa_order, (int, float)): - # pa from atomic executor - return pa_order - if method == "mean": - return pa_order.mean() + + def func(pa): + return pa.mean() + elif method == "amount_weighted": - weights = self.order_indicator["deal_amount"].abs() - return (pa_order * weights).sum() / weights.sum() + + def func(pa, deal_amount): + return (pa * deal_amount.abs()).sum() / (deal_amount.abs().sum()) + elif method == "value_weighted": - weights = self.order_indicator["trade_value"].abs() - return (pa_order * weights).sum() / weights.sum() + + def func(pa, trade_value): + return (pa * trade_value.abs()).sum() / (trade_value.abs().sum()) + else: raise ValueError(f"method {method} is not supported!") + return self.order_indicator.transfer(func) def _cal_trade_positive_rate(self): - pa_order = self.order_indicator["pa"] - if isinstance(pa_order, (int, float)): - # pa from atomic executor - return pa_order - return (pa_order > 0).astype(int).sum() / pa_order.count() + def func(pa): + return (pa > 0).astype(int).sum() / pa.count() + + return self.order_indicator.transfer(func) def _cal_deal_amount(self): - return self.order_indicator["deal_amount"].abs().sum() + def func(deal_amount): + return deal_amount.abs().sum() + + return self.order_indicator.transfer(func) def _cal_trade_value(self): - return self.order_indicator["trade_value"].abs().sum() + def func(trade_value): + return trade_value.abs().sum() + + return self.order_indicator.transfer(func) def _cal_trade_order_count(self): - return self.order_indicator["amount"].count() + def func(amount): + return amount.count() + + return self.order_indicator.transfer(func) def cal_trade_indicators(self, trade_start_time, freq, indicator_config={}): show_indicator = indicator_config.get("show_indicator", False) @@ -558,8 +588,10 @@ class Indicator: ) ) - def get_order_indicator(self): - return self.order_indicator + def get_order_indicator(self, raw: bool = False): + if raw: + return self.order_indicator + return self.order_indicator.to_series() def get_trade_indicator(self): return self.trade_indicator diff --git a/qlib/contrib/model/pytorch_localformer.py b/qlib/contrib/model/pytorch_localformer.py new file mode 100644 index 000000000..2ec56067f --- /dev/null +++ b/qlib/contrib/model/pytorch_localformer.py @@ -0,0 +1,331 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +from __future__ import division +from __future__ import print_function + +import os +import numpy as np +import pandas as pd +from typing import Text, Union +import copy +import math +from ...utils import get_or_create_path +from ...log import get_module_logger + +import torch +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader + +from .pytorch_utils import count_parameters +from ...model.base import Model +from ...data.dataset import DatasetH, TSDatasetH +from ...data.dataset.handler import DataHandlerLP +from torch.nn.modules.container import ModuleList + +# qrun examples/benchmarks/Localformer/workflow_config_localformer_Alpha360.yaml ” + + +class LocalformerModel(Model): + def __init__( + self, + d_feat: int = 20, + d_model: int = 64, + batch_size: int = 2048, + nhead: int = 2, + num_layers: int = 2, + dropout: float = 0, + n_epochs=100, + lr=0.0001, + metric="", + early_stop=5, + loss="mse", + optimizer="adam", + reg=1e-3, + n_jobs=10, + GPU=0, + seed=None, + **kwargs + ): + + # set hyper-parameters. + self.d_model = d_model + self.dropout = dropout + self.n_epochs = n_epochs + self.lr = lr + self.reg = reg + self.metric = metric + self.batch_size = batch_size + self.early_stop = early_stop + self.optimizer = optimizer.lower() + self.loss = loss + self.n_jobs = n_jobs + self.device = torch.device("cuda:%d" % GPU if torch.cuda.is_available() and GPU >= 0 else "cpu") + self.seed = seed + self.logger = get_module_logger("TransformerModel") + self.logger.info("Naive Transformer:" "\nbatch_size : {}" "\ndevice : {}".format(self.batch_size, self.device)) + + if self.seed is not None: + np.random.seed(self.seed) + torch.manual_seed(self.seed) + + self.model = Transformer(d_feat, d_model, nhead, num_layers, dropout, self.device) + if optimizer.lower() == "adam": + self.train_optimizer = optim.Adam(self.model.parameters(), lr=self.lr, weight_decay=self.reg) + elif optimizer.lower() == "gd": + self.train_optimizer = optim.SGD(self.model.parameters(), lr=self.lr, weight_decay=self.reg) + else: + raise NotImplementedError("optimizer {} is not supported!".format(optimizer)) + + self.fitted = False + self.model.to(self.device) + + @property + def use_gpu(self): + return self.device != torch.device("cpu") + + def mse(self, pred, label): + loss = (pred.float() - label.float()) ** 2 + return torch.mean(loss) + + def loss_fn(self, pred, label): + mask = ~torch.isnan(label) + + if self.loss == "mse": + return self.mse(pred[mask], label[mask]) + + raise ValueError("unknown loss `%s`" % self.loss) + + def metric_fn(self, pred, label): + + mask = torch.isfinite(label) + + if self.metric == "" or self.metric == "loss": + return -self.loss_fn(pred[mask], label[mask]) + + raise ValueError("unknown metric `%s`" % self.metric) + + def train_epoch(self, x_train, y_train): + + x_train_values = x_train.values + y_train_values = np.squeeze(y_train.values) + + self.model.train() + + indices = np.arange(len(x_train_values)) + np.random.shuffle(indices) + + for i in range(len(indices))[:: self.batch_size]: + + if len(indices) - i < self.batch_size: + break + + feature = torch.from_numpy(x_train_values[indices[i : i + self.batch_size]]).float().to(self.device) + label = torch.from_numpy(y_train_values[indices[i : i + self.batch_size]]).float().to(self.device) + + pred = self.model(feature) + loss = self.loss_fn(pred, label) + + self.train_optimizer.zero_grad() + loss.backward() + torch.nn.utils.clip_grad_value_(self.model.parameters(), 3.0) + self.train_optimizer.step() + + def test_epoch(self, data_x, data_y): + + # prepare training data + x_values = data_x.values + y_values = np.squeeze(data_y.values) + + self.model.eval() + + scores = [] + losses = [] + + indices = np.arange(len(x_values)) + + for i in range(len(indices))[:: self.batch_size]: + + if len(indices) - i < self.batch_size: + break + + feature = torch.from_numpy(x_values[indices[i : i + self.batch_size]]).float().to(self.device) + label = torch.from_numpy(y_values[indices[i : i + self.batch_size]]).float().to(self.device) + + with torch.no_grad(): + pred = self.model(feature) + loss = self.loss_fn(pred, label) + losses.append(loss.item()) + + score = self.metric_fn(pred, label) + scores.append(score.item()) + + return np.mean(losses), np.mean(scores) + + def fit( + self, + dataset: DatasetH, + evals_result=dict(), + save_path=None, + ): + + df_train, df_valid, df_test = dataset.prepare( + ["train", "valid", "test"], + col_set=["feature", "label"], + data_key=DataHandlerLP.DK_L, + ) + + x_train, y_train = df_train["feature"], df_train["label"] + x_valid, y_valid = df_valid["feature"], df_valid["label"] + + save_path = get_or_create_path(save_path) + stop_steps = 0 + train_loss = 0 + best_score = -np.inf + best_epoch = 0 + evals_result["train"] = [] + evals_result["valid"] = [] + + # train + self.logger.info("training...") + self.fitted = True + + for step in range(self.n_epochs): + self.logger.info("Epoch%d:", step) + self.logger.info("training...") + self.train_epoch(x_train, y_train) + self.logger.info("evaluating...") + train_loss, train_score = self.test_epoch(x_train, y_train) + val_loss, val_score = self.test_epoch(x_valid, y_valid) + self.logger.info("train %.6f, valid %.6f" % (train_score, val_score)) + evals_result["train"].append(train_score) + evals_result["valid"].append(val_score) + + if val_score > best_score: + best_score = val_score + stop_steps = 0 + best_epoch = step + best_param = copy.deepcopy(self.model.state_dict()) + else: + stop_steps += 1 + if stop_steps >= self.early_stop: + self.logger.info("early stop") + break + + self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch)) + self.model.load_state_dict(best_param) + torch.save(best_param, save_path) + + if self.use_gpu: + torch.cuda.empty_cache() + + def predict(self, dataset: DatasetH, segment: Union[Text, slice] = "test"): + if not self.fitted: + raise ValueError("model is not fitted yet!") + + x_test = dataset.prepare(segment, col_set="feature", data_key=DataHandlerLP.DK_I) + index = x_test.index + self.model.eval() + x_values = x_test.values + sample_num = x_values.shape[0] + preds = [] + + for begin in range(sample_num)[:: self.batch_size]: + + if sample_num - begin < self.batch_size: + end = sample_num + else: + end = begin + self.batch_size + + x_batch = torch.from_numpy(x_values[begin:end]).float().to(self.device) + + with torch.no_grad(): + pred = self.model(x_batch).detach().cpu().numpy() + + preds.append(pred) + + return pd.Series(np.concatenate(preds), index=index) + + +class PositionalEncoding(nn.Module): + def __init__(self, d_model, max_len=1000): + super(PositionalEncoding, self).__init__() + pe = torch.zeros(max_len, d_model) + position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) + div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) + pe[:, 0::2] = torch.sin(position * div_term) + pe[:, 1::2] = torch.cos(position * div_term) + pe = pe.unsqueeze(0).transpose(0, 1) + self.register_buffer("pe", pe) + + def forward(self, x): + # [T, N, F] + return x + self.pe[: x.size(0), :] + + +def _get_clones(module, N): + return ModuleList([copy.deepcopy(module) for i in range(N)]) + + +class LocalformerEncoder(nn.Module): + __constants__ = ["norm"] + + def __init__(self, encoder_layer, num_layers, d_model): + super(LocalformerEncoder, self).__init__() + self.layers = _get_clones(encoder_layer, num_layers) + self.conv = _get_clones(nn.Conv1d(d_model, d_model, 3, 1, 1), num_layers) + self.num_layers = num_layers + + def forward(self, src, mask): + output = src + out = src + + for i, mod in enumerate(self.layers): + # [T, N, F] --> [N, T, F] --> [N, F, T] + out = output.transpose(1, 0).transpose(2, 1) + out = self.conv[i](out).transpose(2, 1).transpose(1, 0) + + output = mod(output + out, src_mask=mask) + + return output + out + + +class Transformer(nn.Module): + def __init__(self, d_feat=6, d_model=8, nhead=4, num_layers=2, dropout=0.5, device=None): + super(Transformer, self).__init__() + self.rnn = nn.GRU( + input_size=d_model, + hidden_size=d_model, + num_layers=num_layers, + batch_first=False, + dropout=dropout, + ) + self.feature_layer = nn.Linear(d_feat, d_model) + self.pos_encoder = PositionalEncoding(d_model) + self.encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dropout=dropout) + self.transformer_encoder = LocalformerEncoder(self.encoder_layer, num_layers=num_layers, d_model=d_model) + self.decoder_layer = nn.Linear(d_model, 1) + self.device = device + self.d_feat = d_feat + + def forward(self, src): + # src [N, F*T] --> [N, T, F] + src = src.reshape(len(src), self.d_feat, -1).permute(0, 2, 1) + src = self.feature_layer(src) + + # src [N, T, F] --> [T, N, F], [60, 512, 8] + src = src.transpose(1, 0) # not batch first + + mask = None + + src = self.pos_encoder(src) + output = self.transformer_encoder(src, mask) # [60, 512, 8] + + output, _ = self.rnn(output) + + # [T, N, F] --> [N, T*F] + output = self.decoder_layer(output.transpose(1, 0)[:, -1, :]) # [512, 1] + + return output.squeeze() diff --git a/qlib/contrib/model/pytorch_localformer_ts.py b/qlib/contrib/model/pytorch_localformer_ts.py new file mode 100644 index 000000000..683a9bd4f --- /dev/null +++ b/qlib/contrib/model/pytorch_localformer_ts.py @@ -0,0 +1,308 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +from __future__ import division +from __future__ import print_function + +import os +import numpy as np +import pandas as pd +import copy +import math +from ...utils import get_or_create_path +from ...log import get_module_logger + +import torch +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader + +from .pytorch_utils import count_parameters +from ...model.base import Model +from ...data.dataset import DatasetH, TSDatasetH +from ...data.dataset.handler import DataHandlerLP +from torch.nn.modules.container import ModuleList + + +class LocalformerModel(Model): + def __init__( + self, + d_feat: int = 20, + d_model: int = 64, + batch_size: int = 8192, + nhead: int = 2, + num_layers: int = 2, + dropout: float = 0, + n_epochs=100, + lr=0.0001, + metric="", + early_stop=5, + loss="mse", + optimizer="adam", + reg=1e-3, + n_jobs=10, + GPU=0, + seed=None, + **kwargs + ): + + # set hyper-parameters. + self.d_model = d_model + self.dropout = dropout + self.n_epochs = n_epochs + self.lr = lr + self.reg = reg + self.metric = metric + self.batch_size = batch_size + self.early_stop = early_stop + self.optimizer = optimizer.lower() + self.loss = loss + self.n_jobs = n_jobs + self.device = torch.device("cuda:%d" % GPU if torch.cuda.is_available() and GPU >= 0 else "cpu") + self.seed = seed + self.logger = get_module_logger("TransformerModel") + self.logger.info( + "Improved Transformer:" "\nbatch_size : {}" "\ndevice : {}".format(self.batch_size, self.device) + ) + + if self.seed is not None: + np.random.seed(self.seed) + torch.manual_seed(self.seed) + + self.model = Transformer(d_feat, d_model, nhead, num_layers, dropout, self.device) + if optimizer.lower() == "adam": + self.train_optimizer = optim.Adam(self.model.parameters(), lr=self.lr, weight_decay=self.reg) + elif optimizer.lower() == "gd": + self.train_optimizer = optim.SGD(self.model.parameters(), lr=self.lr, weight_decay=self.reg) + else: + raise NotImplementedError("optimizer {} is not supported!".format(optimizer)) + + self.fitted = False + self.model.to(self.device) + + @property + def use_gpu(self): + return self.device != torch.device("cpu") + + def mse(self, pred, label): + loss = (pred.float() - label.float()) ** 2 + return torch.mean(loss) + + def loss_fn(self, pred, label): + mask = ~torch.isnan(label) + + if self.loss == "mse": + return self.mse(pred[mask], label[mask]) + + raise ValueError("unknown loss `%s`" % self.loss) + + def metric_fn(self, pred, label): + + mask = torch.isfinite(label) + + if self.metric == "" or self.metric == "loss": + return -self.loss_fn(pred[mask], label[mask]) + + raise ValueError("unknown metric `%s`" % self.metric) + + def train_epoch(self, data_loader): + + self.model.train() + + for data in data_loader: + feature = data[:, :, 0:-1].to(self.device) + label = data[:, -1, -1].to(self.device) + + pred = self.model(feature.float()) # .float() + loss = self.loss_fn(pred, label) + + self.train_optimizer.zero_grad() + loss.backward() + torch.nn.utils.clip_grad_value_(self.model.parameters(), 3.0) + self.train_optimizer.step() + + def test_epoch(self, data_loader): + + self.model.eval() + + scores = [] + losses = [] + + for data in data_loader: + + feature = data[:, :, 0:-1].to(self.device) + label = data[:, -1, -1].to(self.device) + + with torch.no_grad(): + pred = self.model(feature.float()) # .float() + loss = self.loss_fn(pred, label) + losses.append(loss.item()) + + score = self.metric_fn(pred, label) + scores.append(score.item()) + + return np.mean(losses), np.mean(scores) + + def fit( + self, + dataset: DatasetH, + evals_result=dict(), + save_path=None, + ): + + dl_train = dataset.prepare("train", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L) + dl_valid = dataset.prepare("valid", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L) + + dl_train.config(fillna_type="ffill+bfill") # process nan brought by dataloader + dl_valid.config(fillna_type="ffill+bfill") # process nan brought by dataloader + + train_loader = DataLoader( + dl_train, batch_size=self.batch_size, shuffle=True, num_workers=self.n_jobs, drop_last=True + ) + valid_loader = DataLoader( + dl_valid, batch_size=self.batch_size, shuffle=False, num_workers=self.n_jobs, drop_last=True + ) + + save_path = get_or_create_path(save_path) + + stop_steps = 0 + train_loss = 0 + best_score = -np.inf + best_epoch = 0 + evals_result["train"] = [] + evals_result["valid"] = [] + + # train + self.logger.info("training...") + self.fitted = True + + for step in range(self.n_epochs): + self.logger.info("Epoch%d:", step) + self.logger.info("training...") + self.train_epoch(train_loader) + self.logger.info("evaluating...") + train_loss, train_score = self.test_epoch(train_loader) + val_loss, val_score = self.test_epoch(valid_loader) + self.logger.info("train %.6f, valid %.6f" % (train_score, val_score)) + evals_result["train"].append(train_score) + evals_result["valid"].append(val_score) + + if val_score > best_score: + best_score = val_score + stop_steps = 0 + best_epoch = step + best_param = copy.deepcopy(self.model.state_dict()) + else: + stop_steps += 1 + if stop_steps >= self.early_stop: + self.logger.info("early stop") + break + + self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch)) + self.model.load_state_dict(best_param) + torch.save(best_param, save_path) + + if self.use_gpu: + torch.cuda.empty_cache() + + def predict(self, dataset): + if not self.fitted: + raise ValueError("model is not fitted yet!") + + dl_test = dataset.prepare("test", col_set=["feature", "label"], data_key=DataHandlerLP.DK_I) + dl_test.config(fillna_type="ffill+bfill") + test_loader = DataLoader(dl_test, batch_size=self.batch_size, num_workers=self.n_jobs) + self.model.eval() + preds = [] + + for data in test_loader: + feature = data[:, :, 0:-1].to(self.device) + + with torch.no_grad(): + pred = self.model(feature.float()).detach().cpu().numpy() + + preds.append(pred) + + return pd.Series(np.concatenate(preds), index=dl_test.get_index()) + + +class PositionalEncoding(nn.Module): + def __init__(self, d_model, max_len=1000): + super(PositionalEncoding, self).__init__() + pe = torch.zeros(max_len, d_model) + position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) + div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) + pe[:, 0::2] = torch.sin(position * div_term) + pe[:, 1::2] = torch.cos(position * div_term) + pe = pe.unsqueeze(0).transpose(0, 1) + self.register_buffer("pe", pe) + + def forward(self, x): + # [T, N, F] + return x + self.pe[: x.size(0), :] + + +def _get_clones(module, N): + return ModuleList([copy.deepcopy(module) for i in range(N)]) + + +class LocalformerEncoder(nn.Module): + __constants__ = ["norm"] + + def __init__(self, encoder_layer, num_layers, d_model): + super(LocalformerEncoder, self).__init__() + self.layers = _get_clones(encoder_layer, num_layers) + self.conv = _get_clones(nn.Conv1d(d_model, d_model, 3, 1, 1), num_layers) + self.num_layers = num_layers + + def forward(self, src, mask): + output = src + out = src + + for i, mod in enumerate(self.layers): + # [T, N, F] --> [N, T, F] --> [N, F, T] + out = output.transpose(1, 0).transpose(2, 1) + out = self.conv[i](out).transpose(2, 1).transpose(1, 0) + + output = mod(output + out, src_mask=mask) + + return output + out + + +class Transformer(nn.Module): + def __init__(self, d_feat=6, d_model=8, nhead=4, num_layers=2, dropout=0.5, device=None): + super(Transformer, self).__init__() + self.rnn = nn.GRU( + input_size=d_model, + hidden_size=d_model, + num_layers=num_layers, + batch_first=False, + dropout=dropout, + ) + self.feature_layer = nn.Linear(d_feat, d_model) + self.pos_encoder = PositionalEncoding(d_model) + self.encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dropout=dropout) + self.transformer_encoder = LocalformerEncoder(self.encoder_layer, num_layers=num_layers, d_model=d_model) + self.decoder_layer = nn.Linear(d_model, 1) + self.device = device + self.d_feat = d_feat + + def forward(self, src): + # src [N, T, F], [512, 60, 6] + src = self.feature_layer(src) # [512, 60, 8] + + # src [N, T, F] --> [T, N, F], [60, 512, 8] + src = src.transpose(1, 0) # not batch first + + mask = None + + src = self.pos_encoder(src) + output = self.transformer_encoder(src, mask) # [60, 512, 8] + + output, _ = self.rnn(output) + + # [T, N, F] --> [N, T*F] + output = self.decoder_layer(output.transpose(1, 0)[:, -1, :]) # [512, 1] + + return output.squeeze() diff --git a/qlib/contrib/model/pytorch_transformer.py b/qlib/contrib/model/pytorch_transformer.py new file mode 100644 index 000000000..53ebff3c5 --- /dev/null +++ b/qlib/contrib/model/pytorch_transformer.py @@ -0,0 +1,294 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +from __future__ import division +from __future__ import print_function + +import os +import numpy as np +import pandas as pd +from typing import Text, Union +import copy +import math +from ...utils import get_or_create_path +from ...log import get_module_logger + +import torch +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader + +from .pytorch_utils import count_parameters +from ...model.base import Model +from ...data.dataset import DatasetH, TSDatasetH +from ...data.dataset.handler import DataHandlerLP + +# qrun examples/benchmarks/Transformer/workflow_config_transformer_Alpha360.yaml ” + + +class TransformerModel(Model): + def __init__( + self, + d_feat: int = 20, + d_model: int = 64, + batch_size: int = 2048, + nhead: int = 2, + num_layers: int = 2, + dropout: float = 0, + n_epochs=100, + lr=0.0001, + metric="", + early_stop=5, + loss="mse", + optimizer="adam", + reg=1e-3, + n_jobs=10, + GPU=0, + seed=None, + **kwargs + ): + + # set hyper-parameters. + self.d_model = d_model + self.dropout = dropout + self.n_epochs = n_epochs + self.lr = lr + self.reg = reg + self.metric = metric + self.batch_size = batch_size + self.early_stop = early_stop + self.optimizer = optimizer.lower() + self.loss = loss + self.n_jobs = n_jobs + self.device = torch.device("cuda:%d" % GPU if torch.cuda.is_available() and GPU >= 0 else "cpu") + self.seed = seed + self.logger = get_module_logger("TransformerModel") + self.logger.info("Naive Transformer:" "\nbatch_size : {}" "\ndevice : {}".format(self.batch_size, self.device)) + + if self.seed is not None: + np.random.seed(self.seed) + torch.manual_seed(self.seed) + + self.model = Transformer(d_feat, d_model, nhead, num_layers, dropout, self.device) + if optimizer.lower() == "adam": + self.train_optimizer = optim.Adam(self.model.parameters(), lr=self.lr, weight_decay=self.reg) + elif optimizer.lower() == "gd": + self.train_optimizer = optim.SGD(self.model.parameters(), lr=self.lr, weight_decay=self.reg) + else: + raise NotImplementedError("optimizer {} is not supported!".format(optimizer)) + + self.fitted = False + self.model.to(self.device) + + @property + def use_gpu(self): + return self.device != torch.device("cpu") + + def mse(self, pred, label): + loss = (pred.float() - label.float()) ** 2 + return torch.mean(loss) + + def loss_fn(self, pred, label): + mask = ~torch.isnan(label) + + if self.loss == "mse": + return self.mse(pred[mask], label[mask]) + + raise ValueError("unknown loss `%s`" % self.loss) + + def metric_fn(self, pred, label): + + mask = torch.isfinite(label) + + if self.metric == "" or self.metric == "loss": + return -self.loss_fn(pred[mask], label[mask]) + + raise ValueError("unknown metric `%s`" % self.metric) + + def train_epoch(self, x_train, y_train): + + x_train_values = x_train.values + y_train_values = np.squeeze(y_train.values) + + self.model.train() + + indices = np.arange(len(x_train_values)) + np.random.shuffle(indices) + + for i in range(len(indices))[:: self.batch_size]: + + if len(indices) - i < self.batch_size: + break + + feature = torch.from_numpy(x_train_values[indices[i : i + self.batch_size]]).float().to(self.device) + label = torch.from_numpy(y_train_values[indices[i : i + self.batch_size]]).float().to(self.device) + + pred = self.model(feature) + loss = self.loss_fn(pred, label) + + self.train_optimizer.zero_grad() + loss.backward() + torch.nn.utils.clip_grad_value_(self.model.parameters(), 3.0) + self.train_optimizer.step() + + def test_epoch(self, data_x, data_y): + + # prepare training data + x_values = data_x.values + y_values = np.squeeze(data_y.values) + + self.model.eval() + + scores = [] + losses = [] + + indices = np.arange(len(x_values)) + + for i in range(len(indices))[:: self.batch_size]: + + if len(indices) - i < self.batch_size: + break + + feature = torch.from_numpy(x_values[indices[i : i + self.batch_size]]).float().to(self.device) + label = torch.from_numpy(y_values[indices[i : i + self.batch_size]]).float().to(self.device) + + with torch.no_grad(): + pred = self.model(feature) + loss = self.loss_fn(pred, label) + losses.append(loss.item()) + + score = self.metric_fn(pred, label) + scores.append(score.item()) + + return np.mean(losses), np.mean(scores) + + def fit( + self, + dataset: DatasetH, + evals_result=dict(), + save_path=None, + ): + + df_train, df_valid, df_test = dataset.prepare( + ["train", "valid", "test"], + col_set=["feature", "label"], + data_key=DataHandlerLP.DK_L, + ) + + x_train, y_train = df_train["feature"], df_train["label"] + x_valid, y_valid = df_valid["feature"], df_valid["label"] + + save_path = get_or_create_path(save_path) + stop_steps = 0 + train_loss = 0 + best_score = -np.inf + best_epoch = 0 + evals_result["train"] = [] + evals_result["valid"] = [] + + # train + self.logger.info("training...") + self.fitted = True + + for step in range(self.n_epochs): + self.logger.info("Epoch%d:", step) + self.logger.info("training...") + self.train_epoch(x_train, y_train) + self.logger.info("evaluating...") + train_loss, train_score = self.test_epoch(x_train, y_train) + val_loss, val_score = self.test_epoch(x_valid, y_valid) + self.logger.info("train %.6f, valid %.6f" % (train_score, val_score)) + evals_result["train"].append(train_score) + evals_result["valid"].append(val_score) + + if val_score > best_score: + best_score = val_score + stop_steps = 0 + best_epoch = step + best_param = copy.deepcopy(self.model.state_dict()) + else: + stop_steps += 1 + if stop_steps >= self.early_stop: + self.logger.info("early stop") + break + + self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch)) + self.model.load_state_dict(best_param) + torch.save(best_param, save_path) + + if self.use_gpu: + torch.cuda.empty_cache() + + def predict(self, dataset: DatasetH, segment: Union[Text, slice] = "test"): + if not self.fitted: + raise ValueError("model is not fitted yet!") + + x_test = dataset.prepare(segment, col_set="feature", data_key=DataHandlerLP.DK_I) + index = x_test.index + self.model.eval() + x_values = x_test.values + sample_num = x_values.shape[0] + preds = [] + + for begin in range(sample_num)[:: self.batch_size]: + + if sample_num - begin < self.batch_size: + end = sample_num + else: + end = begin + self.batch_size + + x_batch = torch.from_numpy(x_values[begin:end]).float().to(self.device) + + with torch.no_grad(): + pred = self.model(x_batch).detach().cpu().numpy() + + preds.append(pred) + + return pd.Series(np.concatenate(preds), index=index) + + +class PositionalEncoding(nn.Module): + def __init__(self, d_model, max_len=1000): + super(PositionalEncoding, self).__init__() + pe = torch.zeros(max_len, d_model) + position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) + div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) + pe[:, 0::2] = torch.sin(position * div_term) + pe[:, 1::2] = torch.cos(position * div_term) + pe = pe.unsqueeze(0).transpose(0, 1) + self.register_buffer("pe", pe) + + def forward(self, x): + # [T, N, F] + return x + self.pe[: x.size(0), :] + + +class Transformer(nn.Module): + def __init__(self, d_feat=6, d_model=8, nhead=4, num_layers=2, dropout=0.5, device=None): + super(Transformer, self).__init__() + self.feature_layer = nn.Linear(d_feat, d_model) + self.pos_encoder = PositionalEncoding(d_model) + self.encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dropout=dropout) + self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers) + self.decoder_layer = nn.Linear(d_model, 1) + self.device = device + self.d_feat = d_feat + + def forward(self, src): + # src [N, F*T] --> [N, T, F] + src = src.reshape(len(src), self.d_feat, -1).permute(0, 2, 1) + src = self.feature_layer(src) + + # src [N, T, F] --> [T, N, F], [60, 512, 8] + src = src.transpose(1, 0) # not batch first + + mask = None + + src = self.pos_encoder(src) + output = self.transformer_encoder(src, mask) # [60, 512, 8] + + # [T, N, F] --> [N, T*F] + output = self.decoder_layer(output.transpose(1, 0)[:, -1, :]) # [512, 1] + + return output.squeeze() diff --git a/qlib/contrib/model/pytorch_transformer_ts.py b/qlib/contrib/model/pytorch_transformer_ts.py new file mode 100644 index 000000000..c53564903 --- /dev/null +++ b/qlib/contrib/model/pytorch_transformer_ts.py @@ -0,0 +1,269 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +from __future__ import division +from __future__ import print_function + +import os +import numpy as np +import pandas as pd +import copy +import math +from ...utils import get_or_create_path +from ...log import get_module_logger + +import torch +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader + +from .pytorch_utils import count_parameters +from ...model.base import Model +from ...data.dataset import DatasetH, TSDatasetH +from ...data.dataset.handler import DataHandlerLP + + +class TransformerModel(Model): + def __init__( + self, + d_feat: int = 20, + d_model: int = 64, + batch_size: int = 8192, + nhead: int = 2, + num_layers: int = 2, + dropout: float = 0, + n_epochs=100, + lr=0.0001, + metric="", + early_stop=5, + loss="mse", + optimizer="adam", + reg=1e-3, + n_jobs=10, + GPU=0, + seed=None, + **kwargs + ): + + # set hyper-parameters. + self.d_model = d_model + self.dropout = dropout + self.n_epochs = n_epochs + self.lr = lr + self.reg = reg + self.metric = metric + self.batch_size = batch_size + self.early_stop = early_stop + self.optimizer = optimizer.lower() + self.loss = loss + self.n_jobs = n_jobs + self.device = torch.device("cuda:%d" % GPU if torch.cuda.is_available() and GPU >= 0 else "cpu") + self.seed = seed + self.logger = get_module_logger("TransformerModel") + self.logger.info("Naive Transformer:" "\nbatch_size : {}" "\ndevice : {}".format(self.batch_size, self.device)) + + if self.seed is not None: + np.random.seed(self.seed) + torch.manual_seed(self.seed) + + self.model = Transformer(d_feat, d_model, nhead, num_layers, dropout, self.device) + if optimizer.lower() == "adam": + self.train_optimizer = optim.Adam(self.model.parameters(), lr=self.lr, weight_decay=self.reg) + elif optimizer.lower() == "gd": + self.train_optimizer = optim.SGD(self.model.parameters(), lr=self.lr, weight_decay=self.reg) + else: + raise NotImplementedError("optimizer {} is not supported!".format(optimizer)) + + self.fitted = False + self.model.to(self.device) + + @property + def use_gpu(self): + return self.device != torch.device("cpu") + + def mse(self, pred, label): + loss = (pred.float() - label.float()) ** 2 + return torch.mean(loss) + + def loss_fn(self, pred, label): + mask = ~torch.isnan(label) + + if self.loss == "mse": + return self.mse(pred[mask], label[mask]) + + raise ValueError("unknown loss `%s`" % self.loss) + + def metric_fn(self, pred, label): + + mask = torch.isfinite(label) + + if self.metric == "" or self.metric == "loss": + return -self.loss_fn(pred[mask], label[mask]) + + raise ValueError("unknown metric `%s`" % self.metric) + + def train_epoch(self, data_loader): + + self.model.train() + + for data in data_loader: + feature = data[:, :, 0:-1].to(self.device) + label = data[:, -1, -1].to(self.device) + + pred = self.model(feature.float()) # .float() + loss = self.loss_fn(pred, label) + + self.train_optimizer.zero_grad() + loss.backward() + torch.nn.utils.clip_grad_value_(self.model.parameters(), 3.0) + self.train_optimizer.step() + + def test_epoch(self, data_loader): + + self.model.eval() + + scores = [] + losses = [] + + for data in data_loader: + + feature = data[:, :, 0:-1].to(self.device) + label = data[:, -1, -1].to(self.device) + + with torch.no_grad(): + pred = self.model(feature.float()) # .float() + loss = self.loss_fn(pred, label) + losses.append(loss.item()) + + score = self.metric_fn(pred, label) + scores.append(score.item()) + + return np.mean(losses), np.mean(scores) + + def fit( + self, + dataset: DatasetH, + evals_result=dict(), + save_path=None, + ): + + dl_train = dataset.prepare("train", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L) + dl_valid = dataset.prepare("valid", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L) + + dl_train.config(fillna_type="ffill+bfill") # process nan brought by dataloader + dl_valid.config(fillna_type="ffill+bfill") # process nan brought by dataloader + + train_loader = DataLoader( + dl_train, batch_size=self.batch_size, shuffle=True, num_workers=self.n_jobs, drop_last=True + ) + valid_loader = DataLoader( + dl_valid, batch_size=self.batch_size, shuffle=False, num_workers=self.n_jobs, drop_last=True + ) + + save_path = get_or_create_path(save_path) + + stop_steps = 0 + train_loss = 0 + best_score = -np.inf + best_epoch = 0 + evals_result["train"] = [] + evals_result["valid"] = [] + + # train + self.logger.info("training...") + self.fitted = True + + for step in range(self.n_epochs): + self.logger.info("Epoch%d:", step) + self.logger.info("training...") + self.train_epoch(train_loader) + self.logger.info("evaluating...") + train_loss, train_score = self.test_epoch(train_loader) + val_loss, val_score = self.test_epoch(valid_loader) + self.logger.info("train %.6f, valid %.6f" % (train_score, val_score)) + evals_result["train"].append(train_score) + evals_result["valid"].append(val_score) + + if val_score > best_score: + best_score = val_score + stop_steps = 0 + best_epoch = step + best_param = copy.deepcopy(self.model.state_dict()) + else: + stop_steps += 1 + if stop_steps >= self.early_stop: + self.logger.info("early stop") + break + + self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch)) + self.model.load_state_dict(best_param) + torch.save(best_param, save_path) + + if self.use_gpu: + torch.cuda.empty_cache() + + def predict(self, dataset): + if not self.fitted: + raise ValueError("model is not fitted yet!") + + dl_test = dataset.prepare("test", col_set=["feature", "label"], data_key=DataHandlerLP.DK_I) + dl_test.config(fillna_type="ffill+bfill") + test_loader = DataLoader(dl_test, batch_size=self.batch_size, num_workers=self.n_jobs) + self.model.eval() + preds = [] + + for data in test_loader: + feature = data[:, :, 0:-1].to(self.device) + + with torch.no_grad(): + pred = self.model(feature.float()).detach().cpu().numpy() + + preds.append(pred) + + return pd.Series(np.concatenate(preds), index=dl_test.get_index()) + + +class PositionalEncoding(nn.Module): + def __init__(self, d_model, max_len=1000): + super(PositionalEncoding, self).__init__() + pe = torch.zeros(max_len, d_model) + position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) + div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) + pe[:, 0::2] = torch.sin(position * div_term) + pe[:, 1::2] = torch.cos(position * div_term) + pe = pe.unsqueeze(0).transpose(0, 1) + self.register_buffer("pe", pe) + + def forward(self, x): + # [T, N, F] + return x + self.pe[: x.size(0), :] + + +class Transformer(nn.Module): + def __init__(self, d_feat=6, d_model=8, nhead=4, num_layers=2, dropout=0.5, device=None): + super(Transformer, self).__init__() + self.feature_layer = nn.Linear(d_feat, d_model) + self.pos_encoder = PositionalEncoding(d_model) + self.encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dropout=dropout) + self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers) + self.decoder_layer = nn.Linear(d_model, 1) + self.device = device + self.d_feat = d_feat + + def forward(self, src): + # src [N, T, F], [512, 60, 6] + src = self.feature_layer(src) # [512, 60, 8] + + # src [N, T, F] --> [T, N, F], [60, 512, 8] + src = src.transpose(1, 0) # not batch first + + mask = None + + src = self.pos_encoder(src) + output = self.transformer_encoder(src, mask) # [60, 512, 8] + + # [T, N, F] --> [N, T*F] + output = self.decoder_layer(output.transpose(1, 0)[:, -1, :]) # [512, 1] + + return output.squeeze() diff --git a/qlib/contrib/strategy/rule_strategy.py b/qlib/contrib/strategy/rule_strategy.py index b42c4f578..7f04f444e 100644 --- a/qlib/contrib/strategy/rule_strategy.py +++ b/qlib/contrib/strategy/rule_strategy.py @@ -18,7 +18,12 @@ from qlib.backtest.utils import get_start_end_idx class TWAPStrategy(BaseStrategy): - """TWAP Strategy for trading""" + """TWAP Strategy for trading + + NOTE: + - This TWAP strategy will celling round when trading. This will make the TWAP trading strategy produce the order + ealier when the total trade unit of amount is less than the trading step + """ def reset(self, outer_trade_decision: BaseTradeDecision = None, **kwargs): """ @@ -58,11 +63,11 @@ class TWAPStrategy(BaseStrategy): trade_start_time, trade_end_time = self.trade_calendar.get_step_time(trade_step) order_list = [] for order in self.outer_trade_decision.get_decision(): - # if not tradable, continue - if not self.trade_exchange.is_stock_tradable( - stock_id=order.stock_id, start_time=trade_start_time, end_time=trade_end_time - ): - continue + # Don't peek the future information + # if not self.trade_exchange.is_stock_tradable( + # stock_id=order.stock_id, start_time=trade_start_time, end_time=trade_end_time + # ): + # continue _amount_trade_unit = self.trade_exchange.get_amount_of_trade_unit( stock_id=order.stock_id, start_time=order.start_time, end_time=order.end_time ) diff --git a/qlib/data/data.py b/qlib/data/data.py index d6735b4e6..ccd35006b 100644 --- a/qlib/data/data.py +++ b/qlib/data/data.py @@ -1056,13 +1056,21 @@ class ClientProvider(BaseProvider): """ def __init__(self): + def is_instance_of_provider(instance: object, cls: type): + if isinstance(instance, Wrapper): + p = getattr(instance, "_provider", None) + + return False if p is None else isinstance(p, cls) + + return isinstance(instance, cls) + from .client import Client self.client = Client(C.flask_server, C.flask_port) self.logger = get_module_logger(self.__class__.__name__) - if isinstance(Cal, ClientCalendarProvider): + if is_instance_of_provider(Cal, ClientCalendarProvider): Cal.set_conn(self.client) - if isinstance(Inst, ClientInstrumentProvider): + if is_instance_of_provider(Inst, ClientInstrumentProvider): Inst.set_conn(self.client) if hasattr(DatasetD, "provider"): DatasetD.provider.set_conn(self.client) diff --git a/qlib/utils/exceptions.py b/qlib/utils/exceptions.py index dad12506b..dd9b3eaf6 100644 --- a/qlib/utils/exceptions.py +++ b/qlib/utils/exceptions.py @@ -1,17 +1,20 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. + # Base exception class class QlibException(Exception): def __init__(self, message): super(QlibException, self).__init__(message) -# Error type for reinitialization when starting an experiment class RecorderInitializationError(QlibException): + """Error type for re-initialization when starting an experiment""" + pass -# Error type for Recorder when can not load object class LoadObjectError(QlibException): + """Error type for Recorder when can not load object""" + pass diff --git a/qlib/workflow/cli.py b/qlib/workflow/cli.py index 879c0aaeb..16e5b6296 100644 --- a/qlib/workflow/cli.py +++ b/qlib/workflow/cli.py @@ -53,7 +53,8 @@ def workflow(config_path, experiment_name="workflow", uri_folder="mlruns"): exp_manager["kwargs"]["uri"] = "file:" + str(Path(os.getcwd()).resolve() / uri_folder) qlib.init(**config.get("qlib_init"), exp_manager=exp_manager) - task_train(config.get("task"), experiment_name=experiment_name) + recorder = task_train(config.get("task"), experiment_name=experiment_name) + recorder.save_objects(config=config) # function to run worklflow by config diff --git a/qlib/workflow/exp.py b/qlib/workflow/exp.py index 627b5ff82..fcf6cd8d1 100644 --- a/qlib/workflow/exp.py +++ b/qlib/workflow/exp.py @@ -325,7 +325,7 @@ class MLflowExperiment(Experiment): UNLIMITED = 50000 # FIXME: Mlflow can only list 50000 records at most!!!!!!! - def list_recorders(self, max_results: int = UNLIMITED, status: Union[str, None] = None): + def list_recorders(self, max_results: int = UNLIMITED, status: Union[str, None] = None, filter_string: str = ""): """ Parameters ---------- @@ -334,8 +334,12 @@ class MLflowExperiment(Experiment): status : str the criteria based on status to filter results. `None` indicates no filtering. + filter_string : str + mlflow supported filter string like 'params."my_param"="a" and tags."my_tag"="b"', use this will help to reduce too much run number. """ - runs = self._client.search_runs(self.id, run_view_type=ViewType.ACTIVE_ONLY, max_results=max_results) + runs = self._client.search_runs( + self.id, run_view_type=ViewType.ACTIVE_ONLY, max_results=max_results, filter_string=filter_string + ) recorders = dict() for i in range(len(runs)): recorder = MLflowRecorder(self.id, self._uri, mlflow_run=runs[i]) diff --git a/qlib/workflow/task/collect.py b/qlib/workflow/task/collect.py index 36ccf434d..467281666 100644 --- a/qlib/workflow/task/collect.py +++ b/qlib/workflow/task/collect.py @@ -139,6 +139,7 @@ class RecorderCollector(Collector): rec_filter_func=None, artifacts_path={"pred": "pred.pkl"}, artifacts_key=None, + list_kwargs={}, ): """ Init RecorderCollector. @@ -150,6 +151,7 @@ class RecorderCollector(Collector): rec_filter_func (Callable, optional): filter the recorder by return True or False. Defaults to None. artifacts_path (dict, optional): The artifacts name and its path in Recorder. Defaults to {"pred": "pred.pkl", "IC": "sig_analysis/ic.pkl"}. artifacts_key (str or List, optional): the artifacts key you want to get. If None, get all artifacts. + list_kwargs (str): arguments for list_recorders function. """ super().__init__(process_list=process_list) if isinstance(experiment, str): @@ -163,6 +165,7 @@ class RecorderCollector(Collector): self.rec_key_func = rec_key_func self.artifacts_key = artifacts_key self.rec_filter_func = rec_filter_func + self.list_kwargs = list_kwargs def collect(self, artifacts_key=None, rec_filter_func=None, only_exist=True) -> dict: """ @@ -187,7 +190,7 @@ class RecorderCollector(Collector): collect_dict = {} # filter records - recs = self.experiment.list_recorders() + recs = self.experiment.list_recorders(**self.list_kwargs) recs_flt = {} for rid, rec in recs.items(): if rec_filter_func is None or rec_filter_func(rec): diff --git a/qlib/workflow/utils.py b/qlib/workflow/utils.py index 5a93eacca..6e1e76529 100644 --- a/qlib/workflow/utils.py +++ b/qlib/workflow/utils.py @@ -1,10 +1,14 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. -import sys, traceback, signal, atexit, logging +import atexit +import logging +import sys +import traceback + +from ..log import get_module_logger from . import R from .recorder import Recorder -from ..log import get_module_logger logger = get_module_logger("workflow", logging.INFO) diff --git a/scripts/data_collector/contrib/future_trading_date_collector/future_trading_date_collector.py b/scripts/data_collector/contrib/future_trading_date_collector/future_trading_date_collector.py index 8df0a4972..939ba7f6a 100644 --- a/scripts/data_collector/contrib/future_trading_date_collector/future_trading_date_collector.py +++ b/scripts/data_collector/contrib/future_trading_date_collector/future_trading_date_collector.py @@ -78,6 +78,7 @@ def future_calendar_collector(qlib_dir: [str, Path], freq: str = "day"): data_list.append(_row_data[0]) data_list = sorted(data_list) date_list = generate_qlib_calendar(data_list, freq=freq) + date_list = sorted(set(daily_calendar.loc[:, 0].values.tolist() + date_list)) write_calendar_to_qlib(qlib_dir, date_list, freq=freq) bs.logout() logger.info(f"get trading dates success: {start_year}-01-01 to {end_year}-12-31") diff --git a/scripts/data_collector/yahoo/README.md b/scripts/data_collector/yahoo/README.md index 6cc630e87..50f731e38 100644 --- a/scripts/data_collector/yahoo/README.md +++ b/scripts/data_collector/yahoo/README.md @@ -71,7 +71,7 @@ pip install -r requirements.txt - examples: ```bash # cn 1d data - python collector.py download_data --source_dir ~/.qlib/stock_data/source/cn_1d --start 2020-01-01 --end 2020-12-31 --delay 1 --interval 1d --region US + python collector.py download_data --source_dir ~/.qlib/stock_data/source/cn_1d --start 2020-01-01 --end 2020-12-31 --delay 1 --interval 1d --region CN # cn 1min data python collector.py download_data --source_dir ~/.qlib/stock_data/source/cn_1min --delay 1 --interval 1min --region CN # us 1d data diff --git a/scripts/data_collector/yahoo/collector.py b/scripts/data_collector/yahoo/collector.py index 6a128a5be..feb28a94f 100644 --- a/scripts/data_collector/yahoo/collector.py +++ b/scripts/data_collector/yahoo/collector.py @@ -283,6 +283,16 @@ class YahooNormalize(BaseNormalize): COLUMNS = ["open", "close", "high", "low", "volume"] DAILY_FORMAT = "%Y-%m-%d" + @staticmethod + def calc_change(df: pd.DataFrame, last_close: float) -> pd.Series: + df = df.copy() + _tmp_series = df["close"].fillna(method="ffill") + _tmp_shift_series = _tmp_series.shift(1) + if last_close is not None: + _tmp_shift_series.iloc[0] = float(last_close) + change_series = _tmp_series / _tmp_shift_series - 1 + return change_series + @staticmethod def normalize_yahoo( df: pd.DataFrame, @@ -310,11 +320,29 @@ class YahooNormalize(BaseNormalize): ) df.sort_index(inplace=True) df.loc[(df["volume"] <= 0) | np.isnan(df["volume"]), set(df.columns) - {symbol_field_name}] = np.nan - _tmp_series = df["close"].fillna(method="ffill") - _tmp_shift_series = _tmp_series.shift(1) - if last_close is not None: - _tmp_shift_series.iloc[0] = float(last_close) - df["change"] = _tmp_series / _tmp_shift_series - 1 + + change_series = YahooNormalize.calc_change(df, last_close) + # NOTE: The data obtained by Yahoo finance sometimes has exceptions + # WARNING: If it is normal for a `symbol(exchange)` to differ by a factor of *89* to *111* for consecutive trading days, + # WARNING: the logic in the following line needs to be modified + _count = 0 + while True: + # NOTE: may appear unusual for many days in a row + change_series = YahooNormalize.calc_change(df, last_close) + _mask = (change_series >= 89) & (change_series <= 111) + if not _mask.any(): + break + _tmp_cols = ["high", "close", "low", "open", "adjclose"] + df.loc[_mask, _tmp_cols] = df.loc[_mask, _tmp_cols] / 100 + _count += 1 + if _count >= 10: + _symbol = df.loc[df[symbol_field_name].first_valid_index()]["symbol"] + logger.warning( + f"{_symbol} `change` is abnormal for {_count} consecutive days, please check the specific data file carefully" + ) + + df["change"] = YahooNormalize.calc_change(df, last_close) + columns += ["change"] df.loc[(df["volume"] <= 0) | np.isnan(df["volume"]), columns] = np.nan @@ -852,7 +880,7 @@ class Run(BaseRun): if self.interval.lower() == "1min": if qlib_data_1d_dir is None or not Path(qlib_data_1d_dir).expanduser().exists(): raise ValueError( - "If normalize 1min, the qlib_data_1d_dir parameter must be set: --qlib_data_1d_dir , Reference: https://github.com/zhupr/qlib/tree/support_extend_data/scripts/data_collector/yahoo#automatic-update-of-daily-frequency-datafrom-yahoo-finance" + "If normalize 1min, the qlib_data_1d_dir parameter must be set: --qlib_data_1d_dir , Reference: https://github.com/microsoft/qlib/tree/main/scripts/data_collector/yahoo#automatic-update-of-daily-frequency-datafrom-yahoo-finance" ) super(Run, self).normalize_data( date_field_name, symbol_field_name, end_date=end_date, qlib_data_1d_dir=qlib_data_1d_dir diff --git a/scripts/dump_bin.py b/scripts/dump_bin.py index 83daa28bc..8e9878895 100644 --- a/scripts/dump_bin.py +++ b/scripts/dump_bin.py @@ -244,6 +244,10 @@ class DumpDataBase: if df is None or df.empty: logger.warning(f"{code} data is None or empty") return + + # try to remove dup rows or it will cause exception when reindex. + df = df.drop_duplicates(self.date_field_name) + # features save dir features_dir = self._features_dir.joinpath(code_to_fname(code).lower()) features_dir.mkdir(parents=True, exist_ok=True) diff --git a/setup.py b/setup.py index 2dead9fba..9673fea2a 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ NAME = "pyqlib" DESCRIPTION = "A Quantitative-research Platform" REQUIRES_PYTHON = ">=3.5.0" -VERSION = "0.6.3.99" +VERSION = "0.7.0.99" # Detect Cython try: