Prooph, Как мне перенаправить некоторые сообщения через локальную Командную шину и другие с помощью BernardMessageProducer

Некоторое время у меня была командная шина, и я разработал большую часть своего приложения. Теперь их некоторые команды, которые я хочу обрабатывать асинхронно. Я добавил BernardMessageProducer, и все, кажется, работает. Я хотел бы отправлять только команды занавеса на асинхронную шину, а остальные обрабатывать локально.

Возможно ли это и как я могу это сделать. Я уже написал командный маршрутизатор, но после того, как командная шина обработала сообщение.

Спасибо

2 ответа

Из чтения

Если вы хотите настроить шину, которая обрабатывает все сообщения асинхронно, вы можете сделать это, подключив Prooph\ServiceBus\Plugin\MessageProducerPlugin, инициализированный вашим выбранным производителем сообщений, к шине сообщений.

Давайте рассмотрим простой пример с использованием psb-zeromq-продуцента

//app bootstrap
$container = new Container;
$container['config'] = [
    'prooph' => [
        'zeromq_producer' => [
            'dsn' => 'tcp://127.0.0.1:5555', // ZMQ Server Address.
            'persistent_id' => 'example', // ZMQ Persistent ID to keep connections alive between requests.
            'rpc' => false, // Use as Query Bus.
        ]
    ]
];

$factory = \Prooph\ServiceBus\Message\ZeroMQ\Container\ZeroMQMessageProducerFactory;
$zmqProducer = $factory($container);

$commandBus = new \Prooph\ServiceBus\CommandBus();

$messageProducerForwarder = new \Prooph\ServiceBus\Plugin\MessageProducerPlugin($zmqProducer);

$commandBus->utilize($messageProducerForwarder);

$echoText = new ExampleCommand('It works');
$commandBus->dispatch($echoText);

Вы также можете направлять отдельные сообщения производителю сообщений с помощью плагина маршрутизатора сообщений.

Примечание: Prooph\ServiceBus\Plugin\Router\RegexRouter - хороший выбор, если вы хотите обрабатывать все сообщения определенного асинхронного пространства имен.

Я не уверен, как это сделать с помощью prooph (особенно потому, что вы не предоставили никаких примеров кода), но в целом: возможно, хороший подход можно найти в репозитории MessageBus Матиаса Нобака в документации по Command Bus.

Вы можете создать промежуточное программное обеспечение, которое проверяет, например, интерфейс маркера (как в приведенном выше примере):

public function handle($message, callable $next)
{
    if ($message instanceof IsHandledAsynchronously) {
        // handle the message asynchronously using a message queue
        $this->messageQueue->add($message);
    } else {
        // handle the message synchronously, i.e. right-away
        $next($message);
    }
}

Тогда нужно просто пометить вашу команду, позволив ей реализовать правильный интерфейс и явно добавив промежуточное ПО в нужное место в командной шине.

Если у вас несколько командных шин, как подсказывает ваш вопрос. Тогда вы, вероятно, захотите иметь какой-нибудь CommandResolver, который соответствует команде, например, по имени класса для соответствующей командной шины. Снова посмотрите документы Матиаса Нобака, особенно раздел Определение карты обработчика команд в том же документе и DelegatesToMessageHandlerInterface

Другие вопросы по тегам