Обработка ошибок в Akka Kafka Producer
Я использую реактивный-кафка-ядро 0.10.1 (нацеливание на Кафку 0.9.х). Похоже, что актер-производитель Кафки останавливается всякий раз, когда возникает ошибка в функции обратного вызова. Есть ли способ настроить это поведение? Наш вариант использования - попытаться восстановить и отправить сообщения.
private def processElement(element: ProducerMessage[K, V]) = {
val record = richProducer.props.partitionizer(element.value) match {
case Some(partitionId) => new ProducerRecord(richProducer.props.topic, partitionId, element.key, element.value)
case None => new ProducerRecord(richProducer.props.topic, element.key, element.value)
}
richProducer.producer.send(record, new Callback {
override def onCompletion(metadata: RecordMetadata, exception: Exception) = {
if (exception != null) {
handleError(exception)
}
}
})
()} private def handleError(ex: Throwable) = {
log.error(ex, "Stopping Kafka subscriber due to fatal error.")
stop()
}