Проблемы совместного использования переменных в сельдерее
Я использую Python и сельдерей в проекте. В проекте у меня есть два файла:
celeryconfig.py
BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_RESULT_BACKEND = "amqp"
CELERY_IMPORTS = ("example",)
CELERYD_CONCURRENCY = 2
и example.py
from celery.task import task
import hashlib
md5 = hashlib.md5()
@task
def getDigest(text):
print 'Using md5 - ',md5
md5.update(text)
return md5.digest()
В файле celeryconfig.py я установил для CELERYD_CONCURRENCY значение 2, что означает, что он будет распределять задачи в моей очереди задач по двум различным процессам.
Из консоли Python я запускаю:
from example import getDigest
getDigest.delay('foo');getDigest.delay('bar')
Это создает две задачи, которые одновременно выполняются двумя работниками. Проблема в том, что, поскольку оба рабочих процесса выполняют свои функции задачи [getDigest ()], они, похоже, используют один и тот же хэш-объект (md5). Вывод celeryd подтверждает это, как вы можете видеть ниже.
[PoolWorker-2] Using md5 -
[PoolWorker-2] <md5 HASH object @ 0x23e6870>
[PoolWorker-1] Using md5 -
[PoolWorker-1] <md5 HASH object @ 0x23e6870>
Для простоты я использую объект md5 из hashlib, но в моем реальном проекте я использую объект, который не может быть доступен и изменен более чем одним процессом. Это, как ожидается, приводит к краху рабочих.
Это поднимает вопрос: как я могу изменить свой код, чтобы заставить рабочие процессы инициализировать и использовать свой собственный (md5) объект? Прямо сейчас они используют один и тот же объект, что приводит к сбою моего приложения. Это возможно?
1 ответ
Они используют один и тот же объект, потому что вы явно указываете это в своем коде. Создавая объект вне рамок задачи и используя его в задаче, вы предоставляете всем работникам доступ к общему объекту. Это проблема параллелизма, не обязательно проблема сельдерея. Вы можете использовать копию объекта, если он маленький, или использовать собственную стратегию блокировки. В целом, хотя, если объект будет обновляться более чем одним процессом за раз, ему необходимо использовать какую-то синхронизацию, которая выходит за рамки Celery.