Как заблокировать переменную в задаче сельдерея?

У меня есть задача сельдерея, которая искажает некоторые переменные. Это прекрасно работает, если я установил одного рабочего из сельдерея, но когда я использую параллелизм, все это испортилось. Как я могу заблокировать критическую секцию, где переменная искажена?

inb4: using Python 3.6, Redis both as broker and result backed. threading.Lock doesn't help in here.

1 ответ

Решение

Пока сельдерей работает на нескольких рабочих (процессах), блокировка потоков не поможет, потому что она работает внутри одного процесса. Кроме того, многопоточная блокировка полезна, когда вы контролируете весь процесс, а при использовании сельдерея нет никакого способа добиться этого.

Это означает, что сельдерей требует распределенной блокировки. Для django я всегда использую django-cache, как здесь: здесь. Если вам нужны более общие блокировки, особенно на основе Redis, для работы с любым приложением python вы можете использовать sherlock.

Я знаю, что это вопрос с 2+ годами, но сейчас я настраиваю свои конфиги сельдерея и я подошел к этой теме.

Я использую python 2.7 с Django 1.11 и celery 4 на Linux-машине. Я использую rabbitmq в качестве брокера.

Мои конфигурации подразумевают, что сельдерей работает как демон и сельдерей, чтобы обрабатывать запланированные задачи.

Итак, с выделенной очередью для данной задачи вы можете настроить эту очередь с рабочим (процессом) с concurrency=1 (подпроцессы).

Это решение решает проблемы параллелизма для сельдерея для выполнения задачи, но в вашем коде, если вы запускаете задачу без сельдерея, он не будет соблюдать принципы параллелизма.

Пример кода:

CELERY_TASK_QUEUES = (
   Queue('celery_periodic', default_exchange, routing_key='celery_periodic'),
   Queue('celery_task_1', celery_task_1_exchange, routing_key='celery_task_1'),
)

default_exchange            = Exchange('celery_periodic', type='direct')
celery_task_1_exchange      = Exchange('celery_task_1', type='direct')

CELERY_BEAT_SCHEDULE = {
   'celery-task-1': {
       'task':     'tasks.celery_task_1',
       'schedule': timedelta(minutes=15),
       'queue':    'celery_task_1'
   },
}

и, наконец, в /etc/default/celeryd (документы здесь: https://docs.celeryproject.org/en/latest/userguide/daemonizing.html):

CELERYD_NODES="worker1 worker2"
CELERYD_OPTS="--concurrency=1 --time-limit=600 -Q:worker1 celery_periodic -Q:worker2 celery_task_1"

--concurrency N означает, что у вас будет ровно N рабочих подпроцессов для вашего рабочего экземпляра (это означает, что рабочий экземпляр может обрабатывать N одновременных задач) (отсюда: /questions/40250584/celerydconcurrency-valyuta-i-avtomasshtab/40250597#40250597).

Подробнее читайте здесь: https://docs.celeryproject.org/en/stable/userguide/workers.html

BR, Эдуардо

Другие вопросы по тегам