Сельдерей не может найти работника
Я использую сельдерей с такой конфигурацией
default_exchange = Exchange('default', type='direct')
latest_exchange = Exchange('latest', type='direct')
shared_celery_config = {
'BROKER_URL': 'redis://localhost:6379/0',
'CELERY_RESULT_BACKEND': 'redis://localhost:6379/0',
'CELERY_DEFAULT_EXCHANGE': 'default',
'CELERY_DEFAULT_ROUTING_KEY': 'default',
'CELERY_DEFAULT_QUEUE': 'default',
'CELERY_QUEUES': (
Queue('default', default_exchange, routing_key='default'),
Queue('latest', latest_exchange, routing_key='latest'),
),
}
celery = Celery('tasks', config_source=shared_celery_config)
Но когда я создаю задание, никто из рабочих не потребляет его, и ничего не происходит. Я начинаю работников с: celery worker -A tasks --loglevel=debug --hostname=worker1
, Я вижу их из ps aux | grep celery
вывод, но при выполнении какой-то команды, чтобы получить статистику вроде celery -A tasks status
Я получаю следующее сообщение об ошибке Error: No nodes replied within time constraint.
, Поэтому все задачи находятся в состоянии ОЖИДАНИЯ. Я считаю, что это какая-то неправильная конфигурация, но не могу понять, что не так и как отладить такую вещь. Любой совет будет очень полезным
2 ответа
Проблема была решена. У меня есть веб-приложение, которое использует Gevent и в tasks
модуль у меня есть импорт из другого модуля, который имеет monkey.patch_all()
, Каким-то образом это мешает бильярду и, следовательно, работнику запускать пул, что приводит к этим последствиям. Во всяком случае, вообще не используйте gevent, imho.
Если вы используете очереди, убедитесь, что хотя бы один из ваших дагов назначен рабочему, то естьAIRFLOW__OPERATORS__DEFAULT_QUEUE
стоимость работникаdocker-compose.yaml
файл должен содержатьdefault_args['queue']
значение файла dag python.