Ошибка торнадо: невозможно переключиться на другой поток

В GET-обработчик торнадо веб-приложения поступает запрос. От GET функция, а blocking_task функция называется. это blocking_task функция имеет @run_on_executor декоратор.

Но это выполнение не удается. Не могли бы вы помочь в этом. Кажется, что мотор db не может выполнить поток.

import time
from concurrent.futures import ThreadPoolExecutor
from tornado import gen, web
from tornado.concurrent import run_on_executor
from tornado.ioloop import IOLoop
import argparse
from common.config import APIConfig
import sys
import os
import motor

parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config-file", dest='config_file',
                    help="Config file location")
args = parser.parse_args()
CONF = APIConfig().parse(args.config_file)

client = motor.MotorClient(CONF.mongo_url)
db = client[CONF.mongo_dbname]
class Handler(web.RequestHandler):

    executor = ThreadPoolExecutor(10)

    def initialize(self):
        """ Prepares the database for the entire class """
        self.db = self.settings["db"]

    @gen.coroutine
    def get(self):
        self.blocking_task()

    @run_on_executor
    def blocking_task(self):
        mongo_dict = self.db.test_cases.find_one({"name": "Ping"})


if __name__ == "__main__":
    app = web.Application([
        (r"/", Handler),
    ],
        db=db,
        debug=CONF.api_debug_on,
    )
    app.listen(8888)
    IOLoop.current().start()




> ERROR:tornado.application:Exception in callback <functools.partial
> object at 0x7f72dfbe48e8> Traceback (most recent call last):   File
> "/usr/local/lib/python2.7/dist-packages/tornado-4.3-py2.7-linux-x86_64.egg/tornado/ioloop.py",
> line 600, in _run_callback
>     ret = callback()   File "/usr/local/lib/python2.7/dist-packages/tornado-4.3-py2.7-linux-x86_64.egg/tornado/stack_context.py",
> line 275, in null_wrapper
>     return fn(*args, **kwargs)   File "/usr/local/lib/python2.7/dist-packages/motor-0.5-py2.7.egg/motor/frameworks/tornado.py",
> line 231, in callback
>     child_gr.switch(future.result()) error: cannot switch to a different thread

Не могли бы вы помочь в этом.

3 ответа

Решение

Наконец, следующий код работает, спасибо @kwarunek Также добавлены параметры в функцию обратного вызова.

import time
from concurrent.futures import ThreadPoolExecutor
from tornado import gen, web
from tornado.concurrent import run_on_executor
from tornado.ioloop import IOLoop
import argparse
from common.config import APIConfig
import sys
import os
import motor

parser = argparse.ArgumentParser()
parser.add_argument("-c", "--config-file", dest='config_file',
                    help="Config file location")
args = parser.parse_args()
CONF = APIConfig().parse(args.config_file)

client = motor.MotorClient(CONF.mongo_url)
db = client[CONF.mongo_dbname]

class Handler(web.RequestHandler):

    executor = ThreadPoolExecutor(10)

    def initialize(self):
        """ Prepares the database for the entire class """
        self.db = self.settings["db"]

    @gen.coroutine
    def get(self):
        self.blocking_task("Ping", "Void-R")


    @run_on_executor
    def blocking_task(self, name, status):
        IOLoop.instance().add_callback(callback=lambda: self.some_update(name, status))

    @gen.coroutine
    def some_update(self, name, status):
        mongo_dict = yield self.db.test_cases.find_one({"name": name})
        self.db.test_cases.update({ "name": name }, { "$set": { "status" : status } } )


if __name__ == "__main__":
    app = web.Application([
        (r"/", Handler),
    ],
        db=db,
        debug=CONF.api_debug_on,
    )
    app.listen(8888)
    IOLoop.current().start()

Из документов

IOLoop и executor, которые будут использоваться, определяются атрибутами io_loop и executor self. Чтобы использовать разные атрибуты, передайте аргументы ключевых слов декоратору

Вы должны предоставить init threadpoolexecutor:

import time
from concurrent.futures import ThreadPoolExecutor
from tornado import gen, web
from tornado.concurrent import run_on_executor
from tornado.ioloop import IOLoop


class Handler(web.RequestHandler):

    executor = ThreadPoolExecutor(10)

    @gen.coroutine
    def get(self):
        self.blocking_task()

    @run_on_executor
    def blocking_task(self):
        time.sleep(10)


if __name__ == "__main__":
    app = web.Application([
        (r"/", Handler),
    ])
    app.listen(8888)
    IOLoop.current().start()

По умолчанию run_on_executor поиск резьбы в executor атрибут, если вы не передаете другой явно, например

_thread_pool = ThreadPoolExecutor(10)

@run_on_executor(executor='_thread_pool')
def blocking_task(self):
    pass

редактировать

По сути, IOLoop должен использоваться в однопоточном окружении (вы можете запустить отдельный IOLoop для каждого потока, но это не ваш случай). Для связи с IOLoop вы должны использовать add_callback, единственную поточно- ориентированную функцию.

Вы можете использовать как:

@run_on_executor
def blocking_task(self):
    IOLoop.instance().add_callback(some_update)

@gen.coroutine
def some_update():
    db.test_cases.update({ "name": "abc" }, { "$set": { "status" : "xyz" } } )

Но вы действительно нуждаетесь в потоке вообще. Какова цель отдельного потока, если вы планируете обновление на главном потоке IOLoop.

Motor - это неблокирующая библиотека, предназначенная для использования с IOLoop нить. Вы бы использовали ThreadPoolExecutor с блокирующей библиотекой, такой как PyMongo, но вы не должны использовать другие потоки с Motor.

Вместо этого вы должны вызывать методы Motor с помощью yield непосредственно:

@gen.coroutine
def get(self):
    yield self.non_blocking_task()

@gen.coroutine
def non_blocking_task(self):
    motor_dict = yield self.db.test_cases.find_one({"name": "Ping"})

Также обратите внимание, что если вы используете @run_on_executor с блокирующей библиотекой, такой как PyMongo, декоратор делает блокирующие функции неблокирующими, поэтому декорированная функция должна вызываться с помощью yield,

Другие вопросы по тегам