Как переносить потоки данных через несколько интервалов в Spark Streaming

Я использую Apache Spark Streaming 1.6.1 для написания Java-приложения, которое объединяет два потока данных Key/Value и записывает вывод в HDFS. Два потока данных содержат K/V-строки и периодически поступают в Spark из HDFS с помощью textFileStream().

Два потока данных не синхронизированы, что означает, что некоторые ключи, которые находятся в потоке 1 в момент времени t0, могут появиться в потоке 2 в момент времени t1 или наоборот. Следовательно, моя цель - объединить два потока и вычислить "оставшиеся" ключи, которые следует учитывать для операции соединения в следующих интервалах пакета.

Чтобы лучше это понять, рассмотрим следующий алгоритм:

variables:
stream1 = <String, String> input stream at time t1
stream2 = <String, String> input stream at time t1
left_keys_s1 = <String, String> records of stream1 that didn't appear in the join at time t0
left_keys_s2 = <String, String> records of stream2 that didn't appear in the join at time t0

operations at time t1:
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2)
write out_stream to HDFS
left_keys_s1 = left_keys_s1 + records of stream1 not in out_stream (should be used at time t2)
left_keys_s2 = left_keys_s2 + records of stream2 not in out_stream (should be used at time t2)

Я попытался реализовать этот алгоритм с Spark Streaming безуспешно. Первоначально я создаю два пустых потока для оставшихся ключей таким образом (это только один поток, но код для генерации второго потока аналогичен):

JavaRDD<String> empty_rdd = sc.emptyRDD(); //sc = Java Spark Context
Queue<JavaRDD<String>> q = new LinkedList<JavaRDD<String>>();
q.add(empty_rdd);
JavaDStream<String> empty_dstream = jssc.queueStream(q);
JavaPairDStream<String, String> k1 = empty_dstream.mapToPair(new PairFunction<String, String, String> () {
                                 @Override
                                 public scala.Tuple2<String, String> call(String s) {
                                   return new scala.Tuple2(s, s);
                                 }
                               });

Позже этот пустой поток объединяется (т. Е. Union()) с stream1, и, наконец, после объединения я добавляю оставшиеся ключи из stream1 и вызываю window(). То же самое происходит с stream2.

Проблема состоит в том, что операции, которые генерируют left_keys_s1 и left_keys_s2, являются преобразованиями без действий, что означает, что Spark не создает потоковый граф RDD и, следовательно, они никогда не выполняются. Сейчас я получаю соединение, которое выводит только записи, ключи которых находятся в stream1 и stream2 за один и тот же промежуток времени.

Ребята, есть ли у вас какие-либо предложения по правильной реализации с помощью Spark?

Спасибо марко

1 ответ

Решение

Должна быть предусмотрена возможность переноса значений из одного пакета в следующий, сохраняя ссылку на СДР, где мы храним эти значения.

Не пытайтесь объединить потоки, используя queueDStreamвместо этого объявите изменяемую ссылку RDD, которая может обновляться через каждый интервал потоковой передачи.

Это пример:

В этой потоковой работе мы начнем с выполнения RDD 100 целые числа. Каждый интервал, 10 случайные числа генерируются и вычитаются для этих начальных 100 целых чисел. Этот процесс продолжается до тех пор, пока начальный RDD с 100 элементами не станет пустым. В этом примере показано, как переносить элементы из одного интервала в следующий.

  import scala.util.Random
  import org.apache.spark.streaming.dstream._

  val ssc = new StreamingContext(sparkContext, Seconds(2))

  var targetInts:RDD[Int] = sc.parallelize(0 until 100)

  var loops = 0

  // we create an rdd of functions that generate random data. 
  // evaluating this RDD at each interval will generate new random data points.
  val randomDataRdd = sc.parallelize(1 to 10).map(_ => () => Random.nextInt(100))

  val dstream = new ConstantInputDStream(ssc, randomDataRdd)

  // create values from the random func rdd

  dataDStream.foreachRDD{rdd => 
                        loops += 1
                        targetInts = targetInts.subtract(rdd)
                        if (targetInts.isEmpty) {println(loops); ssc.stop(false)}
                       }


  ssc.start()

Запуск этого примера и построение графиков loops против targetInts.count дает следующий график:

Удаление 100 дюймов путем генерации случайных чисел

Я надеюсь, что это дает вам достаточное руководство для реализации полного варианта использования.

Другие вопросы по тегам