Сельдерей бесконечная задача, которая слушает очередь

У меня есть задача с сельдереем, которая должна выполняться в бесконечном цикле, прослушивая несколько очередей (не связанных с внутренностями из сельдерея) в RabbitMQ. Когда сообщение извлекается из очереди, эта длительная задача отправляет это сообщение для обработки какой-либо другой задачей.

Как правильно реализовать такой вариант использования в Celery?

Я управляю сельдереем с параллелизмом 3 и флагом Ofair.

Мое текущее наблюдение состоит в том, что через несколько дней эта установка прекращает обработку задач из внутренней очереди сельдерея. Похоже, что эти долго выполняющиеся задачи по какой-то причине перезапускаются, и в конечном итоге все 3 рабочих заняты только этим долгосрочным заданием, поэтому не осталось рабочих для обработки задач из очереди сельдерея.

Я думал о некоторой блокировке на основе файлов, чтобы убедиться, что только один работник может получить блокировку и стать этой долговременной задачей, но не уверен, что это хороший вариант, я думаю, что есть более эффективные решения этой проблемы.

def init_couriers_consumers(self):
    logger.info("lock acquired")
    logger.info("TASK ID: {}".format(init_couriers_consumers.request.id))
    with Connection('amqp://guest:guest@localhost:5672//') as conn:
        couriers_consumer_worker = ConsumerWorker(conn)
        couriers_consumer_worker.run()
        couriers_consumer_worker.should_stop = False
        # cache.set('reboot', False)
        self.retry(countdown=2)


class ConsumerWorker(ConsumerMixin):

    def __init__(self, connection):
        self.connection = connection
        self._create_queues()


    def _create_queues(self):
        from courier.models import Courier
        self.queues = []
        logger.info("create_queues")
        for courier in Courier.objects.filter(user__is_active=True):
            logger.info("create_queue for courier: {}".format(courier.user.username))
            self._create_courier_queues(courier.user.username)

    def _create_courier_queues(self, courier_username):
        self.queues.append(QueuesFactory.get_consumer_order_status_queue(courier_username))
        self.queues.append(QueuesFactory.get_consumer_status_queue(courier_username))
        self.queues.append(QueuesFactory.get_consumer_gps_queue(courier_username))

    def get_consumers(self, Consumer, channel):
        logger.info("Subscribing to queues: {}".format(str(self.queues)))
        return [Consumer(queues=self.queues,
                         callbacks=[self.process_message])]

    def process_message(self, body, message):
        logger.info("process message")
        from courier.api.tasks import process_message_task, error_handler_task
        process_message_task.apply_async((message.delivery_info['routing_key'], message.payload), link_error=error_handler_task.s())
        logger.info("after process message")
        message.ack()

    def on_connection_revived(self):
        logger.info("revived")

    def on_consume_ready(self, connection, channel, consumers, **kwargs):
        logger.info("on consumer ready")

    def on_consume_end(self, connection, channel):
        logger.info("on consume end")

    # def on_iteration(self):
    #     if cache.get('reboot'):
    #         logger.info("SHOULD STOP")
    #         self.should_stop = True
    #         release_lock()

Логи после нового старта:

[2016-11-14 15:47:36,652: INFO/MainProcess] Connected to amqp://guest:**@localhost:5672//
[2016-11-14 15:47:36,665: INFO/MainProcess] mingle: searching for neighbors
[2016-11-14 15:47:37,677: INFO/MainProcess] mingle: all alone
[2016-11-14 15:47:37,692: WARNING/MainProcess] celery@ip-178-216-202-251.e24cloud.com ready.
[2016-11-14 15:47:39,686: INFO/MainProcess] Received task: courier.api.consumers.init_couriers_consumers[couriers_consumer]
[2016-11-14 15:47:39,686: INFO/MainProcess] Received task: courier.api.consumers.init_producer_queues[91d7c307-8eed-4966-83ad-8b001e2459e5]
[2016-11-14 15:47:39,687: INFO/Worker-2] lock acquired
[2016-11-14 15:47:39,688: INFO/Worker-2] TASK ID: couriers_consumer
[2016-11-14 15:47:39,692: INFO/Worker-2] create_queues
[2016-11-14 15:47:40,308: INFO/Worker-2] create_queue for courier: courier1
[2016-11-14 15:47:40,322: INFO/Worker-2] revived
[2016-11-14 15:47:40,322: INFO/Worker-2] Connected to amqp://guest:**@localhost:5672//
[2016-11-14 15:47:40,325: INFO/Worker-2] Subscribing to queues: [<unbound Queue from/courier1/order/status -> <unbound Exchange couriers(direct)> -> from/courier1/order/status>, <unbound Queue from/courier1/status -> <unbound Exchange couriers(direct)> -> from/courier1/order/status>, <unbound Queue from/courier1/gps -> <unbound Exchange couriers(direct)> -> from/courier1/gps>]
[2016-11-14 15:47:40,333: INFO/Worker-2] on consumer ready
[2016-11-14 15:47:40,554: INFO/MainProcess] Task courier.api.consumers.init_producer_queues[91d7c307-8eed-4966-83ad-8b001e2459e5] succeeded in 0.864124746993s: None

Но потом через несколько дней я вижу (grep оживил)

[2016-11-13 05:35:09,502: INFO/Worker-1] revived
[2016-11-14 05:58:17,716: INFO/Worker-3] revived
[2016-11-14 12:33:25,774: INFO/Worker-2] revived

что, вероятно, означает, что каждый работник находится в этой длительной задаче, но не уверен, как это состояние происходит.

0 ответов

Другие вопросы по тегам