diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e7b775bf4..7a78d2d9a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -12,7 +12,7 @@ jobs: runs-on: ${{ matrix.os }} strategy: matrix: - os: [windows-latest, ubuntu-16.04, ubuntu-18.04, ubuntu-20.04, macos-latest] + os: [windows-latest, ubuntu-16.04, ubuntu-18.04, ubuntu-20.04] python-version: [3.6, 3.7, 3.8, 3.9] steps: @@ -46,12 +46,6 @@ jobs: sudo $CONDA/bin/python -m pip install pyqlib --ignore-installed ruamel.yaml numpy fi shell: bash - - - name: Install Lightgbm for MacOS - if: runner.os == 'macOS' - run: | - /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Microsoft/qlib/main/.github/brew_install.sh)" - HOMEBREW_NO_AUTO_UPDATE=1 brew install lightgbm - name: Test data downloads run: | diff --git a/.github/workflows/test_macos.yml b/.github/workflows/test_macos.yml new file mode 100644 index 000000000..57aa87ded --- /dev/null +++ b/.github/workflows/test_macos.yml @@ -0,0 +1,73 @@ +# There are some issues (in the downloading data phase) on MacOS when running with other tests. So we split it into an individual config. +name: Test MacOS + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +jobs: + build: + + runs-on: macos-latest + strategy: + matrix: + python-version: [3.6, 3.7, 3.8, 3.9] + + steps: + - uses: actions/checkout@v2 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + + - name: Lint with Black + run: | + cd .. + sudo $CONDA/bin/python -m pip install black + $CONDA/bin/python -m black qlib -l 120 --check --diff + + # Test Qlib installed with pip + - name: Install Qlib with pip + run: | + sudo $CONDA/bin/python -m pip install numpy==1.19.5 + sudo $CONDA/bin/python -m pip install pyqlib --ignore-installed ruamel.yaml numpy + + - name: Install Lightgbm for MacOS + run: | + /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Microsoft/qlib/main/.github/brew_install.sh)" + HOMEBREW_NO_AUTO_UPDATE=1 brew install lightgbm + + - name: Test data downloads + run: | + $CONDA/bin/python scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data --interval 1d --region cn + + - name: Test workflow by config (install from pip) + run: | + $CONDA/bin/python qlib/workflow/cli.py examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml + sudo $CONDA/bin/python -m pip uninstall -y pyqlib + + # Test Qlib installed from source + - name: Install Qlib from source + run: | + sudo $CONDA/bin/python -m pip install --upgrade cython + sudo $CONDA/bin/python -m pip install numpy jupyter jupyter_contrib_nbextensions + sudo $CONDA/bin/python -m pip install -U scipy scikit-learn # installing without this line will cause errors on GitHub Actions, while instsalling locally won't + sudo $CONDA/bin/python setup.py install + + - name: Install test dependencies + run: | + sudo $CONDA/bin/python -m pip install --upgrade pip + sudo $CONDA/bin/python -m pip install -U pyopenssl idna + sudo $CONDA/bin/python -m pip install black pytest + + - name: Unit tests with Pytest + run: | + cd tests + $CONDA/bin/python -m pytest . --durations=0 + + - name: Test workflow by config (install from source) + run: | + $CONDA/bin/python qlib/workflow/cli.py examples/benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml diff --git a/README.md b/README.md index b68cdaf10..422046c13 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,8 @@ Recent released features | Feature | Status | | -- | ------ | +| Transformer & Localformer | [Released](https://github.com/microsoft/qlib/pull/508) on July 22, 2021 | +| Release Qlib v0.7.0 | [Released](https://github.com/microsoft/qlib/releases/tag/v0.7.0) on July 12, 2021 | | TCTS Model | [Released](https://github.com/microsoft/qlib/pull/491) on July 1, 2021 | | Online serving and automatic model rolling | :star: [Released](https://github.com/microsoft/qlib/pull/290) on May 17, 2021 | | DoubleEnsemble Model | [Released](https://github.com/microsoft/qlib/pull/286) on Mar 2, 2021 | @@ -290,6 +292,8 @@ Here is a list of models built on `Qlib`. - [TabNet based on pytorch (Sercan O. Arik, et al. AAAI 2019)](qlib/contrib/model/pytorch_tabnet.py) - [DoubleEnsemble based on LightGBM (Chuheng Zhang, et al. ICDM 2020)](qlib/contrib/model/double_ensemble.py) - [TCTS based on pytorch (Xueqing Wu, et al. ICML 2021)](qlib/contrib/model/pytorch_tcts.py) +- [Transformer based on pytorch (Ashish Vaswani, et al. NeurIPS 2017)](qlib/contrib/model/pytorch_transformer.py) +- [Localformer based on pytorch (Juyong Jiang, et al.)](qlib/contrib/model/pytorch_localformer.py) Your PR of new Quant models is highly welcomed. @@ -389,7 +393,12 @@ Join IM discussion groups: # Contributing -This project welcomes contributions and suggestions. Most contributions require you to agree to a +This project welcomes contributions and suggestions. +**Here are some +[code standards](docs/developer/code_standard.rst) when you submit a pull request.** + + +Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the right to use your contribution. For details, visit https://cla.opensource.microsoft.com. diff --git a/docs/component/data.rst b/docs/component/data.rst index 73072f369..a3dc74052 100644 --- a/docs/component/data.rst +++ b/docs/component/data.rst @@ -179,6 +179,7 @@ After conversion, users can find their Qlib format data in the directory `~/.qli The Restoration factor. Normally, ``factor = adjusted_price / original_price``, `adjusted price` reference: `split adjusted `_ In the convention of `Qlib` data processing, `open, close, high, low, volume, money and factor` will be set to NaN if the stock is suspended. + If you want to use your own alpha-factor which can't be calculate by OCHLV, like PE, EPS and so on, you could add it to the CSV files with OHCLV together and then dump it to the Qlib format data. Stock Pool (Market) -------------------------------- diff --git a/docs/component/online.rst b/docs/component/online.rst index accc936dd..22a6afaf9 100644 --- a/docs/component/online.rst +++ b/docs/component/online.rst @@ -21,6 +21,8 @@ which including `Online Manager <#Online Manager>`_, `Online Strategy <#Online S If you have many models or `task` needs to be managed, please consider `Task Management <../advanced/task_management.html>`_. The `examples `_ are based on some components in `Task Management <../advanced/task_management.html>`_ such as ``TrainerRM`` or ``Collector``. +**NOTE**: User should keep his data source updated to support online serving. For example, Qlib provides `a batch of scripts `_ to help users update Yahoo daily data. + Online Manager ============= @@ -43,4 +45,4 @@ Updater ============= .. automodule:: qlib.workflow.online.update - :members: \ No newline at end of file + :members: diff --git a/docs/developer/code_standard.rst b/docs/developer/code_standard.rst new file mode 100644 index 000000000..23ea713ba --- /dev/null +++ b/docs/developer/code_standard.rst @@ -0,0 +1,20 @@ +.. _code_standard: + +================================= +Code Standard +================================= + +Docstring +================================= +Please use the Numpy Style. + +Continuous Integration +================================= +Continuous Integration (CI) tools help you stick to the quality standards by running tests every time you push a new commit and reporting the results to a pull request. + +A common error is the mixed use of space and tab. You can fix the bug by inputing the following code in the command line. + +.. code-block:: python + + pip install black + python -m black . -l 120 \ No newline at end of file diff --git a/examples/benchmarks/Localformer/requirements.txt b/examples/benchmarks/Localformer/requirements.txt new file mode 100644 index 000000000..d5b918797 --- /dev/null +++ b/examples/benchmarks/Localformer/requirements.txt @@ -0,0 +1,3 @@ +numpy==1.17.4 +pandas==1.1.2 +torch==1.2.0 \ No newline at end of file diff --git a/examples/benchmarks/Localformer/workflow_config_localformer_Alpha158.yaml b/examples/benchmarks/Localformer/workflow_config_localformer_Alpha158.yaml new file mode 100644 index 000000000..d7e967333 --- /dev/null +++ b/examples/benchmarks/Localformer/workflow_config_localformer_Alpha158.yaml @@ -0,0 +1,82 @@ +qlib_init: + provider_uri: "~/.qlib/qlib_data/cn_data" + region: cn +market: &market csi300 +benchmark: &benchmark SH000300 +data_handler_config: &data_handler_config + start_time: 2008-01-01 + end_time: 2020-08-01 + fit_start_time: 2008-01-01 + fit_end_time: 2014-12-31 + instruments: *market + infer_processors: + - class: FilterCol + kwargs: + fields_group: feature + col_list: ["RESI5", "WVMA5", "RSQR5", "KLEN", "RSQR10", "CORR5", "CORD5", "CORR10", + "ROC60", "RESI10", "VSTD5", "RSQR60", "CORR60", "WVMA60", "STD5", + "RSQR20", "CORD60", "CORD10", "CORR20", "KLOW" + ] + - class: RobustZScoreNorm + kwargs: + fields_group: feature + clip_outlier: true + - class: Fillna + kwargs: + fields_group: feature + learn_processors: + - class: DropnaLabel + - class: CSRankNorm + kwargs: + fields_group: label + label: ["Ref($close, -2) / Ref($close, -1) - 1"] + +port_analysis_config: &port_analysis_config + strategy: + class: TopkDropoutStrategy + module_path: qlib.contrib.strategy.strategy + kwargs: + topk: 50 + n_drop: 5 + backtest: + verbose: False + limit_threshold: 0.095 + account: 100000000 + benchmark: *benchmark + deal_price: close + open_cost: 0.0005 + close_cost: 0.0015 + min_cost: 5 +task: + model: + class: LocalformerModel + module_path: qlib.contrib.model.pytorch_localformer_ts + kwargs: + seed: 0 + n_jobs: 20 + dataset: + class: TSDatasetH + module_path: qlib.data.dataset + kwargs: + handler: + class: Alpha158 + module_path: qlib.contrib.data.handler + kwargs: *data_handler_config + segments: + train: [2008-01-01, 2014-12-31] + valid: [2015-01-01, 2016-12-31] + test: [2017-01-01, 2020-08-01] + step_len: 20 + record: + - class: SignalRecord + module_path: qlib.workflow.record_temp + kwargs: {} + - class: SigAnaRecord + module_path: qlib.workflow.record_temp + kwargs: + ana_long_short: False + ann_scaler: 252 + - class: PortAnaRecord + module_path: qlib.workflow.record_temp + kwargs: + config: *port_analysis_config diff --git a/examples/benchmarks/Localformer/workflow_config_localformer_Alpha360.yaml b/examples/benchmarks/Localformer/workflow_config_localformer_Alpha360.yaml new file mode 100644 index 000000000..1c8489461 --- /dev/null +++ b/examples/benchmarks/Localformer/workflow_config_localformer_Alpha360.yaml @@ -0,0 +1,73 @@ +qlib_init: + provider_uri: "~/.qlib/qlib_data/cn_data" + region: cn +market: &market csi300 +benchmark: &benchmark SH000300 +data_handler_config: &data_handler_config + start_time: 2008-01-01 + end_time: 2020-08-01 + fit_start_time: 2008-01-01 + fit_end_time: 2014-12-31 + instruments: *market + infer_processors: + - class: RobustZScoreNorm + kwargs: + fields_group: feature + clip_outlier: true + - class: Fillna + kwargs: + fields_group: feature + learn_processors: + - class: DropnaLabel + - class: CSRankNorm + kwargs: + fields_group: label + label: ["Ref($close, -2) / Ref($close, -1) - 1"] +port_analysis_config: &port_analysis_config + strategy: + class: TopkDropoutStrategy + module_path: qlib.contrib.strategy.strategy + kwargs: + topk: 50 + n_drop: 5 + backtest: + verbose: False + limit_threshold: 0.095 + account: 100000000 + benchmark: *benchmark + deal_price: close + open_cost: 0.0005 + close_cost: 0.0015 + min_cost: 5 +task: + model: + class: LocalformerModel + module_path: qlib.contrib.model.pytorch_localformer + kwargs: + d_feat: 6 + seed: 0 + dataset: + class: DatasetH + module_path: qlib.data.dataset + kwargs: + handler: + class: Alpha360 + module_path: qlib.contrib.data.handler + kwargs: *data_handler_config + segments: + train: [2008-01-01, 2014-12-31] + valid: [2015-01-01, 2016-12-31] + test: [2017-01-01, 2020-08-01] + record: + - class: SignalRecord + module_path: qlib.workflow.record_temp + kwargs: {} + - class: SigAnaRecord + module_path: qlib.workflow.record_temp + kwargs: + ana_long_short: False + ann_scaler: 252 + - class: PortAnaRecord + module_path: qlib.workflow.record_temp + kwargs: + config: *port_analysis_config \ No newline at end of file diff --git a/examples/benchmarks/README.md b/examples/benchmarks/README.md index 1920a6a3b..ee2c0a833 100644 --- a/examples/benchmarks/README.md +++ b/examples/benchmarks/README.md @@ -1,6 +1,6 @@ # Benchmarks Performance -Here are the results of each benchmark model running on Qlib's `Alpha360` and `Alpha158` dataset with China's A shared-stock & CSI300 data respectively. The values of each metric are the mean and std calculated based on 20 runs. +Here are the results of each benchmark model running on Qlib's `Alpha360` and `Alpha158` dataset with China's A shared-stock & CSI300 data respectively. The values of each metric are the mean and std calculated based on 20 runs with different random seeds. The numbers shown below demonstrate the performance of the entire `workflow` of each model. We will update the `workflow` as well as models in the near future for better results. @@ -23,6 +23,8 @@ The numbers shown below demonstrate the performance of the entire `workflow` of | DoubleEnsemble (Chuheng Zhang, et al.) | Alpha360 | 0.0407±0.00| 0.3053±0.00 | 0.0490±0.00 | 0.3840±0.00 | 0.0380±0.02 | 0.5000±0.21 | -0.0984±0.02 | | TabNet (Sercan O. Arik, et al.)| Alpha360 | 0.0192±0.00 | 0.1401±0.00| 0.0291±0.00 | 0.2163±0.00 | -0.0258±0.00 | -0.2961±0.00| -0.1429±0.00 | | TCTS (Xueqing Wu, et al.)| Alpha360 | 0.0485±0.00 | 0.3689±0.04| 0.0586±0.00 | 0.4669±0.02 | 0.0816±0.02 | 1.1572±0.30| -0.0689±0.02 | +| Transformer (Ashish Vaswani, et al.)| Alpha360 | 0.0141±0.00 | 0.0917±0.02| 0.0331±0.00 | 0.2357±0.03 | -0.0259±0.03 | -0.3323±0.43| -0.1763±0.07 | +| Localformer (Juyong Jiang, et al.)| Alpha360 | 0.0408±0.00 | 0.2988±0.03| 0.0538±0.00 | 0.4105±0.02 | 0.0275±0.03 | 0.3464±0.37| -0.1182±0.03 | ## Alpha158 dataset | Model Name | Dataset | IC | ICIR | Rank IC | Rank ICIR | Annualized Return | Information Ratio | Max Drawdown | @@ -39,6 +41,8 @@ The numbers shown below demonstrate the performance of the entire `workflow` of | GATs (Petar Velickovic, et al.) | Alpha158 (with selected 20 features) | 0.0349±0.00 | 0.2511±0.01| 0.0457±0.00 | 0.3537±0.01 | 0.0578±0.02 | 0.8221±0.25| -0.0824±0.02 | | DoubleEnsemble (Chuheng Zhang, et al.) | Alpha158 | 0.0544±0.00 | 0.4338±0.01 | 0.0523±0.00 | 0.4257±0.01 | 0.1253±0.01 | 1.4105±0.14 | -0.0902±0.01 | | TabNet (Sercan O. Arik, et al.)| Alpha158 | 0.0383±0.00 | 0.3414±0.00| 0.0388±0.00 | 0.3460±0.00 | 0.0226±0.00 | 0.2652±0.00| -0.1072±0.00 | +| Transformer (Ashish Vaswani, et al.)| Alpha158 | 0.0274±0.00 | 0.2166±0.04| 0.0409±0.00 | 0.3342±0.04 | 0.0204±0.03 | 0.2888±0.40| -0.1216±0.04 | +| Localformer (Juyong Jiang, et al.)| Alpha158 | 0.0355±0.00 | 0.2747±0.04| 0.0466±0.00 | 0.3762±0.03 | 0.0506±0.02 | 0.7447±0.34| -0.0875±0.02 | - The selected 20 features are based on the feature importance of a lightgbm-based model. - The base model of DoubleEnsemble is LGBM. diff --git a/examples/benchmarks/Transformer/requirements.txt b/examples/benchmarks/Transformer/requirements.txt new file mode 100644 index 000000000..d5b918797 --- /dev/null +++ b/examples/benchmarks/Transformer/requirements.txt @@ -0,0 +1,3 @@ +numpy==1.17.4 +pandas==1.1.2 +torch==1.2.0 \ No newline at end of file diff --git a/examples/benchmarks/Transformer/workflow_config_transformer_Alpha158.yaml b/examples/benchmarks/Transformer/workflow_config_transformer_Alpha158.yaml new file mode 100644 index 000000000..54707386f --- /dev/null +++ b/examples/benchmarks/Transformer/workflow_config_transformer_Alpha158.yaml @@ -0,0 +1,82 @@ +qlib_init: + provider_uri: "~/.qlib/qlib_data/cn_data" + region: cn +market: &market csi300 +benchmark: &benchmark SH000300 +data_handler_config: &data_handler_config + start_time: 2008-01-01 + end_time: 2020-08-01 + fit_start_time: 2008-01-01 + fit_end_time: 2014-12-31 + instruments: *market + infer_processors: + - class: FilterCol + kwargs: + fields_group: feature + col_list: ["RESI5", "WVMA5", "RSQR5", "KLEN", "RSQR10", "CORR5", "CORD5", "CORR10", + "ROC60", "RESI10", "VSTD5", "RSQR60", "CORR60", "WVMA60", "STD5", + "RSQR20", "CORD60", "CORD10", "CORR20", "KLOW" + ] + - class: RobustZScoreNorm + kwargs: + fields_group: feature + clip_outlier: true + - class: Fillna + kwargs: + fields_group: feature + learn_processors: + - class: DropnaLabel + - class: CSRankNorm + kwargs: + fields_group: label + label: ["Ref($close, -2) / Ref($close, -1) - 1"] + +port_analysis_config: &port_analysis_config + strategy: + class: TopkDropoutStrategy + module_path: qlib.contrib.strategy.strategy + kwargs: + topk: 50 + n_drop: 5 + backtest: + verbose: False + limit_threshold: 0.095 + account: 100000000 + benchmark: *benchmark + deal_price: close + open_cost: 0.0005 + close_cost: 0.0015 + min_cost: 5 +task: + model: + class: TransformerModel + module_path: qlib.contrib.model.pytorch_transformer_ts + kwargs: + seed: 0 + n_jobs: 20 + dataset: + class: TSDatasetH + module_path: qlib.data.dataset + kwargs: + handler: + class: Alpha158 + module_path: qlib.contrib.data.handler + kwargs: *data_handler_config + segments: + train: [2008-01-01, 2014-12-31] + valid: [2015-01-01, 2016-12-31] + test: [2017-01-01, 2020-08-01] + step_len: 20 + record: + - class: SignalRecord + module_path: qlib.workflow.record_temp + kwargs: {} + - class: SigAnaRecord + module_path: qlib.workflow.record_temp + kwargs: + ana_long_short: False + ann_scaler: 252 + - class: PortAnaRecord + module_path: qlib.workflow.record_temp + kwargs: + config: *port_analysis_config diff --git a/examples/benchmarks/Transformer/workflow_config_transformer_Alpha360.yaml b/examples/benchmarks/Transformer/workflow_config_transformer_Alpha360.yaml new file mode 100644 index 000000000..e568a1b30 --- /dev/null +++ b/examples/benchmarks/Transformer/workflow_config_transformer_Alpha360.yaml @@ -0,0 +1,73 @@ +qlib_init: + provider_uri: "~/.qlib/qlib_data/cn_data" + region: cn +market: &market csi300 +benchmark: &benchmark SH000300 +data_handler_config: &data_handler_config + start_time: 2008-01-01 + end_time: 2020-08-01 + fit_start_time: 2008-01-01 + fit_end_time: 2014-12-31 + instruments: *market + infer_processors: + - class: RobustZScoreNorm + kwargs: + fields_group: feature + clip_outlier: true + - class: Fillna + kwargs: + fields_group: feature + learn_processors: + - class: DropnaLabel + - class: CSRankNorm + kwargs: + fields_group: label + label: ["Ref($close, -2) / Ref($close, -1) - 1"] +port_analysis_config: &port_analysis_config + strategy: + class: TopkDropoutStrategy + module_path: qlib.contrib.strategy.strategy + kwargs: + topk: 50 + n_drop: 5 + backtest: + verbose: False + limit_threshold: 0.095 + account: 100000000 + benchmark: *benchmark + deal_price: close + open_cost: 0.0005 + close_cost: 0.0015 + min_cost: 5 +task: + model: + class: TransformerModel + module_path: qlib.contrib.model.pytorch_transformer + kwargs: + d_feat: 6 + seed: 0 + dataset: + class: DatasetH + module_path: qlib.data.dataset + kwargs: + handler: + class: Alpha360 + module_path: qlib.contrib.data.handler + kwargs: *data_handler_config + segments: + train: [2008-01-01, 2014-12-31] + valid: [2015-01-01, 2016-12-31] + test: [2017-01-01, 2020-08-01] + record: + - class: SignalRecord + module_path: qlib.workflow.record_temp + kwargs: {} + - class: SigAnaRecord + module_path: qlib.workflow.record_temp + kwargs: + ana_long_short: False + ann_scaler: 252 + - class: PortAnaRecord + module_path: qlib.workflow.record_temp + kwargs: + config: *port_analysis_config \ No newline at end of file diff --git a/examples/highfreq/highfreq_handler.py b/examples/highfreq/highfreq_handler.py index 19bb2550b..c15c3ec41 100644 --- a/examples/highfreq/highfreq_handler.py +++ b/examples/highfreq/highfreq_handler.py @@ -99,8 +99,6 @@ class HighFreqHandler(DataHandlerLP): ] names += ["$volume_1"] - fields += ["Cut({0}, 240, None)".format(template_paused.format("Date($close)"))] - names += ["date"] return fields, names diff --git a/examples/highfreq/highfreq_processor.py b/examples/highfreq/highfreq_processor.py index f0ab0dec2..62065469b 100644 --- a/examples/highfreq/highfreq_processor.py +++ b/examples/highfreq/highfreq_processor.py @@ -33,6 +33,9 @@ class HighFreqNorm(Processor): self.feature_vmin[name] = np.nanmin(part_values) def __call__(self, df_features): + df_features["date"] = pd.to_datetime( + df_features.index.get_level_values(level="datetime").to_series().dt.date.values + ) df_features.set_index("date", append=True, drop=True, inplace=True) df_values = df_features.values names = { diff --git a/examples/run_all_model.py b/examples/run_all_model.py index c79fee004..1284d8e99 100644 --- a/examples/run_all_model.py +++ b/examples/run_all_model.py @@ -23,7 +23,6 @@ from qlib.config import REG_CN from qlib.workflow import R from qlib.tests.data import GetData - # init qlib provider_uri = "~/.qlib/qlib_data/cn_data" exp_folder_name = "run_all_model_records" @@ -40,6 +39,7 @@ exp_manager = { GetData().qlib_data(target_dir=provider_uri, region=REG_CN, exists_skip=True) qlib.init(provider_uri=provider_uri, region=REG_CN, exp_manager=exp_manager) + # decorator to check the arguments def only_allow_defined_args(function_to_decorate): @functools.wraps(function_to_decorate) @@ -92,7 +92,8 @@ def create_env(): # function to execute the cmd -def execute(cmd): +def execute(cmd, wait_when_err=False): + print("Running CMD:", cmd) with subprocess.Popen(cmd, stdout=subprocess.PIPE, bufsize=1, universal_newlines=True, shell=True) as p: for line in p.stdout: sys.stdout.write(line.split("\b")[0]) @@ -102,6 +103,8 @@ def execute(cmd): sys.stdout.write("\b" * 10 + "\b".join(line.split("\b")[1:-1])) if p.returncode != 0: + if wait_when_err: + input("Press Enter to Continue") return p.stderr else: return None @@ -184,7 +187,15 @@ def gen_and_save_md_table(metrics, dataset): # function to run the all the models @only_allow_defined_args -def run(times=1, models=None, dataset="Alpha360", exclude=False): +def run( + times=1, + models=None, + dataset="Alpha360", + exclude=False, + qlib_uri: str = "git+https://github.com/microsoft/qlib#egg=pyqlib", + wait_before_rm_env: bool = False, + wait_when_err: bool = False, +): """ Please be aware that this function can only work under Linux. MacOS and Windows will be supported in the future. Any PR to enhance this method is highly welcomed. Besides, this script doesn't support parrallel running the same model @@ -200,6 +211,13 @@ def run(times=1, models=None, dataset="Alpha360", exclude=False): determines whether the model being used is excluded or included. dataset : str determines the dataset to be used for each model. + qlib_uri : str + the uri to install qlib with pip + it could be url on the we or local path + wait_before_rm_env : bool + wait before remove environment. + wait_when_err : bool + wait when errors raised when executing commands Usage: ------- @@ -240,32 +258,36 @@ def run(times=1, models=None, dataset="Alpha360", exclude=False): sys.stderr.write("\n") # install requirements.txt sys.stderr.write("Installing requirements.txt...\n") - execute(f"{python_path} -m pip install -r {req_path}") + execute(f"{python_path} -m pip install -r {req_path}", wait_when_err=wait_when_err) sys.stderr.write("\n") # setup gpu for tft if fn == "TFT": execute( - f"conda install -y --prefix {env_path} anaconda cudatoolkit=10.0 && conda install -y --prefix {env_path} cudnn" + f"conda install -y --prefix {env_path} anaconda cudatoolkit=10.0 && conda install -y --prefix {env_path} cudnn", + wait_when_err=wait_when_err, ) sys.stderr.write("\n") # install qlib sys.stderr.write("Installing qlib...\n") - execute(f"{python_path} -m pip install --upgrade pip") # TODO: FIX ME! - execute(f"{python_path} -m pip install --upgrade cython") # TODO: FIX ME! + execute(f"{python_path} -m pip install --upgrade pip", wait_when_err=wait_when_err) # TODO: FIX ME! + execute(f"{python_path} -m pip install --upgrade cython", wait_when_err=wait_when_err) # TODO: FIX ME! if fn == "TFT": execute( - f"cd {env_path} && {python_path} -m pip install --upgrade --force-reinstall --ignore-installed PyYAML -e git+https://github.com/microsoft/qlib#egg=pyqlib" + f"cd {env_path} && {python_path} -m pip install --upgrade --force-reinstall --ignore-installed PyYAML -e {qlib_uri}", + wait_when_err=wait_when_err, ) # TODO: FIX ME! else: execute( - f"cd {env_path} && {python_path} -m pip install --upgrade --force-reinstall -e git+https://github.com/microsoft/qlib#egg=pyqlib" + f"cd {env_path} && {python_path} -m pip install --upgrade --force-reinstall -e {qlib_uri}", + wait_when_err=wait_when_err, ) # TODO: FIX ME! sys.stderr.write("\n") # run workflow_by_config for multiple times for i in range(times): sys.stderr.write(f"Running the model: {fn} for iteration {i+1}...\n") errs = execute( - f"{python_path} {env_path / 'src/pyqlib/qlib/workflow/cli.py'} {yaml_path} {fn} {exp_folder_name}" + f"{python_path} {env_path / 'bin' / 'qrun'} {yaml_path} {fn} {exp_folder_name}", + wait_when_err=wait_when_err, ) if errs is not None: _errs = errors.get(fn, {}) @@ -274,6 +296,8 @@ def run(times=1, models=None, dataset="Alpha360", exclude=False): sys.stderr.write("\n") # remove env sys.stderr.write(f"Deleting the environment: {env_path}...\n") + if wait_before_rm_env: + input("Press Enter to Continue") shutil.rmtree(env_path) # getting all results sys.stderr.write(f"Retrieving results...\n") diff --git a/qlib/__init__.py b/qlib/__init__.py index 5f45f4557..6f76bbcaa 100644 --- a/qlib/__init__.py +++ b/qlib/__init__.py @@ -2,7 +2,7 @@ # Licensed under the MIT License. -__version__ = "0.6.3.99" +__version__ = "0.7.0.99" __version__bak = __version__ # This version is backup for QlibConfig.reset_qlib_version diff --git a/qlib/contrib/model/pytorch_localformer.py b/qlib/contrib/model/pytorch_localformer.py new file mode 100644 index 000000000..2ec56067f --- /dev/null +++ b/qlib/contrib/model/pytorch_localformer.py @@ -0,0 +1,331 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +from __future__ import division +from __future__ import print_function + +import os +import numpy as np +import pandas as pd +from typing import Text, Union +import copy +import math +from ...utils import get_or_create_path +from ...log import get_module_logger + +import torch +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader + +from .pytorch_utils import count_parameters +from ...model.base import Model +from ...data.dataset import DatasetH, TSDatasetH +from ...data.dataset.handler import DataHandlerLP +from torch.nn.modules.container import ModuleList + +# qrun examples/benchmarks/Localformer/workflow_config_localformer_Alpha360.yaml ” + + +class LocalformerModel(Model): + def __init__( + self, + d_feat: int = 20, + d_model: int = 64, + batch_size: int = 2048, + nhead: int = 2, + num_layers: int = 2, + dropout: float = 0, + n_epochs=100, + lr=0.0001, + metric="", + early_stop=5, + loss="mse", + optimizer="adam", + reg=1e-3, + n_jobs=10, + GPU=0, + seed=None, + **kwargs + ): + + # set hyper-parameters. + self.d_model = d_model + self.dropout = dropout + self.n_epochs = n_epochs + self.lr = lr + self.reg = reg + self.metric = metric + self.batch_size = batch_size + self.early_stop = early_stop + self.optimizer = optimizer.lower() + self.loss = loss + self.n_jobs = n_jobs + self.device = torch.device("cuda:%d" % GPU if torch.cuda.is_available() and GPU >= 0 else "cpu") + self.seed = seed + self.logger = get_module_logger("TransformerModel") + self.logger.info("Naive Transformer:" "\nbatch_size : {}" "\ndevice : {}".format(self.batch_size, self.device)) + + if self.seed is not None: + np.random.seed(self.seed) + torch.manual_seed(self.seed) + + self.model = Transformer(d_feat, d_model, nhead, num_layers, dropout, self.device) + if optimizer.lower() == "adam": + self.train_optimizer = optim.Adam(self.model.parameters(), lr=self.lr, weight_decay=self.reg) + elif optimizer.lower() == "gd": + self.train_optimizer = optim.SGD(self.model.parameters(), lr=self.lr, weight_decay=self.reg) + else: + raise NotImplementedError("optimizer {} is not supported!".format(optimizer)) + + self.fitted = False + self.model.to(self.device) + + @property + def use_gpu(self): + return self.device != torch.device("cpu") + + def mse(self, pred, label): + loss = (pred.float() - label.float()) ** 2 + return torch.mean(loss) + + def loss_fn(self, pred, label): + mask = ~torch.isnan(label) + + if self.loss == "mse": + return self.mse(pred[mask], label[mask]) + + raise ValueError("unknown loss `%s`" % self.loss) + + def metric_fn(self, pred, label): + + mask = torch.isfinite(label) + + if self.metric == "" or self.metric == "loss": + return -self.loss_fn(pred[mask], label[mask]) + + raise ValueError("unknown metric `%s`" % self.metric) + + def train_epoch(self, x_train, y_train): + + x_train_values = x_train.values + y_train_values = np.squeeze(y_train.values) + + self.model.train() + + indices = np.arange(len(x_train_values)) + np.random.shuffle(indices) + + for i in range(len(indices))[:: self.batch_size]: + + if len(indices) - i < self.batch_size: + break + + feature = torch.from_numpy(x_train_values[indices[i : i + self.batch_size]]).float().to(self.device) + label = torch.from_numpy(y_train_values[indices[i : i + self.batch_size]]).float().to(self.device) + + pred = self.model(feature) + loss = self.loss_fn(pred, label) + + self.train_optimizer.zero_grad() + loss.backward() + torch.nn.utils.clip_grad_value_(self.model.parameters(), 3.0) + self.train_optimizer.step() + + def test_epoch(self, data_x, data_y): + + # prepare training data + x_values = data_x.values + y_values = np.squeeze(data_y.values) + + self.model.eval() + + scores = [] + losses = [] + + indices = np.arange(len(x_values)) + + for i in range(len(indices))[:: self.batch_size]: + + if len(indices) - i < self.batch_size: + break + + feature = torch.from_numpy(x_values[indices[i : i + self.batch_size]]).float().to(self.device) + label = torch.from_numpy(y_values[indices[i : i + self.batch_size]]).float().to(self.device) + + with torch.no_grad(): + pred = self.model(feature) + loss = self.loss_fn(pred, label) + losses.append(loss.item()) + + score = self.metric_fn(pred, label) + scores.append(score.item()) + + return np.mean(losses), np.mean(scores) + + def fit( + self, + dataset: DatasetH, + evals_result=dict(), + save_path=None, + ): + + df_train, df_valid, df_test = dataset.prepare( + ["train", "valid", "test"], + col_set=["feature", "label"], + data_key=DataHandlerLP.DK_L, + ) + + x_train, y_train = df_train["feature"], df_train["label"] + x_valid, y_valid = df_valid["feature"], df_valid["label"] + + save_path = get_or_create_path(save_path) + stop_steps = 0 + train_loss = 0 + best_score = -np.inf + best_epoch = 0 + evals_result["train"] = [] + evals_result["valid"] = [] + + # train + self.logger.info("training...") + self.fitted = True + + for step in range(self.n_epochs): + self.logger.info("Epoch%d:", step) + self.logger.info("training...") + self.train_epoch(x_train, y_train) + self.logger.info("evaluating...") + train_loss, train_score = self.test_epoch(x_train, y_train) + val_loss, val_score = self.test_epoch(x_valid, y_valid) + self.logger.info("train %.6f, valid %.6f" % (train_score, val_score)) + evals_result["train"].append(train_score) + evals_result["valid"].append(val_score) + + if val_score > best_score: + best_score = val_score + stop_steps = 0 + best_epoch = step + best_param = copy.deepcopy(self.model.state_dict()) + else: + stop_steps += 1 + if stop_steps >= self.early_stop: + self.logger.info("early stop") + break + + self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch)) + self.model.load_state_dict(best_param) + torch.save(best_param, save_path) + + if self.use_gpu: + torch.cuda.empty_cache() + + def predict(self, dataset: DatasetH, segment: Union[Text, slice] = "test"): + if not self.fitted: + raise ValueError("model is not fitted yet!") + + x_test = dataset.prepare(segment, col_set="feature", data_key=DataHandlerLP.DK_I) + index = x_test.index + self.model.eval() + x_values = x_test.values + sample_num = x_values.shape[0] + preds = [] + + for begin in range(sample_num)[:: self.batch_size]: + + if sample_num - begin < self.batch_size: + end = sample_num + else: + end = begin + self.batch_size + + x_batch = torch.from_numpy(x_values[begin:end]).float().to(self.device) + + with torch.no_grad(): + pred = self.model(x_batch).detach().cpu().numpy() + + preds.append(pred) + + return pd.Series(np.concatenate(preds), index=index) + + +class PositionalEncoding(nn.Module): + def __init__(self, d_model, max_len=1000): + super(PositionalEncoding, self).__init__() + pe = torch.zeros(max_len, d_model) + position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) + div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) + pe[:, 0::2] = torch.sin(position * div_term) + pe[:, 1::2] = torch.cos(position * div_term) + pe = pe.unsqueeze(0).transpose(0, 1) + self.register_buffer("pe", pe) + + def forward(self, x): + # [T, N, F] + return x + self.pe[: x.size(0), :] + + +def _get_clones(module, N): + return ModuleList([copy.deepcopy(module) for i in range(N)]) + + +class LocalformerEncoder(nn.Module): + __constants__ = ["norm"] + + def __init__(self, encoder_layer, num_layers, d_model): + super(LocalformerEncoder, self).__init__() + self.layers = _get_clones(encoder_layer, num_layers) + self.conv = _get_clones(nn.Conv1d(d_model, d_model, 3, 1, 1), num_layers) + self.num_layers = num_layers + + def forward(self, src, mask): + output = src + out = src + + for i, mod in enumerate(self.layers): + # [T, N, F] --> [N, T, F] --> [N, F, T] + out = output.transpose(1, 0).transpose(2, 1) + out = self.conv[i](out).transpose(2, 1).transpose(1, 0) + + output = mod(output + out, src_mask=mask) + + return output + out + + +class Transformer(nn.Module): + def __init__(self, d_feat=6, d_model=8, nhead=4, num_layers=2, dropout=0.5, device=None): + super(Transformer, self).__init__() + self.rnn = nn.GRU( + input_size=d_model, + hidden_size=d_model, + num_layers=num_layers, + batch_first=False, + dropout=dropout, + ) + self.feature_layer = nn.Linear(d_feat, d_model) + self.pos_encoder = PositionalEncoding(d_model) + self.encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dropout=dropout) + self.transformer_encoder = LocalformerEncoder(self.encoder_layer, num_layers=num_layers, d_model=d_model) + self.decoder_layer = nn.Linear(d_model, 1) + self.device = device + self.d_feat = d_feat + + def forward(self, src): + # src [N, F*T] --> [N, T, F] + src = src.reshape(len(src), self.d_feat, -1).permute(0, 2, 1) + src = self.feature_layer(src) + + # src [N, T, F] --> [T, N, F], [60, 512, 8] + src = src.transpose(1, 0) # not batch first + + mask = None + + src = self.pos_encoder(src) + output = self.transformer_encoder(src, mask) # [60, 512, 8] + + output, _ = self.rnn(output) + + # [T, N, F] --> [N, T*F] + output = self.decoder_layer(output.transpose(1, 0)[:, -1, :]) # [512, 1] + + return output.squeeze() diff --git a/qlib/contrib/model/pytorch_localformer_ts.py b/qlib/contrib/model/pytorch_localformer_ts.py new file mode 100644 index 000000000..683a9bd4f --- /dev/null +++ b/qlib/contrib/model/pytorch_localformer_ts.py @@ -0,0 +1,308 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +from __future__ import division +from __future__ import print_function + +import os +import numpy as np +import pandas as pd +import copy +import math +from ...utils import get_or_create_path +from ...log import get_module_logger + +import torch +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader + +from .pytorch_utils import count_parameters +from ...model.base import Model +from ...data.dataset import DatasetH, TSDatasetH +from ...data.dataset.handler import DataHandlerLP +from torch.nn.modules.container import ModuleList + + +class LocalformerModel(Model): + def __init__( + self, + d_feat: int = 20, + d_model: int = 64, + batch_size: int = 8192, + nhead: int = 2, + num_layers: int = 2, + dropout: float = 0, + n_epochs=100, + lr=0.0001, + metric="", + early_stop=5, + loss="mse", + optimizer="adam", + reg=1e-3, + n_jobs=10, + GPU=0, + seed=None, + **kwargs + ): + + # set hyper-parameters. + self.d_model = d_model + self.dropout = dropout + self.n_epochs = n_epochs + self.lr = lr + self.reg = reg + self.metric = metric + self.batch_size = batch_size + self.early_stop = early_stop + self.optimizer = optimizer.lower() + self.loss = loss + self.n_jobs = n_jobs + self.device = torch.device("cuda:%d" % GPU if torch.cuda.is_available() and GPU >= 0 else "cpu") + self.seed = seed + self.logger = get_module_logger("TransformerModel") + self.logger.info( + "Improved Transformer:" "\nbatch_size : {}" "\ndevice : {}".format(self.batch_size, self.device) + ) + + if self.seed is not None: + np.random.seed(self.seed) + torch.manual_seed(self.seed) + + self.model = Transformer(d_feat, d_model, nhead, num_layers, dropout, self.device) + if optimizer.lower() == "adam": + self.train_optimizer = optim.Adam(self.model.parameters(), lr=self.lr, weight_decay=self.reg) + elif optimizer.lower() == "gd": + self.train_optimizer = optim.SGD(self.model.parameters(), lr=self.lr, weight_decay=self.reg) + else: + raise NotImplementedError("optimizer {} is not supported!".format(optimizer)) + + self.fitted = False + self.model.to(self.device) + + @property + def use_gpu(self): + return self.device != torch.device("cpu") + + def mse(self, pred, label): + loss = (pred.float() - label.float()) ** 2 + return torch.mean(loss) + + def loss_fn(self, pred, label): + mask = ~torch.isnan(label) + + if self.loss == "mse": + return self.mse(pred[mask], label[mask]) + + raise ValueError("unknown loss `%s`" % self.loss) + + def metric_fn(self, pred, label): + + mask = torch.isfinite(label) + + if self.metric == "" or self.metric == "loss": + return -self.loss_fn(pred[mask], label[mask]) + + raise ValueError("unknown metric `%s`" % self.metric) + + def train_epoch(self, data_loader): + + self.model.train() + + for data in data_loader: + feature = data[:, :, 0:-1].to(self.device) + label = data[:, -1, -1].to(self.device) + + pred = self.model(feature.float()) # .float() + loss = self.loss_fn(pred, label) + + self.train_optimizer.zero_grad() + loss.backward() + torch.nn.utils.clip_grad_value_(self.model.parameters(), 3.0) + self.train_optimizer.step() + + def test_epoch(self, data_loader): + + self.model.eval() + + scores = [] + losses = [] + + for data in data_loader: + + feature = data[:, :, 0:-1].to(self.device) + label = data[:, -1, -1].to(self.device) + + with torch.no_grad(): + pred = self.model(feature.float()) # .float() + loss = self.loss_fn(pred, label) + losses.append(loss.item()) + + score = self.metric_fn(pred, label) + scores.append(score.item()) + + return np.mean(losses), np.mean(scores) + + def fit( + self, + dataset: DatasetH, + evals_result=dict(), + save_path=None, + ): + + dl_train = dataset.prepare("train", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L) + dl_valid = dataset.prepare("valid", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L) + + dl_train.config(fillna_type="ffill+bfill") # process nan brought by dataloader + dl_valid.config(fillna_type="ffill+bfill") # process nan brought by dataloader + + train_loader = DataLoader( + dl_train, batch_size=self.batch_size, shuffle=True, num_workers=self.n_jobs, drop_last=True + ) + valid_loader = DataLoader( + dl_valid, batch_size=self.batch_size, shuffle=False, num_workers=self.n_jobs, drop_last=True + ) + + save_path = get_or_create_path(save_path) + + stop_steps = 0 + train_loss = 0 + best_score = -np.inf + best_epoch = 0 + evals_result["train"] = [] + evals_result["valid"] = [] + + # train + self.logger.info("training...") + self.fitted = True + + for step in range(self.n_epochs): + self.logger.info("Epoch%d:", step) + self.logger.info("training...") + self.train_epoch(train_loader) + self.logger.info("evaluating...") + train_loss, train_score = self.test_epoch(train_loader) + val_loss, val_score = self.test_epoch(valid_loader) + self.logger.info("train %.6f, valid %.6f" % (train_score, val_score)) + evals_result["train"].append(train_score) + evals_result["valid"].append(val_score) + + if val_score > best_score: + best_score = val_score + stop_steps = 0 + best_epoch = step + best_param = copy.deepcopy(self.model.state_dict()) + else: + stop_steps += 1 + if stop_steps >= self.early_stop: + self.logger.info("early stop") + break + + self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch)) + self.model.load_state_dict(best_param) + torch.save(best_param, save_path) + + if self.use_gpu: + torch.cuda.empty_cache() + + def predict(self, dataset): + if not self.fitted: + raise ValueError("model is not fitted yet!") + + dl_test = dataset.prepare("test", col_set=["feature", "label"], data_key=DataHandlerLP.DK_I) + dl_test.config(fillna_type="ffill+bfill") + test_loader = DataLoader(dl_test, batch_size=self.batch_size, num_workers=self.n_jobs) + self.model.eval() + preds = [] + + for data in test_loader: + feature = data[:, :, 0:-1].to(self.device) + + with torch.no_grad(): + pred = self.model(feature.float()).detach().cpu().numpy() + + preds.append(pred) + + return pd.Series(np.concatenate(preds), index=dl_test.get_index()) + + +class PositionalEncoding(nn.Module): + def __init__(self, d_model, max_len=1000): + super(PositionalEncoding, self).__init__() + pe = torch.zeros(max_len, d_model) + position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) + div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) + pe[:, 0::2] = torch.sin(position * div_term) + pe[:, 1::2] = torch.cos(position * div_term) + pe = pe.unsqueeze(0).transpose(0, 1) + self.register_buffer("pe", pe) + + def forward(self, x): + # [T, N, F] + return x + self.pe[: x.size(0), :] + + +def _get_clones(module, N): + return ModuleList([copy.deepcopy(module) for i in range(N)]) + + +class LocalformerEncoder(nn.Module): + __constants__ = ["norm"] + + def __init__(self, encoder_layer, num_layers, d_model): + super(LocalformerEncoder, self).__init__() + self.layers = _get_clones(encoder_layer, num_layers) + self.conv = _get_clones(nn.Conv1d(d_model, d_model, 3, 1, 1), num_layers) + self.num_layers = num_layers + + def forward(self, src, mask): + output = src + out = src + + for i, mod in enumerate(self.layers): + # [T, N, F] --> [N, T, F] --> [N, F, T] + out = output.transpose(1, 0).transpose(2, 1) + out = self.conv[i](out).transpose(2, 1).transpose(1, 0) + + output = mod(output + out, src_mask=mask) + + return output + out + + +class Transformer(nn.Module): + def __init__(self, d_feat=6, d_model=8, nhead=4, num_layers=2, dropout=0.5, device=None): + super(Transformer, self).__init__() + self.rnn = nn.GRU( + input_size=d_model, + hidden_size=d_model, + num_layers=num_layers, + batch_first=False, + dropout=dropout, + ) + self.feature_layer = nn.Linear(d_feat, d_model) + self.pos_encoder = PositionalEncoding(d_model) + self.encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dropout=dropout) + self.transformer_encoder = LocalformerEncoder(self.encoder_layer, num_layers=num_layers, d_model=d_model) + self.decoder_layer = nn.Linear(d_model, 1) + self.device = device + self.d_feat = d_feat + + def forward(self, src): + # src [N, T, F], [512, 60, 6] + src = self.feature_layer(src) # [512, 60, 8] + + # src [N, T, F] --> [T, N, F], [60, 512, 8] + src = src.transpose(1, 0) # not batch first + + mask = None + + src = self.pos_encoder(src) + output = self.transformer_encoder(src, mask) # [60, 512, 8] + + output, _ = self.rnn(output) + + # [T, N, F] --> [N, T*F] + output = self.decoder_layer(output.transpose(1, 0)[:, -1, :]) # [512, 1] + + return output.squeeze() diff --git a/qlib/contrib/model/pytorch_transformer.py b/qlib/contrib/model/pytorch_transformer.py new file mode 100644 index 000000000..53ebff3c5 --- /dev/null +++ b/qlib/contrib/model/pytorch_transformer.py @@ -0,0 +1,294 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +from __future__ import division +from __future__ import print_function + +import os +import numpy as np +import pandas as pd +from typing import Text, Union +import copy +import math +from ...utils import get_or_create_path +from ...log import get_module_logger + +import torch +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader + +from .pytorch_utils import count_parameters +from ...model.base import Model +from ...data.dataset import DatasetH, TSDatasetH +from ...data.dataset.handler import DataHandlerLP + +# qrun examples/benchmarks/Transformer/workflow_config_transformer_Alpha360.yaml ” + + +class TransformerModel(Model): + def __init__( + self, + d_feat: int = 20, + d_model: int = 64, + batch_size: int = 2048, + nhead: int = 2, + num_layers: int = 2, + dropout: float = 0, + n_epochs=100, + lr=0.0001, + metric="", + early_stop=5, + loss="mse", + optimizer="adam", + reg=1e-3, + n_jobs=10, + GPU=0, + seed=None, + **kwargs + ): + + # set hyper-parameters. + self.d_model = d_model + self.dropout = dropout + self.n_epochs = n_epochs + self.lr = lr + self.reg = reg + self.metric = metric + self.batch_size = batch_size + self.early_stop = early_stop + self.optimizer = optimizer.lower() + self.loss = loss + self.n_jobs = n_jobs + self.device = torch.device("cuda:%d" % GPU if torch.cuda.is_available() and GPU >= 0 else "cpu") + self.seed = seed + self.logger = get_module_logger("TransformerModel") + self.logger.info("Naive Transformer:" "\nbatch_size : {}" "\ndevice : {}".format(self.batch_size, self.device)) + + if self.seed is not None: + np.random.seed(self.seed) + torch.manual_seed(self.seed) + + self.model = Transformer(d_feat, d_model, nhead, num_layers, dropout, self.device) + if optimizer.lower() == "adam": + self.train_optimizer = optim.Adam(self.model.parameters(), lr=self.lr, weight_decay=self.reg) + elif optimizer.lower() == "gd": + self.train_optimizer = optim.SGD(self.model.parameters(), lr=self.lr, weight_decay=self.reg) + else: + raise NotImplementedError("optimizer {} is not supported!".format(optimizer)) + + self.fitted = False + self.model.to(self.device) + + @property + def use_gpu(self): + return self.device != torch.device("cpu") + + def mse(self, pred, label): + loss = (pred.float() - label.float()) ** 2 + return torch.mean(loss) + + def loss_fn(self, pred, label): + mask = ~torch.isnan(label) + + if self.loss == "mse": + return self.mse(pred[mask], label[mask]) + + raise ValueError("unknown loss `%s`" % self.loss) + + def metric_fn(self, pred, label): + + mask = torch.isfinite(label) + + if self.metric == "" or self.metric == "loss": + return -self.loss_fn(pred[mask], label[mask]) + + raise ValueError("unknown metric `%s`" % self.metric) + + def train_epoch(self, x_train, y_train): + + x_train_values = x_train.values + y_train_values = np.squeeze(y_train.values) + + self.model.train() + + indices = np.arange(len(x_train_values)) + np.random.shuffle(indices) + + for i in range(len(indices))[:: self.batch_size]: + + if len(indices) - i < self.batch_size: + break + + feature = torch.from_numpy(x_train_values[indices[i : i + self.batch_size]]).float().to(self.device) + label = torch.from_numpy(y_train_values[indices[i : i + self.batch_size]]).float().to(self.device) + + pred = self.model(feature) + loss = self.loss_fn(pred, label) + + self.train_optimizer.zero_grad() + loss.backward() + torch.nn.utils.clip_grad_value_(self.model.parameters(), 3.0) + self.train_optimizer.step() + + def test_epoch(self, data_x, data_y): + + # prepare training data + x_values = data_x.values + y_values = np.squeeze(data_y.values) + + self.model.eval() + + scores = [] + losses = [] + + indices = np.arange(len(x_values)) + + for i in range(len(indices))[:: self.batch_size]: + + if len(indices) - i < self.batch_size: + break + + feature = torch.from_numpy(x_values[indices[i : i + self.batch_size]]).float().to(self.device) + label = torch.from_numpy(y_values[indices[i : i + self.batch_size]]).float().to(self.device) + + with torch.no_grad(): + pred = self.model(feature) + loss = self.loss_fn(pred, label) + losses.append(loss.item()) + + score = self.metric_fn(pred, label) + scores.append(score.item()) + + return np.mean(losses), np.mean(scores) + + def fit( + self, + dataset: DatasetH, + evals_result=dict(), + save_path=None, + ): + + df_train, df_valid, df_test = dataset.prepare( + ["train", "valid", "test"], + col_set=["feature", "label"], + data_key=DataHandlerLP.DK_L, + ) + + x_train, y_train = df_train["feature"], df_train["label"] + x_valid, y_valid = df_valid["feature"], df_valid["label"] + + save_path = get_or_create_path(save_path) + stop_steps = 0 + train_loss = 0 + best_score = -np.inf + best_epoch = 0 + evals_result["train"] = [] + evals_result["valid"] = [] + + # train + self.logger.info("training...") + self.fitted = True + + for step in range(self.n_epochs): + self.logger.info("Epoch%d:", step) + self.logger.info("training...") + self.train_epoch(x_train, y_train) + self.logger.info("evaluating...") + train_loss, train_score = self.test_epoch(x_train, y_train) + val_loss, val_score = self.test_epoch(x_valid, y_valid) + self.logger.info("train %.6f, valid %.6f" % (train_score, val_score)) + evals_result["train"].append(train_score) + evals_result["valid"].append(val_score) + + if val_score > best_score: + best_score = val_score + stop_steps = 0 + best_epoch = step + best_param = copy.deepcopy(self.model.state_dict()) + else: + stop_steps += 1 + if stop_steps >= self.early_stop: + self.logger.info("early stop") + break + + self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch)) + self.model.load_state_dict(best_param) + torch.save(best_param, save_path) + + if self.use_gpu: + torch.cuda.empty_cache() + + def predict(self, dataset: DatasetH, segment: Union[Text, slice] = "test"): + if not self.fitted: + raise ValueError("model is not fitted yet!") + + x_test = dataset.prepare(segment, col_set="feature", data_key=DataHandlerLP.DK_I) + index = x_test.index + self.model.eval() + x_values = x_test.values + sample_num = x_values.shape[0] + preds = [] + + for begin in range(sample_num)[:: self.batch_size]: + + if sample_num - begin < self.batch_size: + end = sample_num + else: + end = begin + self.batch_size + + x_batch = torch.from_numpy(x_values[begin:end]).float().to(self.device) + + with torch.no_grad(): + pred = self.model(x_batch).detach().cpu().numpy() + + preds.append(pred) + + return pd.Series(np.concatenate(preds), index=index) + + +class PositionalEncoding(nn.Module): + def __init__(self, d_model, max_len=1000): + super(PositionalEncoding, self).__init__() + pe = torch.zeros(max_len, d_model) + position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) + div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) + pe[:, 0::2] = torch.sin(position * div_term) + pe[:, 1::2] = torch.cos(position * div_term) + pe = pe.unsqueeze(0).transpose(0, 1) + self.register_buffer("pe", pe) + + def forward(self, x): + # [T, N, F] + return x + self.pe[: x.size(0), :] + + +class Transformer(nn.Module): + def __init__(self, d_feat=6, d_model=8, nhead=4, num_layers=2, dropout=0.5, device=None): + super(Transformer, self).__init__() + self.feature_layer = nn.Linear(d_feat, d_model) + self.pos_encoder = PositionalEncoding(d_model) + self.encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dropout=dropout) + self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers) + self.decoder_layer = nn.Linear(d_model, 1) + self.device = device + self.d_feat = d_feat + + def forward(self, src): + # src [N, F*T] --> [N, T, F] + src = src.reshape(len(src), self.d_feat, -1).permute(0, 2, 1) + src = self.feature_layer(src) + + # src [N, T, F] --> [T, N, F], [60, 512, 8] + src = src.transpose(1, 0) # not batch first + + mask = None + + src = self.pos_encoder(src) + output = self.transformer_encoder(src, mask) # [60, 512, 8] + + # [T, N, F] --> [N, T*F] + output = self.decoder_layer(output.transpose(1, 0)[:, -1, :]) # [512, 1] + + return output.squeeze() diff --git a/qlib/contrib/model/pytorch_transformer_ts.py b/qlib/contrib/model/pytorch_transformer_ts.py new file mode 100644 index 000000000..c53564903 --- /dev/null +++ b/qlib/contrib/model/pytorch_transformer_ts.py @@ -0,0 +1,269 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + + +from __future__ import division +from __future__ import print_function + +import os +import numpy as np +import pandas as pd +import copy +import math +from ...utils import get_or_create_path +from ...log import get_module_logger + +import torch +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader + +from .pytorch_utils import count_parameters +from ...model.base import Model +from ...data.dataset import DatasetH, TSDatasetH +from ...data.dataset.handler import DataHandlerLP + + +class TransformerModel(Model): + def __init__( + self, + d_feat: int = 20, + d_model: int = 64, + batch_size: int = 8192, + nhead: int = 2, + num_layers: int = 2, + dropout: float = 0, + n_epochs=100, + lr=0.0001, + metric="", + early_stop=5, + loss="mse", + optimizer="adam", + reg=1e-3, + n_jobs=10, + GPU=0, + seed=None, + **kwargs + ): + + # set hyper-parameters. + self.d_model = d_model + self.dropout = dropout + self.n_epochs = n_epochs + self.lr = lr + self.reg = reg + self.metric = metric + self.batch_size = batch_size + self.early_stop = early_stop + self.optimizer = optimizer.lower() + self.loss = loss + self.n_jobs = n_jobs + self.device = torch.device("cuda:%d" % GPU if torch.cuda.is_available() and GPU >= 0 else "cpu") + self.seed = seed + self.logger = get_module_logger("TransformerModel") + self.logger.info("Naive Transformer:" "\nbatch_size : {}" "\ndevice : {}".format(self.batch_size, self.device)) + + if self.seed is not None: + np.random.seed(self.seed) + torch.manual_seed(self.seed) + + self.model = Transformer(d_feat, d_model, nhead, num_layers, dropout, self.device) + if optimizer.lower() == "adam": + self.train_optimizer = optim.Adam(self.model.parameters(), lr=self.lr, weight_decay=self.reg) + elif optimizer.lower() == "gd": + self.train_optimizer = optim.SGD(self.model.parameters(), lr=self.lr, weight_decay=self.reg) + else: + raise NotImplementedError("optimizer {} is not supported!".format(optimizer)) + + self.fitted = False + self.model.to(self.device) + + @property + def use_gpu(self): + return self.device != torch.device("cpu") + + def mse(self, pred, label): + loss = (pred.float() - label.float()) ** 2 + return torch.mean(loss) + + def loss_fn(self, pred, label): + mask = ~torch.isnan(label) + + if self.loss == "mse": + return self.mse(pred[mask], label[mask]) + + raise ValueError("unknown loss `%s`" % self.loss) + + def metric_fn(self, pred, label): + + mask = torch.isfinite(label) + + if self.metric == "" or self.metric == "loss": + return -self.loss_fn(pred[mask], label[mask]) + + raise ValueError("unknown metric `%s`" % self.metric) + + def train_epoch(self, data_loader): + + self.model.train() + + for data in data_loader: + feature = data[:, :, 0:-1].to(self.device) + label = data[:, -1, -1].to(self.device) + + pred = self.model(feature.float()) # .float() + loss = self.loss_fn(pred, label) + + self.train_optimizer.zero_grad() + loss.backward() + torch.nn.utils.clip_grad_value_(self.model.parameters(), 3.0) + self.train_optimizer.step() + + def test_epoch(self, data_loader): + + self.model.eval() + + scores = [] + losses = [] + + for data in data_loader: + + feature = data[:, :, 0:-1].to(self.device) + label = data[:, -1, -1].to(self.device) + + with torch.no_grad(): + pred = self.model(feature.float()) # .float() + loss = self.loss_fn(pred, label) + losses.append(loss.item()) + + score = self.metric_fn(pred, label) + scores.append(score.item()) + + return np.mean(losses), np.mean(scores) + + def fit( + self, + dataset: DatasetH, + evals_result=dict(), + save_path=None, + ): + + dl_train = dataset.prepare("train", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L) + dl_valid = dataset.prepare("valid", col_set=["feature", "label"], data_key=DataHandlerLP.DK_L) + + dl_train.config(fillna_type="ffill+bfill") # process nan brought by dataloader + dl_valid.config(fillna_type="ffill+bfill") # process nan brought by dataloader + + train_loader = DataLoader( + dl_train, batch_size=self.batch_size, shuffle=True, num_workers=self.n_jobs, drop_last=True + ) + valid_loader = DataLoader( + dl_valid, batch_size=self.batch_size, shuffle=False, num_workers=self.n_jobs, drop_last=True + ) + + save_path = get_or_create_path(save_path) + + stop_steps = 0 + train_loss = 0 + best_score = -np.inf + best_epoch = 0 + evals_result["train"] = [] + evals_result["valid"] = [] + + # train + self.logger.info("training...") + self.fitted = True + + for step in range(self.n_epochs): + self.logger.info("Epoch%d:", step) + self.logger.info("training...") + self.train_epoch(train_loader) + self.logger.info("evaluating...") + train_loss, train_score = self.test_epoch(train_loader) + val_loss, val_score = self.test_epoch(valid_loader) + self.logger.info("train %.6f, valid %.6f" % (train_score, val_score)) + evals_result["train"].append(train_score) + evals_result["valid"].append(val_score) + + if val_score > best_score: + best_score = val_score + stop_steps = 0 + best_epoch = step + best_param = copy.deepcopy(self.model.state_dict()) + else: + stop_steps += 1 + if stop_steps >= self.early_stop: + self.logger.info("early stop") + break + + self.logger.info("best score: %.6lf @ %d" % (best_score, best_epoch)) + self.model.load_state_dict(best_param) + torch.save(best_param, save_path) + + if self.use_gpu: + torch.cuda.empty_cache() + + def predict(self, dataset): + if not self.fitted: + raise ValueError("model is not fitted yet!") + + dl_test = dataset.prepare("test", col_set=["feature", "label"], data_key=DataHandlerLP.DK_I) + dl_test.config(fillna_type="ffill+bfill") + test_loader = DataLoader(dl_test, batch_size=self.batch_size, num_workers=self.n_jobs) + self.model.eval() + preds = [] + + for data in test_loader: + feature = data[:, :, 0:-1].to(self.device) + + with torch.no_grad(): + pred = self.model(feature.float()).detach().cpu().numpy() + + preds.append(pred) + + return pd.Series(np.concatenate(preds), index=dl_test.get_index()) + + +class PositionalEncoding(nn.Module): + def __init__(self, d_model, max_len=1000): + super(PositionalEncoding, self).__init__() + pe = torch.zeros(max_len, d_model) + position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) + div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) + pe[:, 0::2] = torch.sin(position * div_term) + pe[:, 1::2] = torch.cos(position * div_term) + pe = pe.unsqueeze(0).transpose(0, 1) + self.register_buffer("pe", pe) + + def forward(self, x): + # [T, N, F] + return x + self.pe[: x.size(0), :] + + +class Transformer(nn.Module): + def __init__(self, d_feat=6, d_model=8, nhead=4, num_layers=2, dropout=0.5, device=None): + super(Transformer, self).__init__() + self.feature_layer = nn.Linear(d_feat, d_model) + self.pos_encoder = PositionalEncoding(d_model) + self.encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dropout=dropout) + self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer, num_layers=num_layers) + self.decoder_layer = nn.Linear(d_model, 1) + self.device = device + self.d_feat = d_feat + + def forward(self, src): + # src [N, T, F], [512, 60, 6] + src = self.feature_layer(src) # [512, 60, 8] + + # src [N, T, F] --> [T, N, F], [60, 512, 8] + src = src.transpose(1, 0) # not batch first + + mask = None + + src = self.pos_encoder(src) + output = self.transformer_encoder(src, mask) # [60, 512, 8] + + # [T, N, F] --> [N, T*F] + output = self.decoder_layer(output.transpose(1, 0)[:, -1, :]) # [512, 1] + + return output.squeeze() diff --git a/qlib/data/data.py b/qlib/data/data.py index d6735b4e6..ccd35006b 100644 --- a/qlib/data/data.py +++ b/qlib/data/data.py @@ -1056,13 +1056,21 @@ class ClientProvider(BaseProvider): """ def __init__(self): + def is_instance_of_provider(instance: object, cls: type): + if isinstance(instance, Wrapper): + p = getattr(instance, "_provider", None) + + return False if p is None else isinstance(p, cls) + + return isinstance(instance, cls) + from .client import Client self.client = Client(C.flask_server, C.flask_port) self.logger = get_module_logger(self.__class__.__name__) - if isinstance(Cal, ClientCalendarProvider): + if is_instance_of_provider(Cal, ClientCalendarProvider): Cal.set_conn(self.client) - if isinstance(Inst, ClientInstrumentProvider): + if is_instance_of_provider(Inst, ClientInstrumentProvider): Inst.set_conn(self.client) if hasattr(DatasetD, "provider"): DatasetD.provider.set_conn(self.client) diff --git a/qlib/workflow/cli.py b/qlib/workflow/cli.py index 879c0aaeb..16e5b6296 100644 --- a/qlib/workflow/cli.py +++ b/qlib/workflow/cli.py @@ -53,7 +53,8 @@ def workflow(config_path, experiment_name="workflow", uri_folder="mlruns"): exp_manager["kwargs"]["uri"] = "file:" + str(Path(os.getcwd()).resolve() / uri_folder) qlib.init(**config.get("qlib_init"), exp_manager=exp_manager) - task_train(config.get("task"), experiment_name=experiment_name) + recorder = task_train(config.get("task"), experiment_name=experiment_name) + recorder.save_objects(config=config) # function to run worklflow by config diff --git a/qlib/workflow/exp.py b/qlib/workflow/exp.py index 627b5ff82..fcf6cd8d1 100644 --- a/qlib/workflow/exp.py +++ b/qlib/workflow/exp.py @@ -325,7 +325,7 @@ class MLflowExperiment(Experiment): UNLIMITED = 50000 # FIXME: Mlflow can only list 50000 records at most!!!!!!! - def list_recorders(self, max_results: int = UNLIMITED, status: Union[str, None] = None): + def list_recorders(self, max_results: int = UNLIMITED, status: Union[str, None] = None, filter_string: str = ""): """ Parameters ---------- @@ -334,8 +334,12 @@ class MLflowExperiment(Experiment): status : str the criteria based on status to filter results. `None` indicates no filtering. + filter_string : str + mlflow supported filter string like 'params."my_param"="a" and tags."my_tag"="b"', use this will help to reduce too much run number. """ - runs = self._client.search_runs(self.id, run_view_type=ViewType.ACTIVE_ONLY, max_results=max_results) + runs = self._client.search_runs( + self.id, run_view_type=ViewType.ACTIVE_ONLY, max_results=max_results, filter_string=filter_string + ) recorders = dict() for i in range(len(runs)): recorder = MLflowRecorder(self.id, self._uri, mlflow_run=runs[i]) diff --git a/qlib/workflow/task/collect.py b/qlib/workflow/task/collect.py index 36ccf434d..467281666 100644 --- a/qlib/workflow/task/collect.py +++ b/qlib/workflow/task/collect.py @@ -139,6 +139,7 @@ class RecorderCollector(Collector): rec_filter_func=None, artifacts_path={"pred": "pred.pkl"}, artifacts_key=None, + list_kwargs={}, ): """ Init RecorderCollector. @@ -150,6 +151,7 @@ class RecorderCollector(Collector): rec_filter_func (Callable, optional): filter the recorder by return True or False. Defaults to None. artifacts_path (dict, optional): The artifacts name and its path in Recorder. Defaults to {"pred": "pred.pkl", "IC": "sig_analysis/ic.pkl"}. artifacts_key (str or List, optional): the artifacts key you want to get. If None, get all artifacts. + list_kwargs (str): arguments for list_recorders function. """ super().__init__(process_list=process_list) if isinstance(experiment, str): @@ -163,6 +165,7 @@ class RecorderCollector(Collector): self.rec_key_func = rec_key_func self.artifacts_key = artifacts_key self.rec_filter_func = rec_filter_func + self.list_kwargs = list_kwargs def collect(self, artifacts_key=None, rec_filter_func=None, only_exist=True) -> dict: """ @@ -187,7 +190,7 @@ class RecorderCollector(Collector): collect_dict = {} # filter records - recs = self.experiment.list_recorders() + recs = self.experiment.list_recorders(**self.list_kwargs) recs_flt = {} for rid, rec in recs.items(): if rec_filter_func is None or rec_filter_func(rec): diff --git a/qlib/workflow/utils.py b/qlib/workflow/utils.py index 5a93eacca..6e1e76529 100644 --- a/qlib/workflow/utils.py +++ b/qlib/workflow/utils.py @@ -1,10 +1,14 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. -import sys, traceback, signal, atexit, logging +import atexit +import logging +import sys +import traceback + +from ..log import get_module_logger from . import R from .recorder import Recorder -from ..log import get_module_logger logger = get_module_logger("workflow", logging.INFO) diff --git a/scripts/data_collector/contrib/future_trading_date_collector/future_trading_date_collector.py b/scripts/data_collector/contrib/future_trading_date_collector/future_trading_date_collector.py index 8df0a4972..939ba7f6a 100644 --- a/scripts/data_collector/contrib/future_trading_date_collector/future_trading_date_collector.py +++ b/scripts/data_collector/contrib/future_trading_date_collector/future_trading_date_collector.py @@ -78,6 +78,7 @@ def future_calendar_collector(qlib_dir: [str, Path], freq: str = "day"): data_list.append(_row_data[0]) data_list = sorted(data_list) date_list = generate_qlib_calendar(data_list, freq=freq) + date_list = sorted(set(daily_calendar.loc[:, 0].values.tolist() + date_list)) write_calendar_to_qlib(qlib_dir, date_list, freq=freq) bs.logout() logger.info(f"get trading dates success: {start_year}-01-01 to {end_year}-12-31") diff --git a/scripts/data_collector/yahoo/README.md b/scripts/data_collector/yahoo/README.md index 6cc630e87..50f731e38 100644 --- a/scripts/data_collector/yahoo/README.md +++ b/scripts/data_collector/yahoo/README.md @@ -71,7 +71,7 @@ pip install -r requirements.txt - examples: ```bash # cn 1d data - python collector.py download_data --source_dir ~/.qlib/stock_data/source/cn_1d --start 2020-01-01 --end 2020-12-31 --delay 1 --interval 1d --region US + python collector.py download_data --source_dir ~/.qlib/stock_data/source/cn_1d --start 2020-01-01 --end 2020-12-31 --delay 1 --interval 1d --region CN # cn 1min data python collector.py download_data --source_dir ~/.qlib/stock_data/source/cn_1min --delay 1 --interval 1min --region CN # us 1d data diff --git a/scripts/data_collector/yahoo/collector.py b/scripts/data_collector/yahoo/collector.py index 6a128a5be..feb28a94f 100644 --- a/scripts/data_collector/yahoo/collector.py +++ b/scripts/data_collector/yahoo/collector.py @@ -283,6 +283,16 @@ class YahooNormalize(BaseNormalize): COLUMNS = ["open", "close", "high", "low", "volume"] DAILY_FORMAT = "%Y-%m-%d" + @staticmethod + def calc_change(df: pd.DataFrame, last_close: float) -> pd.Series: + df = df.copy() + _tmp_series = df["close"].fillna(method="ffill") + _tmp_shift_series = _tmp_series.shift(1) + if last_close is not None: + _tmp_shift_series.iloc[0] = float(last_close) + change_series = _tmp_series / _tmp_shift_series - 1 + return change_series + @staticmethod def normalize_yahoo( df: pd.DataFrame, @@ -310,11 +320,29 @@ class YahooNormalize(BaseNormalize): ) df.sort_index(inplace=True) df.loc[(df["volume"] <= 0) | np.isnan(df["volume"]), set(df.columns) - {symbol_field_name}] = np.nan - _tmp_series = df["close"].fillna(method="ffill") - _tmp_shift_series = _tmp_series.shift(1) - if last_close is not None: - _tmp_shift_series.iloc[0] = float(last_close) - df["change"] = _tmp_series / _tmp_shift_series - 1 + + change_series = YahooNormalize.calc_change(df, last_close) + # NOTE: The data obtained by Yahoo finance sometimes has exceptions + # WARNING: If it is normal for a `symbol(exchange)` to differ by a factor of *89* to *111* for consecutive trading days, + # WARNING: the logic in the following line needs to be modified + _count = 0 + while True: + # NOTE: may appear unusual for many days in a row + change_series = YahooNormalize.calc_change(df, last_close) + _mask = (change_series >= 89) & (change_series <= 111) + if not _mask.any(): + break + _tmp_cols = ["high", "close", "low", "open", "adjclose"] + df.loc[_mask, _tmp_cols] = df.loc[_mask, _tmp_cols] / 100 + _count += 1 + if _count >= 10: + _symbol = df.loc[df[symbol_field_name].first_valid_index()]["symbol"] + logger.warning( + f"{_symbol} `change` is abnormal for {_count} consecutive days, please check the specific data file carefully" + ) + + df["change"] = YahooNormalize.calc_change(df, last_close) + columns += ["change"] df.loc[(df["volume"] <= 0) | np.isnan(df["volume"]), columns] = np.nan @@ -852,7 +880,7 @@ class Run(BaseRun): if self.interval.lower() == "1min": if qlib_data_1d_dir is None or not Path(qlib_data_1d_dir).expanduser().exists(): raise ValueError( - "If normalize 1min, the qlib_data_1d_dir parameter must be set: --qlib_data_1d_dir , Reference: https://github.com/zhupr/qlib/tree/support_extend_data/scripts/data_collector/yahoo#automatic-update-of-daily-frequency-datafrom-yahoo-finance" + "If normalize 1min, the qlib_data_1d_dir parameter must be set: --qlib_data_1d_dir , Reference: https://github.com/microsoft/qlib/tree/main/scripts/data_collector/yahoo#automatic-update-of-daily-frequency-datafrom-yahoo-finance" ) super(Run, self).normalize_data( date_field_name, symbol_field_name, end_date=end_date, qlib_data_1d_dir=qlib_data_1d_dir diff --git a/scripts/dump_bin.py b/scripts/dump_bin.py index 83daa28bc..8e9878895 100644 --- a/scripts/dump_bin.py +++ b/scripts/dump_bin.py @@ -244,6 +244,10 @@ class DumpDataBase: if df is None or df.empty: logger.warning(f"{code} data is None or empty") return + + # try to remove dup rows or it will cause exception when reindex. + df = df.drop_duplicates(self.date_field_name) + # features save dir features_dir = self._features_dir.joinpath(code_to_fname(code).lower()) features_dir.mkdir(parents=True, exist_ok=True) diff --git a/setup.py b/setup.py index 2dead9fba..9673fea2a 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ NAME = "pyqlib" DESCRIPTION = "A Quantitative-research Platform" REQUIRES_PYTHON = ">=3.5.0" -VERSION = "0.6.3.99" +VERSION = "0.7.0.99" # Detect Cython try: