Сельдерей не может найти работника

Я использую сельдерей с такой конфигурацией

default_exchange = Exchange('default', type='direct')
latest_exchange = Exchange('latest', type='direct')

shared_celery_config = {
    'BROKER_URL': 'redis://localhost:6379/0',
    'CELERY_RESULT_BACKEND': 'redis://localhost:6379/0',
    'CELERY_DEFAULT_EXCHANGE': 'default',
    'CELERY_DEFAULT_ROUTING_KEY': 'default',
    'CELERY_DEFAULT_QUEUE': 'default',
    'CELERY_QUEUES': (
        Queue('default', default_exchange, routing_key='default'),
        Queue('latest', latest_exchange, routing_key='latest'),
    ),
}

celery = Celery('tasks', config_source=shared_celery_config)

Но когда я создаю задание, никто из рабочих не потребляет его, и ничего не происходит. Я начинаю работников с: celery worker -A tasks --loglevel=debug --hostname=worker1, Я вижу их из ps aux | grep celery вывод, но при выполнении какой-то команды, чтобы получить статистику вроде celery -A tasks status Я получаю следующее сообщение об ошибке Error: No nodes replied within time constraint., Поэтому все задачи находятся в состоянии ОЖИДАНИЯ. Я считаю, что это какая-то неправильная конфигурация, но не могу понять, что не так и как отладить такую ​​вещь. Любой совет будет очень полезным

2 ответа

Проблема была решена. У меня есть веб-приложение, которое использует Gevent и в tasks модуль у меня есть импорт из другого модуля, который имеет monkey.patch_all(), Каким-то образом это мешает бильярду и, следовательно, работнику запускать пул, что приводит к этим последствиям. Во всяком случае, вообще не используйте gevent, imho.

Если вы используете очереди, убедитесь, что хотя бы один из ваших дагов назначен рабочему, то естьAIRFLOW__OPERATORS__DEFAULT_QUEUEстоимость работникаdocker-compose.yamlфайл должен содержатьdefault_args['queue']значение файла dag python.

Другие вопросы по тегам