Как сравнить два набора данных?
Я запускаю приложение spark, которое считывает данные из нескольких таблиц улья (IP-адреса) и сравнивает каждый элемент (IP-адрес) в наборе данных со всеми другими элементами (IP-адресами) из других наборов данных. Конечный результат будет примерно таким:
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| ip_address|dataset1|dataset2 |dataset3 |dataset4 |dataset5 |dataset6| date|
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| xx.xx.xx.xx.xx| 1 | 1| 0| 0| 0| 0 |2017-11-06|
| xx.xx.xx.xx.xx| 0 | 0| 1| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 1 | 0| 0| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 0 | 0| 1| 0| 0| 1 |2017-11-06|
| xx.xx.xx.xx.xx| 1 | 1| 0| 1| 0| 0 |2017-11-06|
---------------------------------------------------------------------------------------------------
Для сравнения я конвертирую dataframes
в результате hiveContext.sql("query")
заявление в Fastutil
объекты. Как это:
val df= hiveContext.sql("query")
val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())
Затем я использую iterator
перебирать каждую коллекцию и записывать строки в файл, используя FileWriter
,
val dfIterator = dfBuffer.iterator()
while (dfIterator.hasNext){
val p = dfIterator.next().toString
//logic
}
Я запускаю приложение с --num-executors 20 --executor-memory 16g --executor-cores 5 --driver-memory 20g
Процесс длится около 18-19 часов в общей сложности около 4-5 миллионов записей с ежедневными сравнениями один на один.
Однако, когда я проверил Application Master UI, я заметил, что после первоначального преобразования dataframes
в fastutil collection objects
сделано (это займет всего несколько минут после запуска задания). Я вижу count
а также collect
операторы, используемые в коде, создающем новые задания, пока преобразование не выполнено. После этого новые задания не запускаются при выполнении сравнения.
Что это значит? Значит ли это, что распределенная обработка вообще не происходит?
Я понимаю, что объекты коллекции не рассматриваются как RDD, может
это будет причиной этого?Как spark выполняет мою программу без использования назначенных ресурсов?
Любая помощь будет оценена, спасибо!
1 ответ
После строки:
val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())
особенно эта часть вышеуказанной строки:
df.map(r => r(0).toString).collect()
который collect
это самое главное, что работа Spark никогда не выполняется dfBuffer
(которая является обычной локальной структурой данных JVM).
Значит ли это, что распределенная обработка вообще не происходит?
Правильный. collect
переносит все данные в одну JVM, где работает драйвер (и именно поэтому вам не следует делать это, пока... вы не знаете, что делаете и какие проблемы это может вызвать).
Я думаю, что выше отвечает на все остальные вопросы.
Возможное решение вашей проблемы сравнения двух наборов данных (в Spark и распределенном режиме) было бы join
набор данных с эталонным набором данных и count
сравнить, не изменилось ли количество записей.