Торнадо Периодический обратный вызов и операции с сокетами внутри обратного вызова

Я пытаюсь сделать неблокирующее веб-приложение, которое использует Tornado. Это приложение использует PeriodicCallback в качестве планировщика для сбора данных с новостных сайтов:

for nc_uuid in self.LIVE_NEWSCOLLECTORS.keys():
            self.LIVE_NEWSCOLLECTORS[nc_uuid].agreggator,ioloop=args
            period=int(self.LIVE_NEWSCOLLECTORS[nc_uuid].period)*60
            if self.timer is not None: period = int(self.timer)

            #self.scheduler.add_job(func=self.LIVE_NEWSCOLLECTORS[nc_uuid].getNews,args=[self.source,i],trigger='interval',seconds=10,id=nc_uuid)
            task = tornado.ioloop.PeriodicCallback(lambda:self.LIVE_NEWSCOLLECTORS[nc_uuid].getNews(self.source,i),1000*10,ioloop)
            task.start()

'getData', который вызывает как обратный вызов, имеет асинхронный http-запрос, который анализирует и отправляет данные в TCPServer для анализа, вызывая метод process_responce:

@gen.coroutine 
def process_response(self,*args,**kwargs):
buf = {'sentence':str('text here')}
data_string = json.dumps(buf)
s.send(data_string)
while True:
    try:
         data = s.recv(100000)
         if not data:
                print "connection closed"
                s.close()
                break
         else:
             print "Received %d bytes: '%s'" % (len(data), data)
            # s.close()
             break
    except socket.error, e:
          if e.args[0] == errno.EWOULDBLOCK:
               print 'error',errno.EWOULDBLOCK
               time.sleep(1)           # short delay, no tight loops
          else:
               print e
               break
    i+=1

Внутри process_response я использую базовый пример для неблокирующих операций с сокетами. Process_response показывает что-то вроде этого: ошибка 10035 ошибка 10035 Получено 75 байт: '{"mode": 1, "ключевое слово": "\u0435\u0432\u0440\u043e", "предложение": "текст здесь"}'

Это выглядит нормальным поведением. Но при получении данных основной IOLoop блокируется! Если бы я спросил веб-сервер, он не вернул бы мои anydata до тех пор, пока не закончится периодическая задача обратного вызова... Где моя ошибка?

1 ответ

time.sleep() является блокирующей функцией и никогда не должен использоваться в неблокирующем коде. использование yield gen.sleep() вместо.

Также рассмотрите возможность использования tornado.iostream.IOStream вместо операций с необработанными сокетами.

Другие вопросы по тегам