1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-07-01 01:51:18 +08:00

volume limit update

This commit is contained in:
wangwenxi.handsome
2021-08-01 16:03:08 +00:00
parent 5c2ddac7f0
commit 0f2d85d098
4 changed files with 132 additions and 123 deletions

View File

@@ -5,30 +5,7 @@ from qlib.data.ops import ElemOperator, PairOperator
from qlib.config import C
from qlib.data.cache import H
from qlib.data.data import Cal
def get_calendar_day(freq="day", future=False):
"""Load High-Freq Calendar Date Using Memcache.
Parameters
----------
freq : str
frequency of read calendar file.
future : bool
whether including future trading day.
Returns
-------
_calendar:
array of date.
"""
flag = f"{freq}_future_{future}_day"
if flag in H["c"]:
_calendar = H["c"][flag]
else:
_calendar = np.array(list(map(lambda x: pd.Timestamp(x.date()), Cal.load_calendar(freq, future))))
H["c"][flag] = _calendar
return _calendar
from qlib.contrib.ops.high_freq import get_calendar_day
class DayLast(ElemOperator):

View File

@@ -1,6 +1,7 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from __future__ import annotations
from collections import defaultdict
from typing import TYPE_CHECKING
if TYPE_CHECKING:
@@ -68,30 +69,37 @@ class Exchange:
<the expression for sell stock limitation>)
`False` value indicates the stock is tradable
`True` value indicates the stock is limited and not tradable
:param volume_threshold: Union[
:param volume_threshold: Union[
Dict[
"all": Union(str, List[str], Tuple[str]),
"buy": Union(str, List[str], Tuple[str]),
"sell": Union(str, List[str], Tuple[str]),
"all": ("cum" or "current", limit_str),
"buy": ("cum" or "current", limit_str),
"sell":("cum" or "current", limit_str),
],
Union(str, List[str], Tuple[str],
]
1) str means one volume limit. In another words, each volume limit is a string.
There are two kinds of string to represent limit.
- the first kind of string is qlib data expression but it must starts with "$".
such as "$askV1", "$bidV1 * 0.8"
- the second kind of string is composed of special fields. Currently we only
supports #market and #dealed. #market is market volume so far that day.
!!!Note that if you use the #market field, you must register the DayCumsum operator
in qlib.contrib.ops.high_freq when initial the qlib. #dealed is dealed order num so far that day.
such as "0.8 * #market - #dealed", "0.6 * #market"
2) "all" means the volume limits are both of buying and selling.
("cum" or "current", limit_str),
]
1) ("cum" or "current", limit_str) denotes a single volume limit.
- limit_str is qlib data expression which is allowed to define your own Operator.
Please refer to qlib/contrib/ops/high_freq.py, here are any custom operator for high frequency,
such as DayCumsum. !!!NOTE: if you want you use the custom operator, you need to
register it in qlib_init.
- "cum" means that this is a cumulative value over time, such as cumulative market volume.
So when it is used as a volume limit, it is necessary to subtract the dealed amount.
- "current" means that this is a real-time value and will not accumulate over time,
so it can be directly used as a capacity limit.
e.g. ("cum", "0.2 * DayCumsum($volume, '9:45', '14:45')"), ("current", "$bidV1")
2) "all" means the volume limits are both buying and selling.
"buy" means the volume limits of buying. "sell" means the volume limits of selling.
Different volume limits will be aggregated with min(). If volume_threshold is only
Union(str, List[str], Tuple[str]) instead of a dict, the volume limits are for
both by deault.
3) e.g. {"all": ("#market * 0.2 - #dealed"), "buy": ("$askV1"), "sell": ("$bidV1")}
("cum" or "current", limit_str) instead of a dict, the volume limits are for
both by deault. In other words, it is same as {"all": ("cum" or "current", limit_str)}.
3) e.g. "volume_threshold": {
"all": ("cum", "0.2 * DayCumsum($volume, '9:45', '14:45')"),
"buy": ("current", "$askV1"),
"sell": ("current", "$bidV1"),
}
:param open_cost: cost rate for open, default 0.0015
:param close_cost: cost rate for close, default 0.0025
@@ -277,9 +285,9 @@ class Exchange:
-------
fields: set
the fields need to get from qlib.
buy_vol_limit: List[str]
buy_vol_limit: List[Tuple[str]]
all volume limits of buying.
sell_vol_limit: List[str]
sell_vol_limit: List[Tuple[str]]
all volume limits of selling.
Raises
@@ -293,27 +301,19 @@ class Exchange:
fields = set()
buy_vol_limit = []
sell_vol_limit = []
if isinstance(volume_threshold, (str, tuple, list)):
if isinstance(volume_threshold, tuple):
volume_threshold = {"all": volume_threshold}
assert type(volume_threshold) == dict
for key in volume_threshold:
vol_limits = volume_threshold[key]
if isinstance(vol_limits, str):
vol_limits = [vol_limits]
for vol_lt in vol_limits:
# the str is qlib data expression when the first character is "$".
if vol_lt[0] == "$":
fields.add(vol_lt)
# the str is composed of special_fields
elif "#market" in vol_lt:
fields.add("DayCumsum($volume)")
else:
raise ValueError(f"volume limit string must be qlib expression or special_fields")
vol_limit = volume_threshold[key]
assert type(vol_limit) == tuple
fields.add(vol_limit[1])
if key in ("buy", "all"):
buy_vol_limit.append(vol_lt)
if key in ("sell", "all"):
sell_vol_limit.append(vol_lt)
if key in ("buy", "all"):
buy_vol_limit.append(vol_limit)
if key in ("sell", "all"):
sell_vol_limit.append(vol_limit)
return buy_vol_limit, sell_vol_limit, fields
@@ -366,7 +366,11 @@ class Exchange:
return True
def deal_order(
self, order, trade_account: Account = None, position: BasePosition = None, deal_order_num: dict = None
self,
order,
trade_account: Account = None,
position: BasePosition = None,
dealed_order_amount: defaultdict = defaultdict(float),
):
"""
Deal order when the actual transaction
@@ -376,7 +380,7 @@ class Exchange:
:param order: Deal the order.
:param trade_account: Trade account to be updated after dealing the order.
:param position: position to be updated after dealing the order.
:param deal_order_num: the dealed order num dict with the format of {"buy":{stock_id: int}, "sell":{stock_id: int}}
:param dealed_order_amount: the dealed order amount dict with the format of {stock_id: float}
:return: trade_val, trade_cost, trade_price
"""
# check order first.
@@ -391,7 +395,7 @@ class Exchange:
trade_price = self.get_deal_price(order.stock_id, order.start_time, order.end_time, order.direction)
# NOTE: order will be changed in this function
trade_val, trade_cost = self._calc_trade_info_by_order(
order, trade_account.current if trade_account else position, deal_order_num
order, trade_account.current if trade_account else position, dealed_order_amount
)
if order.deal_amount > 1e-5:
# If the order can only be deal 0 amount. Nothing to be updated
@@ -655,64 +659,59 @@ class Exchange:
return (deal_amount * factor + 0.1) // self.trade_unit * self.trade_unit / factor
return deal_amount
def _get_amount_by_volume(self, order: Order, deal_order_num: dict) -> int:
"""parse the capacity limit string and return the actual number of orders that can be executed.
def _get_amount_by_volume(self, order: Order, dealed_order_amount: dict) -> int:
"""parse the capacity limit string and return the actual amount of orders that can be executed.
Parameters
----------
order : Order
the order to be executed.
deal_order_num : dict
the dealed order num dict with the format of {"buy":{stock_id: int}, "sell":{stock_id: int}}
dealed_order_amount : dict
:param dealed_order_amount: the dealed order amount dict with the format of {stock_id: float}
Returns
-------
int
the actual number of orders that can be executed, due to the volume limit.
"""
the actual amount of orders that can be executed, due to the volume limit.
"""
if order.direction == Order.BUY:
vol_limit = self.buy_vol_limit
deal_order_num = deal_order_num["buy"]
elif order.direction == Order.SELL:
vol_limit = self.sell_vol_limit
deal_order_num = deal_order_num["sell"]
if vol_limit is None:
return order.deal_amount
vol_limit_num = []
for limit in vol_limit:
assert isinstance(limit, str)
if limit[0] == "$":
assert isinstance(limit, tuple)
if limit[0] == "current":
vol_limit_num.append(
str(
self.quote.get_data(
order.stock_id,
order.start_time,
order.end_time,
fields=limit,
method=ts_data_last,
)
)
)
else:
if "#market in limit":
market_limit = self.quote.get_data(
self.quote.get_data(
order.stock_id,
order.start_time,
order.end_time,
fields="DayCumsum($volume)",
fields=limit[1],
method=ts_data_last,
)
limit_tmp = limit.replace("#market", f"{market_limit}")
if "#dealed in limit":
limit_tmp = limit_tmp.replace("#dealed", f"{deal_order_num[order.stock_id]}")
vol_limit_num.append(limit_tmp)
vol_limit_num = min([eval(i) for i in vol_limit_num])
)
elif limit[0] == "cum":
vol_limit_num.append(
self.quote.get_data(
order.stock_id,
order.start_time,
order.end_time,
fields=limit[1],
method=ts_data_last,
)
- dealed_order_amount[order.stock_id]
)
else:
raise ValueError(f"{limit[0]} is not supported")
vol_limit_num = min(vol_limit_num)
return max(min(vol_limit_num, order.deal_amount), 0)
def _calc_trade_info_by_order(self, order, position: Position, deal_order_num):
def _calc_trade_info_by_order(self, order, position: Position, dealed_order_amount):
"""
Calculation of trade info
@@ -720,7 +719,7 @@ class Exchange:
:param order:
:param position: Position
:param deal_order_num: the dealed order num dict with the format of {"buy":{stock_id: int}, "sell":{stock_id: int}}
:param dealed_order_amount: the dealed order amount dict with the format of {stock_id: float}
:return: trade_val, trade_cost
"""
@@ -744,7 +743,7 @@ class Exchange:
# We choose to sell all
order.deal_amount = order.amount
order.deal_amount = self._get_amount_by_volume(order, deal_order_num)
order.deal_amount = self._get_amount_by_volume(order, dealed_order_amount)
trade_val = order.deal_amount * trade_price
trade_cost = max(trade_val * self.close_cost, self.min_cost)
elif order.direction == Order.BUY:
@@ -764,7 +763,7 @@ class Exchange:
# Unknown amount of money. Just round the amount
order.deal_amount = self.round_amount_by_trade_unit(order.amount, order.factor)
order.deal_amount = self._get_amount_by_volume(order, deal_order_num)
order.deal_amount = self._get_amount_by_volume(order, dealed_order_amount)
trade_val = order.deal_amount * trade_price
trade_cost = max(trade_val * self.open_cost, self.min_cost)
else:

