Сельдерей пытается завершить работу, подняв SystemExit в сигнале task_postrun, но всегда зависает и основной процесс никогда не завершается
Я пытаюсь завершить основной процесс сельдерея с помощью raisin SystemExit() в сигнале task_postrun. Сигнал срабатывает очень хорошо, и возникает исключение, но рабочий никогда не выходит полностью и просто зависает там.
КАК Я ДЕЛАЮ ЭТУ РАБОТУ?
Я забыл где-то настройки?
Ниже приведен код, который я использую для работника (worker.py):
from celery import Celery
from celery import signals
app = Celery('tasks',
set_as_current = True,
broker='amqp://guest@localhost//',
backend="mongodb://localhost//",
)
app.config_from_object({
"CELERYD_MAX_TASKS_PER_CHILD": 1,
"CELERYD_POOL": "solo",
"CELERY_SEND_EVENTS": True,
"CELERYD_CONCURRENCY": 1,
"CELERYD_PREFETCH_MULTIPLIER": 1,
})
def shutdown_worker(**kwargs):
print("SHUTTING DOWN WORKER (RAISING SystemExit)")
raise SystemExit()
import tasks
signals.task_postrun.connect(shutdown_worker)
print("STARTING WORKER")
app.worker_main()
print("WORKER EXITED!")
и ниже приведен код для tasks.py:
from celery import Celery,task
from celery.signals import task_postrun
import time
from celery.task import Task
class Blah(Task):
track_started = True
acks_late = False
def run(config, kwargs):
time.sleep(5)
return "SUCCESS FROM BLAH"
def get_app():
celery = Celery('tasks',
broker='amqp://guest@localhost//',
backend="mongodb://localhost//"
)
return celery
Чтобы проверить это, первым делом я запускаю рабочий код (python worker.py
), тогда я ставлю задачу в очередь так:
>>> import tasks
>>> results = []
>>> results.append(tasks.Blah.delay({}))
Вывод, который я вижу от работника, таков:
STARTING WORKER
-------------- celery@hostname v3.0.9 (Chiastic Slide)
---- **** -----
--- * *** * -- [Configuration]
-- * - **** --- . broker: amqp://guest@localhost:5672//
- ** ---------- . app: tasks:0x26f3050
- ** ---------- . concurrency: 1 (solo)
- ** ---------- . events: ON
- ** ----------
- *** --- * --- [Queues]
-- ******* ---- . celery: exchange:celery(direct) binding:celery
--- ***** -----
[2012-11-06 15:59:16,761: WARNING/MainProcess] celery@hostname has started.
[2012-11-06 15:59:21,785: WARNING/MainProcess] SHUTTING DOWN WORKER (RAISING SystemExit)
Я ожидал, что код Python вернется из звонка app.worker_main()
а затем распечатать WORKER EXITED
и затем для процесса, чтобы завершиться полностью. Этого никогда не происходит, и рабочий процесс должен быть kill -KILL {PID}
Он не может использовать другие задачи.
Я думаю, что мой главный вопрос будет:
КАК ПОЛУЧИТЬ ВОЗВРАТ КОДА app.worker_main()
?
Я хотел бы иметь возможность полностью перезапустить рабочий процесс (после завершения процесса) X
количество заданий выполнено
ОБНОВЛЕНИЕ Я разобрался, на чем висит рабочий - рабочий (WorkController
) висит на вызове self.stop
после того, как ловит SystemExit
исключение.
1 ответ
Ненавижу, когда я отвечаю на свой вопрос.
Anywhoo, он блокировал вызов соединения на Mediator
компонент внутри WorkController
(звонки stop()
на Mediator
компонент, внутри стоп, это join
с).
Я избавился от Mediator
компонент, отключив все ограничения скорости (это должно быть по умолчанию, но это не по какой-то причине).
Вы можете отключить все ограничения скорости с помощью настройки:
CELERY_DISABLE_ALL_RATE_LIMITS: True
Надеюсь, что это поможет кому-то еще в будущем.
мир