mirror of
https://github.com/microsoft/qlib.git
synced 2026-06-29 09:01:18 +08:00
Compare commits
28 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b86a30aae7 | ||
|
|
2c5a4691f3 | ||
|
|
54344c4426 | ||
|
|
303cdb8ce3 | ||
|
|
1a0ac1ab6d | ||
|
|
a79e446724 | ||
|
|
bdf1fb29a6 | ||
|
|
86e1265f69 | ||
|
|
628eb7fa73 | ||
|
|
2a1b512cd2 | ||
|
|
50e7901e87 | ||
|
|
3ba54cd1ab | ||
|
|
483d01f0c1 | ||
|
|
61836cba3d | ||
|
|
aeb5e40c77 | ||
|
|
116f0fa7a7 | ||
|
|
5296cce725 | ||
|
|
292fcc9e98 | ||
|
|
d3fbf066cf | ||
|
|
52ecb79e0b | ||
|
|
59c52eac0a | ||
|
|
f455305a2a | ||
|
|
a67f67db6e | ||
|
|
5c2e99aee3 | ||
|
|
2bb8a4ce0e | ||
|
|
7f274b1e4e | ||
|
|
2aee9e0145 | ||
|
|
a62e2ec4de |
2
.github/workflows/test_macos.yml
vendored
2
.github/workflows/test_macos.yml
vendored
@@ -60,7 +60,7 @@ jobs:
|
||||
python -m pip install --upgrade cython
|
||||
python -m pip install numpy jupyter jupyter_contrib_nbextensions
|
||||
python -m pip install -U scipy scikit-learn # installing without this line will cause errors on GitHub Actions, while instsalling locally won't
|
||||
python setup.py install
|
||||
pip install -e .
|
||||
- name: Install test dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
|
||||
@@ -17,5 +17,5 @@ python:
|
||||
version: 3.7
|
||||
install:
|
||||
- requirements: docs/requirements.txt
|
||||
- method: setuptools
|
||||
path: .
|
||||
- method: pip
|
||||
path: .
|
||||
|
||||
45
README.md
45
README.md
@@ -11,23 +11,24 @@
|
||||
Recent released features
|
||||
| Feature | Status |
|
||||
| -- | ------ |
|
||||
| Meta-Learning-based framework & DDG-DA | [Released](https://github.com/microsoft/qlib/pull/743) on Jan 10, 2022 |
|
||||
| Planning-based portfolio optimization | [Released](https://github.com/microsoft/qlib/pull/754) on Dec 28, 2021 |
|
||||
| Release Qlib v0.8.0 | [Released](https://github.com/microsoft/qlib/releases/tag/v0.8.0) on Dec 8, 2021 |
|
||||
| ADD model | [Released](https://github.com/microsoft/qlib/pull/704) on Nov 22, 2021 |
|
||||
| ADARNN model | [Released](https://github.com/microsoft/qlib/pull/689) on Nov 14, 2021 |
|
||||
| TCN model | [Released](https://github.com/microsoft/qlib/pull/668) on Nov 4, 2021 |
|
||||
| Nested Decision Framework | [Released](https://github.com/microsoft/qlib/pull/438) on Oct 1, 2021. [Example](https://github.com/microsoft/qlib/blob/main/examples/nested_decision_execution/workflow.py) and [Doc](https://qlib.readthedocs.io/en/latest/component/highfreq.html) |
|
||||
|Temporal Routing Adaptor (TRA) | [Released](https://github.com/microsoft/qlib/pull/531) on July 30, 2021 |
|
||||
| Transformer & Localformer | [Released](https://github.com/microsoft/qlib/pull/508) on July 22, 2021 |
|
||||
| Release Qlib v0.7.0 | [Released](https://github.com/microsoft/qlib/releases/tag/v0.7.0) on July 12, 2021 |
|
||||
| TCTS Model | [Released](https://github.com/microsoft/qlib/pull/491) on July 1, 2021 |
|
||||
| Online serving and automatic model rolling | :star: [Released](https://github.com/microsoft/qlib/pull/290) on May 17, 2021 |
|
||||
| DoubleEnsemble Model | [Released](https://github.com/microsoft/qlib/pull/286) on Mar 2, 2021 |
|
||||
| High-frequency data processing example | [Released](https://github.com/microsoft/qlib/pull/257) on Feb 5, 2021 |
|
||||
| High-frequency trading example | [Part of code released](https://github.com/microsoft/qlib/pull/227) on Jan 28, 2021 |
|
||||
| High-frequency data(1min) | [Released](https://github.com/microsoft/qlib/pull/221) on Jan 27, 2021 |
|
||||
| Tabnet Model | [Released](https://github.com/microsoft/qlib/pull/205) on Jan 22, 2021 |
|
||||
| Arctic Provider Backend & Orderbook data example | :hammer: [Rleased](https://github.com/microsoft/qlib/pull/744) on Jan 17, 2022 |
|
||||
| Meta-Learning-based framework & DDG-DA | :chart_with_upwards_trend: :hammer: [Released](https://github.com/microsoft/qlib/pull/743) on Jan 10, 2022 |
|
||||
| Planning-based portfolio optimization | :hammer: [Released](https://github.com/microsoft/qlib/pull/754) on Dec 28, 2021 |
|
||||
| Release Qlib v0.8.0 | :octocat: [Released](https://github.com/microsoft/qlib/releases/tag/v0.8.0) on Dec 8, 2021 |
|
||||
| ADD model | :chart_with_upwards_trend: [Released](https://github.com/microsoft/qlib/pull/704) on Nov 22, 2021 |
|
||||
| ADARNN model | :chart_with_upwards_trend: [Released](https://github.com/microsoft/qlib/pull/689) on Nov 14, 2021 |
|
||||
| TCN model | :chart_with_upwards_trend: [Released](https://github.com/microsoft/qlib/pull/668) on Nov 4, 2021 |
|
||||
| Nested Decision Framework | :hammer: [Released](https://github.com/microsoft/qlib/pull/438) on Oct 1, 2021. [Example](https://github.com/microsoft/qlib/blob/main/examples/nested_decision_execution/workflow.py) and [Doc](https://qlib.readthedocs.io/en/latest/component/highfreq.html) |
|
||||
| Temporal Routing Adaptor (TRA) | :chart_with_upwards_trend: [Released](https://github.com/microsoft/qlib/pull/531) on July 30, 2021 |
|
||||
| Transformer & Localformer | :chart_with_upwards_trend: [Released](https://github.com/microsoft/qlib/pull/508) on July 22, 2021 |
|
||||
| Release Qlib v0.7.0 | :octocat: [Released](https://github.com/microsoft/qlib/releases/tag/v0.7.0) on July 12, 2021 |
|
||||
| TCTS Model | :chart_with_upwards_trend: [Released](https://github.com/microsoft/qlib/pull/491) on July 1, 2021 |
|
||||
| Online serving and automatic model rolling | :hammer: [Released](https://github.com/microsoft/qlib/pull/290) on May 17, 2021 |
|
||||
| DoubleEnsemble Model | :chart_with_upwards_trend: [Released](https://github.com/microsoft/qlib/pull/286) on Mar 2, 2021 |
|
||||
| High-frequency data processing example | :hammer: [Released](https://github.com/microsoft/qlib/pull/257) on Feb 5, 2021 |
|
||||
| High-frequency trading example | :chart_with_upwards_trend: [Part of code released](https://github.com/microsoft/qlib/pull/227) on Jan 28, 2021 |
|
||||
| High-frequency data(1min) | :rice: [Released](https://github.com/microsoft/qlib/pull/221) on Jan 27, 2021 |
|
||||
| Tabnet Model | :chart_with_upwards_trend: [Released](https://github.com/microsoft/qlib/pull/205) on Jan 22, 2021 |
|
||||
|
||||
Features released before 2021 are not listed here.
|
||||
|
||||
@@ -72,7 +73,6 @@ Your feedbacks about the features are very important.
|
||||
| Feature | Status |
|
||||
| -- | ------ |
|
||||
| Point-in-Time database | Under review: https://github.com/microsoft/qlib/pull/343 |
|
||||
| Orderbook database | Under review: https://github.com/microsoft/qlib/pull/744 |
|
||||
|
||||
# Framework of Qlib
|
||||
|
||||
@@ -115,6 +115,7 @@ This table demonstrates the supported Python version of `Qlib`:
|
||||
1. **Conda** is suggested for managing your Python environment.
|
||||
1. Please pay attention that installing cython in Python 3.6 will raise some error when installing ``Qlib`` from source. If users use Python 3.6 on their machines, it is recommended to *upgrade* Python to version 3.7 or use `conda`'s Python to install ``Qlib`` from source.
|
||||
1. For Python 3.9, `Qlib` supports running workflows such as training models, doing backtest and plot most of the related figures (those included in [notebook](examples/workflow_by_code.ipynb)). However, plotting for the *model performance* is not supported for now and we will fix this when the dependent packages are upgraded in the future.
|
||||
1. `Qlib`Requires `tables` package, `hdf5` in tables does not support python3.9.
|
||||
|
||||
### Install with pip
|
||||
Users can easily install ``Qlib`` by pip according to the following command.
|
||||
@@ -136,17 +137,11 @@ Also, users can install the latest dev version ``Qlib`` by the source code accor
|
||||
```
|
||||
|
||||
* Clone the repository and install ``Qlib`` as follows.
|
||||
* If you haven't installed qlib by the command ``pip install pyqlib`` before:
|
||||
```bash
|
||||
git clone https://github.com/microsoft/qlib.git && cd qlib
|
||||
python setup.py install
|
||||
```
|
||||
* If you have already installed the stable version by the command ``pip install pyqlib``:
|
||||
```bash
|
||||
git clone https://github.com/microsoft/qlib.git && cd qlib
|
||||
pip install .
|
||||
```
|
||||
**Note**: **Only** the command ``pip install .`` **can** overwrite the stable version installed by ``pip install pyqlib``, while the command ``python setup.py install`` **can't**.
|
||||
**Note**: You can install Qlib with `python setup.py install` as well. But it is not the recommanded approach. It will skip `pip` and cause obscure problems. For example, **only** the command ``pip install .`` **can** overwrite the stable version installed by ``pip install pyqlib``, while the command ``python setup.py install`` **can't**.
|
||||
|
||||
**Tips**: If you fail to install `Qlib` or run the examples in your environment, comparing your steps and the [CI workflow](.github/workflows/test.yml) may help you find the problem.
|
||||
|
||||
|
||||
@@ -106,6 +106,9 @@ Example
|
||||
`SignalRecord` is the `Record Template` in ``Qlib``, please refer to `Workflow <recorder.html#record-template>`_.
|
||||
|
||||
Also, the above example has been given in ``examples/train_backtest_analyze.ipynb``.
|
||||
Technically, the meaning of the model prediction depends on the label setting designed by user.
|
||||
By default, the meaning of the score is normally the rating of the instruments by the forecasting model. The higher the score, the more profit the instruments.
|
||||
|
||||
|
||||
Custom Model
|
||||
===================
|
||||
|
||||
@@ -23,6 +23,10 @@ The `examples <https://github.com/microsoft/qlib/tree/main/examples/online_srv>`
|
||||
|
||||
**NOTE**: User should keep his data source updated to support online serving. For example, Qlib provides `a batch of scripts <https://github.com/microsoft/qlib/blob/main/scripts/data_collector/yahoo/README.md#automatic-update-of-daily-frequency-datafrom-yahoo-finance>`_ to help users update Yahoo daily data.
|
||||
|
||||
Known limitations currently
|
||||
- Currently, the daily updating prediction for the next trading day is supported. But generating orders for the next trading day is not supported due to the `limitations of public data <https://github.com/microsoft/qlib/issues/215#issuecomment-766293563>_`
|
||||
|
||||
|
||||
Online Manager
|
||||
=============
|
||||
|
||||
|
||||
@@ -29,6 +29,8 @@ Qlib provides a base class ``qlib.strategy.base.BaseStrategy``. All strategy cla
|
||||
|
||||
- `generate_order_list`
|
||||
Return the order list.
|
||||
The frequency to call this method depends on the executor frequency("time_per_step"="day" by default). But the trading frequency can be decided by users' implementation.
|
||||
For example, if the user wants to trading in weekly while the `time_per_step` is "day" in executor, user can return non-empty TradeDecision weekly(otherwise return empty like `this <https://github.com/microsoft/qlib/blob/main/qlib/contrib/strategy/signal_strategy.py#L132>`_ ).
|
||||
|
||||
Users can inherit `BaseStrategy` to customize their strategy class.
|
||||
|
||||
|
||||
@@ -124,9 +124,47 @@ Configuration File
|
||||
===================
|
||||
|
||||
Let's get into details of ``qrun`` in this section.
|
||||
|
||||
Before using ``qrun``, users need to prepare a configuration file. The following content shows how to prepare each part of the configuration file.
|
||||
|
||||
The design logic of the configuration file is very simple. It predefines fixed workflows and provide this yaml interface to users to define how to initialize each component.
|
||||
It follow the design of `init_instance_by_config <https://github.com/microsoft/qlib/blob/2aee9e0145decc3e71def70909639b5e5a6f4b58/qlib/utils/__init__.py#L264>`_ . It defines the initialization of each component of Qlib, which typically include the class and the initialization arguments.
|
||||
|
||||
For example, the following yaml and code are equivalent.
|
||||
|
||||
.. code-block:: YAML
|
||||
|
||||
model:
|
||||
class: LGBModel
|
||||
module_path: qlib.contrib.model.gbdt
|
||||
kwargs:
|
||||
loss: mse
|
||||
colsample_bytree: 0.8879
|
||||
learning_rate: 0.0421
|
||||
subsample: 0.8789
|
||||
lambda_l1: 205.6999
|
||||
lambda_l2: 580.9768
|
||||
max_depth: 8
|
||||
num_leaves: 210
|
||||
num_threads: 20
|
||||
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from qlib.contrib.model.gbdt import LGBModel
|
||||
kwargs = {
|
||||
"loss": "mse" ,
|
||||
"colsample_bytree": 0.8879,
|
||||
"learning_rate": 0.0421,
|
||||
"subsample": 0.8789,
|
||||
"lambda_l1": 205.6999,
|
||||
"lambda_l2": 580.9768,
|
||||
"max_depth": 8,
|
||||
"num_leaves": 210,
|
||||
"num_threads": 20,
|
||||
}
|
||||
LGBModel(kwargs)
|
||||
|
||||
|
||||
Qlib Init Section
|
||||
--------------------
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
pandas==1.1.2
|
||||
numpy==1.17.4
|
||||
numpy==1.21.0
|
||||
scikit_learn==0.23.2
|
||||
torch==1.7.0
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
numpy==1.17.4
|
||||
numpy==1.21.0
|
||||
pandas==1.1.2
|
||||
scikit_learn==0.23.2
|
||||
torch==1.7.0
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
numpy==1.17.4
|
||||
numpy==1.21.0
|
||||
pandas==1.1.2
|
||||
scikit_learn==0.23.2
|
||||
torch==1.7.0
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
pandas==1.1.2
|
||||
numpy==1.17.4
|
||||
numpy==1.21.0
|
||||
catboost==0.24.3
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
pandas==1.1.2
|
||||
numpy==1.17.4
|
||||
numpy==1.21.0
|
||||
lightgbm==3.1.0
|
||||
@@ -1,4 +1,4 @@
|
||||
pandas==1.1.2
|
||||
numpy==1.17.4
|
||||
numpy==1.21.0
|
||||
scikit_learn==0.23.2
|
||||
torch==1.7.0
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
numpy==1.17.4
|
||||
numpy==1.21.0
|
||||
pandas==1.1.2
|
||||
scikit_learn==0.23.2
|
||||
torch==1.7.0
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
numpy==1.17.4
|
||||
numpy==1.21.0
|
||||
pandas==1.1.2
|
||||
scikit_learn==0.23.2
|
||||
torch==1.7.0
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
pandas==1.1.2
|
||||
numpy==1.17.4
|
||||
numpy==1.21.0
|
||||
lightgbm==3.1.0
|
||||
|
||||
@@ -1,3 +1,3 @@
|
||||
numpy==1.17.4
|
||||
numpy==1.21.0
|
||||
pandas==1.1.2
|
||||
torch==1.2.0
|
||||
@@ -1,4 +1,4 @@
|
||||
pandas==1.1.2
|
||||
numpy==1.17.4
|
||||
numpy==1.21.0
|
||||
scikit_learn==0.23.2
|
||||
torch==1.7.0
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
pandas==1.1.2
|
||||
numpy==1.17.4
|
||||
numpy==1.21.0
|
||||
scikit_learn==0.23.2
|
||||
torch==1.7.0
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
numpy==1.17.4
|
||||
numpy==1.21.0
|
||||
pandas==1.1.2
|
||||
scikit_learn==0.23.2
|
||||
torch==1.7.0
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
pandas==1.1.2
|
||||
numpy==1.17.4
|
||||
numpy==1.21.0
|
||||
scikit_learn==0.23.2
|
||||
torch==1.7.0
|
||||
@@ -1,5 +1,5 @@
|
||||
pandas==1.1.2
|
||||
numpy==1.17.4
|
||||
numpy==1.21.0
|
||||
scikit_learn==0.23.2
|
||||
torch==1.7.0
|
||||
seaborn
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
# Introduction
|
||||
This is the implementation of `DDG-DA` based on `Meta Controller` component provided by `Qlib`.
|
||||
|
||||
Please refer to the paper for more details: *DDG-DA: Data Distribution Generation for Predictable Concept Drift Adaptation* [[arXiv](https://arxiv.org/abs/2201.04038)]
|
||||
|
||||
|
||||
## Background
|
||||
In many real-world scenarios, we often deal with streaming data that is sequentially collected over time. Due to the non-stationary nature of the environment, the streaming data distribution may change in unpredictable ways, which is known as concept drift. To handle concept drift, previous methods first detect when/where the concept drift happens and then adapt models to fit the distribution of the latest data. However, there are still many cases that some underlying factors of environment evolution are predictable, making it possible to model the future concept drift trend of the streaming data, while such cases are not fully explored in previous work.
|
||||
|
||||
|
||||
@@ -147,6 +147,9 @@ class DDGDA:
|
||||
},
|
||||
# "record": ["qlib.workflow.record_temp.SignalRecord"]
|
||||
}
|
||||
# the proxy_forecast_model_task will be used to create meta tasks.
|
||||
# The test date of first task will be 2011-01-01. Each test segment will be about 20days
|
||||
# The tasks include all training tasks and test tasks.
|
||||
|
||||
# 2) preparing meta dataset
|
||||
kwargs = dict(
|
||||
|
||||
52
examples/orderbook_data/README.md
Normal file
52
examples/orderbook_data/README.md
Normal file
@@ -0,0 +1,52 @@
|
||||
# Introduction
|
||||
|
||||
This example tries to demonstrate how Qlib supports data without fixed shared frequency.
|
||||
|
||||
For example,
|
||||
- Daily prices volume data are fixed-frequency data. The data comes in a fixed frequency (i.e. daily)
|
||||
- Orders are not fixed data and they may come at any time point
|
||||
|
||||
To support such non-fixed-frequency, Qlib implements an Arctic-based backend.
|
||||
Here is an example to import and query data based on this backend.
|
||||
|
||||
# Installation
|
||||
|
||||
Please refer to [the installation docs](https://docs.mongodb.com/manual/installation/) of mongodb.
|
||||
Current version of script with default value tries to connect localhost **via default port without authentication**.
|
||||
|
||||
Run following command to install necessary libraries
|
||||
```
|
||||
pip install pytest coverage
|
||||
pip install arctic # NOTE: pip may fail to resolve the right package dependency !!! Please make sure the dependency are satisfied.
|
||||
```
|
||||
|
||||
# Importing example data
|
||||
|
||||
|
||||
1. (Optional) Please follow the first part of [this section](https://github.com/microsoft/qlib#data-preparation) to **get 1min data** of Qlib.
|
||||
2. Please follow following steps to download example data
|
||||
```bash
|
||||
cd examples/orderbook_data/
|
||||
wget http://fintech.msra.cn/stock_data/downloads/highfreq_orderboook_example_data.tar.bz2
|
||||
tar xf highfreq_orderboook_example_data.tar.bz2
|
||||
```
|
||||
|
||||
3. Please import the example data to your mongo db
|
||||
```bash
|
||||
cd examples/orderbook_data/
|
||||
python create_dataset.py initialize_library # Initialization Libraries
|
||||
python create_dataset.py import_data # Initialization Libraries
|
||||
```
|
||||
|
||||
# Query Examples
|
||||
|
||||
After importing these data, you run `example.py` to create some high-frequency features.
|
||||
```bash
|
||||
cd examples/orderbook_data/
|
||||
pytest -s --disable-warnings example.py # If you want run all examples
|
||||
pytest -s --disable-warnings example.py::TestClass::test_exp_10 # If you want to run specific example
|
||||
```
|
||||
|
||||
|
||||
# Known limitations
|
||||
Expression computing between different frequencies are not supported yet
|
||||
315
examples/orderbook_data/create_dataset.py
Executable file
315
examples/orderbook_data/create_dataset.py
Executable file
@@ -0,0 +1,315 @@
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
"""
|
||||
NOTE:
|
||||
- This scripts is a demo to import example data import Qlib
|
||||
- !!!!!!!!!!!!!!!TODO!!!!!!!!!!!!!!!!!!!:
|
||||
- Its structure is not well designed and very ugly, your contribution is welcome to make importing dataset easier
|
||||
"""
|
||||
from datetime import date, datetime as dt
|
||||
import os
|
||||
from pathlib import Path
|
||||
import random
|
||||
import shutil
|
||||
import time
|
||||
import traceback
|
||||
|
||||
from arctic import Arctic, chunkstore
|
||||
import arctic
|
||||
from arctic import Arctic, CHUNK_STORE
|
||||
from arctic.chunkstore.chunkstore import CHUNK_SIZE
|
||||
import fire
|
||||
from joblib import Parallel, delayed, parallel
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from pandas import DataFrame
|
||||
from pandas.core.indexes.datetimes import date_range
|
||||
from pymongo.mongo_client import MongoClient
|
||||
|
||||
DIRNAME = Path(__file__).absolute().resolve().parent
|
||||
|
||||
# CONFIG
|
||||
N_JOBS = -1 # leaving one kernel free
|
||||
LOG_FILE_PATH = DIRNAME / "log_file"
|
||||
DATA_PATH = DIRNAME / "raw_data"
|
||||
DATABASE_PATH = DIRNAME / "orig_data"
|
||||
DATA_INFO_PATH = DIRNAME / "data_info"
|
||||
DATA_FINISH_INFO_PATH = DIRNAME / "./data_finish_info"
|
||||
DOC_TYPE = ["Tick", "Order", "OrderQueue", "Transaction", "Day", "Minute"]
|
||||
MAX_SIZE = 3000 * 1024 * 1024 * 1024
|
||||
ALL_STOCK_PATH = DATABASE_PATH / "all.txt"
|
||||
ARCTIC_SRV = "127.0.0.1"
|
||||
|
||||
|
||||
def get_library_name(doc_type):
|
||||
if str.lower(doc_type) == str.lower("Tick"):
|
||||
return "ticks"
|
||||
else:
|
||||
return str.lower(doc_type)
|
||||
|
||||
|
||||
def is_stock(exchange_place, code):
|
||||
if exchange_place == "SH" and code[0] != "6":
|
||||
return False
|
||||
if exchange_place == "SZ" and code[0] != "0" and code[:2] != "30":
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def add_one_stock_daily_data(filepath, type, exchange_place, arc, date):
|
||||
"""
|
||||
exchange_place: "SZ" OR "SH"
|
||||
type: "tick", "orderbook", ...
|
||||
filepath: the path of csv
|
||||
arc: arclink created by a process
|
||||
"""
|
||||
code = os.path.split(filepath)[-1].split(".csv")[0]
|
||||
if exchange_place == "SH" and code[0] != "6":
|
||||
return
|
||||
if exchange_place == "SZ" and code[0] != "0" and code[:2] != "30":
|
||||
return
|
||||
|
||||
df = pd.read_csv(filepath, encoding="gbk", dtype={"code": str})
|
||||
code = os.path.split(filepath)[-1].split(".csv")[0]
|
||||
|
||||
def format_time(day, hms):
|
||||
day = str(day)
|
||||
hms = str(hms)
|
||||
if hms[0] == "1": # >=10,
|
||||
return (
|
||||
"-".join([day[0:4], day[4:6], day[6:8]]) + " " + ":".join([hms[:2], hms[2:4], hms[4:6] + "." + hms[6:]])
|
||||
)
|
||||
else:
|
||||
return (
|
||||
"-".join([day[0:4], day[4:6], day[6:8]]) + " " + ":".join([hms[:1], hms[1:3], hms[3:5] + "." + hms[5:]])
|
||||
)
|
||||
|
||||
## Discard the entire row if wrong data timestamp encoutered.
|
||||
timestamp = list(zip(list(df["date"]), list(df["time"])))
|
||||
error_index_list = []
|
||||
for index, t in enumerate(timestamp):
|
||||
try:
|
||||
pd.Timestamp(format_time(t[0], t[1]))
|
||||
except Exception:
|
||||
error_index_list.append(index) ## The row number of the error line
|
||||
|
||||
# to-do: writting to logs
|
||||
|
||||
if len(error_index_list) > 0:
|
||||
print("error: {}, {}".format(filepath, len(error_index_list)))
|
||||
|
||||
df = df.drop(error_index_list)
|
||||
timestamp = list(zip(list(df["date"]), list(df["time"]))) ## The cleaned timestamp
|
||||
# generate timestamp
|
||||
pd_timestamp = pd.DatetimeIndex(
|
||||
[pd.Timestamp(format_time(timestamp[i][0], timestamp[i][1])) for i in range(len(df["date"]))]
|
||||
)
|
||||
df = df.drop(columns=["date", "time", "name", "code", "wind_code"])
|
||||
# df = pd.DataFrame(data=df.to_dict("list"), index=pd_timestamp)
|
||||
df["date"] = pd.to_datetime(pd_timestamp)
|
||||
df.set_index("date", inplace=True)
|
||||
|
||||
if str.lower(type) == "orderqueue":
|
||||
## extract ab1~ab50
|
||||
df["ab"] = [
|
||||
",".join([str(int(row["ab" + str(i + 1)])) for i in range(0, row["ab_items"])])
|
||||
for timestamp, row in df.iterrows()
|
||||
]
|
||||
df = df.drop(columns=["ab" + str(i) for i in range(1, 51)])
|
||||
|
||||
type = get_library_name(type)
|
||||
# arc.initialize_library(type, lib_type=CHUNK_STORE)
|
||||
lib = arc[type]
|
||||
|
||||
symbol = "".join([exchange_place, code])
|
||||
if symbol in lib.list_symbols():
|
||||
print("update {0}, date={1}".format(symbol, date))
|
||||
if df.empty == True:
|
||||
return error_index_list
|
||||
lib.update(symbol, df, chunk_size="D")
|
||||
else:
|
||||
print("write {0}, date={1}".format(symbol, date))
|
||||
lib.write(symbol, df, chunk_size="D")
|
||||
return error_index_list
|
||||
|
||||
|
||||
def add_one_stock_daily_data_wrapper(filepath, type, exchange_place, index, date):
|
||||
pid = os.getpid()
|
||||
code = os.path.split(filepath)[-1].split(".csv")[0]
|
||||
arc = Arctic(ARCTIC_SRV)
|
||||
try:
|
||||
if index % 100 == 0:
|
||||
print("index = {}, filepath = {}".format(index, filepath))
|
||||
error_index_list = add_one_stock_daily_data(filepath, type, exchange_place, arc, date)
|
||||
if error_index_list is not None and len(error_index_list) > 0:
|
||||
f = open(os.path.join(LOG_FILE_PATH, "temp_timestamp_error_{0}_{1}_{2}.txt".format(pid, date, type)), "a+")
|
||||
f.write("{}, {}, {}\n".format(filepath, error_index_list, exchange_place + "_" + code))
|
||||
f.close()
|
||||
|
||||
except Exception as e:
|
||||
info = traceback.format_exc()
|
||||
print("error:" + str(e))
|
||||
f = open(os.path.join(LOG_FILE_PATH, "temp_fail_{0}_{1}_{2}.txt".format(pid, date, type)), "a+")
|
||||
f.write("fail:" + str(filepath) + "\n" + str(e) + "\n" + str(info) + "\n")
|
||||
f.close()
|
||||
|
||||
finally:
|
||||
arc.reset()
|
||||
|
||||
|
||||
def add_data(tick_date, doc_type, stock_name_dict):
|
||||
pid = os.getpid()
|
||||
|
||||
if doc_type not in DOC_TYPE:
|
||||
print("doc_type not in {}".format(DOC_TYPE))
|
||||
return
|
||||
try:
|
||||
begin_time = time.time()
|
||||
os.system(f"cp {DATABASE_PATH}/{tick_date + '_{}.tar.gz'.format(doc_type)} {DATA_PATH}/")
|
||||
|
||||
os.system(
|
||||
f"tar -xvzf {DATA_PATH}/{tick_date + '_{}.tar.gz'.format(doc_type)} -C {DATA_PATH}/ {tick_date + '_' + doc_type}/SH"
|
||||
)
|
||||
os.system(
|
||||
f"tar -xvzf {DATA_PATH}/{tick_date + '_{}.tar.gz'.format(doc_type)} -C {DATA_PATH}/ {tick_date + '_' + doc_type}/SZ"
|
||||
)
|
||||
os.system(f"chmod 777 {DATA_PATH}")
|
||||
os.system(f"chmod 777 {DATA_PATH}/{tick_date + '_' + doc_type}")
|
||||
os.system(f"chmod 777 {DATA_PATH}/{tick_date + '_' + doc_type}/SH")
|
||||
os.system(f"chmod 777 {DATA_PATH}/{tick_date + '_' + doc_type}/SZ")
|
||||
os.system(f"chmod 777 {DATA_PATH}/{tick_date + '_' + doc_type}/SH/{tick_date}")
|
||||
os.system(f"chmod 777 {DATA_PATH}/{tick_date + '_' + doc_type}/SZ/{tick_date}")
|
||||
|
||||
print("tick_date={}".format(tick_date))
|
||||
|
||||
temp_data_path_sh = os.path.join(DATA_PATH, tick_date + "_" + doc_type, "SH", tick_date)
|
||||
temp_data_path_sz = os.path.join(DATA_PATH, tick_date + "_" + doc_type, "SZ", tick_date)
|
||||
is_files_exist = {"sh": os.path.exists(temp_data_path_sh), "sz": os.path.exists(temp_data_path_sz)}
|
||||
|
||||
sz_files = (
|
||||
(
|
||||
set([i.split(".csv")[0] for i in os.listdir(temp_data_path_sz) if i[:2] == "30" or i[0] == "0"])
|
||||
& set(stock_name_dict["SZ"])
|
||||
)
|
||||
if is_files_exist["sz"]
|
||||
else set()
|
||||
)
|
||||
sz_file_nums = len(sz_files) if is_files_exist["sz"] else 0
|
||||
sh_files = (
|
||||
(
|
||||
set([i.split(".csv")[0] for i in os.listdir(temp_data_path_sh) if i[0] == "6"])
|
||||
& set(stock_name_dict["SH"])
|
||||
)
|
||||
if is_files_exist["sh"]
|
||||
else set()
|
||||
)
|
||||
sh_file_nums = len(sh_files) if is_files_exist["sh"] else 0
|
||||
print("sz_file_nums:{}, sh_file_nums:{}".format(sz_file_nums, sh_file_nums))
|
||||
|
||||
f = (DATA_INFO_PATH / "data_info_log_{}_{}".format(doc_type, tick_date)).open("w+")
|
||||
f.write("sz:{}, sh:{}, date:{}:".format(sz_file_nums, sh_file_nums, tick_date) + "\n")
|
||||
f.close()
|
||||
|
||||
if sh_file_nums > 0:
|
||||
# write is not thread-safe, update may be thread-safe
|
||||
Parallel(n_jobs=N_JOBS)(
|
||||
delayed(add_one_stock_daily_data_wrapper)(
|
||||
os.path.join(temp_data_path_sh, name + ".csv"), doc_type, "SH", index, tick_date
|
||||
)
|
||||
for index, name in enumerate(list(sh_files))
|
||||
)
|
||||
if sz_file_nums > 0:
|
||||
# write is not thread-safe, update may be thread-safe
|
||||
Parallel(n_jobs=N_JOBS)(
|
||||
delayed(add_one_stock_daily_data_wrapper)(
|
||||
os.path.join(temp_data_path_sz, name + ".csv"), doc_type, "SZ", index, tick_date
|
||||
)
|
||||
for index, name in enumerate(list(sz_files))
|
||||
)
|
||||
|
||||
os.system(f"rm -f {DATA_PATH}/{tick_date + '_{}.tar.gz'.format(doc_type)}")
|
||||
os.system(f"rm -rf {DATA_PATH}/{tick_date + '_' + doc_type}")
|
||||
total_time = time.time() - begin_time
|
||||
f = (DATA_FINISH_INFO_PATH / "data_info_finish_log_{}_{}".format(doc_type, tick_date)).open("w+")
|
||||
f.write("finish: date:{}, consume_time:{}, end_time: {}".format(tick_date, total_time, time.time()) + "\n")
|
||||
f.close()
|
||||
|
||||
except Exception as e:
|
||||
info = traceback.format_exc()
|
||||
print("date error:" + str(e))
|
||||
f = open(os.path.join(LOG_FILE_PATH, "temp_fail_{0}_{1}_{2}.txt".format(pid, tick_date, doc_type)), "a+")
|
||||
f.write("fail:" + str(tick_date) + "\n" + str(e) + "\n" + str(info) + "\n")
|
||||
f.close()
|
||||
|
||||
|
||||
class DSCreator:
|
||||
"""Dataset creator"""
|
||||
|
||||
def clear(self):
|
||||
client = MongoClient(ARCTIC_SRV)
|
||||
client.drop_database("arctic")
|
||||
|
||||
def initialize_library(self):
|
||||
arc = Arctic(ARCTIC_SRV)
|
||||
for doc_type in DOC_TYPE:
|
||||
arc.initialize_library(get_library_name(doc_type), lib_type=CHUNK_STORE)
|
||||
|
||||
def _get_empty_folder(self, fp: Path):
|
||||
fp = Path(fp)
|
||||
if fp.exists():
|
||||
shutil.rmtree(fp)
|
||||
fp.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def import_data(self, doc_type_l=["Tick", "Transaction", "Order"]):
|
||||
# clear all the old files
|
||||
for fp in LOG_FILE_PATH, DATA_INFO_PATH, DATA_FINISH_INFO_PATH, DATA_PATH:
|
||||
self._get_empty_folder(fp)
|
||||
|
||||
arc = Arctic(ARCTIC_SRV)
|
||||
for doc_type in DOC_TYPE:
|
||||
# arc.initialize_library(get_library_name(doc_type), lib_type=CHUNK_STORE)
|
||||
arc.set_quota(get_library_name(doc_type), MAX_SIZE)
|
||||
arc.reset()
|
||||
|
||||
# doc_type = 'Day'
|
||||
for doc_type in doc_type_l:
|
||||
date_list = list(set([int(path.split("_")[0]) for path in os.listdir(DATABASE_PATH) if doc_type in path]))
|
||||
date_list.sort()
|
||||
date_list = [str(date) for date in date_list]
|
||||
|
||||
f = open(ALL_STOCK_PATH, "r")
|
||||
stock_name_list = [lines.split("\t")[0] for lines in f.readlines()]
|
||||
f.close()
|
||||
stock_name_dict = {
|
||||
"SH": [stock_name[2:] for stock_name in stock_name_list if "SH" in stock_name],
|
||||
"SZ": [stock_name[2:] for stock_name in stock_name_list if "SZ" in stock_name],
|
||||
}
|
||||
|
||||
lib_name = get_library_name(doc_type)
|
||||
a = Arctic(ARCTIC_SRV)
|
||||
# a.initialize_library(lib_name, lib_type=CHUNK_STORE)
|
||||
|
||||
stock_name_exist = a[lib_name].list_symbols()
|
||||
lib = a[lib_name]
|
||||
initialize_count = 0
|
||||
for stock_name in stock_name_list:
|
||||
if stock_name not in stock_name_exist:
|
||||
initialize_count += 1
|
||||
# A placeholder for stocks
|
||||
pdf = pd.DataFrame(index=[pd.Timestamp("1900-01-01")])
|
||||
pdf.index.name = "date" # an col named date is necessary
|
||||
lib.write(stock_name, pdf)
|
||||
print("initialize count: {}".format(initialize_count))
|
||||
print("tasks: {}".format(date_list))
|
||||
a.reset()
|
||||
|
||||
# date_list = [files.split("_")[0] for files in os.listdir("./raw_data_price") if "tar" in files]
|
||||
# print(len(date_list))
|
||||
date_list = ["20201231"] # for test
|
||||
Parallel(n_jobs=min(2, len(date_list)))(
|
||||
delayed(add_data)(date, doc_type, stock_name_dict) for date in date_list
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
fire.Fire(DSCreator)
|
||||
312
examples/orderbook_data/example.py
Normal file
312
examples/orderbook_data/example.py
Normal file
@@ -0,0 +1,312 @@
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
from arctic.arctic import Arctic
|
||||
import qlib
|
||||
from qlib.data import D
|
||||
import unittest
|
||||
|
||||
|
||||
class TestClass(unittest.TestCase):
|
||||
"""
|
||||
Useful commands
|
||||
- run all tests: pytest examples/orderbook_data/example.py
|
||||
- run a single test: pytest -s --pdb --disable-warnings examples/orderbook_data/example.py::TestClass::test_basic01
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
"""
|
||||
Configure for arctic
|
||||
"""
|
||||
provider_uri = "~/.qlib/qlib_data/yahoo_cn_1min"
|
||||
qlib.init(
|
||||
provider_uri=provider_uri,
|
||||
mem_cache_size_limit=1024 ** 3 * 2,
|
||||
mem_cache_type="sizeof",
|
||||
kernels=1,
|
||||
expression_provider={"class": "LocalExpressionProvider", "kwargs": {"time2idx": False}},
|
||||
feature_provider={
|
||||
"class": "ArcticFeatureProvider",
|
||||
"module_path": "qlib.contrib.data.data",
|
||||
"kwargs": {"uri": "127.0.0.1"},
|
||||
},
|
||||
dataset_provider={
|
||||
"class": "LocalDatasetProvider",
|
||||
"kwargs": {
|
||||
"align_time": False, # Order book is not fixed, so it can't be align to a shared fixed frequency calendar
|
||||
},
|
||||
},
|
||||
)
|
||||
# self.stocks_list = ["SH600519"]
|
||||
self.stocks_list = ["SZ000725"]
|
||||
|
||||
def test_basic(self):
|
||||
# NOTE: this data contains a lot of zeros in $askX and $bidX
|
||||
df = D.features(
|
||||
self.stocks_list,
|
||||
fields=["$ask1", "$ask2", "$bid1", "$bid2"],
|
||||
freq="ticks",
|
||||
start_time="20201230",
|
||||
end_time="20210101",
|
||||
)
|
||||
print(df)
|
||||
|
||||
def test_basic_without_time(self):
|
||||
df = D.features(self.stocks_list, fields=["$ask1"], freq="ticks")
|
||||
print(df)
|
||||
|
||||
def test_basic01(self):
|
||||
df = D.features(
|
||||
self.stocks_list,
|
||||
fields=["TResample($ask1, '1min', 'last')"],
|
||||
freq="ticks",
|
||||
start_time="20201230",
|
||||
end_time="20210101",
|
||||
)
|
||||
print(df)
|
||||
|
||||
def test_basic02(self):
|
||||
df = D.features(
|
||||
self.stocks_list,
|
||||
fields=["$function_code"],
|
||||
freq="transaction",
|
||||
start_time="20201230",
|
||||
end_time="20210101",
|
||||
)
|
||||
print(df)
|
||||
|
||||
def test_basic03(self):
|
||||
df = D.features(
|
||||
self.stocks_list,
|
||||
fields=["$function_code"],
|
||||
freq="order",
|
||||
start_time="20201230",
|
||||
end_time="20210101",
|
||||
)
|
||||
print(df)
|
||||
|
||||
# Here are some popular expressions for high-frequency
|
||||
# 1) some shared expression
|
||||
expr_sum_buy_ask_1 = "(TResample($ask1, '1min', 'last') + TResample($bid1, '1min', 'last'))"
|
||||
total_volume = (
|
||||
"TResample("
|
||||
+ "+".join([f"${name}{i}" for i in range(1, 11) for name in ["asize", "bsize"]])
|
||||
+ ", '1min', 'sum')"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def total_func(name, method):
|
||||
return "TResample(" + "+".join([f"${name}{i}" for i in range(1, 11)]) + ",'1min', '{}')".format(method)
|
||||
|
||||
def test_exp_01(self):
|
||||
exprs = []
|
||||
names = []
|
||||
for name in ["asize", "bsize"]:
|
||||
for i in range(1, 11):
|
||||
exprs.append(f"TResample(${name}{i}, '1min', 'mean') / ({self.total_volume})")
|
||||
names.append(f"v_{name}_{i}")
|
||||
df = D.features(self.stocks_list, fields=exprs, freq="ticks")
|
||||
df.columns = names
|
||||
print(df)
|
||||
|
||||
# 2) some often used papers;
|
||||
def test_exp_02(self):
|
||||
spread_func = (
|
||||
lambda index: f"2 * TResample($ask{index} - $bid{index}, '1min', 'last') / {self.expr_sum_buy_ask_1}"
|
||||
)
|
||||
mid_func = (
|
||||
lambda index: f"2 * TResample(($ask{index} + $bid{index})/2, '1min', 'last') / {self.expr_sum_buy_ask_1}"
|
||||
)
|
||||
|
||||
exprs = []
|
||||
names = []
|
||||
for i in range(1, 11):
|
||||
exprs.extend([spread_func(i), mid_func(i)])
|
||||
names.extend([f"p_spread_{i}", f"p_mid_{i}"])
|
||||
df = D.features(self.stocks_list, fields=exprs, freq="ticks")
|
||||
df.columns = names
|
||||
print(df)
|
||||
|
||||
def test_exp_03(self):
|
||||
expr3_func1 = (
|
||||
lambda name, index_left, index_right: f"2 * TResample(Abs(${name}{index_left} - ${name}{index_right}), '1min', 'last') / {self.expr_sum_buy_ask_1}"
|
||||
)
|
||||
for name in ["ask", "bid"]:
|
||||
for i in range(1, 10):
|
||||
exprs = [expr3_func1(name, i + 1, i)]
|
||||
names = [f"p_diff_{name}_{i}_{i+1}"]
|
||||
exprs.extend([expr3_func1("ask", 10, 1), expr3_func1("bid", 1, 10)])
|
||||
names.extend(["p_diff_ask_10_1", "p_diff_bid_1_10"])
|
||||
df = D.features(self.stocks_list, fields=exprs, freq="ticks")
|
||||
df.columns = names
|
||||
print(df)
|
||||
|
||||
def test_exp_04(self):
|
||||
exprs = []
|
||||
names = []
|
||||
for name in ["asize", "bsize"]:
|
||||
exprs.append(f"(({ self.total_func(name, 'mean')}) / 10) / {self.total_volume}")
|
||||
names.append(f"v_avg_{name}")
|
||||
|
||||
df = D.features(self.stocks_list, fields=exprs, freq="ticks")
|
||||
df.columns = names
|
||||
print(df)
|
||||
|
||||
def test_exp_05(self):
|
||||
exprs = [
|
||||
f"2 * Sub({ self.total_func('ask', 'last')}, {self.total_func('bid', 'last')})/{self.expr_sum_buy_ask_1}",
|
||||
f"Sub({ self.total_func('asize', 'mean')}, {self.total_func('bsize', 'mean')})/{self.total_volume}",
|
||||
]
|
||||
names = ["p_accspread", "v_accspread"]
|
||||
|
||||
df = D.features(self.stocks_list, fields=exprs, freq="ticks")
|
||||
df.columns = names
|
||||
print(df)
|
||||
|
||||
# (p|v)_diff_(ask|bid|asize|bsize)_(time_interval)
|
||||
def test_exp_06(self):
|
||||
t = 3
|
||||
expr6_price_func = (
|
||||
lambda name, index, method: f'2 * (TResample(${name}{index}, "{t}s", "{method}") - Ref(TResample(${name}{index}, "{t}s", "{method}"), 1)) / {t}'
|
||||
)
|
||||
exprs = []
|
||||
names = []
|
||||
for i in range(1, 11):
|
||||
for name in ["bid", "ask"]:
|
||||
exprs.append(
|
||||
f"TResample({expr6_price_func(name, i, 'last')}, '1min', 'mean') / {self.expr_sum_buy_ask_1}"
|
||||
)
|
||||
names.append(f"p_diff_{name}{i}_{t}s")
|
||||
|
||||
for i in range(1, 11):
|
||||
for name in ["asize", "bsize"]:
|
||||
exprs.append(f"TResample({expr6_price_func(name, i, 'mean')}, '1min', 'mean') / {self.total_volume}")
|
||||
names.append(f"v_diff_{name}{i}_{t}s")
|
||||
|
||||
df = D.features(self.stocks_list, fields=exprs, freq="ticks")
|
||||
df.columns = names
|
||||
print(df)
|
||||
|
||||
# TODOs:
|
||||
# Following expressions may be implemented in the future
|
||||
# expr7_2 = lambda funccode, bsflag, time_interval: \
|
||||
# "TResample(TRolling(TEq(@transaction.function_code, {}) & TEq(@transaction.bs_flag ,{}), '{}s', 'sum') / \
|
||||
# TRolling(@transaction.function_code, '{}s', 'count') , '1min', 'mean')".format(ord(funccode), bsflag,time_interval,time_interval)
|
||||
# create_dataset(7, "SH600000", [expr7_2("C")] + [expr7(funccode, ordercode) for funccode in ['B','S'] for ordercode in ['0','1']])
|
||||
# create_dataset(7, ["SH600000"], [expr7_2("C", 48)] )
|
||||
|
||||
@staticmethod
|
||||
def expr7_init(funccode, ordercode, time_interval):
|
||||
# NOTE: based on on order frequency (i.e. freq="order")
|
||||
return f"Rolling(Eq($function_code, {ord(funccode)}) & Eq($order_kind ,{ord(ordercode)}), '{time_interval}s', 'sum') / Rolling($function_code, '{time_interval}s', 'count')"
|
||||
|
||||
# (la|lb|ma|mb|ca|cb)_intensity_(time_interval)
|
||||
def test_exp_07_1(self):
|
||||
# NOTE: based on transaction frequency (i.e. freq="transaction")
|
||||
expr7_3 = (
|
||||
lambda funccode, code, time_interval: f"TResample(Rolling(Eq($function_code, {ord(funccode)}) & {code}($ask_order, $bid_order) , '{time_interval}s', 'sum') / Rolling($function_code, '{time_interval}s', 'count') , '1min', 'mean')"
|
||||
)
|
||||
|
||||
exprs = [expr7_3("C", "Gt", "3"), expr7_3("C", "Lt", "3")]
|
||||
names = ["ca_intensity_3s", "cb_intensity_3s"]
|
||||
|
||||
df = D.features(self.stocks_list, fields=exprs, freq="transaction")
|
||||
df.columns = names
|
||||
print(df)
|
||||
|
||||
trans_dict = {"B": "a", "S": "b", "0": "l", "1": "m"}
|
||||
|
||||
def test_exp_07_2(self):
|
||||
# NOTE: based on on order frequency
|
||||
expr7 = (
|
||||
lambda funccode, ordercode, time_interval: f"TResample({self.expr7_init(funccode, ordercode, time_interval)}, '1min', 'mean')"
|
||||
)
|
||||
|
||||
exprs = []
|
||||
names = []
|
||||
for funccode in ["B", "S"]:
|
||||
for ordercode in ["0", "1"]:
|
||||
exprs.append(expr7(funccode, ordercode, "3"))
|
||||
names.append(self.trans_dict[ordercode] + self.trans_dict[funccode] + "_intensity_3s")
|
||||
df = D.features(self.stocks_list, fields=exprs, freq="transaction")
|
||||
df.columns = names
|
||||
print(df)
|
||||
|
||||
@staticmethod
|
||||
def expr7_3_init(funccode, code, time_interval):
|
||||
# NOTE: It depends on transaction frequency
|
||||
return f"Rolling(Eq($function_code, {ord(funccode)}) & {code}($ask_order, $bid_order) , '{time_interval}s', 'sum') / Rolling($function_code, '{time_interval}s', 'count')"
|
||||
|
||||
# (la|lb|ma|mb|ca|cb)_relative_intensity_(time_interval_small)_(time_interval_big)
|
||||
def test_exp_08_1(self):
|
||||
expr8_1 = (
|
||||
lambda funccode, ordercode, time_interval_short, time_interval_long: f"TResample(Gt({self.expr7_init(funccode, ordercode, time_interval_short)},{self.expr7_init(funccode, ordercode, time_interval_long)}), '1min', 'mean')"
|
||||
)
|
||||
|
||||
exprs = []
|
||||
names = []
|
||||
for funccode in ["B", "S"]:
|
||||
for ordercode in ["0", "1"]:
|
||||
exprs.append(expr8_1(funccode, ordercode, "10", "900"))
|
||||
names.append(self.trans_dict[ordercode] + self.trans_dict[funccode] + "_relative_intensity_10s_900s")
|
||||
|
||||
df = D.features(self.stocks_list, fields=exprs, freq="order")
|
||||
df.columns = names
|
||||
print(df)
|
||||
|
||||
def test_exp_08_2(self):
|
||||
# NOTE: It depends on transaction frequency
|
||||
expr8_2 = (
|
||||
lambda funccode, ordercode, time_interval_short, time_interval_long: f"TResample(Gt({self.expr7_3_init(funccode, ordercode, time_interval_short)},{self.expr7_3_init(funccode, ordercode, time_interval_long)}), '1min', 'mean')"
|
||||
)
|
||||
|
||||
exprs = [expr8_2("C", "Gt", "10", "900"), expr8_2("C", "Lt", "10", "900")]
|
||||
names = ["ca_relative_intensity_10s_900s", "cb_relative_intensity_10s_900s"]
|
||||
|
||||
df = D.features(self.stocks_list, fields=exprs, freq="transaction")
|
||||
df.columns = names
|
||||
print(df)
|
||||
|
||||
## v9(la|lb|ma|mb|ca|cb)_diff_intensity_(time_interval1)_(time_interval2)
|
||||
# 1) calculating the original data
|
||||
# 2) Resample data to 3s and calculate the changing rate
|
||||
# 3) Resample data to 1min
|
||||
|
||||
def test_exp_09_trans(self):
|
||||
exprs = [
|
||||
f'TResample(Div(Sub(TResample({self.expr7_3_init("C", "Gt", "3")}, "3s", "last"), Ref(TResample({self.expr7_3_init("C", "Gt", "3")}, "3s","last"), 1)), 3), "1min", "mean")',
|
||||
f'TResample(Div(Sub(TResample({self.expr7_3_init("C", "Lt", "3")}, "3s", "last"), Ref(TResample({self.expr7_3_init("C", "Lt", "3")}, "3s","last"), 1)), 3), "1min", "mean")',
|
||||
]
|
||||
names = ["ca_diff_intensity_3s_3s", "cb_diff_intensity_3s_3s"]
|
||||
df = D.features(self.stocks_list, fields=exprs, freq="transaction")
|
||||
df.columns = names
|
||||
print(df)
|
||||
|
||||
def test_exp_09_order(self):
|
||||
exprs = []
|
||||
names = []
|
||||
for funccode in ["B", "S"]:
|
||||
for ordercode in ["0", "1"]:
|
||||
exprs.append(
|
||||
f'TResample(Div(Sub(TResample({self.expr7_init(funccode, ordercode, "3")}, "3s", "last"), Ref(TResample({self.expr7_init(funccode, ordercode, "3")},"3s", "last"), 1)), 3) ,"1min", "mean")'
|
||||
)
|
||||
names.append(self.trans_dict[ordercode] + self.trans_dict[funccode] + "_diff_intensity_3s_3s")
|
||||
df = D.features(self.stocks_list, fields=exprs, freq="order")
|
||||
df.columns = names
|
||||
print(df)
|
||||
|
||||
def test_exp_10(self):
|
||||
exprs = []
|
||||
names = []
|
||||
for i in [5, 10, 30, 60]:
|
||||
exprs.append(
|
||||
f'TResample(Ref(TResample($ask1 + $bid1, "1s", "ffill"), {-i}) / TResample($ask1 + $bid1, "1s", "ffill") - 1, "1min", "mean" )'
|
||||
)
|
||||
names.append(f"lag_{i}_change_rate" for i in [5, 10, 30, 60])
|
||||
df = D.features(self.stocks_list, fields=exprs, freq="ticks")
|
||||
df.columns = names
|
||||
print(df)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -186,7 +186,7 @@ def gen_and_save_md_table(metrics, dataset):
|
||||
# read yaml, remove seed kwargs of model, and then save file in the temp_dir
|
||||
def gen_yaml_file_without_seed_kwargs(yaml_path, temp_dir):
|
||||
with open(yaml_path, "r") as fp:
|
||||
config = yaml.load(fp)
|
||||
config = yaml.safe_load(fp)
|
||||
try:
|
||||
del config["task"]["model"]["kwargs"]["seed"]
|
||||
except KeyError:
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
# Licensed under the MIT License.
|
||||
from pathlib import Path
|
||||
|
||||
__version__ = "0.8.1"
|
||||
__version__ = "0.8.2"
|
||||
__version__bak = __version__ # This version is backup for QlibConfig.reset_qlib_version
|
||||
import os
|
||||
from typing import Union
|
||||
@@ -12,7 +12,6 @@ import platform
|
||||
import subprocess
|
||||
from .log import get_module_logger
|
||||
|
||||
|
||||
# init qlib
|
||||
def init(default_conf="client", **kwargs):
|
||||
"""
|
||||
@@ -63,7 +62,7 @@ def init(default_conf="client", **kwargs):
|
||||
else:
|
||||
logger.warning(f"auto_path is False, please make sure {mount_path} is mounted")
|
||||
elif uri_type == C.NFS_URI:
|
||||
_mount_nfs_uri(provider_uri, mount_path, C["auto_mount"])
|
||||
_mount_nfs_uri(provider_uri, C.dpm.get_data_uri(_freq), C["auto_mount"])
|
||||
else:
|
||||
raise NotImplementedError(f"This type of URI is not supported")
|
||||
|
||||
@@ -96,7 +95,7 @@ def _mount_nfs_uri(provider_uri, mount_path, auto_mount: bool = False):
|
||||
sys_type = platform.system()
|
||||
if "win" in sys_type.lower():
|
||||
# system: window
|
||||
exec_result = os.popen("mount -o anon %s %s" % (provider_uri, mount_path + ":"))
|
||||
exec_result = os.popen(f"mount -o anon {provider_uri} {mount_path}")
|
||||
result = exec_result.read()
|
||||
if "85" in result:
|
||||
LOG.warning(f"{provider_uri} on Windows:{mount_path} is already mounted")
|
||||
|
||||
@@ -19,7 +19,7 @@ import logging
|
||||
import platform
|
||||
import multiprocessing
|
||||
from pathlib import Path
|
||||
from typing import Optional, Union
|
||||
from typing import Callable, Optional, Union
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from qlib.constant import REG_CN, REG_US
|
||||
@@ -40,7 +40,7 @@ class Config:
|
||||
if attr in self.__dict__["_config"]:
|
||||
return self.__dict__["_config"][attr]
|
||||
|
||||
raise AttributeError(f"No such {attr} in self._config")
|
||||
raise AttributeError(f"No such `{attr}` in self._config")
|
||||
|
||||
def get(self, key, default=None):
|
||||
return self.__dict__["_config"].get(key, default)
|
||||
@@ -112,6 +112,8 @@ _default_config = {
|
||||
"calendar_cache": None,
|
||||
# for simple dataset cache
|
||||
"local_cache_path": None,
|
||||
# kernels can be a fixed value or a callable function lie `def (freq: str) -> int`
|
||||
# If the kernels are arctic_kernels, `min(NUM_USABLE_CPU, 30)` may be a good value
|
||||
"kernels": NUM_USABLE_CPU,
|
||||
# pickle.dump protocol version
|
||||
"dump_protocol_version": PROTOCOL_VERSION,
|
||||
@@ -121,11 +123,10 @@ _default_config = {
|
||||
"joblib_backend": "multiprocessing",
|
||||
"default_disk_cache": 1, # 0:skip/1:use
|
||||
"mem_cache_size_limit": 500,
|
||||
"mem_cache_limit_type": "length",
|
||||
# memory cache expire second, only in used 'DatasetURICache' and 'client D.calendar'
|
||||
# default 1 hour
|
||||
"mem_cache_expire": 60 * 60,
|
||||
# memory cache space limit, default 5GB, only in used client
|
||||
"mem_cache_space_limit": 1024 * 1024 * 1024 * 5,
|
||||
# cache dir name
|
||||
"dataset_cache_dir_name": "dataset_cache",
|
||||
"features_cache_dir_name": "features_cache",
|
||||
@@ -215,8 +216,9 @@ MODE_CONF = {
|
||||
"provider_uri": "~/.qlib/qlib_data/cn_data",
|
||||
# cache
|
||||
# Using parameter 'remote' to announce the client is using server_cache, and the writing access will be disabled.
|
||||
"expression_cache": DISK_EXPRESSION_CACHE,
|
||||
"dataset_cache": DISK_DATASET_CACHE,
|
||||
# Disable cache by default. Avoid introduce advanced features for beginners
|
||||
"expression_cache": None,
|
||||
"dataset_cache": None,
|
||||
# SimpleDatasetCache directory
|
||||
"local_cache_path": Path("~/.cache/qlib_simple_cache").expanduser().resolve(),
|
||||
"calendar_cache": None,
|
||||
@@ -269,11 +271,19 @@ class QlibConfig(Config):
|
||||
self._registered = False
|
||||
|
||||
class DataPathManager:
|
||||
def __init__(
|
||||
self,
|
||||
provider_uri: Union[str, Path, dict],
|
||||
mount_path: Union[str, Path, dict],
|
||||
):
|
||||
"""
|
||||
Motivation:
|
||||
- get the right path (e.g. data uri) for accessing data based on given information(e.g. provider_uri, mount_path and frequency)
|
||||
- some helper functions to process uri.
|
||||
"""
|
||||
|
||||
def __init__(self, provider_uri: Union[str, Path, dict], mount_path: Union[str, Path, dict]):
|
||||
|
||||
"""
|
||||
The relation of `provider_uri` and `mount_path`
|
||||
- `mount_path` is used only if provider_uri is an NFS path
|
||||
- otherwise, provider_uri will be used for accessing data
|
||||
"""
|
||||
self.provider_uri = provider_uri
|
||||
self.mount_path = mount_path
|
||||
|
||||
@@ -304,6 +314,9 @@ class QlibConfig(Config):
|
||||
return QlibConfig.LOCAL_URI
|
||||
|
||||
def get_data_uri(self, freq: Optional[Union[str, Freq]] = None) -> Path:
|
||||
"""
|
||||
please refer DataPathManager's __init__ and class doc
|
||||
"""
|
||||
if freq is not None:
|
||||
freq = str(freq) # converting Freq to string
|
||||
if freq is None or freq not in self.provider_uri:
|
||||
@@ -314,7 +327,8 @@ class QlibConfig(Config):
|
||||
elif self.get_uri_type(_provider_uri) == QlibConfig.NFS_URI:
|
||||
if "win" in platform.system().lower():
|
||||
# windows, mount_path is the drive
|
||||
return Path(f"{self.mount_path[freq]}:\\")
|
||||
_path = str(self.mount_path[freq])
|
||||
return Path(f"{_path}:\\") if ":" not in _path else Path(_path)
|
||||
return Path(self.mount_path[freq])
|
||||
else:
|
||||
raise NotImplementedError(f"This type of uri is not supported")
|
||||
@@ -351,9 +365,7 @@ class QlibConfig(Config):
|
||||
for _freq in _provider_uri.keys():
|
||||
# mount_path
|
||||
_mount_path[_freq] = (
|
||||
_mount_path[_freq]
|
||||
if _mount_path[_freq] is None
|
||||
else str(Path(_mount_path[_freq]).expanduser().resolve())
|
||||
_mount_path[_freq] if _mount_path[_freq] is None else str(Path(_mount_path[_freq]).expanduser())
|
||||
)
|
||||
self["provider_uri"] = _provider_uri
|
||||
self["mount_path"] = _mount_path
|
||||
@@ -452,6 +464,12 @@ class QlibConfig(Config):
|
||||
# Due to a bug? that converting __version__ to _QlibConfig__version__bak
|
||||
# Using __version__bak instead of __version__
|
||||
|
||||
def get_kernels(self, freq: str):
|
||||
"""get number of processors given frequency"""
|
||||
if isinstance(self["kernels"], Callable):
|
||||
return self["kernels"](freq)
|
||||
return self["kernels"]
|
||||
|
||||
@property
|
||||
def registered(self):
|
||||
return self._registered
|
||||
|
||||
55
qlib/contrib/data/data.py
Normal file
55
qlib/contrib/data/data.py
Normal file
@@ -0,0 +1,55 @@
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
# We remove arctic from core framework of Qlib to contrib due to
|
||||
# - Arctic has very strict limitation on pandas and numpy version
|
||||
# - https://github.com/man-group/arctic/pull/908
|
||||
# - pip fail to computing the right version number!!!!
|
||||
# - Maybe we can solve this problem by poetry
|
||||
|
||||
# FIXME: So if you want to use arctic-based provider, please install arctic manually
|
||||
# `pip install arctic` may not be enough.
|
||||
from arctic import Arctic
|
||||
import pandas as pd
|
||||
import pymongo
|
||||
|
||||
from qlib.data.data import FeatureProvider
|
||||
|
||||
|
||||
class ArcticFeatureProvider(FeatureProvider):
|
||||
def __init__(
|
||||
self, uri="127.0.0.1", retry_time=0, market_transaction_time_list=[("09:15", "11:30"), ("13:00", "15:00")]
|
||||
):
|
||||
super().__init__()
|
||||
self.uri = uri
|
||||
# TODO:
|
||||
# retry connecting if error occurs
|
||||
# does it real matters?
|
||||
self.retry_time = retry_time
|
||||
# NOTE: this is especially important for TResample operator
|
||||
self.market_transaction_time_list = market_transaction_time_list
|
||||
|
||||
def feature(self, instrument, field, start_index, end_index, freq):
|
||||
field = str(field)[1:]
|
||||
with pymongo.MongoClient(self.uri) as client:
|
||||
# TODO: this will result in frequently connecting the server and performance issue
|
||||
arctic = Arctic(client)
|
||||
|
||||
if freq not in arctic.list_libraries():
|
||||
raise ValueError("lib {} not in arctic".format(freq))
|
||||
|
||||
if instrument not in arctic[freq].list_symbols():
|
||||
# instruments does not exist
|
||||
return pd.Series()
|
||||
else:
|
||||
df = arctic[freq].read(instrument, columns=[field], chunk_range=(start_index, end_index))
|
||||
s = df[field]
|
||||
|
||||
if not s.empty:
|
||||
s = pd.concat(
|
||||
[
|
||||
s.between_time(time_tuple[0], time_tuple[1])
|
||||
for time_tuple in self.market_transaction_time_list
|
||||
]
|
||||
)
|
||||
return s
|
||||
@@ -27,7 +27,6 @@ from ...workflow import R
|
||||
|
||||
class DNNModelPytorch(Model):
|
||||
"""DNN Model
|
||||
|
||||
Parameters
|
||||
----------
|
||||
input_dim : int
|
||||
@@ -202,7 +201,7 @@ class DNNModelPytorch(Model):
|
||||
y_val_auto = torch.from_numpy(y_valid.values).float().to(self.device)
|
||||
w_val_auto = torch.from_numpy(w_valid.values).float().to(self.device)
|
||||
|
||||
for step in range(self.max_steps):
|
||||
for step in range(1, self.max_steps + 1):
|
||||
if stop_steps >= self.early_stop_rounds:
|
||||
if verbose:
|
||||
self.logger.info("\tearly stop")
|
||||
@@ -226,7 +225,7 @@ class DNNModelPytorch(Model):
|
||||
# validation
|
||||
train_loss += loss.val
|
||||
# for evert `eval_steps` steps or at the last steps, we will evaluate the model.
|
||||
if step % self.eval_steps == 0 or step + 1 == self.max_steps:
|
||||
if step % self.eval_steps == 0 or step == self.max_steps:
|
||||
stop_steps += 1
|
||||
train_loss /= self.eval_steps
|
||||
|
||||
|
||||
@@ -150,7 +150,7 @@ class Expression(abc.ABC):
|
||||
args = str(self), instrument, start_index, end_index, freq
|
||||
if args in H["f"]:
|
||||
return H["f"][args]
|
||||
if start_index is None or end_index is None or start_index > end_index:
|
||||
if start_index is not None and end_index is not None and start_index > end_index:
|
||||
raise ValueError("Invalid index range: {} {}".format(start_index, end_index))
|
||||
try:
|
||||
series = self._load_internal(instrument, start_index, end_index, freq)
|
||||
|
||||
@@ -147,6 +147,7 @@ class MemCache:
|
||||
"""
|
||||
|
||||
size_limit = C.mem_cache_size_limit if mem_cache_size_limit is None else mem_cache_size_limit
|
||||
limit_type = C.mem_cache_limit_type if limit_type is None else limit_type
|
||||
|
||||
if limit_type == "length":
|
||||
klass = MemCacheLengthUnit
|
||||
@@ -1198,7 +1199,4 @@ class MemoryCalendarCache(CalendarCache):
|
||||
return result
|
||||
|
||||
|
||||
# MemCache sizeof
|
||||
HZ = MemCache(C.mem_cache_space_limit, limit_type="sizeof")
|
||||
# MemCache length
|
||||
H = MemCache(limit_type="length")
|
||||
H = MemCache()
|
||||
|
||||
@@ -5,8 +5,10 @@
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import os
|
||||
import re
|
||||
import abc
|
||||
import time
|
||||
import copy
|
||||
import queue
|
||||
import bisect
|
||||
@@ -38,11 +40,17 @@ from ..utils import (
|
||||
normalize_cache_fields,
|
||||
code_to_fname,
|
||||
set_log_with_config,
|
||||
time_to_slc_point,
|
||||
)
|
||||
from ..utils.paral import ParallelExt
|
||||
|
||||
|
||||
class ProviderBackendMixin:
|
||||
"""
|
||||
This helper class tries to make the provider based on storage backend more convenient
|
||||
It is not necessary to inherent this class if that provider don't rely on the backend storage
|
||||
"""
|
||||
|
||||
def get_default_backend(self):
|
||||
backend = {}
|
||||
provider_name: str = re.findall("[A-Z][^A-Z]*", self.__class__.__name__)[-2]
|
||||
@@ -59,15 +67,12 @@ class ProviderBackendMixin:
|
||||
return init_instance_by_config(backend)
|
||||
|
||||
|
||||
class CalendarProvider(abc.ABC, ProviderBackendMixin):
|
||||
class CalendarProvider(abc.ABC):
|
||||
"""Calendar provider base class
|
||||
|
||||
Provide calendar data.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.backend = kwargs.get("backend", {})
|
||||
|
||||
def calendar(self, start_time=None, end_time=None, freq="day", future=False):
|
||||
"""Get calendar of certain market in given time range.
|
||||
|
||||
@@ -194,15 +199,12 @@ class CalendarProvider(abc.ABC, ProviderBackendMixin):
|
||||
raise NotImplementedError("Subclass of CalendarProvider must implement `load_calendar` method")
|
||||
|
||||
|
||||
class InstrumentProvider(abc.ABC, ProviderBackendMixin):
|
||||
class InstrumentProvider(abc.ABC):
|
||||
"""Instrument provider base class
|
||||
|
||||
Provide instrument data.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.backend = kwargs.get("backend", {})
|
||||
|
||||
@staticmethod
|
||||
def instruments(market: Union[List, str] = "all", filter_pipe: Union[List, None] = None):
|
||||
"""Get the general config dictionary for a base market adding several dynamic filters.
|
||||
@@ -304,15 +306,12 @@ class InstrumentProvider(abc.ABC, ProviderBackendMixin):
|
||||
raise ValueError(f"Unknown instrument type {inst}")
|
||||
|
||||
|
||||
class FeatureProvider(abc.ABC, ProviderBackendMixin):
|
||||
class FeatureProvider(abc.ABC):
|
||||
"""Feature provider class
|
||||
|
||||
Provide feature data.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.backend = kwargs.get("backend", {})
|
||||
|
||||
@abc.abstractmethod
|
||||
def feature(self, instrument, field, start_time, end_time, freq):
|
||||
"""Get feature data.
|
||||
@@ -365,9 +364,13 @@ class ExpressionProvider(abc.ABC):
|
||||
return expression
|
||||
|
||||
@abc.abstractmethod
|
||||
def expression(self, instrument, field, start_time=None, end_time=None, freq="day"):
|
||||
def expression(self, instrument, field, start_time=None, end_time=None, freq="day") -> pd.Series:
|
||||
"""Get Expression data.
|
||||
|
||||
The responsibility of `expression`
|
||||
- parse the `field` and `load` the according data.
|
||||
- When loading the data, it should handle the time dependency of the data. `get_expression_instance` is commonly used in this method
|
||||
|
||||
Parameters
|
||||
----------
|
||||
instrument : str
|
||||
@@ -385,6 +388,11 @@ class ExpressionProvider(abc.ABC):
|
||||
-------
|
||||
pd.Series
|
||||
data of a certain expression
|
||||
|
||||
The data has two types of format
|
||||
1) expression with datetime index
|
||||
2) expression with integer index
|
||||
- because the datetime is not as good as
|
||||
"""
|
||||
raise NotImplementedError("Subclass of ExpressionProvider must implement `Expression` method")
|
||||
|
||||
@@ -500,7 +508,7 @@ class DatasetProvider(abc.ABC):
|
||||
"""
|
||||
normalize_column_names = normalize_cache_fields(column_names)
|
||||
# One process for one task, so that the memory will be freed quicker.
|
||||
workers = max(min(C.kernels, len(instruments_d)), 1)
|
||||
workers = max(min(C.get_kernels(freq), len(instruments_d)), 1)
|
||||
|
||||
# create iterator
|
||||
if isinstance(instruments_d, dict):
|
||||
@@ -513,7 +521,7 @@ class DatasetProvider(abc.ABC):
|
||||
for inst, spans in it:
|
||||
inst_l.append(inst)
|
||||
task_l.append(
|
||||
delayed(DatasetProvider.expression_calculator)(
|
||||
delayed(DatasetProvider.inst_calculator)(
|
||||
inst, start_time, end_time, freq, normalize_column_names, spans, C, inst_processors
|
||||
)
|
||||
)
|
||||
@@ -536,17 +544,17 @@ class DatasetProvider(abc.ABC):
|
||||
data = DiskDatasetCache.cache_to_origin_data(data, column_names)
|
||||
else:
|
||||
data = pd.DataFrame(
|
||||
index=pd.MultiIndex.from_arrays([[], []], names=("instrument", "datetime")), columns=column_names
|
||||
index=pd.MultiIndex.from_arrays([[], []], names=("instrument", "datetime")),
|
||||
columns=column_names,
|
||||
dtype=np.float32,
|
||||
)
|
||||
|
||||
return data
|
||||
|
||||
@staticmethod
|
||||
def expression_calculator(
|
||||
inst, start_time, end_time, freq, column_names, spans=None, g_config=None, inst_processors=[]
|
||||
):
|
||||
def inst_calculator(inst, start_time, end_time, freq, column_names, spans=None, g_config=None, inst_processors=[]):
|
||||
"""
|
||||
Calculate the expressions for one instrument, return a df result.
|
||||
Calculate the expressions for **one** instrument, return a df result.
|
||||
If the expression has been calculated before, load from cache.
|
||||
|
||||
return value: A data frame with index 'datetime' and other data columns.
|
||||
@@ -566,11 +574,13 @@ class DatasetProvider(abc.ABC):
|
||||
obj[field] = ExpressionD.expression(inst, field, start_time, end_time, freq)
|
||||
|
||||
data = pd.DataFrame(obj)
|
||||
_calendar = Cal.calendar(freq=freq)
|
||||
data.index = _calendar[data.index.values.astype(int)]
|
||||
if not data.empty and not np.issubdtype(data.index.dtype, np.dtype("M")):
|
||||
# If the underlaying provides the data not in datatime formmat, we'll convert it into datetime format
|
||||
_calendar = Cal.calendar(freq=freq)
|
||||
data.index = _calendar[data.index.values.astype(int)]
|
||||
data.index.names = ["datetime"]
|
||||
|
||||
if spans is not None:
|
||||
if not data.empty and spans is not None:
|
||||
mask = np.zeros(len(data), dtype=bool)
|
||||
for begin, end in spans:
|
||||
mask |= (data.index >= begin) & (data.index <= end)
|
||||
@@ -583,15 +593,16 @@ class DatasetProvider(abc.ABC):
|
||||
return data
|
||||
|
||||
|
||||
class LocalCalendarProvider(CalendarProvider):
|
||||
class LocalCalendarProvider(CalendarProvider, ProviderBackendMixin):
|
||||
"""Local calendar data provider class
|
||||
|
||||
Provide calendar data from local data source.
|
||||
"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super(LocalCalendarProvider, self).__init__(**kwargs)
|
||||
self.remote = kwargs.get("remote", False)
|
||||
def __init__(self, remote=False, backend={}):
|
||||
super().__init__()
|
||||
self.remote = remote
|
||||
self.backend = backend
|
||||
|
||||
def load_calendar(self, freq, future):
|
||||
"""Load original calendar timestamp from file.
|
||||
@@ -623,12 +634,16 @@ class LocalCalendarProvider(CalendarProvider):
|
||||
return [pd.Timestamp(x) for x in backend_obj]
|
||||
|
||||
|
||||
class LocalInstrumentProvider(InstrumentProvider):
|
||||
class LocalInstrumentProvider(InstrumentProvider, ProviderBackendMixin):
|
||||
"""Local instrument data provider class
|
||||
|
||||
Provide instrument data from local data source.
|
||||
"""
|
||||
|
||||
def __init__(self, backend={}) -> None:
|
||||
super().__init__()
|
||||
self.backend = backend
|
||||
|
||||
def _load_instruments(self, market, freq):
|
||||
return self.backend_obj(market=market, freq=freq).data
|
||||
|
||||
@@ -667,15 +682,16 @@ class LocalInstrumentProvider(InstrumentProvider):
|
||||
return _instruments_filtered
|
||||
|
||||
|
||||
class LocalFeatureProvider(FeatureProvider):
|
||||
class LocalFeatureProvider(FeatureProvider, ProviderBackendMixin):
|
||||
"""Local feature data provider class
|
||||
|
||||
Provide feature data from local data source.
|
||||
"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
super(LocalFeatureProvider, self).__init__(**kwargs)
|
||||
self.remote = kwargs.get("remote", False)
|
||||
def __init__(self, remote=False, backend={}):
|
||||
super().__init__()
|
||||
self.remote = remote
|
||||
self.backend = backend
|
||||
|
||||
def feature(self, instrument, field, start_index, end_index, freq):
|
||||
# validate
|
||||
@@ -690,14 +706,27 @@ class LocalExpressionProvider(ExpressionProvider):
|
||||
Provide expression data from local data source.
|
||||
"""
|
||||
|
||||
def __init__(self, time2idx=True):
|
||||
super().__init__()
|
||||
self.time2idx = time2idx
|
||||
|
||||
def expression(self, instrument, field, start_time=None, end_time=None, freq="day"):
|
||||
expression = self.get_expression_instance(field)
|
||||
start_time = pd.Timestamp(start_time)
|
||||
end_time = pd.Timestamp(end_time)
|
||||
_, _, start_index, end_index = Cal.locate_index(start_time, end_time, freq=freq, future=False)
|
||||
lft_etd, rght_etd = expression.get_extended_window_size()
|
||||
start_time = time_to_slc_point(start_time)
|
||||
end_time = time_to_slc_point(end_time)
|
||||
|
||||
# Two kinds of queries are supported
|
||||
# - Index-based expression: this may save a lot of memory because the datetime index is not saved on the disk
|
||||
# - Data with datetime index expression: this will make it more convenient to integrating with some existing databases
|
||||
if self.time2idx:
|
||||
_, _, start_index, end_index = Cal.locate_index(start_time, end_time, freq=freq, future=False)
|
||||
lft_etd, rght_etd = expression.get_extended_window_size()
|
||||
query_start, query_end = max(0, start_index - lft_etd), end_index + rght_etd
|
||||
else:
|
||||
start_index, end_index = query_start, query_end = start_time, end_time
|
||||
|
||||
try:
|
||||
series = expression.load(instrument, max(0, start_index - lft_etd), end_index + rght_etd, freq)
|
||||
series = expression.load(instrument, query_start, query_end, freq)
|
||||
except Exception as e:
|
||||
get_module_logger("data").debug(
|
||||
f"Loading expression error: "
|
||||
@@ -726,8 +755,18 @@ class LocalDatasetProvider(DatasetProvider):
|
||||
Provide dataset data from local data source.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
def __init__(self, align_time: bool = True):
|
||||
"""
|
||||
Parameters
|
||||
----------
|
||||
align_time : bool
|
||||
Will we align the time to calendar
|
||||
the frequency is flexible in some dataset and can't be aligned.
|
||||
For the data with fixed frequency with a shared calendar, the align data to the calendar will provides following benefits
|
||||
- Align queries to the same parameters, so the cache can be shared.
|
||||
"""
|
||||
super().__init__()
|
||||
self.align_time = align_time
|
||||
|
||||
def dataset(
|
||||
self,
|
||||
@@ -740,14 +779,16 @@ class LocalDatasetProvider(DatasetProvider):
|
||||
):
|
||||
instruments_d = self.get_instruments_d(instruments, freq)
|
||||
column_names = self.get_column_names(fields)
|
||||
cal = Cal.calendar(start_time, end_time, freq)
|
||||
if len(cal) == 0:
|
||||
return pd.DataFrame(
|
||||
index=pd.MultiIndex.from_arrays([[], []], names=("instrument", "datetime")), columns=column_names
|
||||
)
|
||||
start_time = cal[0]
|
||||
end_time = cal[-1]
|
||||
|
||||
if self.align_time:
|
||||
# NOTE: if the frequency is a fixed value.
|
||||
# align the data to fixed calendar point
|
||||
cal = Cal.calendar(start_time, end_time, freq)
|
||||
if len(cal) == 0:
|
||||
return pd.DataFrame(
|
||||
index=pd.MultiIndex.from_arrays([[], []], names=("instrument", "datetime")), columns=column_names
|
||||
)
|
||||
start_time = cal[0]
|
||||
end_time = cal[-1]
|
||||
data = self.dataset_processor(
|
||||
instruments_d, column_names, start_time, end_time, freq, inst_processors=inst_processors
|
||||
)
|
||||
|
||||
@@ -226,7 +226,7 @@ class StaticDataLoader(DataLoader, Serializable):
|
||||
|
||||
include_attr = ["_config"]
|
||||
|
||||
def __init__(self, config: Union[dict, str], join="outer"):
|
||||
def __init__(self, config: Union[dict, str, pd.DataFrame], join="outer"):
|
||||
"""
|
||||
Parameters
|
||||
----------
|
||||
@@ -269,6 +269,8 @@ class StaticDataLoader(DataLoader, Serializable):
|
||||
elif isinstance(self._config, (str, Path)):
|
||||
with Path(self._config).open("rb") as f:
|
||||
self._data = pickle.load(f)
|
||||
elif isinstance(self._config, pd.DataFrame):
|
||||
self._data = self._config
|
||||
|
||||
|
||||
class DataLoaderDH(DataLoader):
|
||||
|
||||
@@ -27,6 +27,12 @@ except ImportError:
|
||||
"#### Do not import qlib package in the repository directory in case of importing qlib from . without compiling #####"
|
||||
)
|
||||
raise
|
||||
except ValueError as e:
|
||||
print("!!!!!!!! A error occurs when importing operators implemented based on Cython.!!!!!!!!")
|
||||
print("!!!!!!!! They will be disabled. Please Upgrade your numpy to enable them !!!!!!!!")
|
||||
# We catch this error because some platform can't upgrade there package (e.g. Kaggle)
|
||||
# https://www.kaggle.com/general/293387
|
||||
# https://www.kaggle.com/product-feedback/98562
|
||||
|
||||
|
||||
np.seterr(invalid="ignore")
|
||||
@@ -721,9 +727,9 @@ class Rolling(ExpressionOps):
|
||||
# NOTE: remove all null check,
|
||||
# now it's user's responsibility to decide whether use features in null days
|
||||
# isnull = series.isnull() # NOTE: isnull = NaN, inf is not null
|
||||
if self.N == 0:
|
||||
if isinstance(self.N, int) and self.N == 0:
|
||||
series = getattr(series.expanding(min_periods=1), self.func)()
|
||||
elif 0 < self.N < 1:
|
||||
elif isinstance(self.N, int) and 0 < self.N < 1:
|
||||
series = series.ewm(alpha=self.N, min_periods=1).mean()
|
||||
else:
|
||||
series = getattr(series.rolling(self.N, min_periods=1), self.func)()
|
||||
@@ -1380,6 +1386,7 @@ class PairRolling(ExpressionOps):
|
||||
"""
|
||||
|
||||
def __init__(self, feature_left, feature_right, N, func):
|
||||
# TODO: in what case will a const be passed into `__init__` as `feature_left` or `feature_right`
|
||||
self.feature_left = feature_left
|
||||
self.feature_right = feature_right
|
||||
self.N = N
|
||||
@@ -1389,8 +1396,19 @@ class PairRolling(ExpressionOps):
|
||||
return "{}({},{},{})".format(type(self).__name__, self.feature_left, self.feature_right, self.N)
|
||||
|
||||
def _load_internal(self, instrument, start_index, end_index, freq):
|
||||
series_left = self.feature_left.load(instrument, start_index, end_index, freq)
|
||||
series_right = self.feature_right.load(instrument, start_index, end_index, freq)
|
||||
assert any(
|
||||
[isinstance(self.feature_left, Expression), self.feature_right, Expression]
|
||||
), "at least one of two inputs is Expression instance"
|
||||
|
||||
if isinstance(self.feature_left, Expression):
|
||||
series_left = self.feature_left.load(instrument, start_index, end_index, freq)
|
||||
else:
|
||||
series_left = self.feature_left # numeric value
|
||||
if isinstance(self.feature_right, Expression):
|
||||
series_right = self.feature_right.load(instrument, start_index, end_index, freq)
|
||||
else:
|
||||
series_right = self.feature_right
|
||||
|
||||
if self.N == 0:
|
||||
series = getattr(series_left.expanding(min_periods=1), self.func)(series_right)
|
||||
else:
|
||||
@@ -1400,21 +1418,33 @@ class PairRolling(ExpressionOps):
|
||||
def get_longest_back_rolling(self):
|
||||
if self.N == 0:
|
||||
return np.inf
|
||||
return (
|
||||
max(self.feature_left.get_longest_back_rolling(), self.feature_right.get_longest_back_rolling())
|
||||
+ self.N
|
||||
- 1
|
||||
)
|
||||
if isinstance(self.feature_left, Expression):
|
||||
left_br = self.feature_left.get_longest_back_rolling()
|
||||
else:
|
||||
left_br = 0
|
||||
|
||||
if isinstance(self.feature_right, Expression):
|
||||
right_br = self.feature_right.get_longest_back_rolling()
|
||||
else:
|
||||
right_br = 0
|
||||
return max(left_br, right_br)
|
||||
|
||||
def get_extended_window_size(self):
|
||||
ll, lr = self.feature_left.get_extended_window_size()
|
||||
rl, rr = self.feature_right.get_extended_window_size()
|
||||
if self.N == 0:
|
||||
get_module_logger(self.__class__.__name__).warning(
|
||||
"The PairRolling(ATTR, 0) will not be accurately calculated"
|
||||
)
|
||||
return -np.inf, max(lr, rr)
|
||||
else:
|
||||
if isinstance(self.feature_left, Expression):
|
||||
ll, lr = self.feature_left.get_extended_window_size()
|
||||
else:
|
||||
ll, lr = 0, 0
|
||||
|
||||
if isinstance(self.feature_right, Expression):
|
||||
rl, rr = self.feature_right.get_extended_window_size()
|
||||
else:
|
||||
rl, rr = 0, 0
|
||||
return max(ll, rl) + self.N - 1, max(lr, rr)
|
||||
|
||||
|
||||
@@ -1474,7 +1504,50 @@ class Cov(PairRolling):
|
||||
super(Cov, self).__init__(feature_left, feature_right, N, "cov")
|
||||
|
||||
|
||||
#################### Operator which only support data with time index ####################
|
||||
# Convention
|
||||
# - The name of the operators in this section will start with "T"
|
||||
|
||||
|
||||
class TResample(ElemOperator):
|
||||
def __init__(self, feature, freq, func):
|
||||
"""
|
||||
Resampling the data to target frequency.
|
||||
The resample function of pandas is used.
|
||||
- the timestamp will be at the start of the time span after resample.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
feature : Expression
|
||||
An expression for calculating the feature
|
||||
freq : str
|
||||
It will be passed into the resample method for resampling basedn on given frequency
|
||||
func : method
|
||||
The method to get the resampled values
|
||||
Some expression are high frequently used
|
||||
"""
|
||||
self.feature = feature
|
||||
self.freq = freq
|
||||
self.func = func
|
||||
|
||||
def __str__(self):
|
||||
return "{}({},{})".format(type(self).__name__, self.feature, self.freq)
|
||||
|
||||
def _load_internal(self, instrument, start_index, end_index, freq):
|
||||
series = self.feature.load(instrument, start_index, end_index, freq)
|
||||
|
||||
if series.empty:
|
||||
return series
|
||||
else:
|
||||
if self.func == "sum":
|
||||
return getattr(series.resample(self.freq), self.func)(min_count=1)
|
||||
else:
|
||||
return getattr(series.resample(self.freq), self.func)()
|
||||
|
||||
|
||||
TOpsList = [TResample]
|
||||
OpsList = [
|
||||
Rolling,
|
||||
Ref,
|
||||
Max,
|
||||
Min,
|
||||
@@ -1521,7 +1594,7 @@ OpsList = [
|
||||
IdxMin,
|
||||
If,
|
||||
Feature,
|
||||
]
|
||||
] + [TResample]
|
||||
|
||||
|
||||
class OpsWrapper:
|
||||
|
||||
@@ -34,7 +34,11 @@ class FileStorageMixin:
|
||||
|
||||
@property
|
||||
def dpm(self):
|
||||
return C.dpm if getattr(self, "_provider_uri", None) is None else C.DataPathManager(self._provider_uri, None)
|
||||
return (
|
||||
C.dpm
|
||||
if getattr(self, "_provider_uri", None) is None
|
||||
else C.DataPathManager(self._provider_uri, C.mount_path)
|
||||
)
|
||||
|
||||
@property
|
||||
def support_freq(self) -> List[str]:
|
||||
|
||||
@@ -167,9 +167,14 @@ def parse_field(field):
|
||||
# - $close -> Feature("close")
|
||||
# - $close5 -> Feature("close5")
|
||||
# - $open+$close -> Feature("open")+Feature("close")
|
||||
# TODO: this maybe used in the feature if we want to support the computation of different frequency data
|
||||
# - $close@5min -> Feature("close", "5min")
|
||||
|
||||
if not isinstance(field, str):
|
||||
field = str(field)
|
||||
return re.sub(r"\$(\w+)", r'Feature("\1")', re.sub(r"(\w+\s*)\(", r"Operators.\1(", field))
|
||||
for pattern, new in [(r"\$(\w+)", rf'Feature("\1")'), (r"(\w+\s*)\(", r"Operators.\1(")]: # Features # Operators
|
||||
field = re.sub(pattern, new, field)
|
||||
return field
|
||||
|
||||
|
||||
def get_module_by_module_path(module_path: Union[str, ModuleType]):
|
||||
|
||||
121
scripts/data_collector/future_calendar_collector.py
Normal file
121
scripts/data_collector/future_calendar_collector.py
Normal file
@@ -0,0 +1,121 @@
|
||||
# Copyright (c) Microsoft Corporation.
|
||||
# Licensed under the MIT License.
|
||||
|
||||
import abc
|
||||
import importlib
|
||||
from pathlib import Path
|
||||
from typing import Union, Iterable, List
|
||||
|
||||
import fire
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
# pip install baostock
|
||||
import baostock as bs
|
||||
from loguru import logger
|
||||
|
||||
|
||||
class CollectorFutureCalendar:
|
||||
calendar_format = "%Y-%m-%d"
|
||||
|
||||
def __init__(self, qlib_dir: Union[str, Path], start_date: str = None, end_date: str = None):
|
||||
"""
|
||||
|
||||
Parameters
|
||||
----------
|
||||
qlib_dir:
|
||||
qlib data directory
|
||||
start_date
|
||||
start date
|
||||
end_date
|
||||
end date
|
||||
"""
|
||||
self.qlib_dir = Path(qlib_dir).expanduser().absolute()
|
||||
self.calendar_path = self.qlib_dir.joinpath("calendars/day.txt")
|
||||
self.future_path = self.qlib_dir.joinpath("calendars/day_future.txt")
|
||||
self._calendar_list = self.calendar_list
|
||||
_latest_date = self._calendar_list[-1]
|
||||
self.start_date = _latest_date if start_date is None else pd.Timestamp(start_date)
|
||||
self.end_date = _latest_date + pd.Timedelta(days=365 * 2) if end_date is None else pd.Timestamp(end_date)
|
||||
|
||||
@property
|
||||
def calendar_list(self) -> List[pd.Timestamp]:
|
||||
# load old calendar
|
||||
if not self.calendar_path.exists():
|
||||
raise ValueError(f"calendar does not exist: {self.calendar_path}")
|
||||
calendar_df = pd.read_csv(self.calendar_path, header=None)
|
||||
calendar_df.columns = ["date"]
|
||||
calendar_df["date"] = pd.to_datetime(calendar_df["date"])
|
||||
return calendar_df["date"].to_list()
|
||||
|
||||
def _format_datetime(self, datetime_d: [str, pd.Timestamp]):
|
||||
datetime_d = pd.Timestamp(datetime_d)
|
||||
return datetime_d.strftime(self.calendar_format)
|
||||
|
||||
def write_calendar(self, calendar: Iterable):
|
||||
calendars_list = list(map(lambda x: self._format_datetime(x), sorted(set(self.calendar_list + calendar))))
|
||||
np.savetxt(self.future_path, calendars_list, fmt="%s", encoding="utf-8")
|
||||
|
||||
@abc.abstractmethod
|
||||
def collector(self) -> Iterable[pd.Timestamp]:
|
||||
"""
|
||||
|
||||
Returns
|
||||
-------
|
||||
|
||||
"""
|
||||
raise NotImplementedError(f"Please implement the `collector` method")
|
||||
|
||||
|
||||
class CollectorFutureCalendarCN(CollectorFutureCalendar):
|
||||
def collector(self) -> Iterable[pd.Timestamp]:
|
||||
lg = bs.login()
|
||||
if lg.error_code != "0":
|
||||
raise ValueError(f"login respond error_msg: {lg.error_msg}")
|
||||
rs = bs.query_trade_dates(
|
||||
start_date=self._format_datetime(self.start_date), end_date=self._format_datetime(self.end_date)
|
||||
)
|
||||
if rs.error_code != "0":
|
||||
raise ValueError(f"query_trade_dates respond error_msg: {rs.error_msg}")
|
||||
data_list = []
|
||||
while (rs.error_code == "0") & rs.next():
|
||||
data_list.append(rs.get_row_data())
|
||||
calendar = pd.DataFrame(data_list, columns=rs.fields)
|
||||
calendar["is_trading_day"] = calendar["is_trading_day"].astype(int)
|
||||
return pd.to_datetime(calendar[calendar["is_trading_day"] == 1]["calendar_date"]).to_list()
|
||||
|
||||
|
||||
class CollectorFutureCalendarUS(CollectorFutureCalendar):
|
||||
def collector(self) -> Iterable[pd.Timestamp]:
|
||||
# TODO: US future calendar
|
||||
raise ValueError("Us calendar is not supported")
|
||||
|
||||
|
||||
def run(qlib_dir: Union[str, Path], region: str = "cn", start_date: str = None, end_date: str = None):
|
||||
"""Collect future calendar(day)
|
||||
|
||||
Parameters
|
||||
----------
|
||||
qlib_dir:
|
||||
qlib data directory
|
||||
region:
|
||||
cn/CN or us/US
|
||||
start_date
|
||||
start date
|
||||
end_date
|
||||
end date
|
||||
|
||||
Examples
|
||||
-------
|
||||
# get cn future calendar
|
||||
$ python future_calendar_collector.py --qlib_data_1d_dir <user data dir> --region cn
|
||||
"""
|
||||
logger.info(f"collector future calendar: region={region}")
|
||||
_cur_module = importlib.import_module("future_calendar_collector")
|
||||
_class = getattr(_cur_module, f"CollectorFutureCalendar{region.upper()}")
|
||||
collector = _class(qlib_dir=qlib_dir, start_date=start_date, end_date=end_date)
|
||||
collector.write_calendar(collector.collector())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
fire.Fire(run)
|
||||
@@ -19,7 +19,7 @@ class TestElementOperator(TestOperatorData):
|
||||
"Abs($change)",
|
||||
]
|
||||
columns = ["change", "abs"]
|
||||
self.data = DatasetProvider.expression_calculator(
|
||||
self.data = DatasetProvider.inst_calculator(
|
||||
self.inst, self.start_time, self.end_time, freq, expressions, self.spans, C, []
|
||||
)
|
||||
self.data.columns = columns
|
||||
|
||||
Reference in New Issue
Block a user