Удалить столбец (столбцы) в кадре данных spark csv

У меня есть датафрейм, к которому я делаю объединение всех его полей.

После объединения он становится другим фреймом данных, и, наконец, я записываю его вывод в файл csv, разделенный на две колонки. Один из столбцов присутствует в первом кадре данных, который я не хочу включать в окончательный вывод.

Вот мой код:

val dfMainOutput = df1resultFinal.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
      .select($"LineItem_organizationId", $"LineItem_lineItemId",
       when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition".cast(DataTypes.StringType)).as("DataPartition"),
       when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"),
       when($"FFAction_1".isNotNull, concat(col("FFAction_1"), lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction"))
       .filter(!$"FFAction".contains("D"))

Здесь я объединяю и создаю другой фрейм данных:

val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.fieldNames.map(c => col(c)): _*).as("concatenated"))     

Это то, что я пытался

dfMainOutputFinal
  .drop("DataPartition")
  .write
  .partitionBy("DataPartition","StatementTypeCode")
  .format("csv")
  .option("header","true")
  .option("encoding", "\ufeff")
  .option("codec", "gzip")
  .save("path to csv")

Теперь я не хочу столбец DataPartition в моем выводе.

Я делаю раздел на основе DataPartition, поэтому я не получаю, а потому что DataPartition присутствует в основном фрейме данных, я получаю его в выходных данных.

ВОПРОС 1: Как можно игнорировать столбцы из Dataframe

ВОПРОС 2: Есть ли способ добавить "\ufeff" в выходной файл csv перед записью моих фактических данных, чтобы мой формат кодирования стал UTF-8-BOM.

Согласно предложенному ответу

Это то, что я пытался

 val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.filter(_ != "DataPartition").fieldNames.map(c => col(c)): _*).as("concatenated"))

Но получая ошибку ниже

<console>:238: error: value fieldNames is not a member of Seq[org.apache.spark.sql.types.StructField]
               val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.filter(_ != "DataPartition").fieldNames.map(c => col(c)): _*).as("concatenated"))

Ниже вопрос, если мне нужно удалить два столбца в окончательном выводе

  val dfMainOutputFinal = dfMainOutput.select($"DataPartition","PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition","PartitionYear").map(c => col(c)): _*).as("concatenated"))

2 ответа

Решение

Вопрос 1:

Столбцы, которые вы используете в df.write.partitionBy() не будет добавлен в окончательный CSV-файл. Они автоматически игнорируются, поскольку данные закодированы в файловой структуре. Однако, если вы имеете в виду, чтобы удалить его из concat_ws (и тем самым из файла), это можно сделать с небольшим изменением:

concat_ws("|^|", 
  dfMainOutput.schema.fieldNames
    .filter(_ != "DataPartition")
    .map(c => col(c)): _*).as("concatenated"))

Здесь столбец DataPartition отфильтровывается перед объединением.

Вопрос 2:

Искра, похоже, не поддерживает UTF-8 BOM и это, кажется, вызывает проблемы при чтении в файлах с форматом. Я не могу придумать простого способа добавить байты спецификации в каждый CSV-файл, кроме написания скрипта для добавления их после завершения Spark. Моя рекомендация будет просто использовать обычный UTF-8 форматирование.

dfMainOutputFinal.write.partitionBy("DataPartition","StatementTypeCode")
  .format("csv")
  .option("header", "true")
  .option("encoding", "UTF-8")
  .option("codec", "gzip")
  .save("path to csv")

Кроме того, согласно стандарту Unicode, спецификация не рекомендуется.

... Использование спецификации не требуется и не рекомендуется для UTF-8, но может встречаться в ситуациях, когда данные UTF-8 преобразуются из других форм кодирования, в которых используется спецификация, или когда спецификация используется в качестве сигнатуры UTF-8.,

ВОПРОС 1: Как можно игнорировать столбцы из Dataframe

Ans:

val df = sc.parallelize(List(Person(1,2,3), Person(4,5,6))).toDF("age", "height", "weight")

df.columns
df.show()



+---+------+------+
|age|height|weight|
+---+------+------+
|  1|     2|     3|
|  4|     5|     6|
+---+------+------+


val df_new=df.select("age", "height")
    df_new.columns
    df_new.show()

+---+------+
|age|height|
+---+------+
|  1|     2|
|  4|     5|
+---+------+

df: org.apache.spark.sql.DataFrame = [age: int, height: int ... 1 more field]
df_new: org.apache.spark.sql.DataFrame = [age: int, height: int]

ВОПРОС 2: Есть ли способ добавить "\ufeff" в выходной файл csv перед записью моих фактических данных, чтобы мой формат кодирования стал UTF-8-BOM.

Ans:

 String path= "/data/vaquarkhan/input/unicode.csv";

 String outputPath = "file:/data/vaquarkhan/output/output.csv";
    getSparkSession()
      .read()
      .option("inferSchema", "true")
      .option("header", "true")
      .option("encoding", "UTF-8")
      .csv(path)
      .write()
      .mode(SaveMode.Overwrite)
      .csv(outputPath);
}
Другие вопросы по тегам