Предварительная обработка данных с Apache Spark и Scala

Я довольно новичок в spark и scala, и поэтому у меня есть несколько вопросов, касающихся предварительной обработки данных с помощью spark и работы с rdds. Я работаю над небольшим проектом и хочу внедрить систему машинного обучения с помощью spark. Я думаю, что работать с алгоритмами можно, но у меня проблемы с предварительной обработкой данных. У меня есть набор данных с 30 столбцами и около миллиона строк. Но для простоты предположим, что у меня есть следующий набор данных (csv-файл):

columnA, columnB, column_txt, label
1      , a      , abc       , 0
2      ,        , abc       , 0
3      , b      , abc       , 1
4      , b      , abc       , 1
5      , a      , abc       , 0
6      ,        , abc       , 0
7      , c      , abc       , 1
8      , a      , abc       , 1
9      , b      , abc       , 1
10     , c      , abc       , 0

После загрузки данных в спарк я хочу сделать следующие шаги:

  1. Удалите все столбцы, заканчивающиеся на "_txt"
  2. Отфильтруйте все строки, где columnB пусто (это я уже понял)
  3. Удалить те столбцы, которые имеют более 9 уровней (здесь columnA)

Поэтому у меня проблемы с проблемами 1. и 3. Я знаю, что не могу удалить столбцы, поэтому мне нужно создать новый rdd, но как мне это сделать без определенных столбцов? На данный момент я загружаю CSV-файл без заголовка в спарк, но для моих задач мне нужно. Рекомендуется ли загружать заголовок в отдельный rdd? Но как я могу взаимодействовать с этим устройством, чтобы найти нужные столбцы? Извините, я знаю много вопросов, но я все еще в начале и пытаюсь учиться. Спасибо и всего наилучшего, Крис

1 ответ

Решение

Предполагая, что фрейм данных загружен с заголовками и структура является плоской:

val df = sqlContext.
    read.
    format("com.databricks.spark.csv").
    option("header", "true").
    load("data.csv")

как то так должно работать:

import org.apache.spark.sql.DataFrame

def moreThan9(df: DataFrame, col: String) = {
    df.agg(countDistinct(col)).first()(0) match {
        case x: Long => x > 9L
        case _ => false
    }
}

val newDf = df.
    schema. //  Extract schema
    toArray. // Convert to array
    map(_.name). // Map to names
    foldLeft(df)((df: DataFrame, col: String) => {
        if (col.endsWith("_txt") | moreThan9(df, col)) df.drop(col) else df
    })

Если он загружен без заголовка, то вы можете сделать то же самое, используя сопоставление автоматически назначенных с действительными.

Другие вопросы по тегам