1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-06 04:20:57 +08:00

Merge remote-tracking branch 'upstream/high-freq-execution' into high-freq-execution

This commit is contained in:
Yuchen Fang
2021-03-19 14:11:08 +08:00
27 changed files with 1401 additions and 107 deletions

View File

@@ -0,0 +1,28 @@
# High-Frequency Dataset
This dataset is an example for RL high frequency trading.
## Get High-Frequency Data
Get high-frequency data by running the following command:
```bash
python workflow.py get_data
```
## Dump & Reload & Reinitialize the Dataset
The High-Frequency Dataset is implemented as `qlib.data.dataset.DatasetH` in the `workflow.py`. `DatatsetH` is the subclass of [`qlib.utils.serial.Serializable`](https://qlib.readthedocs.io/en/latest/advanced/serial.html), whose state can be dumped in or loaded from disk in `pickle` format.
### About Reinitialization
After reloading `Dataset` from disk, `Qlib` also support reinitializing the dataset. It means that users can reset some states of `Dataset` or `DataHandler` such as `instruments`, `start_time`, `end_time` and `segments`, etc., and generate new data according to the states.
The example is given in `workflow.py`, users can run the code as follows.
### Run the Code
Run the example by running the following command:
```bash
python workflow.py dump_and_load_dataset
```

View File

@@ -0,0 +1,174 @@
from qlib.data.dataset.handler import DataHandler, DataHandlerLP
from qlib.data.dataset.processor import Processor
from qlib.utils import get_cls_kwargs
from qlib.log import TimeInspector
class HighFreqHandler(DataHandlerLP):
def __init__(
self,
instruments="csi300",
start_time=None,
end_time=None,
infer_processors=[],
learn_processors=[],
fit_start_time=None,
fit_end_time=None,
drop_raw=True,
):
def check_transform_proc(proc_l):
new_l = []
for p in proc_l:
p["kwargs"].update(
{
"fit_start_time": fit_start_time,
"fit_end_time": fit_end_time,
}
)
new_l.append(p)
return new_l
infer_processors = check_transform_proc(infer_processors)
learn_processors = check_transform_proc(learn_processors)
data_loader = {
"class": "QlibDataLoader",
"kwargs": {
"config": self.get_feature_config(),
"swap_level": False,
"freq": "1min",
},
}
super().__init__(
instruments=instruments,
start_time=start_time,
end_time=end_time,
data_loader=data_loader,
infer_processors=infer_processors,
learn_processors=learn_processors,
drop_raw=drop_raw,
)
def get_feature_config(self):
fields = []
names = []
template_if = "If(IsNull({1}), {0}, {1})"
template_paused = "Select(Or(IsNull($paused), Eq($paused, 0.0)), {0})"
template_fillnan = "BFillNan(FFillNan({0}))"
# Because there is no vwap field in the yahoo data, a method similar to Simpson integration is used to approximate vwap
simpson_vwap = "($open + 2*$high + 2*$low + $close)/6"
def get_normalized_price_feature(price_field, shift=0):
"""Get normalized price feature ops"""
if shift == 0:
template_norm = "Cut({0}/Ref(DayLast({1}), 240), 240, None)"
else:
template_norm = "Cut(Ref({0}, " + str(shift) + ")/Ref(DayLast({1}), 240), 240, None)"
feature_ops = template_norm.format(
template_if.format(
template_fillnan.format(template_paused.format("$close")),
template_paused.format(price_field),
),
template_fillnan.format(template_paused.format("$close")),
)
return feature_ops
fields += [get_normalized_price_feature("$open", 0)]
fields += [get_normalized_price_feature("$high", 0)]
fields += [get_normalized_price_feature("$low", 0)]
fields += [get_normalized_price_feature("$close", 0)]
fields += [get_normalized_price_feature(simpson_vwap, 0)]
names += ["$open", "$high", "$low", "$close", "$vwap"]
fields += [get_normalized_price_feature("$open", 240)]
fields += [get_normalized_price_feature("$high", 240)]
fields += [get_normalized_price_feature("$low", 240)]
fields += [get_normalized_price_feature("$close", 240)]
fields += [get_normalized_price_feature(simpson_vwap, 240)]
names += ["$open_1", "$high_1", "$low_1", "$close_1", "$vwap_1"]
fields += [
"Cut({0}/Ref(DayLast(Mean({0}, 7200)), 240), 240, None)".format(
"If(IsNull({0}), 0, If(Or(Gt({1}, Mul(1.001, {3})), Lt({1}, Mul(0.999, {2}))), 0, {0}))".format(
template_paused.format("$volume"),
template_paused.format(simpson_vwap),
template_paused.format("$low"),
template_paused.format("$high"),
)
)
]
names += ["$volume"]
fields += [
"Cut(Ref({0}, 240)/Ref(DayLast(Mean({0}, 7200)), 240), 240, None)".format(
"If(IsNull({0}), 0, If(Or(Gt({1}, Mul(1.001, {3})), Lt({1}, Mul(0.999, {2}))), 0, {0}))".format(
template_paused.format("$volume"),
template_paused.format(simpson_vwap),
template_paused.format("$low"),
template_paused.format("$high"),
)
)
]
names += ["$volume_1"]
fields += ["Cut({0}, 240, None)".format(template_paused.format("Date($close)"))]
names += ["date"]
return fields, names
class HighFreqBacktestHandler(DataHandler):
def __init__(
self,
instruments="csi300",
start_time=None,
end_time=None,
):
data_loader = {
"class": "QlibDataLoader",
"kwargs": {
"config": self.get_feature_config(),
"swap_level": False,
"freq": "1min",
},
}
super().__init__(
instruments=instruments,
start_time=start_time,
end_time=end_time,
data_loader=data_loader,
)
def get_feature_config(self):
fields = []
names = []
template_if = "If(IsNull({1}), {0}, {1})"
template_paused = "Select(Or(IsNull($paused), Eq($paused, 0.0)), {0})"
template_fillnan = "BFillNan(FFillNan({0}))"
# Because there is no vwap field in the yahoo data, a method similar to Simpson integration is used to approximate vwap
simpson_vwap = "($open + 2*$high + 2*$low + $close)/6"
fields += [
"Cut({0}, 240, None)".format(template_fillnan.format(template_paused.format("$close"))),
]
names += ["$close0"]
fields += [
"Cut({0}, 240, None)".format(
template_if.format(
template_fillnan.format(template_paused.format("$close")),
template_paused.format(simpson_vwap),
)
)
]
names += ["$vwap0"]
fields += [
"Cut(If(IsNull({0}), 0, If(Or(Gt({1}, Mul(1.001, {3})), Lt({1}, Mul(0.999, {2}))), 0, {0})), 240, None)".format(
template_paused.format("$volume"),
template_paused.format(simpson_vwap),
template_paused.format("$low"),
template_paused.format("$high"),
)
]
names += ["$volume0"]
return fields, names

View File

@@ -0,0 +1,190 @@
import numpy as np
import pandas as pd
import importlib
from qlib.data.ops import ElemOperator, PairOperator
from qlib.config import C
from qlib.data.cache import H
from qlib.data.data import Cal
def get_calendar_day(freq="day", future=False):
"""Load High-Freq Calendar Date Using Memcache.
Parameters
----------
freq : str
frequency of read calendar file.
future : bool
whether including future trading day.
Returns
-------
_calendar:
array of date.
"""
flag = f"{freq}_future_{future}_day"
if flag in H["c"]:
_calendar = H["c"][flag]
else:
_calendar = np.array(list(map(lambda x: x.date(), Cal.load_calendar(freq, future))))
H["c"][flag] = _calendar
return _calendar
class DayLast(ElemOperator):
"""DayLast Operator
Parameters
----------
feature : Expression
feature instance
Returns
----------
feature:
a series of that each value equals the last value of its day
"""
def _load_internal(self, instrument, start_index, end_index, freq):
_calendar = get_calendar_day(freq=freq)
series = self.feature.load(instrument, start_index, end_index, freq)
return series.groupby(_calendar[series.index]).transform("last")
class FFillNan(ElemOperator):
"""FFillNan Operator
Parameters
----------
feature : Expression
feature instance
Returns
----------
feature:
a forward fill nan feature
"""
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
return series.fillna(method="ffill")
class BFillNan(ElemOperator):
"""BFillNan Operator
Parameters
----------
feature : Expression
feature instance
Returns
----------
feature:
a backfoward fill nan feature
"""
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
return series.fillna(method="bfill")
class Date(ElemOperator):
"""Date Operator
Parameters
----------
feature : Expression
feature instance
Returns
----------
feature:
a series of that each value is the date corresponding to feature.index
"""
def _load_internal(self, instrument, start_index, end_index, freq):
_calendar = get_calendar_day(freq=freq)
series = self.feature.load(instrument, start_index, end_index, freq)
return pd.Series(_calendar[series.index], index=series.index)
class Select(PairOperator):
"""Select Operator
Parameters
----------
feature_left : Expression
feature instance, select condition
feature_right : Expression
feature instance, select value
Returns
----------
feature:
value(feature_right) that meets the condition(feature_left)
"""
def _load_internal(self, instrument, start_index, end_index, freq):
series_condition = self.feature_left.load(instrument, start_index, end_index, freq)
series_feature = self.feature_right.load(instrument, start_index, end_index, freq)
return series_feature.loc[series_condition]
class IsNull(ElemOperator):
"""IsNull Operator
Parameters
----------
feature : Expression
feature instance
Returns
----------
feature:
A series indicating whether the feature is nan
"""
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
return series.isnull()
class Cut(ElemOperator):
"""Cut Operator
Parameters
----------
feature : Expression
feature instance
l : int
l > 0, delete the first l elements of feature (default is None, which means 0)
r : int
r < 0, delete the last -r elements of feature (default is None, which means 0)
Returns
----------
feature:
A series with the first l and last -r elements deleted from the feature.
Note: It is deleted from the raw data, not the sliced data
"""
def __init__(self, feature, l=None, r=None):
self.l = l
self.r = r
if (self.l is not None and self.l <= 0) or (self.r is not None and self.r >= 0):
raise ValueError("Cut operator l shoud > 0 and r should < 0")
super(Cut, self).__init__(feature)
def _load_internal(self, instrument, start_index, end_index, freq):
series = self.feature.load(instrument, start_index, end_index, freq)
return series.iloc[self.l : self.r]
def get_extended_window_size(self):
ll = 0 if self.l is None else self.l
rr = 0 if self.r is None else abs(self.r)
lft_etd, rght_etd = self.feature.get_extended_window_size()
lft_etd = lft_etd + ll
rght_etd = rght_etd + rr
return lft_etd, rght_etd

View File

@@ -0,0 +1,72 @@
import numpy as np
import pandas as pd
from qlib.data.dataset.processor import Processor
from qlib.data.dataset.utils import fetch_df_by_index
class HighFreqNorm(Processor):
def __init__(self, fit_start_time, fit_end_time):
self.fit_start_time = fit_start_time
self.fit_end_time = fit_end_time
def fit(self, df_features):
fetch_df = fetch_df_by_index(df_features, slice(self.fit_start_time, self.fit_end_time), level="datetime")
del df_features
df_values = fetch_df.values
names = {
"price": slice(0, 10),
"volume": slice(10, 12),
}
self.feature_med = {}
self.feature_std = {}
self.feature_vmax = {}
self.feature_vmin = {}
for name, name_val in names.items():
part_values = df_values[:, name_val].astype(np.float32)
if name == "volume":
part_values = np.log1p(part_values)
self.feature_med[name] = np.nanmedian(part_values)
part_values = part_values - self.feature_med[name]
self.feature_std[name] = np.nanmedian(np.absolute(part_values)) * 1.4826 + 1e-12
part_values = part_values / self.feature_std[name]
self.feature_vmax[name] = np.nanmax(part_values)
self.feature_vmin[name] = np.nanmin(part_values)
def __call__(self, df_features):
df_features.set_index("date", append=True, drop=True, inplace=True)
df_values = df_features.values
names = {
"price": slice(0, 10),
"volume": slice(10, 12),
}
for name, name_val in names.items():
if name == "volume":
df_values[:, name_val] = np.log1p(df_values[:, name_val])
df_values[:, name_val] -= self.feature_med[name]
df_values[:, name_val] /= self.feature_std[name]
slice0 = df_values[:, name_val] > 3.0
slice1 = df_values[:, name_val] > 3.5
slice2 = df_values[:, name_val] < -3.0
slice3 = df_values[:, name_val] < -3.5
df_values[:, name_val][slice0] = (
3.0 + (df_values[:, name_val][slice0] - 3.0) / (self.feature_vmax[name] - 3) * 0.5
)
df_values[:, name_val][slice1] = 3.5
df_values[:, name_val][slice2] = (
-3.0 - (df_values[:, name_val][slice2] + 3.0) / (self.feature_vmin[name] + 3) * 0.5
)
df_values[:, name_val][slice3] = -3.5
idx = df_features.index.droplevel("datetime").drop_duplicates()
idx.set_names(["instrument", "datetime"], inplace=True)
# Reshape is specifically for adapting to RL high-freq executor
feat = df_values[:, [0, 1, 2, 3, 4, 10]].reshape(-1, 6 * 240)
feat_1 = df_values[:, [5, 6, 7, 8, 9, 11]].reshape(-1, 6 * 240)
df_new_features = pd.DataFrame(
data=np.concatenate((feat, feat_1), axis=1),
index=idx,
columns=["FEATURE_%d" % i for i in range(12 * 240)],
).sort_index()
return df_new_features

View File

@@ -0,0 +1,217 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import sys
import fire
from pathlib import Path
import qlib
import pickle
import numpy as np
import pandas as pd
from qlib.config import REG_CN, HIGH_FREQ_CONFIG
from qlib.contrib.model.gbdt import LGBModel
from qlib.contrib.data.handler import Alpha158
from qlib.contrib.strategy.strategy import TopkDropoutStrategy
from qlib.contrib.evaluate import (
backtest as normal_backtest,
risk_analysis,
)
from qlib.utils import init_instance_by_config, exists_qlib_data
from qlib.data.dataset.handler import DataHandlerLP
from qlib.data.ops import Operators
from qlib.data.data import Cal
from qlib.tests.data import GetData
from highfreq_ops import get_calendar_day, DayLast, FFillNan, BFillNan, Date, Select, IsNull, Cut
class HighfreqWorkflow(object):
SPEC_CONF = {"custom_ops": [DayLast, FFillNan, BFillNan, Date, Select, IsNull, Cut], "expression_cache": None}
MARKET = "all"
BENCHMARK = "SH000300"
start_time = "2020-09-15 00:00:00"
end_time = "2021-01-18 16:00:00"
train_end_time = "2020-11-30 16:00:00"
test_start_time = "2020-12-01 00:00:00"
DATA_HANDLER_CONFIG0 = {
"start_time": start_time,
"end_time": end_time,
"fit_start_time": start_time,
"fit_end_time": train_end_time,
"instruments": MARKET,
"infer_processors": [{"class": "HighFreqNorm", "module_path": "highfreq_processor", "kwargs": {}}],
}
DATA_HANDLER_CONFIG1 = {
"start_time": start_time,
"end_time": end_time,
"instruments": MARKET,
}
task = {
"dataset": {
"class": "DatasetH",
"module_path": "qlib.data.dataset",
"kwargs": {
"handler": {
"class": "HighFreqHandler",
"module_path": "highfreq_handler",
"kwargs": DATA_HANDLER_CONFIG0,
},
"segments": {
"train": (start_time, train_end_time),
"test": (
test_start_time,
end_time,
),
},
},
},
"dataset_backtest": {
"class": "DatasetH",
"module_path": "qlib.data.dataset",
"kwargs": {
"handler": {
"class": "HighFreqBacktestHandler",
"module_path": "highfreq_handler",
"kwargs": DATA_HANDLER_CONFIG1,
},
"segments": {
"train": (start_time, train_end_time),
"test": (
test_start_time,
end_time,
),
},
},
},
}
def _init_qlib(self):
"""initialize qlib"""
# use yahoo_cn_1min data
QLIB_INIT_CONFIG = {**HIGH_FREQ_CONFIG, **self.SPEC_CONF}
provider_uri = QLIB_INIT_CONFIG.get("provider_uri")
if not exists_qlib_data(provider_uri):
print(f"Qlib data is not found in {provider_uri}")
GetData().qlib_data(target_dir=provider_uri, interval="1min", region=REG_CN)
qlib.init(**QLIB_INIT_CONFIG)
def _prepare_calender_cache(self):
"""preload the calendar for cache"""
# This code used the copy-on-write feature of Linux to avoid calculating the calendar multiple times in the subprocess
# This code may accelerate, but may be not useful on Windows and Mac Os
Cal.calendar(freq="1min")
get_calendar_day(freq="1min")
def get_data(self):
"""use dataset to get highreq data"""
self._init_qlib()
self._prepare_calender_cache()
dataset = init_instance_by_config(self.task["dataset"])
xtrain, xtest = dataset.prepare(["train", "test"])
print(xtrain, xtest)
dataset_backtest = init_instance_by_config(self.task["dataset_backtest"])
backtest_train, backtest_test = dataset_backtest.prepare(["train", "test"])
print(backtest_train, backtest_test)
return
def dump_and_load_dataset(self):
"""dump and load dataset state on disk"""
self._init_qlib()
self._prepare_calender_cache()
dataset = init_instance_by_config(self.task["dataset"])
dataset_backtest = init_instance_by_config(self.task["dataset_backtest"])
##=============dump dataset=============
dataset.to_pickle(path="dataset.pkl")
dataset_backtest.to_pickle(path="dataset_backtest.pkl")
del dataset, dataset_backtest
##=============reload dataset=============
with open("dataset.pkl", "rb") as file_dataset:
dataset = pickle.load(file_dataset)
with open("dataset_backtest.pkl", "rb") as file_dataset_backtest:
dataset_backtest = pickle.load(file_dataset_backtest)
self._prepare_calender_cache()
##=============reinit dataset=============
dataset.init(
handler_kwargs={
"init_type": DataHandlerLP.IT_LS,
"start_time": "2021-01-19 00:00:00",
"end_time": "2021-01-25 16:00:00",
},
segment_kwargs={
"test": (
"2021-01-19 00:00:00",
"2021-01-25 16:00:00",
),
},
)
dataset_backtest.init(
handler_kwargs={
"start_time": "2021-01-19 00:00:00",
"end_time": "2021-01-25 16:00:00",
},
segment_kwargs={
"test": (
"2021-01-19 00:00:00",
"2021-01-25 16:00:00",
),
},
)
##=============get data=============
xtest = dataset.prepare(["test"])
backtest_test = dataset_backtest.prepare(["test"])
print(xtest, backtest_test)
return
def get_high_freq_data(self, data_path):
self._init_qlib()
self._prepare_calender_cache()
import os
dataset = init_instance_by_config(self.task["dataset"])
xtrain, xtest = dataset.prepare(["train", "test"])
normed_feature = pd.concat([xtrain, xtest]).sort_index()
dic = dict(tuple(normed_feature.groupby("instrument")))
feature_path = os.path.join(data_path, "normed_feature/")
if not os.path.exists(feature_path):
os.makedirs(feature_path)
for k, v in dic.items():
v.to_pickle(feature_path + f"{k}.pkl")
dataset_backtest = init_instance_by_config(self.task["dataset_backtest"])
backtest_train, backtest_test = dataset_backtest.prepare(["train", "test"])
backtest = pd.concat([backtest_train, backtest_test]).sort_index()
backtest['date'] = backtest.index.map(lambda x: x[1].date())
backtest.set_index('date', append=True, drop=True, inplace=True)
dic = dict(tuple(backtest.groupby("instrument")))
backtest_path = os.path.join(data_path, "backtest/")
if not os.path.exists(backtest_path):
os.makedirs(backtest_path)
for k, v in dic.items():
v.to_pickle(backtest_path + f"{k}.pkl.backtest")
if __name__ == "__main__":
#fire.Fire(HighfreqWorkflow)
data_path = '../data/'
workflow = HighfreqWorkflow()
workflow.get_high_freq_data(data_path)

View File

@@ -4,6 +4,91 @@ This is the experiment code for our AAAI 2021 paper "[Universal Trading for Orde
## Abstract
As a fundamental problem in algorithmic trading, order execution aims at fulfilling a specific trading order, either liquidation or acquirement, for a given instrument. Towards effective execution strategy, recent years have witnessed the shift from the analytical view with model-based market assumptions to model-free perspective, i.e., reinforcement learning, due to its nature of sequential decision optimization. However, the noisy and yet imperfect market information that can be leveraged by the policy has made it quite challenging to build up sample efficient reinforcement learning methods to achieve effective order execution. In this paper, we propose a novel universal trading policy optimization framework to bridge the gap between the noisy yet imperfect market states and the optimal action sequences for order execution. Particularly, this framework leverages a policy distillation method that can better guide the learning of the common policy towards practically optimal execution by an oracle teacher with perfect information to approximate the optimal trading strategy. The extensive experiments have shown significant improvements of our method over various strong baselines, with reasonable trading actions.
## Environment Dependencies
### Dependencies
```
gym==0.17.3
torch==1.6.0
numba==0.51.2
numpy==1.19.1
pandas==1.1.3
tqdm==4.50.2
tianshou==0.3.0.post1
env==0.1.0
PyYAML==5.4.1
redis==3.5.3
```
### Environment Variable
`EXP_PATH` Absolute path to your config folder, we give folder `exp` as an example.
`OUTPUT_DIR` Absolute path to your log folder.
## Data Processing
For Feature processing, we take Yahoo dataset as an example, which can be precessed in `qlib/examples/highfreq/workflow.py` file. If you have a need to change your data storage path, you can change the `data_path` in `workflow.py`, and then do the following.
```
python workflow.py
```
For order generation, if you have changed change the the `data_path` in `workflow.py`, change `data_path` in `order_gen.py` again, then do the following.
```
python order_gen.py
```
## Training and backtest
### Config file
Config file is need to start our project, we take `PPO`, `OPDS` and `OPD` as an example in folder `exp/example`. If you want to use our given config, make sure the `data_path` you set before matches the config file.
### Baseline method
To run a method, you can do the following.
```
python main.py --config={config_path}
```
Where `{config_path}` means the relative path from your config.yml to `EXP_PATH`.
If you need to run our given method such as PPO method, you can do the following.
```
python main.py --config=example/PPO/config.yml
```
### OPD method
OPD method is a multi step method, at first you should run OPDT as the teacher in OPD method.
```
python main.py --config=example/OPDT/config.yml
```
After training, find the `policy_best` file in your OPDT log file and copy it to `trade` file for backtest. Also you can change `policy_path` in the `example/OPDT_b/config.yml` to your `policy_best` file. Then run the backtest method.
```
python main.py --config=example/OPDT_b/config.yml
```
then processed feature from teacher. Remember to change `log_path` if you have changed `log_dir` in `OPDT_b/config.yml`.
```
python teacher_feature.py
```
and finally start our OPD method.
```
python main.py --config=example/OPD/config.yml
```
### Citation
You are more than welcome to cite our paper:
```
@@ -13,4 +98,4 @@ You are more than welcome to cite our paper:
booktitle={Proceedings of the AAAI Conference on Artificial Intelligence},
year={2021}
}
```
```

View File

@@ -0,0 +1,76 @@
seed: 42
task: train
log_dir: example/OPD
buffer_size: 80000
io_conf:
test_sampler: TestSampler
train_sampler: Sampler
test_logger: DFLogger
resources:
num_cpus: 24
num_gpus: 1
device: cuda
train_paths:
raw_dir: ../data/backtest/
order_dir: ../data/order/train/
valid_paths:
raw_dir: ../data/backtest/
order_dir: ../data/order/valid/
test_paths:
raw_dir: ../data/backtest/
order_dir: ../data/order/test/
env_conf:
name: StockEnv_Acc
max_step_num: 237
limit: 10
time_interval: 30
interval_num: 8
features:
- name: raw
type: range
loc: ../data/normed_feature/
size: 180
- name: teacher_action
type: interval
size: 1
loc: ../data/feature/teacher/
obs:
name: RuleTeacher
config: {}
action:
name: Static_Action
config:
action_num: 5
action_map: [0, 0.25, 0.5, 0.75, 1]
reward:
VP_Penalty_small_vec:
penalty: 100
coefficient: 1
policy_conf:
name: PPO_sup
config:
discount_factor: 1.
max_grad_norm: 100.
reward_normalization: False
eps_clip: 0.3
value_clip: True
vf_coef: 1.
gae_lambda: 1.
vf_clip_para: 0.3
sup_coef: 0.01
network_conf:
name: OPD
config:
hidden_size: 64
out_shape: 5
fc_size: 32
cnn_shape: [30, 6]
optim:
lr: 1e-4
batch_size: 1024
max_epoch: 30
step_per_epoch: 20
collect_per_step: 10000
repeat_per_collect: 5
early_stopping: 5
weight_decay: 0.

View File

@@ -0,0 +1,71 @@
seed: 42
task: train
log_dir: example/OPDS
buffer_size: 80000
io_conf:
test_sampler: TestSampler
train_sampler: Sampler
test_logger: DFLogger
resources:
num_cpus: 24
num_gpus: 1
device: cuda
train_paths:
raw_dir: ../data/backtest/
order_dir: ../data/order/train/
valid_paths:
raw_dir: ../data/backtest/
order_dir: ../data/order/valid/
test_paths:
raw_dir: ../data/backtest/
order_dir: ../data/order/test/
env_conf:
name: StockEnv_Acc
max_step_num: 237
limit: 10
time_interval: 30
interval_num: 8
features:
- name: raw
type: range
loc: ../data/normed_feature/
size: 180
obs:
name: TeacherObs
config: {}
action:
name: Static_Action
config:
action_num: 5
action_map: [0, 0.25, 0.5, 0.75, 1]
reward:
VP_Penalty_small_vec:
penalty: 100
coefficient: 1
policy_conf:
name: PPO
config:
discount_factor: 1.
max_grad_norm: 100.
reward_normalization: False
eps_clip: 0.3
value_clip: True
vf_coef: 1.
gae_lambda: 1.
vf_clip_para: 0.3
network_conf:
name: PPO
config:
hidden_size: 64
out_shape: 5
fc_size: 32
cnn_shape: [30, 6]
optim:
lr: 1e-4
batch_size: 1024
max_epoch: 30
step_per_epoch: 20
collect_per_step: 10000
repeat_per_collect: 5
early_stopping: 5
weight_decay: 0.

View File

@@ -0,0 +1,71 @@
seed: 42
task: train
log_dir: example/OPDT
buffer_size: 80000
io_conf:
test_sampler: TestSampler
train_sampler: Sampler
test_logger: DFLogger
resources:
num_cpus: 24
num_gpus: 1
device: cuda
train_paths:
raw_dir: ../data/backtest/
order_dir: ../data/order/train/
valid_paths:
raw_dir: ../data/backtest/
order_dir: ../data/order/valid/
test_paths:
raw_dir: ../data/backtest/
order_dir: ../data/order/test/
env_conf:
name: StockEnv_Acc
max_step_num: 237
limit: 10
time_interval: 30
interval_num: 8
features:
- name: raw
type: range
loc: ../data/normed_feature/
size: 180
obs:
name: TeacherObs
config: {}
action:
name: Static_Action
config:
action_num: 5
action_map: [0, 0.25, 0.5, 0.75, 1]
reward:
VP_Penalty_small_vec:
penalty: 100
coefficient: 1
policy_conf:
name: PPO
config:
discount_factor: 1.
max_grad_norm: 100.
reward_normalization: False
eps_clip: 0.3
value_clip: True
vf_coef: 1.
gae_lambda: 1.
vf_clip_para: 0.3
network_conf:
name: Teacher
config:
hidden_size: 64
out_shape: 5
fc_size: 32
cnn_shape: [30, 6]
optim:
lr: 1e-4
batch_size: 1024
max_epoch: 30
step_per_epoch: 20
collect_per_step: 10000
repeat_per_collect: 5
early_stopping: 5
weight_decay: 0.

View File

@@ -0,0 +1,76 @@
seed: 42
task: eval
log_dir: example/OPDT_b
buffer_size: 80000
io_conf:
test_sampler: TestSampler
train_sampler: Sampler
test_logger: DFLogger
resources:
num_cpus: 24
num_gpus: 1
device: cuda
train_paths:
raw_dir: ../data/backtest/
order_dir: ../data/order/train/
valid_paths:
raw_dir: ../data/backtest/
order_dir: ../data/order/valid/
test_paths:
raw_dir: ../data/backtest/
order_dir: ../data/order/all/
env_conf:
name: StockEnv_Acc
max_step_num: 237
limit: 10
time_interval: 30
interval_num: 8
features:
- name: raw
type: range
loc: ../data/normed_feature/
size: 180
obs:
name: TeacherObs
config: {}
action:
name: Static_Action
config:
action_num: 5
action_map: [0, 0.25, 0.5, 0.75, 1]
reward:
VP_Penalty_small_vec:
penalty: 100
coefficient: 1
policy_path: policy_best
policy_conf:
name: PPO
config:
discount_factor: 1.
max_grad_norm: 100.
reward_normalization: False
eps_clip: 0.3
value_clip: True
vf_coef: 1.
gae_lambda: 1.
vf_clip_para: 0.3
network_conf:
name: Teacher
config:
hidden_size: 64
out_shape: 5
fc_size: 32
cnn_shape: [30, 6]
optim:
lr: 1e-4
batch_size: 1024
max_epoch: 30
step_per_epoch: 20
collect_per_step: 10000
repeat_per_collect: 5
early_stopping: 5
weight_decay: 0.
search:
optim.weight_decay:
type: choice
value: [0.]

View File

@@ -0,0 +1,70 @@
seed: 42
task: train
log_dir: example/PPO
buffer_size: 80000
io_conf:
test_sampler: TestSampler
train_sampler: Sampler
test_logger: DFLogger
resources:
num_cpus: 24
num_gpus: 1
device: cuda
train_paths:
raw_dir: ../data/backtest/
order_dir: ../data/order/train/
valid_paths:
raw_dir: ../data/backtest/
order_dir: ../data/order/valid/
test_paths:
raw_dir: ../data/backtest/
order_dir: ../data/order/test/
env_conf:
name: StockEnv_Acc
max_step_num: 237
limit: 10
time_interval: 30
interval_num: 8
features:
- name: raw
type: range
loc: ../data/normed_feature/
size: 180
obs:
name: TeacherObs
config: {}
action:
name: Static_Action
config:
action_num: 5
action_map: [0, 0.25, 0.5, 0.75, 1]
reward:
PPO_Reward:
coefficient: 1
policy_conf:
name: PPO
config:
discount_factor: 1.
max_grad_norm: 100.
reward_normalization: False
eps_clip: 0.3
value_clip: True
vf_coef: 1.
gae_lambda: 1.
vf_clip_para: 0.3
network_conf:
name: PPO
config:
hidden_size: 64
out_shape: 5
fc_size: 32
cnn_shape: [30, 6]
optim:
lr: 1e-4
batch_size: 1024
max_epoch: 30
step_per_epoch: 20
collect_per_step: 10000
repeat_per_collect: 5
early_stopping: 5
weight_decay: 0.

View File

@@ -87,7 +87,7 @@ class DFLogger(object):
df_cache[ins] = (
[],
[],
len(pd.read_pickle(order_dir + ins + ".pkl.target")),
(pd.read_pickle(order_dir + ins + ".pkl.target")['amount'] != 0).sum(),
)
df_cache[ins][0].append(df)
df_cache[ins][1].append(res)

View File

@@ -0,0 +1,59 @@
import numpy as np
import pandas as pd
import os
import time
import datetime
from joblib import Parallel, delayed
data_path = '../data/'
in_dir = os.path.join(data_path, 'backtest/')
### create order folders ####
def generate_order(df, start, end):
# df['date'] = df.index.map(lambda x: x[1].date())
# df.set_index('date', append=True, inplace=True)
df = df.groupby('date').take(range(start, end)).droplevel(level=0)
div = df['$volume0'].rolling((end - start)*60).mean().shift(1).groupby(level='date').transform('first')
order = df.groupby(level=(2, 0)).mean().dropna()
order = pd.DataFrame(order)
order['amount'] = np.random.lognormal(-3.28, 1.14) * order['$volume0']
order['order_type'] = 0
order = order.drop(columns=["$volume0", "$vwap0"])
return order
def w_order(f, start, end):
df = pd.read_pickle(in_dir + f)
#df['date'] = df.index.get_level_values(1).map(lambda x: x.date())
#df = df.set_index('date', append=True, drop=True)
# old_order = pd.read_pickle('../v-zeh/full-07-20/order/ratio_test/' + f)
order = generate_order(df, start, end)
# order = order[order.index.isin(old_order.index)]
order_train = order[order.index.get_level_values(0) < '2020-12-01']
order_test = order[order.index.get_level_values(0) >= '2020-12-01']
order_valid = order_test[order_test.index.get_level_values(0) < '2021-01-01']
order_test = order_test[order_test.index.get_level_values(0) >= '2021-01-01']
if len(order_train) > 0:
train_path = os.path.join(data_path, "order/train/")
if not os.path.exists(train_path):
os.makedirs(train_path)
order_train.to_pickle(train_path + f[:-9] + '.target')
if len(order_valid) > 0:
valid_path = os.path.join(data_path, "order/valid/")
if not os.path.exists(valid_path):
os.makedirs(valid_path)
order_valid.to_pickle(valid_path + f[:-9] + '.target')
if len(order_test) > 0:
test_path = os.path.join(data_path, "order/test/")
if not os.path.exists(test_path):
os.makedirs(test_path)
order_test.to_pickle(test_path + f[:-9] + '.target')
if len(order) > 0:
all_path = os.path.join(data_path, "order/all/")
if not os.path.exists(all_path):
os.makedirs(all_path)
order_test.to_pickle(all_path + f[:-9] + '.target')
return 0
res = Parallel(n_jobs=64)(delayed(w_order)(f, 0, 239) for f in os.listdir(in_dir))
print(sum(res))

View File

@@ -0,0 +1,24 @@
import pandas as pd
import os
data_path = '../data/'
feature_path = os.path.join(data_path, 'feature/teacher/')
if not os.path.exists(feature_path):
os.makedirs(feature_path)
log_file = os.path.join(os.environ.get('OUTPUT_DIR'),'example/OPDT_b/0/test/')
files = os.listdir(log_file)
for f in files:
if f.endswith(".log"):
df = pd.read_pickle(log_file + f)
df['datetime'] = df.index.get_level_values(1).map(lambda x: x[1])
df.set_index('datetime', append=True, drop=True, inplace=True)
action = df['action']
action = action.reset_index(level=1, drop=True)
action.index = action.index.map(lambda x: (x[0], x[1], x[2].time()))
action = action.unstack().iloc[:, ::30] * 2
action = action.fillna(0)
train_action = action.astype("int")
final = train_action
final.to_pickle(feature_path + f[:-4] + '.pkl')

View File

@@ -17,7 +17,7 @@ from qlib.contrib.evaluate import (
from qlib.utils import exists_qlib_data, init_instance_by_config, flatten_dict
from qlib.workflow import R
from qlib.workflow.record_temp import SignalRecord, PortAnaRecord
from qlib.tests.data import GetData
if __name__ == "__main__":
@@ -25,9 +25,6 @@ if __name__ == "__main__":
provider_uri = "~/.qlib/qlib_data/cn_data" # target_dir
if not exists_qlib_data(provider_uri):
print(f"Qlib data is not found in {provider_uri}")
sys.path.append(str(Path(__file__).resolve().parent.parent.joinpath("scripts")))
from get_data import GetData
GetData().qlib_data(target_dir=provider_uri, region=REG_CN)
qlib.init(provider_uri=provider_uri, region=REG_CN)