Как загрузить большой CSV-файл, проверить каждую строку и обработать данные
Я ищу, чтобы проверить каждую строку файла CSV более 600 миллионов строк и до 30 столбцов (решение должно обрабатывать несколько больших файлов CSV этого диапазона).
Столбцы могут быть текстом, датами или суммами. CSV должен быть проверен с 40 правилами, некоторые правила будут проверять правильность amout, некоторые из них будут проверять даты (формат) и т.д...
Результаты каждого правила проверки должны быть сохранены и будут отображены позже.
Как только данные проверены, будет применен второй этап правил проверки, основанный на этом времени на сумме, усредняет... результаты также каждого правила должны быть сохранены.
Я использую Spark для загрузки файла. С
session.read().format("com.databricks.spark.csv").option("delimiter",
"|").option("header", "false").csv(csvPath)
Или же
session.read().option("header", "true").text(csvPath);
Для перебора каждой строки я вижу, что есть два варианта:
- использование
dataset.map( row -> { something });
"Что-то" должно проверять каждую строку и сохранять результат где-то
Но так как блок "что-то" будет выполняться в исполнителях, я не вижу, как вернуть его в драйвер или сохранить его где-то, откуда его можно извлечь из процесса Driver.
- Второй вариант заключается в использовании
dataset.collect
: но это вызовет внеочередную память, так как все данные будут загружены в драйвер. Мы могли бы использовать метод "взять", а затем удалить подмножество из набора данных (с фильтром) и повторить операцию, но мне не нравится этот метод
Мне было интересно, если кто-то может предложить мне надежный метод для решения этой проблемы. В основном сохраните Spark для второго этапа правил проверки и используйте Spark или другой framwrok для загрузки файла, а также для выполнения и создания первого набора правил проверки
Заранее спасибо за помощь
2 ответа
Вы можете просто добавить столбцы с результатами проверки к вашему исходному фрейму данных и использовать набор пользовательских UDF-правил для фактической проверки, что-то вроде этого:
object Rules {
val rule1UDF = udf(
(col1: String, col2: String) => {
// your validation code goes here
true // the result of validation
}
}
// ...
val nonAggregatedChecksDf = df
.withColumn("rule1_result", Rules.rule1UDF("col1", "col2"))
.withColumn("rule2_result", Rules.rule2UDF("col1", "col3"))
.select("id", "rule1_result", "rule2_result", <all the columns relevant for the aggregation checks>)
val aggregatedChecksDf = nonAggregatedChecksDf
.agg(<...>)
.withColumn("rule3_result", Rules.rule3UDF("sum1", "avg2"))
.withColumn("rule4_result", Rules.rule4UDF("count1", "count3"))
.select("id", "rule1_result", "rule2_result", "rule3_result", "rule4_result")
Второй вариант - использовать dataset.collect.
Я бы посоветовал не делать этого, а выбрать ключевое поле в исходном кадре данных, а также все столбцы результатов проверки и сохранить их в виде столбцов в виде паркета.
aggregatedChecksDf
.select("id", "rule1_result", "rule2_result", "rule3_result", "rule4_result")
.write
.mode(saveMode)
.parquet(path)
Это будет намного быстрее, поскольку записи выполняются всеми исполнителями параллельно, а драйвер не становится узким местом. Это также, скорее всего, поможет избежать проблем с OOM, поскольку использование памяти распределяется по всем исполнителям.
Вы можете использовать SparkSession
прочитайте файл CSV, а затем разделите данные на столбцы и обработайте данные в пакетном режиме. Например, вы записываете данные во внешнюю БД, которая не требует большой обработки.
dataFrame
.write
.mode(saveMode)
.option("batchsize", 100)
.jdbc(url, "tablename", new java.util.Properties())
Если ваша бизнес-логика требует от вас обработки каждой строки набора данных / кадра данных, вы можете использовать df.map()
, Если ваша логика может работать сразу на нескольких RDD, вы можете пойти с df.mapPartition()
Задачи с высокими накладными расходами на запись выполняются лучше с mapPartition
чем с map
преобразование.
Рассмотрим случай инициализации базы данных. Если мы используем map()
или же foreach()
количество раз, которое нам нужно будет инициализировать, будет равно количеству элементов в СДР. Тогда как, если мы используем mapPartitions()
, количество раз, которое нам нужно будет инициализировать, будет равно количеству разделов