Как переносить потоки данных через несколько интервалов в Spark Streaming
Я использую Apache Spark Streaming 1.6.1 для написания Java-приложения, которое объединяет два потока данных Key/Value и записывает вывод в HDFS. Два потока данных содержат K/V-строки и периодически поступают в Spark из HDFS с помощью textFileStream().
Два потока данных не синхронизированы, что означает, что некоторые ключи, которые находятся в потоке 1 в момент времени t0, могут появиться в потоке 2 в момент времени t1 или наоборот. Следовательно, моя цель - объединить два потока и вычислить "оставшиеся" ключи, которые следует учитывать для операции соединения в следующих интервалах пакета.
Чтобы лучше это понять, рассмотрим следующий алгоритм:
variables:
stream1 = <String, String> input stream at time t1
stream2 = <String, String> input stream at time t1
left_keys_s1 = <String, String> records of stream1 that didn't appear in the join at time t0
left_keys_s2 = <String, String> records of stream2 that didn't appear in the join at time t0
operations at time t1:
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2)
write out_stream to HDFS
left_keys_s1 = left_keys_s1 + records of stream1 not in out_stream (should be used at time t2)
left_keys_s2 = left_keys_s2 + records of stream2 not in out_stream (should be used at time t2)
Я попытался реализовать этот алгоритм с Spark Streaming безуспешно. Первоначально я создаю два пустых потока для оставшихся ключей таким образом (это только один поток, но код для генерации второго потока аналогичен):
JavaRDD<String> empty_rdd = sc.emptyRDD(); //sc = Java Spark Context
Queue<JavaRDD<String>> q = new LinkedList<JavaRDD<String>>();
q.add(empty_rdd);
JavaDStream<String> empty_dstream = jssc.queueStream(q);
JavaPairDStream<String, String> k1 = empty_dstream.mapToPair(new PairFunction<String, String, String> () {
@Override
public scala.Tuple2<String, String> call(String s) {
return new scala.Tuple2(s, s);
}
});
Позже этот пустой поток объединяется (т. Е. Union()) с stream1, и, наконец, после объединения я добавляю оставшиеся ключи из stream1 и вызываю window(). То же самое происходит с stream2.
Проблема состоит в том, что операции, которые генерируют left_keys_s1 и left_keys_s2, являются преобразованиями без действий, что означает, что Spark не создает потоковый граф RDD и, следовательно, они никогда не выполняются. Сейчас я получаю соединение, которое выводит только записи, ключи которых находятся в stream1 и stream2 за один и тот же промежуток времени.
Ребята, есть ли у вас какие-либо предложения по правильной реализации с помощью Spark?
Спасибо марко
1 ответ
Должна быть предусмотрена возможность переноса значений из одного пакета в следующий, сохраняя ссылку на СДР, где мы храним эти значения.
Не пытайтесь объединить потоки, используя queueDStream
вместо этого объявите изменяемую ссылку RDD, которая может обновляться через каждый интервал потоковой передачи.
Это пример:
В этой потоковой работе мы начнем с выполнения RDD 100
целые числа. Каждый интервал, 10
случайные числа генерируются и вычитаются для этих начальных 100 целых чисел. Этот процесс продолжается до тех пор, пока начальный RDD с 100 элементами не станет пустым. В этом примере показано, как переносить элементы из одного интервала в следующий.
import scala.util.Random
import org.apache.spark.streaming.dstream._
val ssc = new StreamingContext(sparkContext, Seconds(2))
var targetInts:RDD[Int] = sc.parallelize(0 until 100)
var loops = 0
// we create an rdd of functions that generate random data.
// evaluating this RDD at each interval will generate new random data points.
val randomDataRdd = sc.parallelize(1 to 10).map(_ => () => Random.nextInt(100))
val dstream = new ConstantInputDStream(ssc, randomDataRdd)
// create values from the random func rdd
dataDStream.foreachRDD{rdd =>
loops += 1
targetInts = targetInts.subtract(rdd)
if (targetInts.isEmpty) {println(loops); ssc.stop(false)}
}
ssc.start()
Запуск этого примера и построение графиков loops
против targetInts.count
дает следующий график:
Я надеюсь, что это дает вам достаточное руководство для реализации полного варианта использования.