Обработка ошибок потребителей в Symfony Messenger / RabbitMQ

Я использую новые компоненты Symfony Messenger 4.1 и RabbitMQ 3.6.10-1 для постановки в очередь и асинхронной отправки уведомлений по электронной почте и SMS из моего веб-приложения Symfony 4.1. Конфигурация моего мессенджера (messenger.yaml) выглядит так:

framework:
    messenger:
        transports:
            amqp: '%env(MESSENGER_TRANSPORT_DSN_NOTIFICATIONS)%'

        routing:
            'App\NotificationBundle\Entity\NotificationQueueEntry': amqp

Когда новое уведомление должно быть отправлено, я ставлю его в очередь следующим образом:

use Symfony\Component\Messenger\MessageBusInterface;
// ...
$notificationQueueEntry = new NotificationQueueEntry();
// [Set notification details such as recipients, subject, and message]
$this->messageBus->dispatch($notificationQueueEntry);

Затем я запускаю потребителя в командной строке следующим образом:

$ bin/console messenger:consume-messages

Я реализовал SendNotificationHandler сервис, где происходит фактическая доставка. Конфигурация сервиса:

App\NotificationBundle\MessageHandler\SendNotificationHandler:
    arguments:
        - '@App\NotificationBundle\Service\NotificationQueueService'
    tags: [ messenger.message_handler ]

И класс:

class SendNotificationHandler
{
    public function __invoke(NotificationQueueEntry $entry): void
    {
        $this->notificationQueueService->sendNotification($entry);
    }
}

До этого момента все работает гладко и уведомления доставляются.

Теперь мой вопрос: может случиться так, что электронная почта или SMS не могут быть доставлены из-за (временного) сбоя сети. В таком случае я хотел бы, чтобы моя система повторила доставку через указанное количество времени, вплоть до указанного максимального количества попыток. Как можно достичь этого?

Я читал об обмене Dead Letter, однако я не смог найти документацию или пример того, как интегрировать это с компонентом Symfony Messenger.

1 ответ

Что вам нужно сделать, это сказать RabbitMQ, что сообщение отклонено, а не подтверждено. По умолчанию мессенджер позаботится об этом внутри AmqpReceiver. Как вы можете видеть, если вы бросите исключение, которое реализует RejectMessageExceptionInterface внутри вашего обработчика сообщение будет автоматически отклонено для вас.

Вы также можете "смоделировать" это поведение с помощью специального промежуточного программного обеспечения. Я создал нечто подобное в небольшом демонстрационном приложении. Механизм состоит из промежуточного программного обеспечения, которое оборачивает (сериализованное) исходное сообщение в новое RetryMessage и отправляет его через пользовательскую шину сообщений в другую очередь, используемую в качестве обмена мертвыми буквами. Затем обработчик этого сообщения распакует RetryMessage (получает исходное сообщение и десериализует его) и передает его по шине по умолчанию:

Увидеть:

Это базовая настройка, которая отклоняет сообщение и позволяет мгновенно его использовать (!). Возможно, вы захотите добавить дополнительную информацию, такую ​​как заголовки для меток времени, при задержке потребления, чтобы улучшить это. Для этого вам следует написать собственный приемник, промежуточное программное обеспечение и / или обработчик.

Другие вопросы по тегам