Мост RabbitMQ слушателя к Flux
У меня есть приложение Reactive Spring Boot, получающее сообщения от RabbitMQ и сохраняющее их в репозитории (MongoDB):
@RabbitListener(...)
public void processMessage(Message message) {
repository.persist(message).subscribe();
}
Предполагая, что за короткий период времени поступит несколько сообщений, этот код может привести к исчерпанию настроенного пула соединений в базе данных. Если бы я получил сообщения вFlux
, Я мог бы concatMap()
их в базу данных или вставьте в ведра из n документов.
Вот почему я попытался реализовать мост данного слушателя RabbitMQ к самоуправляемому Flux:
@Component
public class QueueListenerController {
private final MyMongoRepository repository;
private final FluxProcessor<Message, Message> fluxProcessor;
private final FluxSink<Message> fluxSink;
public QueueListenerController(MyMongoRepository repository) {
this.repository = repository;
this.fluxProcessor = DirectProcessor.<Message>create().serialize();
this.fluxSink = fluxProcessor.sink();
}
@PostConstruct
private void postConstruct() {
fluxProcessor.concatMap(repository::persist)
.subscribe();
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "my-queue", durable = "true", autoDelete = "false"),
exchange = @Exchange(value = "amq.direct", durable = "true", autoDelete = "false")
))
public void processMessage(Message message) {
fluxSink.next(message);
}
}
Это работает локально и в течение определенного периода времени, но через некоторое время (я ожидаю 12-24 часа) он перестает сохранять сообщения в базе данных, поэтому я совершенно уверен, что делаю что-то не так.
Каким будет правильный способ преобразования входящих сообщений RabbitMQ в Flux
сообщений?