Как вы возвращаете будущее, содержащее список сообщений после того, как все доступные сообщения были использованы из темы Кафки?
Я, вероятно, упускаю суть потребителя Kafka, но я хочу сделать следующее:
Потребитель подписывается на тему, захватывает все сообщения в теме и возвращает будущее со списком всех этих сообщений.
Код, который я написал, чтобы попытаться выполнить это
val sink = Sink.fold[List[KafkaMessage], KafkaMessage](List[KafkaMessage]()) { (list, kafkaMessage) =>
list :+ kafkaMessage
}
def consume(topic: String) =
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.map { message =>
logger.info(s"Consuming ${message.record.value}")
KafkaMessage(Some(message.record.key()), Some(message.record.value()))
}
.buffer(bufferSize, overflowStrategy)
.runWith(sink)
Будущее никогда не возвращается, хотя, оно потребляет необходимые сообщения и затем продолжает опросить тему повторно. Есть ли способ вернуть будущее, а затем закрыть потребителя?
1 ответ
Поскольку Kafka предназначен для потоковой передачи данных, не существует такой вещи, как "все сообщения", поскольку новые данные могут быть добавлены к теме в любой момент.
Я думаю, есть две возможные вещи, которые вы могли бы сделать:
- проверь сколько записей вернул последний
poll
и прекратить или - вам нужно получить "текущий конец журнала" через
endOffsets
и сравните это со смещением последней записи на разделы. Если оба совпадения, вы можете вернуться.
Первый подход проще, но может иметь недостаток, заключающийся в том, что он не так надежен, как второй. Теоретически, опрос может вернуть ноль записей, даже если записи доступны (даже если изменения не очень велики, что это происходит).
Не уверен, как выразить это условие завершения в Scala (поскольку я не очень знаком со Scala).