1
0
mirror of https://github.com/microsoft/qlib.git synced 2026-06-06 05:51:17 +08:00
Files
qlib/qlib/data/client.py
2020-11-26 00:55:26 +08:00

103 lines
3.7 KiB
Python

# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
from __future__ import division
from __future__ import print_function
import socketio
import qlib
from ..log import get_module_logger
import pickle
class Client(object):
"""A client class
Provide the connection tool functions for ClientProvider.
"""
def __init__(self, host, port):
super(Client, self).__init__()
self.sio = socketio.Client()
self.server_host = host
self.server_port = port
self.logger = get_module_logger(self.__class__.__name__)
# bind connect/disconnect callbacks
self.sio.on(
"connect",
lambda: self.logger.debug("Connect to server {}".format(self.sio.connection_url)),
)
self.sio.on("disconnect", lambda: self.logger.debug("Disconnect from server!"))
def connect_server(self):
"""Connect to server."""
try:
self.sio.connect("ws://" + self.server_host + ":" + str(self.server_port))
except socketio.exceptions.ConnectionError:
self.logger.error("Cannot connect to server - check your network or server status")
def disconnect(self):
"""Disconnect from server."""
try:
self.sio.eio.disconnect(True)
except Exception as e:
self.logger.error("Cannot disconnect from server : %s" % e)
def send_request(self, request_type, request_content, msg_queue, msg_proc_func=None):
"""Send a certain request to server.
Parameters
----------
request_type : str
type of proposed request, 'calendar'/'instrument'/'feature'.
request_content : dict
records the information of the request.
msg_proc_func : func
the function to process the message when receiving response, should have arg `*args`.
msg_queue: Queue
The queue to pass the messsage after callback.
"""
head_info = {"version": qlib.__version__}
def request_callback(*args):
"""callback_wrapper
:param *args: args[0] is the response content
"""
# args[0] is the response content
self.logger.debug("receive data and enter queue")
msg = dict(args[0])
if msg["detailed_info"] is not None:
if msg["status"] != 0:
self.logger.error(msg["detailed_info"])
else:
self.logger.info(msg["detailed_info"])
if msg["status"] != 0:
ex = ValueError(f"Bad response(status=={msg['status']}), detailed info: {msg['detailed_info']}")
msg_queue.put(ex)
else:
if msg_proc_func is not None:
try:
ret = msg_proc_func(msg["result"])
except Exception as e:
self.logger.exception("Error when processing message.")
ret = e
else:
ret = msg["result"]
msg_queue.put(ret)
self.disconnect()
self.logger.debug("disconnected")
self.logger.debug("try connecting")
self.connect_server()
self.logger.debug("connected")
# The pickle is for passing some parameters with special type(such as
# pd.Timestamp)
request_content = {"head": head_info, "body": pickle.dumps(request_content)}
self.sio.on(request_type + "_response", request_callback)
self.logger.debug("try sending")
self.sio.emit(request_type + "_request", request_content)
self.sio.wait()