Ошибка исключения в потоке EReader под управлением ibapi
Я использую Python ib-api для получения рыночных данных в реальном времени от Interactive Brokers. Он может предоставить данные, которые я ожидал, но заканчивается "необработанным исключением в потоке EReader".
from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract as IBcontract
from threading import Thread
import queue
import pandas as pd
from ibapi.ticktype import TickTypeEnum`
`DEFAULT_PRICE_DATA_ID = 1001`
`FINISHED = object()
STARTED = object()
TIME_OUT = object()`
class finishableQueue(object):
def __init__(self, queue_to_finish):
self._queue = queue_to_finish
self.status = STARTED
def get(self, timeout):
contents_of_queue=[]
finished=False
while not finished:
try:
current_element = self._queue.get(timeout=timeout)
if current_element is FINISHED:
finished = True
self.status = FINISHED
else:
contents_of_queue.append(current_element)
except queue.Empty:
finished = True
self.status = TIME_OUT
return contents_of_queue
def timed_out(self):
return self.status is TIME_OUT
class TestWrapper(EWrapper):
def __init__(self):
self._my_price_data_dict = {}
def get_error(self, timeout=5):
if self.is_error():
try:
return self._my_errors.get(timeout=timeout)
except queue.Empty:
return None
return None
def is_error(self):
an_error_if=not self._my_errors.empty()
return an_error_if
def init_error(self):
error_queue=queue.Queue()
self._my_errors = error_queue
def error(self, id, errorCode, errorString):
## Overriden method
errormsg = "IB error id %d errorcode %d string %s" % (id, errorCode, errorString)
self._my_errors.put(errormsg)
def init_ibprices(self, tickerid):
ibprice_data_queue = self._my_price_data_dict[tickerid] = queue.Queue()
return ibprice_data_queue
def tickPrice(self, reqId, tickType, price, attrib):
tickdata = (TickTypeEnum.to_str(tickType), price)
price_data_dict = self._my_price_data_dict
if reqId not in price_data_dict.keys():
self.init_ibprices(reqId)
price_data_dict[reqId].put(tickdata)
class TestClient(EClient):
def __init__(self, wrapper):
EClient.__init__(self, wrapper)
def error(self, reqId, errorCode, errorString):
print("Error: ", reqId, " ", errorCode, " ", errorString)
def getIBrealtimedata(self, ibcontract, tickerid=DEFAULT_PRICE_DATA_ID):
ib_data_queue = finishableQueue(self.init_ibprices(tickerid))
self.reqMktData(
tickerid,
ibcontract,
"",
False,
False,
[]
)
MAX_WAIT_SECONDS = 5
print("Getting data from the server... could take %d seconds to complete " % MAX_WAIT_SECONDS)
price_data = ib_data_queue.get(timeout = MAX_WAIT_SECONDS)
while self.wrapper.is_error():
print(self.get_error())
if ib_data_queue.timed_out():
print("Exceeded maximum wait for wrapper to confirm finished - seems to be normal behaviour")
self.cancelMktData(tickerid)
return price_data
class TestApp(TestWrapper, TestClient):
def __init__(self, ipaddress, portid, clientid):
TestWrapper.__init__(self)
TestClient.__init__(self, wrapper=self)
self.connect(ipaddress, portid, clientid)
thread = Thread(target = self.run)
thread.start()
setattr(self, "_thread", thread)
self.init_error()
def main(slist):
app = TestApp("127.0.0.1", 7497, 1)
for i in slist:
ibcontract = IBcontract()
ibcontract.secType = "STK"
ibcontract.symbol = i
ibcontract.exchange ="SEHK"
Lastprice = app.getIBrealtimedata(ibcontract)
df = pd.DataFrame(Lastprice)
print(ibcontract.symbol, df.head())
app.disconnect()
if __name__ == "__main__":
seclist = [700,2318,5,12]
main(seclist)
Вот сообщения об ошибках:
необработанное исключение в трассировке потока EReader (последний вызов был последним): файл "D:\Anaconda3\envs\myweb\lib\site-packages\ibapi\reader.py", строка 34, > в run data = self.conn.recvMsg() Файл "D:\Anaconda3\envs\myweb\lib\site-packages\ibapi\connection.py", строка>99, в файле recvMsg buf = self._recvAllMsg() Файл "D: \ Anaconda3 \ envs \ myweb \" lib \ site-packages \ ibapi \ connection.py ", строка>119, в _recvAllMsg buf = self.socket.recv(4096) OSError: [WinError 10038] Была предпринята операция над чем-то, что> не является сокетом
2 ответа
Запускается отдельный поток для чтения входящих сообщений из сокета:
thread = Thread(target = self.run)
thread.start()
Но этот поток никогда не останавливается и все еще работает, когда вы вызываете функцию disnect(). В результате он пытается получить доступ к объекту сокета, который теперь является None, вызывая ошибку. Попробуйте остановить поток EReader перед отключением, установив done=True
,
В качестве примечания: поскольку эта ошибка происходит в самом конце программы при отключении, она не должна мешать получению ожидаемых данных.
Обходной путь, чтобы избежать предупреждения.
Реализуйте это в своем подклассе EClient/EWrapper :
- Создайте функцию отключения сокета:
import socket, time
[...]
def _socketShutdown(self):
self.conn.lock.acquire()
try:
if self.conn.socket is not None:
self.conn.socket.shutdown(socket.SHUT_WR)
finally:
self.conn.lock.release()
- Используйте его перед закрытием соединения:
self._socketShutdown()
time.sleep(1)
self.disconnect()