Как сравнить два набора данных?

Я запускаю приложение spark, которое считывает данные из нескольких таблиц улья (IP-адреса) и сравнивает каждый элемент (IP-адрес) в наборе данных со всеми другими элементами (IP-адресами) из других наборов данных. Конечный результат будет примерно таким:

+---------------+--------+---------------+---------------+---------+----------+--------+----------+
|     ip_address|dataset1|dataset2       |dataset3       |dataset4 |dataset5  |dataset6|      date|
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| xx.xx.xx.xx.xx|     1  |              1|              0|        0|         0|      0 |2017-11-06|
| xx.xx.xx.xx.xx|     0  |              0|              1|        0|         0|      1 |2017-11-06|
| xx.xx.xx.xx.xx|     1  |              0|              0|        0|         0|      1 |2017-11-06|
| xx.xx.xx.xx.xx|     0  |              0|              1|        0|         0|      1 |2017-11-06|
| xx.xx.xx.xx.xx|     1  |              1|              0|        1|         0|      0 |2017-11-06|
---------------------------------------------------------------------------------------------------

Для сравнения я конвертирую dataframes в результате hiveContext.sql("query") заявление в Fastutil объекты. Как это:

val df= hiveContext.sql("query")
val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())

Затем я использую iterator перебирать каждую коллекцию и записывать строки в файл, используя FileWriter,

val dfIterator = dfBuffer.iterator()
while (dfIterator.hasNext){
     val p = dfIterator.next().toString
     //logic
}

Я запускаю приложение с --num-executors 20 --executor-memory 16g --executor-cores 5 --driver-memory 20g

Процесс длится около 18-19 часов в общей сложности около 4-5 миллионов записей с ежедневными сравнениями один на один.

Однако, когда я проверил Application Master UI, я заметил, что после первоначального преобразования dataframes в fastutil collection objects сделано (это займет всего несколько минут после запуска задания). Я вижу count а также collect операторы, используемые в коде, создающем новые задания, пока преобразование не выполнено. После этого новые задания не запускаются при выполнении сравнения.

  • Что это значит? Значит ли это, что распределенная обработка вообще не происходит?

  • Я понимаю, что объекты коллекции не рассматриваются как RDD, может
    это будет причиной этого?

  • Как spark выполняет мою программу без использования назначенных ресурсов?

Любая помощь будет оценена, спасибо!

1 ответ

Решение

После строки:

val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())

особенно эта часть вышеуказанной строки:

df.map(r => r(0).toString).collect()

который collect это самое главное, что работа Spark никогда не выполняется dfBuffer (которая является обычной локальной структурой данных JVM).

Значит ли это, что распределенная обработка вообще не происходит?

Правильный. collect переносит все данные в одну JVM, где работает драйвер (и именно поэтому вам не следует делать это, пока... вы не знаете, что делаете и какие проблемы это может вызвать).

Я думаю, что выше отвечает на все остальные вопросы.


Возможное решение вашей проблемы сравнения двух наборов данных (в Spark и распределенном режиме) было бы join набор данных с эталонным набором данных и count сравнить, не изменилось ли количество записей.

Другие вопросы по тегам