Реактивное программирование с Reactor и RabbitMQ

Недавно я написал демонстрационную программу для запуска реактивного программирования с комбинацией Reactor и RabbitMQ. Это мой демонстрационный код:

public class FluxWithRabbitMQDemo {

private static final String QUEUE = "demo_thong";

private final reactor.rabbitmq.Sender sender;
private final Receiver receiver;

public FluxWithRabbitMQDemo() {
    this.sender = ReactorRabbitMq.createSender();
    this.receiver = ReactorRabbitMq.createReceiver();
}

public void run(int count) {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.useNio();
    SenderOptions senderOptions =  new SenderOptions()
            .connectionFactory(connectionFactory)
            .resourceCreationScheduler(Schedulers.elastic());

    reactor.rabbitmq.Sender sender = ReactorRabbitMq.createSender(senderOptions);

    Mono<AMQP.Queue.DeclareOk> queueDeclaration = sender.declareQueue(QueueSpecification.queue(QUEUE));
    Flux<Delivery> messages = receiver.consumeAutoAck(QUEUE);
    queueDeclaration.thenMany(messages).subscribe(m->System.out.println("Get message "+ new String(m.getBody())));



    Flux<OutboundMessageResult> dataStream = sender.sendWithPublishConfirms(Flux.range(1, count)
            .filter(m -> !m.equals(10))
            .parallel()
            .runOn(Schedulers.parallel())
            .doOnNext(i->System.out.println("Message  " + i + " run on thread "+Thread.currentThread().getId()))
            .map(i -> new OutboundMessage("", QUEUE, ("Message " + i).getBytes())));

    sender.declareQueue(QueueSpecification.queue(QUEUE))
            .thenMany(dataStream)
            .doOnError(e -> System.out.println("Send failed"+ e))
            .subscribe(m->{
                if (m!= null){
                    System.out.println("Sent successfully message "+new String(m.getOutboundMessage().getBody()));
                }
            });

    try {
        Thread.sleep(20000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

}

public static void main(String[] args) throws Exception {
    int count = 20;
    FluxWithRabbitMQDemo sender = new FluxWithRabbitMQDemo();
    sender.run(count);
}

} Я ожидал, что после того, как Flux выпустит элемент, Отправитель должен отправить его в RabbitMQ, а после получения RabbitMQ Получатель должен получить его. Но все произошло последовательно, и это результат, который я получил

Message  3 run on thread 25
Message  4 run on thread 26
Message  8 run on thread 26
Message  13 run on thread 26
Message  17 run on thread 26
Message  2 run on thread 24
Message  1 run on thread 23
Message  6 run on thread 24
Message  5 run on thread 23
Message  9 run on thread 23
Message  14 run on thread 23
Message  18 run on thread 23
Message  11 run on thread 24
Message  15 run on thread 24
Message  19 run on thread 24
Message  7 run on thread 25
Message  12 run on thread 25
Message  16 run on thread 25
Message  20 run on thread 25
Sent successfully message Message 3
Sent successfully message Message 1
Sent successfully message Message 2
Sent successfully message Message 4
Sent successfully message Message 5
Sent successfully message Message 6
Sent successfully message Message 8
Sent successfully message Message 9
Sent successfully message Message 11
Sent successfully message Message 13
Sent successfully message Message 14
Sent successfully message Message 15
Sent successfully message Message 17
Sent successfully message Message 18
Sent successfully message Message 19
Sent successfully message Message 7
Sent successfully message Message 12
Sent successfully message Message 16
Sent successfully message Message 20
Get message Message 3
Get message Message 1
Get message Message 2
Get message Message 4
Get message Message 5
Get message Message 6
Get message Message 8
Get message Message 9
Get message Message 11
Get message Message 13
Get message Message 14
Get message Message 15
Get message Message 17
Get message Message 18
Get message Message 19
Get message Message 7
Get message Message 12
Get message Message 16
Get message Message 20

Я не знаю, что делать с моим кодом для достижения ожидаемых результатов. Кто-нибудь может мне помочь? Спасибо за продвижение!!!

1 ответ

Решение

Сообщения генерируются слишком быстро. Чтобы увидеть чередование, в dataStream добавлять

.doOnNext(i->Thread.sleep(10))

Другие вопросы по тегам