Удалить столбец (столбцы) в кадре данных spark csv
У меня есть датафрейм, к которому я делаю объединение всех его полей.
После объединения он становится другим фреймом данных, и, наконец, я записываю его вывод в файл csv, разделенный на две колонки. Один из столбцов присутствует в первом кадре данных, который я не хочу включать в окончательный вывод.
Вот мой код:
val dfMainOutput = df1resultFinal.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
.select($"LineItem_organizationId", $"LineItem_lineItemId",
when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition".cast(DataTypes.StringType)).as("DataPartition"),
when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"),
when($"FFAction_1".isNotNull, concat(col("FFAction_1"), lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction"))
.filter(!$"FFAction".contains("D"))
Здесь я объединяю и создаю другой фрейм данных:
val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.fieldNames.map(c => col(c)): _*).as("concatenated"))
Это то, что я пытался
dfMainOutputFinal
.drop("DataPartition")
.write
.partitionBy("DataPartition","StatementTypeCode")
.format("csv")
.option("header","true")
.option("encoding", "\ufeff")
.option("codec", "gzip")
.save("path to csv")
Теперь я не хочу столбец DataPartition в моем выводе.
Я делаю раздел на основе DataPartition, поэтому я не получаю, а потому что DataPartition присутствует в основном фрейме данных, я получаю его в выходных данных.
ВОПРОС 1: Как можно игнорировать столбцы из Dataframe
ВОПРОС 2: Есть ли способ добавить "\ufeff"
в выходной файл csv перед записью моих фактических данных, чтобы мой формат кодирования стал UTF-8-BOM.
Согласно предложенному ответу
Это то, что я пытался
val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.filter(_ != "DataPartition").fieldNames.map(c => col(c)): _*).as("concatenated"))
Но получая ошибку ниже
<console>:238: error: value fieldNames is not a member of Seq[org.apache.spark.sql.types.StructField]
val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.filter(_ != "DataPartition").fieldNames.map(c => col(c)): _*).as("concatenated"))
Ниже вопрос, если мне нужно удалить два столбца в окончательном выводе
val dfMainOutputFinal = dfMainOutput.select($"DataPartition","PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition","PartitionYear").map(c => col(c)): _*).as("concatenated"))
2 ответа
Вопрос 1:
Столбцы, которые вы используете в df.write.partitionBy()
не будет добавлен в окончательный CSV-файл. Они автоматически игнорируются, поскольку данные закодированы в файловой структуре. Однако, если вы имеете в виду, чтобы удалить его из concat_ws
(и тем самым из файла), это можно сделать с небольшим изменением:
concat_ws("|^|",
dfMainOutput.schema.fieldNames
.filter(_ != "DataPartition")
.map(c => col(c)): _*).as("concatenated"))
Здесь столбец DataPartition отфильтровывается перед объединением.
Вопрос 2:
Искра, похоже, не поддерживает UTF-8 BOM
и это, кажется, вызывает проблемы при чтении в файлах с форматом. Я не могу придумать простого способа добавить байты спецификации в каждый CSV-файл, кроме написания скрипта для добавления их после завершения Spark. Моя рекомендация будет просто использовать обычный UTF-8
форматирование.
dfMainOutputFinal.write.partitionBy("DataPartition","StatementTypeCode")
.format("csv")
.option("header", "true")
.option("encoding", "UTF-8")
.option("codec", "gzip")
.save("path to csv")
Кроме того, согласно стандарту Unicode, спецификация не рекомендуется.
... Использование спецификации не требуется и не рекомендуется для UTF-8, но может встречаться в ситуациях, когда данные UTF-8 преобразуются из других форм кодирования, в которых используется спецификация, или когда спецификация используется в качестве сигнатуры UTF-8.,
ВОПРОС 1: Как можно игнорировать столбцы из Dataframe
Ans:
val df = sc.parallelize(List(Person(1,2,3), Person(4,5,6))).toDF("age", "height", "weight")
df.columns
df.show()
+---+------+------+
|age|height|weight|
+---+------+------+
| 1| 2| 3|
| 4| 5| 6|
+---+------+------+
val df_new=df.select("age", "height")
df_new.columns
df_new.show()
+---+------+
|age|height|
+---+------+
| 1| 2|
| 4| 5|
+---+------+
df: org.apache.spark.sql.DataFrame = [age: int, height: int ... 1 more field]
df_new: org.apache.spark.sql.DataFrame = [age: int, height: int]
ВОПРОС 2: Есть ли способ добавить "\ufeff" в выходной файл csv перед записью моих фактических данных, чтобы мой формат кодирования стал UTF-8-BOM.
Ans:
String path= "/data/vaquarkhan/input/unicode.csv";
String outputPath = "file:/data/vaquarkhan/output/output.csv";
getSparkSession()
.read()
.option("inferSchema", "true")
.option("header", "true")
.option("encoding", "UTF-8")
.csv(path)
.write()
.mode(SaveMode.Overwrite)
.csv(outputPath);
}