Поддерживается ли XClaim / претензия в данных Redis Spring - ReactiveRedisOperations.opsForStream()

Чтобы создать надежную очередь сообщений с использованием потоков redis, я использую зависимость spring-boot-starter-data-redis-reactive и lettuce для обработки сообщений из потока redis. Хотя я могу добавлять, читать, подтверждать и удалять сообщения через API, доступный вReactiveRedisOperations.opsForStream() в виде группы потребителей я не смог найти API для запроса ожидающего сообщения, которое не подтверждается в течение 5 минут, хотя оно доступно в this.reactiveRedisConnectionFactory .getReactiveConnection() .streamCommands() .xClaim(). Но я не хочу иметь шаблонный код для управления исключениями, сериализацией и т. Д. Есть ли способ запросить сообщение с помощьюReactiveRedisOperations.opsForStream()

https://docs.spring.io/spring-data/redis/docs/current/api/org/springframework/data/redis/core/ReactiveStreamOperations.html

1 ответ

Решение

Без redis весенних данных, используя клиентскую библиотеку салата напрямую, я могу получить ожидающее сообщение, а также запросить сообщение, как показано ниже

public Flux<PendingMessage> getPendingMessages(PollMessage pollMessage, String queueName) {
    Predicate<PendingMessage> poisonMessage = pendingMessage -> (pendingMessage.getTotalDeliveryCount()<=maxRetries);
    Predicate<PendingMessage> nackMessage = pendingMessage -> (pendingMessage.getElapsedTimeSinceLastDelivery().compareTo(Duration.ofMillis(ackTimeout)) > 0 );

    return statefulRedisClusterConnection.reactive()
        .xpending(queueName, pollMessage.getConsumerGroupName(), Range.unbounded(), Limit.from(1000))
        .collectList()
        .map((it) -> ((PendingMessages)PENDING_MESSAGES_CONVERTER
                .apply(it, pollMessage.getConsumerGroupName()))
                .withinRange(org.springframework.data.domain.Range.unbounded()))
            .flatMapMany(Flux::fromIterable)
            .filter(nackMessage)
            .filter(poisonMessage)
            .limitRequest(pollMessage.getBatchSize());
}

Чтобы запросить сообщение, я снова использовал API, доступный в библиотеке салата.

public Flux<StreamMessage<String, String>> claimMessage(PendingMessage pendingMessage, String queueName, String groupName, String serviceName) {
    return statefulRedisClusterConnection.reactive()
            .xclaim(queueName, Consumer.from(groupName, serviceName), 0, pendingMessage.getIdAsString());
}

На данный момент получение ожидающего сообщения от redis через spring-data имеет проблемы, поэтому я использовал библиотеку салата напрямую, чтобы получить ожидающее сообщение и потребовать его.

https://jira.spring.io/browse/DATAREDIS-1160

Другие вопросы по тегам