1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-06-06 05:51:17 +08:00

fix cn_index collector (#780)

This commit is contained in:
Pengrong Zhu
2021-12-26 14:12:48 +08:00
committed by GitHub
parent 2840570dd3
commit a0f49fe2e7
2 changed files with 102 additions and 66 deletions

View File

@@ -6,13 +6,12 @@ import abc
import sys
import importlib
from io import BytesIO
from typing import List
from typing import List, Iterable
from pathlib import Path
import fire
import requests
import pandas as pd
from lxml import etree
from loguru import logger
CUR_DIR = Path(__file__).resolve().parent
@@ -22,12 +21,10 @@ from data_collector.index import IndexBase
from data_collector.utils import get_calendar_list, get_trading_date_by_shift, deco_retry
NEW_COMPANIES_URL = "http://www.csindex.com.cn/uploads/file/autofile/cons/{index_code}cons.xls"
NEW_COMPANIES_URL = "https://csi-web-dev.oss-cn-shanghai-finance-1-pub.aliyuncs.com/static/html/csindex/public/uploads/file/autofile/cons/{index_code}cons.xls"
# INDEX_CHANGES_URL = "http://www.csindex.com.cn/zh-CN/search/total?key=%E5%85%B3%E4%BA%8E%E8%B0%83%E6%95%B4%E6%B2%AA%E6%B7%B1300%E5%92%8C%E4%B8%AD%E8%AF%81%E9%A6%99%E6%B8%AF100%E7%AD%89%E6%8C%87%E6%95%B0%E6%A0%B7%E6%9C%AC%E8%82%A1%E7%9A%84%E5%85%AC%E5%91%8A"
# 2020-11-27 Announcement title change
INDEX_CHANGES_URL = "http://www.csindex.com.cn/zh-CN/search/total?key=%E5%85%B3%E4%BA%8E%E8%B0%83%E6%95%B4%E6%B2%AA%E6%B7%B1300%E5%92%8C%E4%B8%AD%E8%AF%81%E9%A6%99%E6%B8%AF100%E7%AD%89"
INDEX_CHANGES_URL = "https://www.csindex.com.cn/csindex-home/search/search-content?lang=cn&searchInput=%E5%85%B3%E4%BA%8E%E8%B0%83%E6%95%B4%E6%B2%AA%E6%B7%B1300%E5%92%8C%E4%B8%AD%E8%AF%81%E9%A6%99%E6%B8%AF100%E7%AD%89%E6%8C%87%E6%95%B0%E6%A0%B7%E6%9C%AC&pageNum={page_num}&pageSize={page_size}&sortField=date&dateRange=all&contentType=announcement"
REQ_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.101 Safari/537.36 Edg/91.0.864.48"
@@ -55,7 +52,11 @@ class CSIIndex(IndexBase):
-------
calendar list
"""
return get_calendar_list(bench_code=self.index_name.upper())
_calendar = getattr(self, "_calendar_list", None)
if not _calendar:
_calendar = get_calendar_list(bench_code=self.index_name.upper())
setattr(self, "_calendar_list", _calendar)
return _calendar
@property
def new_companies_url(self) -> str:
@@ -135,7 +136,8 @@ class CSIIndex(IndexBase):
res = []
for _url in self._get_change_notices_url():
_df = self._read_change_from_url(_url)
res.append(_df)
if not _df.empty:
res.append(_df)
logger.info("get companies changes finish")
return pd.concat(res, sort=False)
@@ -155,6 +157,56 @@ class CSIIndex(IndexBase):
symbol = f"{int(symbol):06}"
return f"SH{symbol}" if symbol.startswith("60") else f"SZ{symbol}"
def _parse_excel(self, excel_url: str, add_date: pd.Timestamp, remove_date: pd.Timestamp) -> pd.DataFrame:
content = retry_request(excel_url, exclude_status=[404]).content
_io = BytesIO(content)
df_map = pd.read_excel(_io, sheet_name=None)
with self.cache_dir.joinpath(
f"{self.index_name.lower()}_changes_{add_date.strftime('%Y%m%d')}.{excel_url.split('.')[-1]}"
).open("wb") as fp:
fp.write(content)
tmp = []
for _s_name, _type, _date in [("调入", self.ADD, add_date), ("调出", self.REMOVE, remove_date)]:
_df = df_map[_s_name]
_df = _df.loc[_df["指数代码"] == self.index_code, ["证券代码"]]
_df = _df.applymap(self.normalize_symbol)
_df.columns = [self.SYMBOL_FIELD_NAME]
_df["type"] = _type
_df[self.DATE_FIELD_NAME] = _date
tmp.append(_df)
df = pd.concat(tmp)
return df
def _parse_table(self, content: str, add_date: pd.DataFrame, remove_date: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
_tmp_count = 0
for _df in pd.read_html(content):
if _df.shape[-1] != 4:
continue
_tmp_count += 1
if self.html_table_index + 1 > _tmp_count:
continue
tmp = []
for _s, _type, _date in [
(_df.iloc[2:, 0], self.REMOVE, remove_date),
(_df.iloc[2:, 2], self.ADD, add_date),
]:
_tmp_df = pd.DataFrame()
_tmp_df[self.SYMBOL_FIELD_NAME] = _s.map(self.normalize_symbol)
_tmp_df["type"] = _type
_tmp_df[self.DATE_FIELD_NAME] = _date
tmp.append(_tmp_df)
df = pd.concat(tmp)
df.to_csv(
str(
self.cache_dir.joinpath(
f"{self.index_name.lower()}_changes_{add_date.strftime('%Y%m%d')}.csv"
).resolve()
)
)
break
return df
def _read_change_from_url(self, url: str) -> pd.DataFrame:
"""read change from url
@@ -174,75 +226,60 @@ class CSIIndex(IndexBase):
date: pd.Timestamp
type: str, value from ["add", "remove"]
"""
resp = retry_request(url)
_text = resp.text
resp = retry_request(url).json()["data"]
title = resp["title"]
if not title.startswith("关于"):
return pd.DataFrame()
if "沪深300" not in title:
return pd.DataFrame()
logger.info(f"load index data from https://www.csindex.com.cn/#/about/newsDetail?id={url.split('id=')[-1]}")
_text = resp["content"]
date_list = re.findall(r"(\d{4}).*?年.*?(\d+).*?月.*?(\d+).*?日", _text)
if len(date_list) >= 2:
add_date = pd.Timestamp("-".join(date_list[0]))
else:
_date = pd.Timestamp("-".join(re.findall(r"(\d{4}).*?年.*?(\d+).*?月", _text)[0]))
add_date = get_trading_date_by_shift(self.calendar_list, _date, shift=0)
if "盘后" in _text or "市后" in _text:
add_date = get_trading_date_by_shift(self.calendar_list, add_date, shift=1)
remove_date = get_trading_date_by_shift(self.calendar_list, add_date, shift=-1)
logger.info(f"get {add_date} changes")
try:
excel_url = re.findall('.*href="(.*?xls.*?)".*', _text)[0]
content = retry_request(f"http://www.csindex.com.cn{excel_url}", exclude_status=[404]).content
_io = BytesIO(content)
df_map = pd.read_excel(_io, sheet_name=None)
with self.cache_dir.joinpath(
f"{self.index_name.lower()}_changes_{add_date.strftime('%Y%m%d')}.{excel_url.split('.')[-1]}"
).open("wb") as fp:
fp.write(content)
tmp = []
for _s_name, _type, _date in [("调入", self.ADD, add_date), ("调出", self.REMOVE, remove_date)]:
_df = df_map[_s_name]
_df = _df.loc[_df["指数代码"] == self.index_code, ["证券代码"]]
_df = _df.applymap(self.normalize_symbol)
_df.columns = [self.SYMBOL_FIELD_NAME]
_df["type"] = _type
_df[self.DATE_FIELD_NAME] = _date
tmp.append(_df)
df = pd.concat(tmp)
except Exception as e:
df = None
_tmp_count = 0
for _df in pd.read_html(resp.content):
if _df.shape[-1] != 4:
continue
_tmp_count += 1
if self.html_table_index + 1 > _tmp_count:
continue
tmp = []
for _s, _type, _date in [
(_df.iloc[2:, 0], self.REMOVE, remove_date),
(_df.iloc[2:, 2], self.ADD, add_date),
]:
_tmp_df = pd.DataFrame()
_tmp_df[self.SYMBOL_FIELD_NAME] = _s.map(self.normalize_symbol)
_tmp_df["type"] = _type
_tmp_df[self.DATE_FIELD_NAME] = _date
tmp.append(_tmp_df)
df = pd.concat(tmp)
df.to_csv(
str(
self.cache_dir.joinpath(
f"{self.index_name.lower()}_changes_{add_date.strftime('%Y%m%d')}.csv"
).resolve()
)
)
break
excel_url = None
if resp.get("enclosureList", []):
excel_url = resp["enclosureList"][0]["fileUrl"]
else:
excel_url_list = re.findall('.*href="(.*?xls.*?)".*', _text)
if excel_url_list:
excel_url = excel_url_list[0]
if not excel_url.startswith("http"):
excel_url = excel_url if excel_url.startswith("/") else "/" + excel_url
excel_url = f"http://www.csindex.com.cn{excel_url}"
if excel_url:
logger.info(f"get {add_date} changes from excel, title={title}, excel_url={excel_url}")
try:
df = self._parse_excel(excel_url, add_date, remove_date)
except ValueError:
logger.warning(f"error downloading file: {excel_url}, will parse the table from the content")
df = self._parse_table(_text, add_date, remove_date)
else:
logger.info(f"get {add_date} changes from url content, title={title}")
df = self._parse_table(_text, add_date, remove_date)
return df
def _get_change_notices_url(self) -> List[str]:
def _get_change_notices_url(self) -> Iterable[str]:
"""get change notices url
Returns
-------
[url1, url2]
"""
resp = retry_request(self.changes_url)
html = etree.HTML(resp.text)
return html.xpath("//*[@id='itemContainer']//li/a/@href")
page_num = 1
page_size = 5
data = retry_request(self.changes_url.format(page_size=page_size, page_num=page_num)).json()
data = retry_request(self.changes_url.format(page_size=data["total"], page_num=page_num)).json()
for item in data["data"]:
yield f"https://www.csindex.com.cn/csindex-home/announcement/queryAnnouncementById?id={item['id']}"
def get_new_companies(self) -> pd.DataFrame:
"""
@@ -270,7 +307,7 @@ class CSIIndex(IndexBase):
df = df.iloc[:, [0, 4]]
df.columns = [self.END_DATE_FIELD, self.SYMBOL_FIELD_NAME]
df[self.SYMBOL_FIELD_NAME] = df[self.SYMBOL_FIELD_NAME].map(self.normalize_symbol)
df[self.END_DATE_FIELD] = pd.to_datetime(df[self.END_DATE_FIELD])
df[self.END_DATE_FIELD] = pd.to_datetime(df[self.END_DATE_FIELD].astype(str))
df[self.START_DATE_FIELD] = self.bench_start_date
logger.info("end of get new companies.")
return df
@@ -287,7 +324,7 @@ class CSI300(CSIIndex):
@property
def html_table_index(self):
return 0
return 1
class CSI100(CSIIndex):
@@ -301,7 +338,7 @@ class CSI100(CSIIndex):
@property
def html_table_index(self):
return 1
return 2
def get_instruments(

View File

@@ -1,4 +1,3 @@
logure
fire
requests
pandas