Проблемы совместного использования переменных в сельдерее

Я использую Python и сельдерей в проекте. В проекте у меня есть два файла:

celeryconfig.py

BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_RESULT_BACKEND = "amqp"
CELERY_IMPORTS = ("example",)
CELERYD_CONCURRENCY = 2

и example.py

from celery.task import task
import hashlib

md5 = hashlib.md5()

@task
def getDigest(text):
    print 'Using md5 - ',md5
    md5.update(text)
    return md5.digest()

В файле celeryconfig.py я установил для CELERYD_CONCURRENCY значение 2, что означает, что он будет распределять задачи в моей очереди задач по двум различным процессам.

Из консоли Python я запускаю:

from example import getDigest
getDigest.delay('foo');getDigest.delay('bar')

Это создает две задачи, которые одновременно выполняются двумя работниками. Проблема в том, что, поскольку оба рабочих процесса выполняют свои функции задачи [getDigest ()], они, похоже, используют один и тот же хэш-объект (md5). Вывод celeryd подтверждает это, как вы можете видеть ниже.

[PoolWorker-2] Using md5 -
[PoolWorker-2] <md5 HASH object @ 0x23e6870>
[PoolWorker-1] Using md5 -
[PoolWorker-1] <md5 HASH object @ 0x23e6870>

Для простоты я использую объект md5 из hashlib, но в моем реальном проекте я использую объект, который не может быть доступен и изменен более чем одним процессом. Это, как ожидается, приводит к краху рабочих.

Это поднимает вопрос: как я могу изменить свой код, чтобы заставить рабочие процессы инициализировать и использовать свой собственный (md5) объект? Прямо сейчас они используют один и тот же объект, что приводит к сбою моего приложения. Это возможно?

1 ответ

Решение

Они используют один и тот же объект, потому что вы явно указываете это в своем коде. Создавая объект вне рамок задачи и используя его в задаче, вы предоставляете всем работникам доступ к общему объекту. Это проблема параллелизма, не обязательно проблема сельдерея. Вы можете использовать копию объекта, если он маленький, или использовать собственную стратегию блокировки. В целом, хотя, если объект будет обновляться более чем одним процессом за раз, ему необходимо использовать какую-то синхронизацию, которая выходит за рамки Celery.

Другие вопросы по тегам