Приемник потока данных Redis Spring завершается преждевременно
Я использую данные Spring Redis для потребления из потока Redis, используя приемник реактивного потока для прослушивания работы группы потребителей, но заметил, что поток Flux иногда преждевременно закрывается и больше не прослушивает новые сообщения, а поток завершается преждевременно.
Код
StreamReceiverOptions<String, MapRecord<String, String, String>> options = StreamReceiverOptions.builder()
.build();
StreamReceiver.create(reactiveConnFactory, options)
.receiveAutoAck("CONSUMER_GRP", "CONSUMER_ID_1"), StreamOffset.create(
"CONSUMER_STREAM",
ReadOffset.lastConsumed()))
.doOnNext(msg -> LOG.info("Got [{}] message from stream", msg))
.flatMap(msg -> Mono.fromRunnable(() -> process("reactive", msg))
.subscribeOn(streamConsumerExecutor))
.onErrorResume(t -> Flux.empty())
.doOnCancel(() -> LOG.info("Consumer Stream was cancelled"))
.doOnComplete(() -> {
LOG.info("Consumer Stream Completed");
})
.doOnTerminate(() -> {
LOG.info("Consumer Stream terminated");
})
.subscribe();
После некоторого времени чтения сообщений из потока получаем журнал, что "поток потребителя завершен"
версия: 2.2.0.RELEASE
Это ошибка или мне что-то не хватает, может ли кто-нибудь помочь?
ОБНОВИТЬ
Похоже, что для команд redis истекает время ожидания, поскольку я получаю исключение RedisCommandTimeoutException, есть ли способ повторить процесс потоковой передачи при таких ошибках, а не отменить его. Также выяснилось, что это происходит в операции XREADGROUP, хотя при запуске через nodejs redis-cli, выдающая ту же команду, работала нормально?