Spark Streaming указывает начальные и конечные смещения

У меня есть сценарий, в котором я хочу повторно обработать определенный пакет данных, поступающих из Kafka, используя Spark DStreams.

скажем, я хочу обработать следующие партии данных.

Тема-Раздел1-{1000,2000} Тема-Раздел2-{500-600}

Ниже приведен фрагмент кода, где я могу указать начальные смещения.

val inputDStream = KafkaUtils.createDirectStream[String,String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Assign[String, String](
      topic-partition-list,
      kafkaProps,
      starting-offset-ranges))

Но я хочу знать, в любом случае, я могу также указать конечные смещения, как в случае пакетного режима структурированной потоковой передачи.

По сути, он должен обработать этот небольшой пакет и остановить рабочий процесс.

Примечание: я не хочу использовать структурированную потоковую передачу для этого варианта использования. Хотите использовать только DStreams.

1 ответ

Решение

Нашел способ сделать это.

val offsetRanges = Array(
  // topic, partition, inclusive starting offset, exclusive ending offset
  OffsetRange("test", 0, 0, 100),
  OffsetRange("test", 1, 0, 100)
)

val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
Другие вопросы по тегам