Как загрузить большой CSV-файл, проверить каждую строку и обработать данные

Я ищу, чтобы проверить каждую строку файла CSV более 600 миллионов строк и до 30 столбцов (решение должно обрабатывать несколько больших файлов CSV этого диапазона).

Столбцы могут быть текстом, датами или суммами. CSV должен быть проверен с 40 правилами, некоторые правила будут проверять правильность amout, некоторые из них будут проверять даты (формат) и т.д...

Результаты каждого правила проверки должны быть сохранены и будут отображены позже.

Как только данные проверены, будет применен второй этап правил проверки, основанный на этом времени на сумме, усредняет... результаты также каждого правила должны быть сохранены.

Я использую Spark для загрузки файла. С

session.read().format("com.databricks.spark.csv").option("delimiter",
         "|").option("header", "false").csv(csvPath)

Или же

session.read().option("header", "true").text(csvPath);

Для перебора каждой строки я вижу, что есть два варианта:

  • использование dataset.map( row -> { something });"Что-то" должно проверять каждую строку и сохранять результат где-то

Но так как блок "что-то" будет выполняться в исполнителях, я не вижу, как вернуть его в драйвер или сохранить его где-то, откуда его можно извлечь из процесса Driver.

  • Второй вариант заключается в использовании dataset.collect: но это вызовет внеочередную память, так как все данные будут загружены в драйвер. Мы могли бы использовать метод "взять", а затем удалить подмножество из набора данных (с фильтром) и повторить операцию, но мне не нравится этот метод

Мне было интересно, если кто-то может предложить мне надежный метод для решения этой проблемы. В основном сохраните Spark для второго этапа правил проверки и используйте Spark или другой framwrok для загрузки файла, а также для выполнения и создания первого набора правил проверки

Заранее спасибо за помощь

2 ответа

Вы можете просто добавить столбцы с результатами проверки к вашему исходному фрейму данных и использовать набор пользовательских UDF-правил для фактической проверки, что-то вроде этого:

    object Rules {
      val rule1UDF = udf(
        (col1: String, col2: String) => {
         // your validation code goes here
         true // the result of validation
      }
    }
    // ...
    val nonAggregatedChecksDf = df
       .withColumn("rule1_result", Rules.rule1UDF("col1", "col2"))
       .withColumn("rule2_result", Rules.rule2UDF("col1", "col3"))
       .select("id", "rule1_result", "rule2_result", <all the columns relevant for the aggregation checks>)

    val aggregatedChecksDf = nonAggregatedChecksDf
       .agg(<...>)
       .withColumn("rule3_result", Rules.rule3UDF("sum1", "avg2"))
       .withColumn("rule4_result", Rules.rule4UDF("count1", "count3"))
       .select("id", "rule1_result", "rule2_result", "rule3_result", "rule4_result")

Второй вариант - использовать dataset.collect.

Я бы посоветовал не делать этого, а выбрать ключевое поле в исходном кадре данных, а также все столбцы результатов проверки и сохранить их в виде столбцов в виде паркета.

aggregatedChecksDf
    .select("id", "rule1_result", "rule2_result", "rule3_result", "rule4_result")
    .write
    .mode(saveMode)
    .parquet(path)

Это будет намного быстрее, поскольку записи выполняются всеми исполнителями параллельно, а драйвер не становится узким местом. Это также, скорее всего, поможет избежать проблем с OOM, поскольку использование памяти распределяется по всем исполнителям.

Вы можете использовать SparkSession прочитайте файл CSV, а затем разделите данные на столбцы и обработайте данные в пакетном режиме. Например, вы записываете данные во внешнюю БД, которая не требует большой обработки.

dataFrame
    .write
    .mode(saveMode)
    .option("batchsize", 100)
    .jdbc(url, "tablename", new java.util.Properties())

Если ваша бизнес-логика требует от вас обработки каждой строки набора данных / кадра данных, вы можете использовать df.map(), Если ваша логика может работать сразу на нескольких RDD, вы можете пойти с df.mapPartition()Задачи с высокими накладными расходами на запись выполняются лучше с mapPartition чем с map преобразование.

Рассмотрим случай инициализации базы данных. Если мы используем map() или же foreach()количество раз, которое нам нужно будет инициализировать, будет равно количеству элементов в СДР. Тогда как, если мы используем mapPartitions(), количество раз, которое нам нужно будет инициализировать, будет равно количеству разделов

Другие вопросы по тегам