RabbitMQ + Reactor, отправляющий ACK после передачи сообщения?
Я пытаюсь выяснить, является ли Project-Reactor приемлемым вариантом для моего варианта использования.
Мне нужна высокая согласованность в моем приложении, я хочу:
- Получить сообщение от RabbitMQ
- Сделать некоторые манипуляции / преобразование
- Поместите сообщение в (другую) очередь RabbitMQ
- Отправить ACK для исходного сообщения
С обычной библиотекой Java RabbitMQ это выглядело бы примерно так:
var connectionFactory = new ConnectionFactory();
var connection = connectionFactory.newConnection();
var channel = connection.createChannel();
channel.txSelect();
channel.queueDeclare("queue.A", true, false, false, null);
channel.queueDeclare("queue.B", true, false, false, null);
channel.basicConsume("queue.A", false, (tag, delivery) ->
{
String data = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("Received: " + data);
channel.basicPublish("", "queue.B", null, data.getBytes(StandardCharsets.UTF_8));
channel.txCommit();
System.out.println("Sending ACK");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
channel.txCommit();
}, tag -> System.err.println("Tag " + tag + " failed"));
Это приятно печатает
Received: DATA
Sending ACK
Используя Reactor-RabbitMQ, я пришел к следующему:
var options = new ReceiverOptions();
var receiver = RabbitFlux.createReceiver(options);
var sender = RabbitFlux.createSender();
sender.declare(QueueSpecification.queue("queue.A")).block();
sender.declare(QueueSpecification.queue("queue.B")).block();
sender
.send(receiver.consumeManualAck("queue.A")
.doOnError(ex -> ex.printStackTrace())
.doOnEach(s ->
{
s.get().ack();
print("ACK");
})
.map(ad ->
{
print("MAP");
return new OutboundMessage("", "queue.B", ad.getBody());
}))
.doOnEach(v ->
{
print("SUB");
})
.subscribe();
Тем не менее, это печатает
ACK
MAP
Это не правильный порядок, очевидно, потому что я делаю doOnEach
до map
, Но AcknowledgableDelivery
больше не доступен после того, как я сопоставил его OutboundMessage
для отправителя.
Обратите внимание, что SUB
никогда не печатается. Наверное, потому что send
подписка является непрерывной и никогда не достигает Mono<Void>
прекращено состояние.
Мои знания Reactor не так обширны, поэтому я чувствую, что мне не хватает чего-то, что я мог бы использовать здесь. Можно ли сделать это аккуратно с Reactor или я должен забыть об этом?
Сноска: я использую var
потому что я в Java 11, и операторы print как способ количественно определить порядок, в котором выполняются вещи.
1 ответ
Я не думаю, что это можно подтвердить внутри метода отправки. Вместо этого вы можете попробовать:
receiver.consumeManualAck("queue.A")
.flatMap(ad -> {
Mono<OutboundMessage> outboundMessageMono =
Mono.just(new OutboundMessage("", "queue.B", ad.getBody()));
return sender.send(outboundMessageMono)
.doFinally(signalType -> {
if (signalType == SignalType.ON_COMPLETE) {
ad.ack();
} else {
ad.nack(false);
}
});
});