Обработка записей Hive в программе Spark Driver
В моем случае у меня есть таблица улья, которая содержит 100 тысяч записей. Каждая запись представляет файл необработанных данных, который должен быть обработан. Обработка каждого файла необработанных данных генерирует CSV-файл, размер которого будет варьироваться от 10 до 500 МБ. В конечном итоге эти CSV-файлы затем заполняются в таблицу HIve как отдельный процесс. В моем корпоративном кластере все еще не рекомендуется создавать огромное количество данных в формате hdf. Поэтому я предпочитаю объединить эти два отдельных процесса в один процесс, чтобы они обрабатывали, скажем, 5000 записей на 5000 записей.
Мой вопрос:-
Учитывая, что мой rdd относится ко всей таблице кустов, как выполнить шаг обработки необработанных данных для каждых 5000 записей? (что-то похожее на цикл for с 5000 записей каждый раз)
1 ответ
Один из способов сделать это - использовать скользящую способность RDD. Вы можете найти это в пакете mllib apache spark. Вот как вы можете использовать это. Предположим, что у нас есть RDD с 1000 элементов
val rdd = sc.parallelize(1 to 1000)
import org.apache.spark.mllib.rdd._
val newRdd = RDDFunctions.fromRDD(rdd)
// sliding by 10 (instead use 5000 or what you need)
val rddSlidedBy10 = newRdd.sliding(10, 10)
Результат будет выглядеть так
Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20), Array(21, 22, 23, 24, 25, 26, 27, 28, 29, 30), Array(31, 32, 33, 34, 35, 36, 37, 38, 39, 40), Array(41, 42, 43, 44, 45, 46, 47, 48, 49, 50), Array(51, 52, 53, 54, 55, 56, 57, 58, 59, 60), Array(61, 62, 63, 64, 65, 66, 67, 68, 69, 70), Array(71, 72, 73, 74, 75, 76, 77, 78, 79, 80)
Вы можете использовать foreach для массива и обрабатывать необработанные данные в CSV.