add_callback с пулом потоков, но исключение: "Не удается написать () после окончания ()."

Я использую пул потоков при использовании Торнадо, чтобы сделать некоторую работу. Это код:

общий / thread_pool.py

import tornado.ioloop

class Worker(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self._queue = queue

    def run(self):
        logging.info('Worker start')
        while True:
            content = self._queue.get()
            if isinstance(content, str) and content == 'quit':
                break
            #content: (func, args, on_complete)
            func = content[0]
            args = content[1]
            on_complete = content[2]
            resp = func(args)
            tornado.ioloop.IOLoop.instance().add_callback(lambda: on_complete(resp))
            #i dont know is correct to call this
            #self._queue.task_done()
        logging.info('Worker stop')

class WorkerPool(object):
    _workers = []
    def __init__(self, num):
        self._queue = Queue.Queue()
        self._size = num

    def start(self):
        logging.info('WorkerPool start %d' % self._size)
        for _ in range(self._size):
            worker = Worker(self._queue)
            worker.start()
            self._workers.append(worker)

    def stop(self):
        for worker in self._workers:
            self._queue.put('quit') 
        for worker in self._workers:
            worker.join()
        logging.info('WorkerPool stopd')

    def append(self, content):
        self._queue.put(content)

gateway.py

import tornado.ioloop
import tornado.web

from common import thread_pool

workers = None

class MainServerHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        start_time = time.time()
        method = 'get'
        content = (self.handle, (method, self.request, start_time), self.on_complete)
        workers.append(content)

    @tornado.web.asynchronous
    def post(self):
        start_time = time.time()
        method = 'post'
        content = (self.handle, (method, self.request, start_time), self.on_complete)
        workers.append(content)

    def handle(self, args):
        method, request, start_time = args
        #for test, just return
        return 'test test'

    def on_complete(self, res):
        logging.debug('on_complete')
        self.write(res)
        self.finish()
        return        

def main(argv):  
    global workers
    workers = thread_pool.WorkerPool(conf_mgr.thread_num)
    workers.start()

    application = tornado.web.Application([(r"/", MainServerHandler)])
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
    main(sys.argv[1:])

Когда я делаю много одновременных запросов, я получаю эту ошибку:

ERROR: 2014-09-15 18:04:03: ioloop.py:435 * 140500107065056 Exception in callback <tornado.stack_context._StackContextWrapper object at 0x7fc8b4d6b9f0>

  Traceback (most recent call last):
     File "/home/work/nlp_arch/project/ps/se/nlp-arch/gateway/gateway/../third-party/tornado-2.4.1/tornado/ioloop.py", line 421, in _run_callback
       callback()
     File "/home/work/nlp_arch/project/ps/se/nlp-arch/gateway/gateway/../common/thread_pool.py", line 39, in <lambda>
       tornado.ioloop.IOLoop.instance().add_callback(lambda: on_complete(resp))
     File "/home/work/nlp_arch/project/ps/se/nlp-arch/gateway/gateway/gateway.py", line 92, in on_complete
       self.write(res)
     File "/home/work/nlp_arch/project/ps/se/nlp-arch/gateway/gateway/../third-party/tornado-2.4.1/tornado/web.py", line 489, in write
      raise RuntimeError("Cannot write() after finish().  May be caused "
  RuntimeError: Cannot write() after finish().  May be caused by using async operations without the @asynchronous decorator.

Но я не звонил write после finish, Я также использую @asynchronous декоратор. В то же время в логах я вижу, что write/finish вызывается тем же потоком.

1 ответ

Решение

Проблема заключается в том, как вы добавляете обратный вызов в цикл ввода-вывода. Добавьте это так:

tornado.ioloop.IOLoop.instance().add_callback(on_complete, resp)

И ошибки уйдут.

Вы видите это странное поведение, потому что, когда вы используете лямбда-функцию, вы создаете замыкание в локальной области действия функции, и переменные, используемые в этом замыкании, связываются в точке, где выполняется лямбда, а не при ее создании., Рассмотрим этот пример:

funcs = []
def func(a):
    print a

for i in range(5):
   funcs.append(lambda: func(i))

for f in funcs:
    f()

Выход:

4
4
4
4
4

Поскольку ваш рабочий метод выполняется в цикле while, on_complete заканчивается переопределением несколько раз, что также меняет значение on_complete внутри лямбды Это означает, что если один рабочий поток устанавливает on_complete для обработчика A, но затем получает другую задачу и устанавливает on_complete для обработчика B до установки обратного вызова для работающего обработчика A оба обратных вызова заканчиваются тем, что запускают обработчик B on_complete,

Если вы действительно хотите использовать лямбду, вы также можете избежать этого, связывая on_complete в локальном объеме лямбда:

tornado.ioloop.IOLoop.instance().add_callback(lambda on_complete=on_complete: on_complete(resp))

Но просто добавить функцию и ее аргумент напрямую намного приятнее.

Другие вопросы по тегам