Spring Data Redis Streams (Reactive) - каков правильный способ обработки ошибок?

Я использую Redis Streams с Spring Data Redis 2.2.4. Одна вещь, которую я хочу выяснить, - это правильный способ обработки ошибок.

Для модели без реакции мы устанавливаем ErrorHandler и EancelSubscriptionOnError на StreamReadRequest. Затем мы можем контролировать, какое исключение должно отменить подписку на Streams.

Пример:

        StreamReadRequest<String> streamReadRequest = StreamReadRequest.builder(offset)
            .errorHandler(streamListener.getErrorHandler())
            .cancelOnError(e -> false)
            .consumer(consumer)
            .autoAck(true)
            .build();

Но для реактивной модели я не нахожу API обработки ошибок для StreamReceiver. Когда возникает исключение из нашего обработчика бизнес-сообщений. Подписка на Redis Streams будет отменена, и больше сообщений не будет. Поэтому я должен убедиться, что в моем коде бизнес-логики нет исключений. См. Пример ниже:

    public void init() {

        StreamOffset<String> offset = StreamOffset.create(STREAM_NAME, ReadOffset.lastConsumed());

        receiver.receiveAutoAck(consumer, offset)
            .flatMap(this::onMessage)
            .subscribe();

    }

    private Mono<Long> onMessage(ObjectRecord<String, MyEvent> message) {
        try {
            return redisTemplate.opsForStream().acknowledge(groupName, message);
        } catch (Exception e) {
            return Mono.just(-1L);
        }

    }

0 ответов

Другие вопросы по тегам