Получение ожидающих сообщений с помощью Redis Streams и Spring Data
Я использую Redis Streams в своем приложении Spring Boot. В планировщике я хочу регулярно получать все ожидающие сообщения и проверять, как долго они уже обрабатываются, и при необходимости повторно запускать их.
Моя проблема в том, что теперь я могу получать ожидающие сообщения, но я не уверен, как получить полезную нагрузку.
Мой первый подход использовал pending
а также range
операции. Обратной стороной является то, что totalDeliveryCount не увеличивается сrange
- поэтому я не могу использовать метод диапазона
val pendingMessages = stringRedisTemplate.opsForStream<String, Any>().pending(redisStreamName, Consumer.from(redisConsumerGroup, instanceName))
return pendingMessages.filter { pendingMessage ->
if (pendingMessage.totalDeliveryCount < maxDeliveryAttempts && pendingMessage.elapsedTimeSinceLastDelivery > Duration.ofMillis(pendingTimeout.toLong())) {
return@filter true
} else {
...
return@filter false
}
}.map { //map from PendingMessage::class to a MapRecord with the content
val map = stringRedisTemplate.opsForStream().range(redisStreamName, Range.just(it.idAsString)) // does not increase totalDeliveryCount !!!
if (map != null && map.size > 0) {
return@map map[0]
} else {
return@map null
}
}.filterNotNull().toList()
Мой второй подход использовал pending
а также read
операции. Для операции чтения я могу указать смещение с текущим идентификатором. Проблема в том, что мне возвращаются только те идентификаторы, которые выше указанного.
val pendingMessages = stringRedisTemplate.opsForStream().pending(redisStreamName, Consumer.from(redisConsumerGroup, instanceName))
return pendingMessages.filter { pendingMessage ->
if (pendingMessage.totalDeliveryCount < maxDeliveryAttempts && pendingMessage.elapsedTimeSinceLastDelivery > Duration.ofMillis(pendingTimeout.toLong())) {
return@filter true
} else {
...
return@filter false
}
}.map { //map from PendingMessage::class to a MapRecord with the content
val map = stringRedisTemplate.opsForStream<String, Any>()
.read(it.consumer, StreamReadOptions.empty().count(1),
StreamOffset.create(redisStreamName, ReadOffset.from(it.id)))
if (map != null && map.size > 0 && map[0].id.value == it.idAsString) { // map[0].id.value == it.idAsString does not match
return@map map[0]
} else {
return@map null
}
}.filterNotNull().toList()
Итак, когда я использую ReadOffset.from('1234-0')
Я не получаю сообщение с 1234-0
но все после этого сообщения. Есть ли способ получить точное сообщение, а также соблюстиtotalDeliveryCount
а также elapsedTimeSinceLastDelivery
статистика?
Я использую spring-data-redis 2.3.1.RELEASE
1 ответ
Сейчас я использую следующий обходной путь, который подходит в большинстве случаев:
return if (id.sequence > 0) {
"${id.timestamp}-${id.sequence - 1}"
} else {
"${id.timestamp - 1}-99999"
}
Он основан на том факте, что за мс вставляется не более 99999 сообщений.