Приемник потока данных Redis Spring завершается преждевременно

Я использую данные Spring Redis для потребления из потока Redis, используя приемник реактивного потока для прослушивания работы группы потребителей, но заметил, что поток Flux иногда преждевременно закрывается и больше не прослушивает новые сообщения, а поток завершается преждевременно.

Код

StreamReceiverOptions<String, MapRecord<String, String, String>> options = StreamReceiverOptions.builder() 
            .build();
 StreamReceiver.create(reactiveConnFactory, options)
            .receiveAutoAck("CONSUMER_GRP", "CONSUMER_ID_1"), StreamOffset.create(
                        "CONSUMER_STREAM",
                        ReadOffset.lastConsumed()))
            .doOnNext(msg -> LOG.info("Got [{}] message from stream", msg))
            .flatMap(msg -> Mono.fromRunnable(() -> process("reactive", msg))
                  .subscribeOn(streamConsumerExecutor))
            .onErrorResume(t -> Flux.empty())
            .doOnCancel(() -> LOG.info("Consumer Stream was cancelled"))
            .doOnComplete(() -> {
               LOG.info("Consumer Stream Completed");
            })
            .doOnTerminate(() -> {
               LOG.info("Consumer Stream terminated");
            }) 
            .subscribe();


После некоторого времени чтения сообщений из потока получаем журнал, что "поток потребителя завершен"

версия: 2.2.0.RELEASE

Это ошибка или мне что-то не хватает, может ли кто-нибудь помочь?

ОБНОВИТЬ

Похоже, что для команд redis истекает время ожидания, поскольку я получаю исключение RedisCommandTimeoutException, есть ли способ повторить процесс потоковой передачи при таких ошибках, а не отменить его. Также выяснилось, что это происходит в операции XREADGROUP, хотя при запуске через nodejs redis-cli, выдающая ту же команду, работала нормально?

0 ответов

Другие вопросы по тегам