# Copyright (c) Microsoft Corporation. # Licensed under the MIT License. """ NOTE: - This scripts is a demo to import example data import Qlib - !!!!!!!!!!!!!!!TODO!!!!!!!!!!!!!!!!!!!: - Its structure is not well designed and very ugly, your contribution is welcome to make importing dataset easier """ from datetime import date, datetime as dt import os from pathlib import Path import random import shutil import time import traceback from arctic import Arctic, chunkstore import arctic from arctic import Arctic, CHUNK_STORE from arctic.chunkstore.chunkstore import CHUNK_SIZE import fire from joblib import Parallel, delayed, parallel import numpy as np import pandas as pd from pandas import DataFrame from pandas.core.indexes.datetimes import date_range from pymongo.mongo_client import MongoClient DIRNAME = Path(__file__).absolute().resolve().parent # CONFIG N_JOBS = -1 # leaving one kernel free LOG_FILE_PATH = DIRNAME / "log_file" DATA_PATH = DIRNAME / "raw_data" DATABASE_PATH = DIRNAME / "orig_data" DATA_INFO_PATH = DIRNAME / "data_info" DATA_FINISH_INFO_PATH = DIRNAME / "./data_finish_info" DOC_TYPE = ["Tick", "Order", "OrderQueue", "Transaction", "Day", "Minute"] MAX_SIZE = 3000 * 1024 * 1024 * 1024 ALL_STOCK_PATH = DATABASE_PATH / "all.txt" ARCTIC_SRV = "127.0.0.1" def get_library_name(doc_type): if str.lower(doc_type) == str.lower("Tick"): return "ticks" else: return str.lower(doc_type) def is_stock(exchange_place, code): if exchange_place == "SH" and code[0] != "6": return False if exchange_place == "SZ" and code[0] != "0" and code[:2] != "30": return False return True def add_one_stock_daily_data(filepath, type, exchange_place, arc, date): """ exchange_place: "SZ" OR "SH" type: "tick", "orderbook", ... filepath: the path of csv arc: arclink created by a process """ code = os.path.split(filepath)[-1].split(".csv")[0] if exchange_place == "SH" and code[0] != "6": return if exchange_place == "SZ" and code[0] != "0" and code[:2] != "30": return df = pd.read_csv(filepath, encoding="gbk", dtype={"code": str}) code = os.path.split(filepath)[-1].split(".csv")[0] def format_time(day, hms): day = str(day) hms = str(hms) if hms[0] == "1": # >=10, return ( "-".join([day[0:4], day[4:6], day[6:8]]) + " " + ":".join([hms[:2], hms[2:4], hms[4:6] + "." + hms[6:]]) ) else: return ( "-".join([day[0:4], day[4:6], day[6:8]]) + " " + ":".join([hms[:1], hms[1:3], hms[3:5] + "." + hms[5:]]) ) ## Discard the entire row if wrong data timestamp encoutered. timestamp = list(zip(list(df["date"]), list(df["time"]))) error_index_list = [] for index, t in enumerate(timestamp): try: pd.Timestamp(format_time(t[0], t[1])) except Exception: error_index_list.append(index) ## The row number of the error line # to-do: writting to logs if len(error_index_list) > 0: print("error: {}, {}".format(filepath, len(error_index_list))) df = df.drop(error_index_list) timestamp = list(zip(list(df["date"]), list(df["time"]))) ## The cleaned timestamp # generate timestamp pd_timestamp = pd.DatetimeIndex( [pd.Timestamp(format_time(timestamp[i][0], timestamp[i][1])) for i in range(len(df["date"]))] ) df = df.drop(columns=["date", "time", "name", "code", "wind_code"]) # df = pd.DataFrame(data=df.to_dict("list"), index=pd_timestamp) df["date"] = pd.to_datetime(pd_timestamp) df.set_index("date", inplace=True) if str.lower(type) == "orderqueue": ## extract ab1~ab50 df["ab"] = [ ",".join([str(int(row["ab" + str(i + 1)])) for i in range(0, row["ab_items"])]) for timestamp, row in df.iterrows() ] df = df.drop(columns=["ab" + str(i) for i in range(1, 51)]) type = get_library_name(type) # arc.initialize_library(type, lib_type=CHUNK_STORE) lib = arc[type] symbol = "".join([exchange_place, code]) if symbol in lib.list_symbols(): print("update {0}, date={1}".format(symbol, date)) if df.empty == True: return error_index_list lib.update(symbol, df, chunk_size="D") else: print("write {0}, date={1}".format(symbol, date)) lib.write(symbol, df, chunk_size="D") return error_index_list def add_one_stock_daily_data_wrapper(filepath, type, exchange_place, index, date): pid = os.getpid() code = os.path.split(filepath)[-1].split(".csv")[0] arc = Arctic(ARCTIC_SRV) try: if index % 100 == 0: print("index = {}, filepath = {}".format(index, filepath)) error_index_list = add_one_stock_daily_data(filepath, type, exchange_place, arc, date) if error_index_list is not None and len(error_index_list) > 0: f = open(os.path.join(LOG_FILE_PATH, "temp_timestamp_error_{0}_{1}_{2}.txt".format(pid, date, type)), "a+") f.write("{}, {}, {}\n".format(filepath, error_index_list, exchange_place + "_" + code)) f.close() except Exception as e: info = traceback.format_exc() print("error:" + str(e)) f = open(os.path.join(LOG_FILE_PATH, "temp_fail_{0}_{1}_{2}.txt".format(pid, date, type)), "a+") f.write("fail:" + str(filepath) + "\n" + str(e) + "\n" + str(info) + "\n") f.close() finally: arc.reset() def add_data(tick_date, doc_type, stock_name_dict): pid = os.getpid() if doc_type not in DOC_TYPE: print("doc_type not in {}".format(DOC_TYPE)) return try: begin_time = time.time() os.system(f"cp {DATABASE_PATH}/{tick_date + '_{}.tar.gz'.format(doc_type)} {DATA_PATH}/") os.system( f"tar -xvzf {DATA_PATH}/{tick_date + '_{}.tar.gz'.format(doc_type)} -C {DATA_PATH}/ {tick_date + '_' + doc_type}/SH" ) os.system( f"tar -xvzf {DATA_PATH}/{tick_date + '_{}.tar.gz'.format(doc_type)} -C {DATA_PATH}/ {tick_date + '_' + doc_type}/SZ" ) os.system(f"chmod 777 {DATA_PATH}") os.system(f"chmod 777 {DATA_PATH}/{tick_date + '_' + doc_type}") os.system(f"chmod 777 {DATA_PATH}/{tick_date + '_' + doc_type}/SH") os.system(f"chmod 777 {DATA_PATH}/{tick_date + '_' + doc_type}/SZ") os.system(f"chmod 777 {DATA_PATH}/{tick_date + '_' + doc_type}/SH/{tick_date}") os.system(f"chmod 777 {DATA_PATH}/{tick_date + '_' + doc_type}/SZ/{tick_date}") print("tick_date={}".format(tick_date)) temp_data_path_sh = os.path.join(DATA_PATH, tick_date + "_" + doc_type, "SH", tick_date) temp_data_path_sz = os.path.join(DATA_PATH, tick_date + "_" + doc_type, "SZ", tick_date) is_files_exist = {"sh": os.path.exists(temp_data_path_sh), "sz": os.path.exists(temp_data_path_sz)} sz_files = ( ( set([i.split(".csv")[0] for i in os.listdir(temp_data_path_sz) if i[:2] == "30" or i[0] == "0"]) & set(stock_name_dict["SZ"]) ) if is_files_exist["sz"] else set() ) sz_file_nums = len(sz_files) if is_files_exist["sz"] else 0 sh_files = ( ( set([i.split(".csv")[0] for i in os.listdir(temp_data_path_sh) if i[0] == "6"]) & set(stock_name_dict["SH"]) ) if is_files_exist["sh"] else set() ) sh_file_nums = len(sh_files) if is_files_exist["sh"] else 0 print("sz_file_nums:{}, sh_file_nums:{}".format(sz_file_nums, sh_file_nums)) f = (DATA_INFO_PATH / "data_info_log_{}_{}".format(doc_type, tick_date)).open("w+") f.write("sz:{}, sh:{}, date:{}:".format(sz_file_nums, sh_file_nums, tick_date) + "\n") f.close() if sh_file_nums > 0: # write is not thread-safe, update may be thread-safe Parallel(n_jobs=N_JOBS)( delayed(add_one_stock_daily_data_wrapper)( os.path.join(temp_data_path_sh, name + ".csv"), doc_type, "SH", index, tick_date ) for index, name in enumerate(list(sh_files)) ) if sz_file_nums > 0: # write is not thread-safe, update may be thread-safe Parallel(n_jobs=N_JOBS)( delayed(add_one_stock_daily_data_wrapper)( os.path.join(temp_data_path_sz, name + ".csv"), doc_type, "SZ", index, tick_date ) for index, name in enumerate(list(sz_files)) ) os.system(f"rm -f {DATA_PATH}/{tick_date + '_{}.tar.gz'.format(doc_type)}") os.system(f"rm -rf {DATA_PATH}/{tick_date + '_' + doc_type}") total_time = time.time() - begin_time f = (DATA_FINISH_INFO_PATH / "data_info_finish_log_{}_{}".format(doc_type, tick_date)).open("w+") f.write("finish: date:{}, consume_time:{}, end_time: {}".format(tick_date, total_time, time.time()) + "\n") f.close() except Exception as e: info = traceback.format_exc() print("date error:" + str(e)) f = open(os.path.join(LOG_FILE_PATH, "temp_fail_{0}_{1}_{2}.txt".format(pid, tick_date, doc_type)), "a+") f.write("fail:" + str(tick_date) + "\n" + str(e) + "\n" + str(info) + "\n") f.close() class DSCreator: """Dataset creator""" def clear(self): client = MongoClient(ARCTIC_SRV) client.drop_database("arctic") def initialize_library(self): arc = Arctic(ARCTIC_SRV) for doc_type in DOC_TYPE: arc.initialize_library(get_library_name(doc_type), lib_type=CHUNK_STORE) def _get_empty_folder(self, fp: Path): fp = Path(fp) if fp.exists(): shutil.rmtree(fp) fp.mkdir(parents=True, exist_ok=True) def import_data(self, doc_type_l=["Tick", "Transaction", "Order"]): # clear all the old files for fp in LOG_FILE_PATH, DATA_INFO_PATH, DATA_FINISH_INFO_PATH, DATA_PATH: self._get_empty_folder(fp) arc = Arctic(ARCTIC_SRV) for doc_type in DOC_TYPE: # arc.initialize_library(get_library_name(doc_type), lib_type=CHUNK_STORE) arc.set_quota(get_library_name(doc_type), MAX_SIZE) arc.reset() # doc_type = 'Day' for doc_type in doc_type_l: date_list = list(set([int(path.split("_")[0]) for path in os.listdir(DATABASE_PATH) if doc_type in path])) date_list.sort() date_list = [str(date) for date in date_list] f = open(ALL_STOCK_PATH, "r") stock_name_list = [lines.split("\t")[0] for lines in f.readlines()] f.close() stock_name_dict = { "SH": [stock_name[2:] for stock_name in stock_name_list if "SH" in stock_name], "SZ": [stock_name[2:] for stock_name in stock_name_list if "SZ" in stock_name], } lib_name = get_library_name(doc_type) a = Arctic(ARCTIC_SRV) # a.initialize_library(lib_name, lib_type=CHUNK_STORE) stock_name_exist = a[lib_name].list_symbols() lib = a[lib_name] initialize_count = 0 for stock_name in stock_name_list: if stock_name not in stock_name_exist: initialize_count += 1 # A placeholder for stocks pdf = pd.DataFrame(index=[pd.Timestamp("1900-01-01")]) pdf.index.name = "date" # an col named date is necessary lib.write(stock_name, pdf) print("initialize count: {}".format(initialize_count)) print("tasks: {}".format(date_list)) a.reset() # date_list = [files.split("_")[0] for files in os.listdir("./raw_data_price") if "tar" in files] # print(len(date_list)) date_list = ["20201231"] # for test Parallel(n_jobs=min(2, len(date_list)))( delayed(add_data)(date, doc_type, stock_name_dict) for date in date_list ) if __name__ == "__main__": fire.Fire(DSCreator)