Сельдерей зависает при доступе к базе данных ZEO (состояние гонки?)
проблема
Сельдерея зависает от выполнения задачи при использовании пакета, который обращается к серверу ZEO. Однако, если бы я получил доступ к серверу напрямую tasks.py
Нет проблем вообще.
Фон
У меня есть программа, которая читает и пишет в файл ZODB. Поскольку я хочу, чтобы несколько пользователей могли одновременно получать доступ к этой базе данных и изменять ее, я управляю ею с помощью сервера ZEO, что должно обеспечить ее безопасность для нескольких процессов и потоков. Я определяю базу данных в модуле моей программы:
from ZEO import ClientStorage
from ZODB.DB import DB
addr = 'localhost', 8090
storage = ClientStorage.ClientStorage(addr, wait=False)
db = DB(storage)
SSCCE
Я, очевидно, пытаюсь выполнить более сложные операции, но давайте предположим, что мне нужны только ключи корневого объекта или его дочерних элементов. Я могу представить проблему в этом контексте.
я создаю dummy_package
с вышеуказанным кодом в модуле, databases.py
и пустой модуль, предназначенный для доступа к базе данных:
# main.py
def get_keys(dict_like):
return dict_like.keys()
Если я не пытаюсь получить доступ к базе данных с dummy_package
Я могу импортировать базу данных и получить доступ к root без проблем:
# tasks.py
from dummy_package import databases
@task()
def simple_task():
connection = databases.db.open()
keys = connection.root().keys()
connection.close(); databases.db.close()
return keys # Works perfectly
Тем не менее, пытаясь передать соединение или ребенка root
заставляет задачу зависать бесконечно.
@task()
def simple_task():
connection = databases.db.open()
root = connection.root()
ret = main.get_keys(root) # Hangs indefinitely
...
Если это имеет какое-либо значение, к этим задачам Сельдерея обращается Джанго.
Вопрос
Итак, прежде всего, что здесь происходит? Есть ли какое-то состояние гонки, вызванное таким образом доступом к серверу ZEO?
Я мог бы сделать все, чтобы доступ к базе данных отвечал за Селери, но это сделало бы уродливый код. Кроме того, это разрушило бы способность моей программы функционировать как отдельная программа. Разве невозможно взаимодействовать с ZEO в рамках процедуры, называемой работником Celery?
2 ответа
Я не совсем понимаю, что происходит, но я думаю, что тупик как-то связан с тем, что Celery использует multiprocessing
по умолчанию для параллелизма. Переход на использование Eventlet для задач, которым требуется доступ к ZEO-серверу, решил мою проблему.
Мой процесс
Запустите работника, который использует Eventlet, и тот, который использует стандартные
multiproccesing
,celery
имя очереди по умолчанию ( по историческим причинам), поэтому рабочий Eventlet должен обработать эту очередь:$ celery worker --concurrency=500 --pool=eventlet --loglevel=debug \ -Q celery --hostname eventlet_worker $ celery worker --loglevel=debug \ -Q multiprocessing_queue --hostname multiprocessing_worker
Маршрутные задачи, которые требуют стандарта
multiprocessing
в соответствующую очередь. Все остальные будут направлены наcelery
очередь (управляемая Eventlet) по умолчанию. (Если использовать Django, это идет вsettings.py
):CELERY_ROUTES = {'project.tasks.ex_task': {'queue': 'multiprocessing_queue'}}
Не сохраняйте открытое соединение или его корневой объект как глобальный.
Вам нужно соединение для каждого потока; просто потому, что ZEO делает возможным доступ к нескольким потокам, похоже, что вы используете что-то, что не является локальным для потока (например, глобальный уровень на уровне модуля в database.py).
Сохраните базу данных как глобальную, но вызывайте db.open() во время каждой задачи. См. http://zodb.readthedocs.org/en/latest/api.html