Сельдерей пытается завершить работу, подняв SystemExit в сигнале task_postrun, но всегда зависает и основной процесс никогда не завершается

Я пытаюсь завершить основной процесс сельдерея с помощью raisin SystemExit() в сигнале task_postrun. Сигнал срабатывает очень хорошо, и возникает исключение, но рабочий никогда не выходит полностью и просто зависает там.

КАК Я ДЕЛАЮ ЭТУ РАБОТУ?

Я забыл где-то настройки?

Ниже приведен код, который я использую для работника (worker.py):

from celery import Celery
from celery import signals

app = Celery('tasks',
    set_as_current = True,
    broker='amqp://guest@localhost//',
    backend="mongodb://localhost//",
)

app.config_from_object({
    "CELERYD_MAX_TASKS_PER_CHILD": 1,
    "CELERYD_POOL": "solo",
    "CELERY_SEND_EVENTS": True,
    "CELERYD_CONCURRENCY": 1,
    "CELERYD_PREFETCH_MULTIPLIER": 1,
})

def shutdown_worker(**kwargs):
    print("SHUTTING DOWN WORKER (RAISING SystemExit)")
    raise SystemExit()

import tasks

signals.task_postrun.connect(shutdown_worker)

print("STARTING WORKER")
app.worker_main()
print("WORKER EXITED!")

и ниже приведен код для tasks.py:

from celery import Celery,task
from celery.signals import task_postrun
import time

from celery.task import Task

class Blah(Task):
    track_started = True
    acks_late = False

    def run(config, kwargs):
        time.sleep(5)
        return "SUCCESS FROM BLAH"

def get_app():
    celery = Celery('tasks',
        broker='amqp://guest@localhost//',
        backend="mongodb://localhost//"
    )
    return celery

Чтобы проверить это, первым делом я запускаю рабочий код (python worker.py), тогда я ставлю задачу в очередь так:

>>> import tasks
>>> results = []
>>> results.append(tasks.Blah.delay({}))

Вывод, который я вижу от работника, таков:

STARTING WORKER

 -------------- celery@hostname v3.0.9 (Chiastic Slide)
---- **** ----- 
--- * ***  * -- [Configuration]
-- * - **** --- . broker:      amqp://guest@localhost:5672//
- ** ---------- . app:         tasks:0x26f3050
- ** ---------- . concurrency: 1 (solo)
- ** ---------- . events:      ON
- ** ---------- 
- *** --- * --- [Queues]
-- ******* ---- . celery:      exchange:celery(direct) binding:celery
--- ***** ----- 

[2012-11-06 15:59:16,761: WARNING/MainProcess] celery@hostname has started.
[2012-11-06 15:59:21,785: WARNING/MainProcess] SHUTTING DOWN WORKER (RAISING SystemExit)

Я ожидал, что код Python вернется из звонка app.worker_main() а затем распечатать WORKER EXITED и затем для процесса, чтобы завершиться полностью. Этого никогда не происходит, и рабочий процесс должен быть kill -KILL {PID} Он не может использовать другие задачи.

Я думаю, что мой главный вопрос будет:

КАК ПОЛУЧИТЬ ВОЗВРАТ КОДА app.worker_main()?

Я хотел бы иметь возможность полностью перезапустить рабочий процесс (после завершения процесса) X количество заданий выполнено

ОБНОВЛЕНИЕ Я разобрался, на чем висит рабочий - рабочий (WorkController) висит на вызове self.stop после того, как ловит SystemExit исключение.

1 ответ

Решение

Ненавижу, когда я отвечаю на свой вопрос.

Anywhoo, он блокировал вызов соединения на Mediator компонент внутри WorkController (звонки stop() на Mediator компонент, внутри стоп, это joinс).

Я избавился от Mediator компонент, отключив все ограничения скорости (это должно быть по умолчанию, но это не по какой-то причине).

Вы можете отключить все ограничения скорости с помощью настройки:

CELERY_DISABLE_ALL_RATE_LIMITS: True

Надеюсь, что это поможет кому-то еще в будущем.

мир

Другие вопросы по тегам