Предварительная обработка данных с Apache Spark и Scala
Я довольно новичок в spark и scala, и поэтому у меня есть несколько вопросов, касающихся предварительной обработки данных с помощью spark и работы с rdds. Я работаю над небольшим проектом и хочу внедрить систему машинного обучения с помощью spark. Я думаю, что работать с алгоритмами можно, но у меня проблемы с предварительной обработкой данных. У меня есть набор данных с 30 столбцами и около миллиона строк. Но для простоты предположим, что у меня есть следующий набор данных (csv-файл):
columnA, columnB, column_txt, label
1 , a , abc , 0
2 , , abc , 0
3 , b , abc , 1
4 , b , abc , 1
5 , a , abc , 0
6 , , abc , 0
7 , c , abc , 1
8 , a , abc , 1
9 , b , abc , 1
10 , c , abc , 0
После загрузки данных в спарк я хочу сделать следующие шаги:
- Удалите все столбцы, заканчивающиеся на "_txt"
- Отфильтруйте все строки, где columnB пусто (это я уже понял)
- Удалить те столбцы, которые имеют более 9 уровней (здесь columnA)
Поэтому у меня проблемы с проблемами 1. и 3. Я знаю, что не могу удалить столбцы, поэтому мне нужно создать новый rdd, но как мне это сделать без определенных столбцов? На данный момент я загружаю CSV-файл без заголовка в спарк, но для моих задач мне нужно. Рекомендуется ли загружать заголовок в отдельный rdd? Но как я могу взаимодействовать с этим устройством, чтобы найти нужные столбцы? Извините, я знаю много вопросов, но я все еще в начале и пытаюсь учиться. Спасибо и всего наилучшего, Крис
1 ответ
Предполагая, что фрейм данных загружен с заголовками и структура является плоской:
val df = sqlContext.
read.
format("com.databricks.spark.csv").
option("header", "true").
load("data.csv")
как то так должно работать:
import org.apache.spark.sql.DataFrame
def moreThan9(df: DataFrame, col: String) = {
df.agg(countDistinct(col)).first()(0) match {
case x: Long => x > 9L
case _ => false
}
}
val newDf = df.
schema. // Extract schema
toArray. // Convert to array
map(_.name). // Map to names
foldLeft(df)((df: DataFrame, col: String) => {
if (col.endsWith("_txt") | moreThan9(df, col)) df.drop(col) else df
})
Если он загружен без заголовка, то вы можете сделать то же самое, используя сопоставление автоматически назначенных с действительными.