Prooph, Как мне перенаправить некоторые сообщения через локальную Командную шину и другие с помощью BernardMessageProducer
Некоторое время у меня была командная шина, и я разработал большую часть своего приложения. Теперь их некоторые команды, которые я хочу обрабатывать асинхронно. Я добавил BernardMessageProducer, и все, кажется, работает. Я хотел бы отправлять только команды занавеса на асинхронную шину, а остальные обрабатывать локально.
Возможно ли это и как я могу это сделать. Я уже написал командный маршрутизатор, но после того, как командная шина обработала сообщение.
Спасибо
2 ответа
Из чтения
Если вы хотите настроить шину, которая обрабатывает все сообщения асинхронно, вы можете сделать это, подключив Prooph\ServiceBus\Plugin\MessageProducerPlugin, инициализированный вашим выбранным производителем сообщений, к шине сообщений.
Давайте рассмотрим простой пример с использованием psb-zeromq-продуцента
//app bootstrap
$container = new Container;
$container['config'] = [
'prooph' => [
'zeromq_producer' => [
'dsn' => 'tcp://127.0.0.1:5555', // ZMQ Server Address.
'persistent_id' => 'example', // ZMQ Persistent ID to keep connections alive between requests.
'rpc' => false, // Use as Query Bus.
]
]
];
$factory = \Prooph\ServiceBus\Message\ZeroMQ\Container\ZeroMQMessageProducerFactory;
$zmqProducer = $factory($container);
$commandBus = new \Prooph\ServiceBus\CommandBus();
$messageProducerForwarder = new \Prooph\ServiceBus\Plugin\MessageProducerPlugin($zmqProducer);
$commandBus->utilize($messageProducerForwarder);
$echoText = new ExampleCommand('It works');
$commandBus->dispatch($echoText);
Вы также можете направлять отдельные сообщения производителю сообщений с помощью плагина маршрутизатора сообщений.
Примечание: Prooph\ServiceBus\Plugin\Router\RegexRouter - хороший выбор, если вы хотите обрабатывать все сообщения определенного асинхронного пространства имен.
Я не уверен, как это сделать с помощью prooph (особенно потому, что вы не предоставили никаких примеров кода), но в целом: возможно, хороший подход можно найти в репозитории MessageBus Матиаса Нобака в документации по Command Bus.
Вы можете создать промежуточное программное обеспечение, которое проверяет, например, интерфейс маркера (как в приведенном выше примере):
public function handle($message, callable $next)
{
if ($message instanceof IsHandledAsynchronously) {
// handle the message asynchronously using a message queue
$this->messageQueue->add($message);
} else {
// handle the message synchronously, i.e. right-away
$next($message);
}
}
Тогда нужно просто пометить вашу команду, позволив ей реализовать правильный интерфейс и явно добавив промежуточное ПО в нужное место в командной шине.
Если у вас несколько командных шин, как подсказывает ваш вопрос. Тогда вы, вероятно, захотите иметь какой-нибудь CommandResolver, который соответствует команде, например, по имени класса для соответствующей командной шины. Снова посмотрите документы Матиаса Нобака, особенно раздел Определение карты обработчика команд в том же документе и DelegatesToMessageHandlerInterface