Spark Streaming указывает начальные и конечные смещения
У меня есть сценарий, в котором я хочу повторно обработать определенный пакет данных, поступающих из Kafka, используя Spark DStreams.
скажем, я хочу обработать следующие партии данных.
Тема-Раздел1-{1000,2000} Тема-Раздел2-{500-600}
Ниже приведен фрагмент кода, где я могу указать начальные смещения.
val inputDStream = KafkaUtils.createDirectStream[String,String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String, String](
topic-partition-list,
kafkaProps,
starting-offset-ranges))
Но я хочу знать, в любом случае, я могу также указать конечные смещения, как в случае пакетного режима структурированной потоковой передачи.
По сути, он должен обработать этот небольшой пакет и остановить рабочий процесс.
Примечание: я не хочу использовать структурированную потоковую передачу для этого варианта использования. Хотите использовать только DStreams.
1 ответ
Решение
Нашел способ сделать это.
val offsetRanges = Array(
// topic, partition, inclusive starting offset, exclusive ending offset
OffsetRange("test", 0, 0, 100),
OffsetRange("test", 1, 0, 100)
)
val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)