Получение ожидающих сообщений с помощью Redis Streams и Spring Data

Я использую Redis Streams в своем приложении Spring Boot. В планировщике я хочу регулярно получать все ожидающие сообщения и проверять, как долго они уже обрабатываются, и при необходимости повторно запускать их.

Моя проблема в том, что теперь я могу получать ожидающие сообщения, но я не уверен, как получить полезную нагрузку.

Мой первый подход использовал pending а также rangeоперации. Обратной стороной является то, что totalDeliveryCount не увеличивается сrange - поэтому я не могу использовать метод диапазона

val pendingMessages = stringRedisTemplate.opsForStream<String, Any>().pending(redisStreamName, Consumer.from(redisConsumerGroup, instanceName))
return pendingMessages.filter { pendingMessage ->
    if (pendingMessage.totalDeliveryCount < maxDeliveryAttempts && pendingMessage.elapsedTimeSinceLastDelivery > Duration.ofMillis(pendingTimeout.toLong())) {
            return@filter true
    } else {
        ...
        return@filter false
    }
}.map { //map from PendingMessage::class to a MapRecord with the content
    val map = stringRedisTemplate.opsForStream().range(redisStreamName, Range.just(it.idAsString)) // does not increase totalDeliveryCount !!!
    if (map != null && map.size > 0) { 
        return@map map[0]
    } else {
        return@map null
    }
}.filterNotNull().toList()

Мой второй подход использовал pending а также readоперации. Для операции чтения я могу указать смещение с текущим идентификатором. Проблема в том, что мне возвращаются только те идентификаторы, которые выше указанного.

val pendingMessages = stringRedisTemplate.opsForStream().pending(redisStreamName, Consumer.from(redisConsumerGroup, instanceName))
return pendingMessages.filter { pendingMessage ->
    if (pendingMessage.totalDeliveryCount < maxDeliveryAttempts && pendingMessage.elapsedTimeSinceLastDelivery > Duration.ofMillis(pendingTimeout.toLong())) {
            return@filter true
    } else {
        ...
        return@filter false
    }
}.map { //map from PendingMessage::class to a MapRecord with the content
    val map = stringRedisTemplate.opsForStream<String, Any>()
            .read(it.consumer, StreamReadOptions.empty().count(1),
                    StreamOffset.create(redisStreamName, ReadOffset.from(it.id)))
    if (map != null && map.size > 0 && map[0].id.value == it.idAsString) { // map[0].id.value == it.idAsString does not match
        return@map map[0]
    } else {
        return@map null
    }
}.filterNotNull().toList()

Итак, когда я использую ReadOffset.from('1234-0') Я не получаю сообщение с 1234-0но все после этого сообщения. Есть ли способ получить точное сообщение, а также соблюстиtotalDeliveryCount а также elapsedTimeSinceLastDelivery статистика?

Я использую spring-data-redis 2.3.1.RELEASE

1 ответ

Решение

Сейчас я использую следующий обходной путь, который подходит в большинстве случаев:

return if (id.sequence > 0) {
            "${id.timestamp}-${id.sequence - 1}"
        } else {
            "${id.timestamp - 1}-99999"
        }

Он основан на том факте, что за мс вставляется не более 99999 сообщений.

Другие вопросы по тегам