View File

@@ -114,6 +114,10 @@ class BaseExecutor:
if common_infra is None:
get_module_logger("BaseExecutor").warning(f"`common_infra` is not set for {self}")
# record deal order amount in one day
self.dealed_order_amount = defaultdict(float)
self.deal_day = None
def reset_common_infra(self, common_infra):
"""
reset infrastructure for trading
@@ -467,10 +471,6 @@ class SimulatorExecutor(BaseExecutor):
self.trade_type = trade_type
# record deal order num in one day
self.deal_order_num = {"buy": defaultdict(int), "sell": defaultdict(int)}
self.deal_day = None
def _get_order_iterator(self, trade_decision: BaseTradeDecision) -> List[Order]:
"""
@@ -500,21 +500,14 @@ class SimulatorExecutor(BaseExecutor):
raise NotImplementedError(f"This type of input is not supported")
return order_it
def _update_order_num(self, order):
"""update date and dealed order num in the day."""
def _update_dealed_order_amount(self, order):
"""update date and dealed order amount in the day."""
now_deal_day = order.start_time.floor(freq="D")
if self.deal_day is None:
now_deal_day = self.trade_calendar.get_step_time()[0].floor(freq="D")
if self.deal_day is None or now_deal_day > self.deal_day:
self.dealed_order_amount = defaultdict(float)
self.deal_day = now_deal_day
if now_deal_day > self.deal_day:
self.deal_order_num = {"buy": defaultdict(int), "sell": defaultdict(int)}
self.deal_day = now_deal_day
if order.direction == Order.BUY:
self.deal_order_num["buy"][order.stock_id] += order.deal_amount
elif order.direction == Order.SELL:
self.deal_order_num["sell"][order.stock_id] += order.deal_amount
else:
raise NotImplementedError(f"order type {order.type} error")
self.dealed_order_amount[order.stock_id] += order.deal_amount
def _collect_data(self, trade_decision: BaseTradeDecision, level: int = 0):
@@ -527,10 +520,10 @@ class SimulatorExecutor(BaseExecutor):
trade_val, trade_cost, trade_price = self.trade_exchange.deal_order(
order,
trade_account=self.trade_account,
deal_order_num=self.deal_order_num,
dealed_order_amount=self.dealed_order_amount,
)
execute_result.append((order, trade_val, trade_cost, trade_price))
self._update_order_num(order)
self._update_dealed_order_amount(order)
if self.verbose:
print(
"[I {:%Y-%m-%d %H:%M:%S}]: {} {}, price {:.2f}, amount {}, deal_amount {}, factor {}, value {:.2f}, cash {:.2f}.".format(

View File

@@ -3,6 +3,7 @@
from pathlib import Path
import numpy as np
import pandas as pd
from datetime import datetime
import qlib
from qlib.data import D
@@ -12,7 +13,9 @@ from qlib.data.ops import ElemOperator
def get_calendar_day(freq="1min", future=False):
"""Load High-Freq Calendar Date Using Memcache.
"""
Load High-Freq Calendar Date Using Memcache.
!!!NOTE: Loading the calendar is quite slow. So loading calendar before start multiprocessing will make it faster.
Parameters
----------
@@ -36,20 +39,57 @@ def get_calendar_day(freq="1min", future=False):
class DayCumsum(ElemOperator):
"""DayLast Operator
"""DayCumsum Operator during start time and end time.
Parameters
----------
feature : Expression
feature instance
start : str
the start time of backtest in one day.
!!!NOTE: "9:30" means the time period of (9:30, 9:31) is in transaction.
end : str
the end time of backtest in one day.
!!!NOTE: "14:59" means the time period of (14:59, 15:00) is in transaction,
but (15:00, 15:01) is not.
So start="9:30" and end="14:59" means trading all day.
Returns
----------
feature:
a series of that each value equals the last value of its day
a series of that each value equals the cumsum value during start time and end time.
Otherwise, the value is zero.
"""
def __init__(self, feature, start: str = "9:30", end: str = "14:59"):
self.feature = feature
self.start = datetime.strptime(start, "%H:%M")
self.end = datetime.strptime(end, "%H:%M")
self.morning_open = datetime.strptime("9:30", "%H:%M")
self.morning_close = datetime.strptime("11:30", "%H:%M")
self.noon_open = datetime.strptime("13:00", "%H:%M")
self.noon_close = datetime.strptime("15:00", "%H:%M")
self.start_id = self.time_to_index(self.start)
self.end_id = self.time_to_index(self.end)
def time_to_index(self, t):
if t >= self.morning_open and t < self.morning_close:
return int((t - self.morning_open).total_seconds() / 60)
elif t >= self.noon_open and t < self.noon_close:
return int((t - self.noon_open).total_seconds() / 60) + 120
else:
raise ValueError(f"{t} is not the opening time of the stock market")
def period_cusum(self, df):
assert len(df) == 240
df.iloc[0 : self.start_id] = 0
df = df.cumsum()
df.iloc[self.end_id + 1 : 240] = 0
return df
def _load_internal(self, instrument, start_index, end_index, freq):
_calendar = get_calendar_day(freq=freq)
series = self.feature.load(instrument, start_index, end_index, freq)
return series.groupby(_calendar[series.index]).cumsum()
return series.groupby(_calendar[series.index]).transform(self.period_cusum)