Используйте Messenger, чтобы прочитать сообщение в очереди, не отправленное с Messenger
Я пытаюсь прочитать сообщение в очереди (в RabbitMQ), которое не было отправлено с Symfony Messenger. Похоже, что Messenger добавляет несколько заголовков, таких как
headers:
type: App\Message\Transaction
но при чтении внешних сообщений этот заголовок не существует.
Итак, есть ли способ сообщить Messenger, что каждое сообщение в очереди A должно рассматриваться как тип сообщения Transaction
?
Что у меня сегодня есть:
framework:
messenger:
transports:
# Uncomment the following line to enable a transport named "amqp"
amqp:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
exchange:
name: messages
type: direct
queue:
name: queue_messages
routing:
# Route your messages to the transports
'App\Message\Transaction': amqp
и что я хотел бы добавить что-то вроде:
routing:
# Route your messages to the transports
amqp: 'App\Message\Transaction'
1 ответ
Райан Уивер ответил на аналогичный вопрос о слабости Symfony:
Вам понадобится специальный сериализатор для мессенджера, если сообщения не происходят из мессенджера:)
1) Вы создаете настраиваемую сериализацию (реализует SerializerInterface из Messenger) и настраиваете ее в конфигурации Messenger
2) Каким-то образом в этом сериализаторе вы берете JSON и превращаете его в какой-то объект "сообщения", который есть в вашем коде. Как вы это сделаете, зависит только от вас - вам нужно каким-то образом посмотреть на ваш JSON и выяснить, к какому классу сообщений он должен быть привязан. Затем вы можете создать этот объект вручную и заполнить данные или использовать сериализатор Symfony. Оберните это в Конверт прежде, чем возвратить это
3) Поскольку ваш сериализатор теперь возвращает объект "сообщение", если какой-то вид, Messenger использует свою обычную логику, чтобы найти обработчики для этого сообщения и выполнить их
Я сделал быструю реализацию для своих нужд, чтобы вы соответствовали вашей бизнес-логике:
1 - Создать Serializer
который реализует SerializerInterface
:
// I keeped the default serializer, and just override his decode method.
/**
* {@inheritdoc}
*/
public function decode(array $encodedEnvelope): Envelope
{
if (empty($encodedEnvelope['body']) || empty($encodedEnvelope['headers'])) {
throw new InvalidArgumentException('Encoded envelope should have at least a "body" and some "headers".');
}
if (empty($encodedEnvelope['headers']['action'])) {
throw new InvalidArgumentException('Encoded envelope does not have an "action" header.');
}
// Call a factory to return the Message Class associate with the action
if (!$messageClass = $this->messageFactory->getMessageClass($encodedEnvelope['headers']['action'])) {
throw new InvalidArgumentException(sprintf('"%s" is not a valid action.', $encodedEnvelope['headers']['action']));
}
// ... keep the default Serializer logic
return new Envelope($message, ...$stamps);
}
2 - Получить право Message
используя фабрику:
class MessageFactory
{
/**
* @param string $action
* @return string|null
*/
public function getMessageClass(string $action)
{
switch($action){
case ActionConstants::POST_MESSAGE :
return PostMessage::class ;
default:
return null;
}
}
}
3) Настройте новый настраиваемый сериализатор для мессенджера:
framework:
messenger:
serializer: 'app.my_custom_serializer'
Я попытаюсь пойти немного дальше и найду способ "соединить" очередь напрямую, сообщу вам.