Reactive-Kafka: как приостановить работу потребителя в случае исключения и повторить запрос по требованию
Я уже задавал этот вопрос в группах Google, но пока не получил ответа. Так что постить это здесь для другой аудитории.
Мы используем Reactive-Kafka для нашего приложения. У нас есть сценарий, описанный ниже, где мы хотим прекратить отправку сообщений потребителю, если при обработке сообщения возникло исключение. Сообщение следует повторить по истечении установленного времени или по явному запросу со стороны потребителя. С нашим текущим подходом, скажем, если база данных потребителя в течение некоторого времени не работает, он все равно будет пытаться читать из kafka и обрабатывать сообщения, но обработка завершится неудачно из-за проблем с БД. Это будет держать приложение занятым без необходимости. Вместо этого мы хотим приостановить прием сообщения получателем на оговоренное время (скажем, подождать 30 минут, чтобы повторить попытку). Мы не уверены, как справиться с этим делом.
Можно ли сделать то же самое? Я что-то пропустил?
Вот пример кода, взятого из реактивной кафки:
Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.mapAsync(1) { msg =>
Future {
/**
* Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down
*/
}.map(_ => msg.committableOffset).recover {
case ex => {
/**
* HOW TO DO ????????
* On exception, I would like to tell stream to stop sending messages and pause the consumer and try again within stipulated time
* or on demand from the last committed offset
*/
throw ex
}
}
}
.batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) { (batch, elem) =>
batch.updated(elem)
}
.mapAsync(3)(_.commitScaladsl())
.runWith(Sink.ignore)
2 ответа
E сть recoverWithRetries
комбинатор для этого. Для справки смотрите этот ответ и документы.
Вы можете извлечь свой источник
val src = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.mapAsync(1) { msg =>
Future {
/**
* Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down
*/
}.map(_ => msg.committableOffset)
а затем сделать
src
.recoverWithRetries(attempts = -1, {case e: MyDatabaseException =>
logger.error(e)
src.delay(30.minutes, DelayOverflowStrategy.backpressure)})
...
(Попытка с попытками =-1 означает повторную попытку бесконечно)
Обратите внимание, что вам, вероятно, понадобится отобразить материализованное значение src
от akka.kafka.scaladsl.Consumer.Control
в akka.NotUsed
для того, чтобы сослаться на это в recoverWithRetries
:
val src = Consumer.committableSource(consumerSettings, Subscriptions.topics("topic1"))
.mapAsync(1) { msg =>
Future {
/**
* Unreliable consumer, for e.g. saving to DB might not be successful due to DB is down
*/
}.map(_ => msg.committableOffset)
.mapMaterializedValue(_ => akka.NotUsed)