Ошибка исключения в потоке EReader под управлением ibapi

Я использую Python ib-api для получения рыночных данных в реальном времени от Interactive Brokers. Он может предоставить данные, которые я ожидал, но заканчивается "необработанным исключением в потоке EReader".

from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract as IBcontract
from threading import Thread
import queue
import pandas as pd
from ibapi.ticktype import TickTypeEnum`

`DEFAULT_PRICE_DATA_ID = 1001`

`FINISHED = object()
STARTED = object()
TIME_OUT = object()`

class finishableQueue(object):

    def __init__(self, queue_to_finish):

        self._queue = queue_to_finish
        self.status = STARTED

    def get(self, timeout):

        contents_of_queue=[]
        finished=False

        while not finished:
            try:
                current_element = self._queue.get(timeout=timeout)
                if current_element is FINISHED:
                    finished = True
                    self.status = FINISHED
                else:
                    contents_of_queue.append(current_element)

            except queue.Empty:
                finished = True
                self.status = TIME_OUT

        return contents_of_queue

    def timed_out(self):
        return self.status is TIME_OUT


class TestWrapper(EWrapper):

    def __init__(self):
        self._my_price_data_dict = {}

    def get_error(self, timeout=5):
        if self.is_error():
            try:
                return self._my_errors.get(timeout=timeout)
            except queue.Empty:
                return None

        return None

    def is_error(self):
        an_error_if=not self._my_errors.empty()
        return an_error_if

    def init_error(self):
        error_queue=queue.Queue()
        self._my_errors = error_queue

    def error(self, id, errorCode, errorString):
        ## Overriden method
        errormsg = "IB error id %d errorcode %d string %s" % (id, errorCode, errorString)
        self._my_errors.put(errormsg)

    def init_ibprices(self, tickerid):
        ibprice_data_queue = self._my_price_data_dict[tickerid] = queue.Queue()

        return ibprice_data_queue

    def tickPrice(self, reqId, tickType, price, attrib):
        tickdata = (TickTypeEnum.to_str(tickType), price)

        price_data_dict = self._my_price_data_dict

        if reqId not in price_data_dict.keys():
            self.init_ibprices(reqId)

        price_data_dict[reqId].put(tickdata)


class TestClient(EClient):

    def __init__(self, wrapper):
        EClient.__init__(self, wrapper)

    def error(self, reqId, errorCode, errorString):
        print("Error: ", reqId, " ", errorCode, " ", errorString)

    def getIBrealtimedata(self, ibcontract, tickerid=DEFAULT_PRICE_DATA_ID):
        ib_data_queue = finishableQueue(self.init_ibprices(tickerid))

        self.reqMktData(
            tickerid,
            ibcontract,
            "",
            False,
            False,
            []
        )

        MAX_WAIT_SECONDS = 5
        print("Getting data from the server... could take %d seconds to complete " % MAX_WAIT_SECONDS)

        price_data = ib_data_queue.get(timeout = MAX_WAIT_SECONDS)

        while self.wrapper.is_error():
            print(self.get_error())

        if ib_data_queue.timed_out():
            print("Exceeded maximum wait for wrapper to confirm finished - seems to be normal behaviour")

        self.cancelMktData(tickerid)

        return price_data

class TestApp(TestWrapper, TestClient):
    def __init__(self, ipaddress, portid, clientid):
        TestWrapper.__init__(self)
        TestClient.__init__(self, wrapper=self)

        self.connect(ipaddress, portid, clientid)

        thread = Thread(target = self.run)
        thread.start()

        setattr(self, "_thread", thread)

        self.init_error()

def main(slist):

    app = TestApp("127.0.0.1", 7497, 1)

    for i in slist:
        ibcontract = IBcontract()
        ibcontract.secType = "STK"
        ibcontract.symbol = i
        ibcontract.exchange ="SEHK"

        Lastprice = app.getIBrealtimedata(ibcontract)

        df = pd.DataFrame(Lastprice)
        print(ibcontract.symbol, df.head())

    app.disconnect()

if __name__ == "__main__":

    seclist = [700,2318,5,12]
    main(seclist)

Вот сообщения об ошибках:

необработанное исключение в трассировке потока EReader (последний вызов был последним): файл "D:\Anaconda3\envs\myweb\lib\site-packages\ibapi\reader.py", строка 34, > в run data = self.conn.recvMsg() Файл "D:\Anaconda3\envs\myweb\lib\site-packages\ibapi\connection.py", строка>99, в файле recvMsg buf = self._recvAllMsg() Файл "D: \ Anaconda3 \ envs \ myweb \" lib \ site-packages \ ibapi \ connection.py ", строка>119, в _recvAllMsg buf = self.socket.recv(4096) OSError: [WinError 10038] Была предпринята операция над чем-то, что> не является сокетом

2 ответа

Запускается отдельный поток для чтения входящих сообщений из сокета:

thread = Thread(target = self.run)
thread.start()

Но этот поток никогда не останавливается и все еще работает, когда вы вызываете функцию disnect(). В результате он пытается получить доступ к объекту сокета, который теперь является None, вызывая ошибку. Попробуйте остановить поток EReader перед отключением, установив done=True,

В качестве примечания: поскольку эта ошибка происходит в самом конце программы при отключении, она не должна мешать получению ожидаемых данных.

Обходной путь, чтобы избежать предупреждения.

Реализуйте это в своем подклассе EClient/EWrapper :

  1. Создайте функцию отключения сокета:
      import socket, time
[...]

    def _socketShutdown(self):
        self.conn.lock.acquire()
        try:
            if self.conn.socket is not None:
                self.conn.socket.shutdown(socket.SHUT_WR)
        finally:
            self.conn.lock.release()
  1. Используйте его перед закрытием соединения:
          self._socketShutdown()
    time.sleep(1) 
    self.disconnect()
Другие вопросы по тегам