Используйте Messenger, чтобы прочитать сообщение в очереди, не отправленное с Messenger

Я пытаюсь прочитать сообщение в очереди (в RabbitMQ), которое не было отправлено с Symfony Messenger. Похоже, что Messenger добавляет несколько заголовков, таких как

headers: 
    type: App\Message\Transaction

но при чтении внешних сообщений этот заголовок не существует.

Итак, есть ли способ сообщить Messenger, что каждое сообщение в очереди A должно рассматриваться как тип сообщения Transaction?

Что у меня сегодня есть:

framework:
    messenger:
        transports:
            # Uncomment the following line to enable a transport named "amqp"
            amqp:
                dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
                options:
                    exchange:
                        name: messages
                        type: direct
                    queue:
                        name: queue_messages

        routing:
            # Route your messages to the transports
             'App\Message\Transaction': amqp

и что я хотел бы добавить что-то вроде:

        routing:
            # Route your messages to the transports
             amqp: 'App\Message\Transaction'

1 ответ

Решение

Райан Уивер ответил на аналогичный вопрос о слабости Symfony:

Вам понадобится специальный сериализатор для мессенджера, если сообщения не происходят из мессенджера:)

1) Вы создаете настраиваемую сериализацию (реализует SerializerInterface из Messenger) и настраиваете ее в конфигурации Messenger

2) Каким-то образом в этом сериализаторе вы берете JSON и превращаете его в какой-то объект "сообщения", который есть в вашем коде. Как вы это сделаете, зависит только от вас - вам нужно каким-то образом посмотреть на ваш JSON и выяснить, к какому классу сообщений он должен быть привязан. Затем вы можете создать этот объект вручную и заполнить данные или использовать сериализатор Symfony. Оберните это в Конверт прежде, чем возвратить это

3) Поскольку ваш сериализатор теперь возвращает объект "сообщение", если какой-то вид, Messenger использует свою обычную логику, чтобы найти обработчики для этого сообщения и выполнить их


Я сделал быструю реализацию для своих нужд, чтобы вы соответствовали вашей бизнес-логике:

1 - Создать Serializer который реализует SerializerInterface:


   // I keeped the default serializer, and just override his decode method.

   /**
     * {@inheritdoc}
     */
    public function decode(array $encodedEnvelope): Envelope
    {
        if (empty($encodedEnvelope['body']) || empty($encodedEnvelope['headers'])) {
            throw new InvalidArgumentException('Encoded envelope should have at least a "body" and some "headers".');
        }

        if (empty($encodedEnvelope['headers']['action'])) {
            throw new InvalidArgumentException('Encoded envelope does not have an "action" header.');
        }

        // Call a factory to return the Message Class associate with the action
        if (!$messageClass = $this->messageFactory->getMessageClass($encodedEnvelope['headers']['action'])) {
            throw new InvalidArgumentException(sprintf('"%s" is not a valid action.', $encodedEnvelope['headers']['action']));
        }

        // ... keep the default Serializer logic

        return new Envelope($message, ...$stamps);
    }

2 - Получить право Message используя фабрику:

class MessageFactory
{
    /**
     * @param string $action
     * @return string|null
     */
    public function getMessageClass(string $action)
    {
        switch($action){
            case ActionConstants::POST_MESSAGE :
                return PostMessage::class ;
            default:
                return null;
        }
    }
}

3) Настройте новый настраиваемый сериализатор для мессенджера:

framework:
  messenger:
    serializer: 'app.my_custom_serializer'

Я попытаюсь пойти немного дальше и найду способ "соединить" очередь напрямую, сообщу вам.

Другие вопросы по тегам