add_callback с пулом потоков, но исключение: "Не удается написать () после окончания ()."
Я использую пул потоков при использовании Торнадо, чтобы сделать некоторую работу. Это код:
общий / thread_pool.py
import tornado.ioloop
class Worker(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self._queue = queue
def run(self):
logging.info('Worker start')
while True:
content = self._queue.get()
if isinstance(content, str) and content == 'quit':
break
#content: (func, args, on_complete)
func = content[0]
args = content[1]
on_complete = content[2]
resp = func(args)
tornado.ioloop.IOLoop.instance().add_callback(lambda: on_complete(resp))
#i dont know is correct to call this
#self._queue.task_done()
logging.info('Worker stop')
class WorkerPool(object):
_workers = []
def __init__(self, num):
self._queue = Queue.Queue()
self._size = num
def start(self):
logging.info('WorkerPool start %d' % self._size)
for _ in range(self._size):
worker = Worker(self._queue)
worker.start()
self._workers.append(worker)
def stop(self):
for worker in self._workers:
self._queue.put('quit')
for worker in self._workers:
worker.join()
logging.info('WorkerPool stopd')
def append(self, content):
self._queue.put(content)
gateway.py
import tornado.ioloop
import tornado.web
from common import thread_pool
workers = None
class MainServerHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self):
start_time = time.time()
method = 'get'
content = (self.handle, (method, self.request, start_time), self.on_complete)
workers.append(content)
@tornado.web.asynchronous
def post(self):
start_time = time.time()
method = 'post'
content = (self.handle, (method, self.request, start_time), self.on_complete)
workers.append(content)
def handle(self, args):
method, request, start_time = args
#for test, just return
return 'test test'
def on_complete(self, res):
logging.debug('on_complete')
self.write(res)
self.finish()
return
def main(argv):
global workers
workers = thread_pool.WorkerPool(conf_mgr.thread_num)
workers.start()
application = tornado.web.Application([(r"/", MainServerHandler)])
application.listen(8888)
tornado.ioloop.IOLoop.instance().start()
if __name__ == "__main__":
main(sys.argv[1:])
Когда я делаю много одновременных запросов, я получаю эту ошибку:
ERROR: 2014-09-15 18:04:03: ioloop.py:435 * 140500107065056 Exception in callback <tornado.stack_context._StackContextWrapper object at 0x7fc8b4d6b9f0>
Traceback (most recent call last):
File "/home/work/nlp_arch/project/ps/se/nlp-arch/gateway/gateway/../third-party/tornado-2.4.1/tornado/ioloop.py", line 421, in _run_callback
callback()
File "/home/work/nlp_arch/project/ps/se/nlp-arch/gateway/gateway/../common/thread_pool.py", line 39, in <lambda>
tornado.ioloop.IOLoop.instance().add_callback(lambda: on_complete(resp))
File "/home/work/nlp_arch/project/ps/se/nlp-arch/gateway/gateway/gateway.py", line 92, in on_complete
self.write(res)
File "/home/work/nlp_arch/project/ps/se/nlp-arch/gateway/gateway/../third-party/tornado-2.4.1/tornado/web.py", line 489, in write
raise RuntimeError("Cannot write() after finish(). May be caused "
RuntimeError: Cannot write() after finish(). May be caused by using async operations without the @asynchronous decorator.
Но я не звонил write
после finish
, Я также использую @asynchronous
декоратор. В то же время в логах я вижу, что write
/finish
вызывается тем же потоком.
1 ответ
Проблема заключается в том, как вы добавляете обратный вызов в цикл ввода-вывода. Добавьте это так:
tornado.ioloop.IOLoop.instance().add_callback(on_complete, resp)
И ошибки уйдут.
Вы видите это странное поведение, потому что, когда вы используете лямбда-функцию, вы создаете замыкание в локальной области действия функции, и переменные, используемые в этом замыкании, связываются в точке, где выполняется лямбда, а не при ее создании., Рассмотрим этот пример:
funcs = []
def func(a):
print a
for i in range(5):
funcs.append(lambda: func(i))
for f in funcs:
f()
Выход:
4
4
4
4
4
Поскольку ваш рабочий метод выполняется в цикле while, on_complete
заканчивается переопределением несколько раз, что также меняет значение on_complete
внутри лямбды Это означает, что если один рабочий поток устанавливает on_complete
для обработчика A, но затем получает другую задачу и устанавливает on_complete
для обработчика B до установки обратного вызова для работающего обработчика A оба обратных вызова заканчиваются тем, что запускают обработчик B on_complete
,
Если вы действительно хотите использовать лямбду, вы также можете избежать этого, связывая on_complete
в локальном объеме лямбда:
tornado.ioloop.IOLoop.instance().add_callback(lambda on_complete=on_complete: on_complete(resp))
Но просто добавить функцию и ее аргумент напрямую намного приятнее.