From a0f49fe2e7c8792afc8fb0272919973dac685cc0 Mon Sep 17 00:00:00 2001 From: Pengrong Zhu Date: Sun, 26 Dec 2021 14:12:48 +0800 Subject: [PATCH] fix cn_index collector (#780) --- scripts/data_collector/cn_index/collector.py | 167 +++++++++++------- .../data_collector/cn_index/requirements.txt | 1 - 2 files changed, 102 insertions(+), 66 deletions(-) diff --git a/scripts/data_collector/cn_index/collector.py b/scripts/data_collector/cn_index/collector.py index 39104ed68..6f3eee255 100644 --- a/scripts/data_collector/cn_index/collector.py +++ b/scripts/data_collector/cn_index/collector.py @@ -6,13 +6,12 @@ import abc import sys import importlib from io import BytesIO -from typing import List +from typing import List, Iterable from pathlib import Path import fire import requests import pandas as pd -from lxml import etree from loguru import logger CUR_DIR = Path(__file__).resolve().parent @@ -22,12 +21,10 @@ from data_collector.index import IndexBase from data_collector.utils import get_calendar_list, get_trading_date_by_shift, deco_retry -NEW_COMPANIES_URL = "http://www.csindex.com.cn/uploads/file/autofile/cons/{index_code}cons.xls" +NEW_COMPANIES_URL = "https://csi-web-dev.oss-cn-shanghai-finance-1-pub.aliyuncs.com/static/html/csindex/public/uploads/file/autofile/cons/{index_code}cons.xls" -# INDEX_CHANGES_URL = "http://www.csindex.com.cn/zh-CN/search/total?key=%E5%85%B3%E4%BA%8E%E8%B0%83%E6%95%B4%E6%B2%AA%E6%B7%B1300%E5%92%8C%E4%B8%AD%E8%AF%81%E9%A6%99%E6%B8%AF100%E7%AD%89%E6%8C%87%E6%95%B0%E6%A0%B7%E6%9C%AC%E8%82%A1%E7%9A%84%E5%85%AC%E5%91%8A" -# 2020-11-27 Announcement title change -INDEX_CHANGES_URL = "http://www.csindex.com.cn/zh-CN/search/total?key=%E5%85%B3%E4%BA%8E%E8%B0%83%E6%95%B4%E6%B2%AA%E6%B7%B1300%E5%92%8C%E4%B8%AD%E8%AF%81%E9%A6%99%E6%B8%AF100%E7%AD%89" +INDEX_CHANGES_URL = "https://www.csindex.com.cn/csindex-home/search/search-content?lang=cn&searchInput=%E5%85%B3%E4%BA%8E%E8%B0%83%E6%95%B4%E6%B2%AA%E6%B7%B1300%E5%92%8C%E4%B8%AD%E8%AF%81%E9%A6%99%E6%B8%AF100%E7%AD%89%E6%8C%87%E6%95%B0%E6%A0%B7%E6%9C%AC&pageNum={page_num}&pageSize={page_size}&sortField=date&dateRange=all&contentType=announcement" REQ_HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.101 Safari/537.36 Edg/91.0.864.48" @@ -55,7 +52,11 @@ class CSIIndex(IndexBase): ------- calendar list """ - return get_calendar_list(bench_code=self.index_name.upper()) + _calendar = getattr(self, "_calendar_list", None) + if not _calendar: + _calendar = get_calendar_list(bench_code=self.index_name.upper()) + setattr(self, "_calendar_list", _calendar) + return _calendar @property def new_companies_url(self) -> str: @@ -135,7 +136,8 @@ class CSIIndex(IndexBase): res = [] for _url in self._get_change_notices_url(): _df = self._read_change_from_url(_url) - res.append(_df) + if not _df.empty: + res.append(_df) logger.info("get companies changes finish") return pd.concat(res, sort=False) @@ -155,6 +157,56 @@ class CSIIndex(IndexBase): symbol = f"{int(symbol):06}" return f"SH{symbol}" if symbol.startswith("60") else f"SZ{symbol}" + def _parse_excel(self, excel_url: str, add_date: pd.Timestamp, remove_date: pd.Timestamp) -> pd.DataFrame: + content = retry_request(excel_url, exclude_status=[404]).content + _io = BytesIO(content) + df_map = pd.read_excel(_io, sheet_name=None) + with self.cache_dir.joinpath( + f"{self.index_name.lower()}_changes_{add_date.strftime('%Y%m%d')}.{excel_url.split('.')[-1]}" + ).open("wb") as fp: + fp.write(content) + tmp = [] + for _s_name, _type, _date in [("调入", self.ADD, add_date), ("调出", self.REMOVE, remove_date)]: + _df = df_map[_s_name] + _df = _df.loc[_df["指数代码"] == self.index_code, ["证券代码"]] + _df = _df.applymap(self.normalize_symbol) + _df.columns = [self.SYMBOL_FIELD_NAME] + _df["type"] = _type + _df[self.DATE_FIELD_NAME] = _date + tmp.append(_df) + df = pd.concat(tmp) + return df + + def _parse_table(self, content: str, add_date: pd.DataFrame, remove_date: pd.DataFrame) -> pd.DataFrame: + df = pd.DataFrame() + _tmp_count = 0 + for _df in pd.read_html(content): + if _df.shape[-1] != 4: + continue + _tmp_count += 1 + if self.html_table_index + 1 > _tmp_count: + continue + tmp = [] + for _s, _type, _date in [ + (_df.iloc[2:, 0], self.REMOVE, remove_date), + (_df.iloc[2:, 2], self.ADD, add_date), + ]: + _tmp_df = pd.DataFrame() + _tmp_df[self.SYMBOL_FIELD_NAME] = _s.map(self.normalize_symbol) + _tmp_df["type"] = _type + _tmp_df[self.DATE_FIELD_NAME] = _date + tmp.append(_tmp_df) + df = pd.concat(tmp) + df.to_csv( + str( + self.cache_dir.joinpath( + f"{self.index_name.lower()}_changes_{add_date.strftime('%Y%m%d')}.csv" + ).resolve() + ) + ) + break + return df + def _read_change_from_url(self, url: str) -> pd.DataFrame: """read change from url @@ -174,75 +226,60 @@ class CSIIndex(IndexBase): date: pd.Timestamp type: str, value from ["add", "remove"] """ - resp = retry_request(url) - _text = resp.text + resp = retry_request(url).json()["data"] + title = resp["title"] + if not title.startswith("关于"): + return pd.DataFrame() + if "沪深300" not in title: + return pd.DataFrame() + + logger.info(f"load index data from https://www.csindex.com.cn/#/about/newsDetail?id={url.split('id=')[-1]}") + _text = resp["content"] date_list = re.findall(r"(\d{4}).*?年.*?(\d+).*?月.*?(\d+).*?日", _text) if len(date_list) >= 2: add_date = pd.Timestamp("-".join(date_list[0])) else: _date = pd.Timestamp("-".join(re.findall(r"(\d{4}).*?年.*?(\d+).*?月", _text)[0])) add_date = get_trading_date_by_shift(self.calendar_list, _date, shift=0) + if "盘后" in _text or "市后" in _text: + add_date = get_trading_date_by_shift(self.calendar_list, add_date, shift=1) remove_date = get_trading_date_by_shift(self.calendar_list, add_date, shift=-1) - logger.info(f"get {add_date} changes") - try: - excel_url = re.findall('.*href="(.*?xls.*?)".*', _text)[0] - content = retry_request(f"http://www.csindex.com.cn{excel_url}", exclude_status=[404]).content - _io = BytesIO(content) - df_map = pd.read_excel(_io, sheet_name=None) - with self.cache_dir.joinpath( - f"{self.index_name.lower()}_changes_{add_date.strftime('%Y%m%d')}.{excel_url.split('.')[-1]}" - ).open("wb") as fp: - fp.write(content) - tmp = [] - for _s_name, _type, _date in [("调入", self.ADD, add_date), ("调出", self.REMOVE, remove_date)]: - _df = df_map[_s_name] - _df = _df.loc[_df["指数代码"] == self.index_code, ["证券代码"]] - _df = _df.applymap(self.normalize_symbol) - _df.columns = [self.SYMBOL_FIELD_NAME] - _df["type"] = _type - _df[self.DATE_FIELD_NAME] = _date - tmp.append(_df) - df = pd.concat(tmp) - except Exception as e: - df = None - _tmp_count = 0 - for _df in pd.read_html(resp.content): - if _df.shape[-1] != 4: - continue - _tmp_count += 1 - if self.html_table_index + 1 > _tmp_count: - continue - tmp = [] - for _s, _type, _date in [ - (_df.iloc[2:, 0], self.REMOVE, remove_date), - (_df.iloc[2:, 2], self.ADD, add_date), - ]: - _tmp_df = pd.DataFrame() - _tmp_df[self.SYMBOL_FIELD_NAME] = _s.map(self.normalize_symbol) - _tmp_df["type"] = _type - _tmp_df[self.DATE_FIELD_NAME] = _date - tmp.append(_tmp_df) - df = pd.concat(tmp) - df.to_csv( - str( - self.cache_dir.joinpath( - f"{self.index_name.lower()}_changes_{add_date.strftime('%Y%m%d')}.csv" - ).resolve() - ) - ) - break + + excel_url = None + if resp.get("enclosureList", []): + excel_url = resp["enclosureList"][0]["fileUrl"] + else: + excel_url_list = re.findall('.*href="(.*?xls.*?)".*', _text) + if excel_url_list: + excel_url = excel_url_list[0] + if not excel_url.startswith("http"): + excel_url = excel_url if excel_url.startswith("/") else "/" + excel_url + excel_url = f"http://www.csindex.com.cn{excel_url}" + if excel_url: + logger.info(f"get {add_date} changes from excel, title={title}, excel_url={excel_url}") + try: + df = self._parse_excel(excel_url, add_date, remove_date) + except ValueError: + logger.warning(f"error downloading file: {excel_url}, will parse the table from the content") + df = self._parse_table(_text, add_date, remove_date) + else: + logger.info(f"get {add_date} changes from url content, title={title}") + df = self._parse_table(_text, add_date, remove_date) return df - def _get_change_notices_url(self) -> List[str]: + def _get_change_notices_url(self) -> Iterable[str]: """get change notices url Returns ------- [url1, url2] """ - resp = retry_request(self.changes_url) - html = etree.HTML(resp.text) - return html.xpath("//*[@id='itemContainer']//li/a/@href") + page_num = 1 + page_size = 5 + data = retry_request(self.changes_url.format(page_size=page_size, page_num=page_num)).json() + data = retry_request(self.changes_url.format(page_size=data["total"], page_num=page_num)).json() + for item in data["data"]: + yield f"https://www.csindex.com.cn/csindex-home/announcement/queryAnnouncementById?id={item['id']}" def get_new_companies(self) -> pd.DataFrame: """ @@ -270,7 +307,7 @@ class CSIIndex(IndexBase): df = df.iloc[:, [0, 4]] df.columns = [self.END_DATE_FIELD, self.SYMBOL_FIELD_NAME] df[self.SYMBOL_FIELD_NAME] = df[self.SYMBOL_FIELD_NAME].map(self.normalize_symbol) - df[self.END_DATE_FIELD] = pd.to_datetime(df[self.END_DATE_FIELD]) + df[self.END_DATE_FIELD] = pd.to_datetime(df[self.END_DATE_FIELD].astype(str)) df[self.START_DATE_FIELD] = self.bench_start_date logger.info("end of get new companies.") return df @@ -287,7 +324,7 @@ class CSI300(CSIIndex): @property def html_table_index(self): - return 0 + return 1 class CSI100(CSIIndex): @@ -301,7 +338,7 @@ class CSI100(CSIIndex): @property def html_table_index(self): - return 1 + return 2 def get_instruments( diff --git a/scripts/data_collector/cn_index/requirements.txt b/scripts/data_collector/cn_index/requirements.txt index 729271038..1d846b504 100644 --- a/scripts/data_collector/cn_index/requirements.txt +++ b/scripts/data_collector/cn_index/requirements.txt @@ -1,4 +1,3 @@ -logure fire requests pandas