Торнадо Периодический обратный вызов и операции с сокетами внутри обратного вызова
Я пытаюсь сделать неблокирующее веб-приложение, которое использует Tornado. Это приложение использует PeriodicCallback в качестве планировщика для сбора данных с новостных сайтов:
for nc_uuid in self.LIVE_NEWSCOLLECTORS.keys():
self.LIVE_NEWSCOLLECTORS[nc_uuid].agreggator,ioloop=args
period=int(self.LIVE_NEWSCOLLECTORS[nc_uuid].period)*60
if self.timer is not None: period = int(self.timer)
#self.scheduler.add_job(func=self.LIVE_NEWSCOLLECTORS[nc_uuid].getNews,args=[self.source,i],trigger='interval',seconds=10,id=nc_uuid)
task = tornado.ioloop.PeriodicCallback(lambda:self.LIVE_NEWSCOLLECTORS[nc_uuid].getNews(self.source,i),1000*10,ioloop)
task.start()
'getData', который вызывает как обратный вызов, имеет асинхронный http-запрос, который анализирует и отправляет данные в TCPServer для анализа, вызывая метод process_responce:
@gen.coroutine
def process_response(self,*args,**kwargs):
buf = {'sentence':str('text here')}
data_string = json.dumps(buf)
s.send(data_string)
while True:
try:
data = s.recv(100000)
if not data:
print "connection closed"
s.close()
break
else:
print "Received %d bytes: '%s'" % (len(data), data)
# s.close()
break
except socket.error, e:
if e.args[0] == errno.EWOULDBLOCK:
print 'error',errno.EWOULDBLOCK
time.sleep(1) # short delay, no tight loops
else:
print e
break
i+=1
Внутри process_response я использую базовый пример для неблокирующих операций с сокетами. Process_response показывает что-то вроде этого: ошибка 10035 ошибка 10035 Получено 75 байт: '{"mode": 1, "ключевое слово": "\u0435\u0432\u0440\u043e", "предложение": "текст здесь"}'
Это выглядит нормальным поведением. Но при получении данных основной IOLoop блокируется! Если бы я спросил веб-сервер, он не вернул бы мои anydata до тех пор, пока не закончится периодическая задача обратного вызова... Где моя ошибка?
1 ответ
time.sleep()
является блокирующей функцией и никогда не должен использоваться в неблокирующем коде. использование yield gen.sleep()
вместо.
Также рассмотрите возможность использования tornado.iostream.IOStream
вместо операций с необработанными сокетами.