Как заблокировать переменную в задаче сельдерея?
У меня есть задача сельдерея, которая искажает некоторые переменные. Это прекрасно работает, если я установил одного рабочего из сельдерея, но когда я использую параллелизм, все это испортилось. Как я могу заблокировать критическую секцию, где переменная искажена?
inb4: using Python 3.6, Redis both as broker and result backed. threading.Lock doesn't help in here.
1 ответ
Пока сельдерей работает на нескольких рабочих (процессах), блокировка потоков не поможет, потому что она работает внутри одного процесса. Кроме того, многопоточная блокировка полезна, когда вы контролируете весь процесс, а при использовании сельдерея нет никакого способа добиться этого.
Это означает, что сельдерей требует распределенной блокировки. Для django я всегда использую django-cache, как здесь: здесь. Если вам нужны более общие блокировки, особенно на основе Redis, для работы с любым приложением python вы можете использовать sherlock.
Я знаю, что это вопрос с 2+ годами, но сейчас я настраиваю свои конфиги сельдерея и я подошел к этой теме.
Я использую python 2.7 с Django 1.11 и celery 4 на Linux-машине. Я использую rabbitmq в качестве брокера.
Мои конфигурации подразумевают, что сельдерей работает как демон и сельдерей, чтобы обрабатывать запланированные задачи.
Итак, с выделенной очередью для данной задачи вы можете настроить эту очередь с рабочим (процессом) с concurrency=1 (подпроцессы).
Это решение решает проблемы параллелизма для сельдерея для выполнения задачи, но в вашем коде, если вы запускаете задачу без сельдерея, он не будет соблюдать принципы параллелизма.
Пример кода:
CELERY_TASK_QUEUES = (
Queue('celery_periodic', default_exchange, routing_key='celery_periodic'),
Queue('celery_task_1', celery_task_1_exchange, routing_key='celery_task_1'),
)
default_exchange = Exchange('celery_periodic', type='direct')
celery_task_1_exchange = Exchange('celery_task_1', type='direct')
CELERY_BEAT_SCHEDULE = {
'celery-task-1': {
'task': 'tasks.celery_task_1',
'schedule': timedelta(minutes=15),
'queue': 'celery_task_1'
},
}
и, наконец, в /etc/default/celeryd (документы здесь: https://docs.celeryproject.org/en/latest/userguide/daemonizing.html):
CELERYD_NODES="worker1 worker2"
CELERYD_OPTS="--concurrency=1 --time-limit=600 -Q:worker1 celery_periodic -Q:worker2 celery_task_1"
--concurrency N означает, что у вас будет ровно N рабочих подпроцессов для вашего рабочего экземпляра (это означает, что рабочий экземпляр может обрабатывать N одновременных задач) (отсюда: /questions/40250584/celerydconcurrency-valyuta-i-avtomasshtab/40250597#40250597).
Подробнее читайте здесь: https://docs.celeryproject.org/en/stable/userguide/workers.html
BR, Эдуардо