Сельдерей зависает при доступе к базе данных ZEO (состояние гонки?)

проблема

Сельдерея зависает от выполнения задачи при использовании пакета, который обращается к серверу ZEO. Однако, если бы я получил доступ к серверу напрямую tasks.pyНет проблем вообще.

Фон

У меня есть программа, которая читает и пишет в файл ZODB. Поскольку я хочу, чтобы несколько пользователей могли одновременно получать доступ к этой базе данных и изменять ее, я управляю ею с помощью сервера ZEO, что должно обеспечить ее безопасность для нескольких процессов и потоков. Я определяю базу данных в модуле моей программы:

from ZEO import ClientStorage
from ZODB.DB import DB

addr = 'localhost', 8090
storage = ClientStorage.ClientStorage(addr, wait=False)
db = DB(storage)

SSCCE

Я, очевидно, пытаюсь выполнить более сложные операции, но давайте предположим, что мне нужны только ключи корневого объекта или его дочерних элементов. Я могу представить проблему в этом контексте.

я создаю dummy_package с вышеуказанным кодом в модуле, databases.pyи пустой модуль, предназначенный для доступа к базе данных:

# main.py

def get_keys(dict_like):
    return dict_like.keys()

Если я не пытаюсь получить доступ к базе данных с dummy_packageЯ могу импортировать базу данных и получить доступ к root без проблем:

# tasks.py
from dummy_package import databases

@task()
def simple_task():

    connection = databases.db.open()
    keys = connection.root().keys()
    connection.close(); databases.db.close()
    return keys  # Works perfectly

Тем не менее, пытаясь передать соединение или ребенка root заставляет задачу зависать бесконечно.

@task()
def simple_task():
    connection = databases.db.open()
    root = connection.root()
    ret = main.get_keys(root)  # Hangs indefinitely
    ...

Если это имеет какое-либо значение, к этим задачам Сельдерея обращается Джанго.

Вопрос

Итак, прежде всего, что здесь происходит? Есть ли какое-то состояние гонки, вызванное таким образом доступом к серверу ZEO?

Я мог бы сделать все, чтобы доступ к базе данных отвечал за Селери, но это сделало бы уродливый код. Кроме того, это разрушило бы способность моей программы функционировать как отдельная программа. Разве невозможно взаимодействовать с ZEO в рамках процедуры, называемой работником Celery?

2 ответа

Решение

Я не совсем понимаю, что происходит, но я думаю, что тупик как-то связан с тем, что Celery использует multiprocessing по умолчанию для параллелизма. Переход на использование Eventlet для задач, которым требуется доступ к ZEO-серверу, решил мою проблему.

Мой процесс

  1. Запустите работника, который использует Eventlet, и тот, который использует стандартные multiproccesing,

    celery имя очереди по умолчанию ( по историческим причинам), поэтому рабочий Eventlet должен обработать эту очередь:

    $ celery worker --concurrency=500 --pool=eventlet --loglevel=debug \ 
                     -Q celery                --hostname eventlet_worker
    $ celery worker  --loglevel=debug \
                     -Q multiprocessing_queue --hostname multiprocessing_worker
    
  2. Маршрутные задачи, которые требуют стандарта multiprocessing в соответствующую очередь. Все остальные будут направлены на celery очередь (управляемая Eventlet) по умолчанию. (Если использовать Django, это идет в settings.py):

    CELERY_ROUTES = {'project.tasks.ex_task': {'queue': 'multiprocessing_queue'}}
    

Не сохраняйте открытое соединение или его корневой объект как глобальный.

Вам нужно соединение для каждого потока; просто потому, что ZEO делает возможным доступ к нескольким потокам, похоже, что вы используете что-то, что не является локальным для потока (например, глобальный уровень на уровне модуля в database.py).

Сохраните базу данных как глобальную, но вызывайте db.open() во время каждой задачи. См. http://zodb.readthedocs.org/en/latest/api.html

Другие вопросы по тегам