Как переименовать выходной файл фрейма искровых данных в AWS в spark SCALA
Я сохраняю свой вывод данных в формате spark в виде csv-файла в scala с разделами. Вот как я это делаю в Zeppelin.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
import org.apache.spark.{ SparkConf, SparkContext }
import java.sql.{Date, Timestamp}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract
val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))
val rdd = sc.textFile("s3://trfsmallfffile/FinancialLineItem/MAIN")
val header = rdd.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)
val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)
val df1resultFinal=data.withColumn("DataPartition", get_cus_val(input_file_name))
val rdd1 = sc.textFile("s3://trfsmallfffile/FinancialLineItem/INCR")
val header1 = rdd1.filter(_.contains("LineItem.organizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("LineItem.organizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)
import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("LineItem_organizationId", "LineItem_lineItemId").orderBy($"TimeStamp".cast(LongType).desc)
val latestForEachKey = data1.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank", "TimeStamp")
val dfMainOutput = df1resultFinal.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
.select($"LineItem_organizationId", $"LineItem_lineItemId",
when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition").as("DataPartition"),
when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"),
when($"FinancialConceptLocalId_1".isNotNull, $"FinancialConceptLocalId_1").otherwise($"FinancialConceptLocalId").as("FinancialConceptLocalId"),
when($"FinancialConceptGlobalId_1".isNotNull, $"FinancialConceptGlobalId_1").otherwise($"FinancialConceptGlobalId").as("FinancialConceptGlobalId"),
when($"FinancialConceptCodeGlobalSecondaryId_1".isNotNull, $"FinancialConceptCodeGlobalSecondaryId_1").otherwise($"FinancialConceptCodeGlobalSecondaryId").as("FinancialConceptCodeGlobalSecondaryId"),
when($"FFAction_1".isNotNull, $"FFAction_1").otherwise($"FFAction|!|").as("FFAction|!|"))
.filter(!$"FFAction|!|".contains("D|!|"))
val dfMainOutputFinal = dfMainOutput.na.fill("").select($"DataPartition",$"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated"))
val headerColumn = dataHeader.columns.toSeq
val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3)
val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "|^|null", "")).withColumnRenamed("concatenated", header)
dfMainOutputFinalWithoutNull.repartition(1).write.partitionBy("DataPartition","StatementTypeCode")
.format("csv")
.option("nullValue", "")
.option("delimiter", "\t")
.option("quote", "\u0000")
.option("header", "true")
.option("codec", "gzip")
.save("s3://trfsmallfffile/FinancialLineItem/output")
val FFRowCount =dfMainOutputFinalWithoutNull.groupBy("DataPartition","StatementTypeCode").count
FFRowCount.coalesce(1).write.format("com.databricks.spark.xml")
.option("rootTag", "FFFileType")
.option("rowTag", "FFPhysicalFile")
.save("s3://trfsmallfffile/FinancialLineItem/Descr")
Теперь файлы сохраняются в структуре разделенных папок, что и ожидается.
Теперь мое требование - переименовать весь файл детали и сохранить его в одном каталоге. Имя файла будет таким же, как имя структуры папок.
Например, у меня есть один файл, сохраненный в folder/DataPartition=Japan/PartitionYear=1971/part-00001-87a61115-92c9-4926-a803-b46315e55a08.c000.csv.gz
Теперь я хочу, чтобы мое имя файла было
Japan.1971.1.txt.gz
Japan.1971.2.txt.gz
Я сделал это в Java Map-Reduce после того, как моя работа была завершена, затем я читал файловую систему HDFS, а затем переместил ее в другое место с переименованным именем файла.
Но как сделать это в файловой системе AWS S3 в искровой SCALA .
Насколько я знаю, нет прямого способа переименовать имя выходного файла фрейма искровых данных.
Но есть реализация, которая может быть выполнена в самой работе, используя MultipleOutputs
как saveAsHadoopFile, но как это сделать?
Я ищу пример кода в Scala
Это как если бы после завершения работы нам нужно было прочитать файл с s3, развернуть его и переместить в другое место.
2 ответа
val tempOutPath = "mediamath.dir"
headerDf.union(outDf)
.repartition(1)
.write
.mode(SaveMode.Overwrite)
.format("text")
.option("codec", "gzip")
.save(tempOutPath)
import org.apache.hadoop.fs._
val sc = spark.sparkContext
val fs = FileSystem.get(sc.hadoopConfiguration)
val file = fs.globStatus(new Path("mediamath.dir/part*.gz"))(0).getPath.getName
fs.rename(new Path("mediamath.dir/" + file), new Path(<aws-s3-path>))
Вот мой фрагмент кода, пожалуйста, посмотрите, поможет ли это вам.
AFAIK, если вы хотите переименовать файл / объект непосредственно в S3 bucket, это невозможно.
Вы можете достичь
rename = copy to target + delete source
Сначала давайте извлечем имя файла из источника
def prepareNewFilename(oldFilename: String) = {
val pattern = raw".*/DataPartition=%s/PartitionYear=%s/part-%s.*\.%s"
.format("([A-Za-z]+)", "([0-9]+)", "([0-9]+)", "([a-z]+)")
.r
val pattern(country, year, part, extn) = oldFilename
"%s.%s.%s.%s.%s".format(country, year, part, "txt", extn)
}
val oldFilename = "folder/DataPartition=Japan/PartitionYear=1971/part-00001-87a61115-92c9-4926-a803-b46315e55a08.c000.csv.gz"
val newFilename = prepareNewFilename(oldFilename)
//newFilename: String = Japan.1971.00001.txt.gz
Код для переименования файла / объекта в S3 в корзине
import com.amazonaws.AmazonServiceException
import com.amazonaws.services.s3.AmazonS3ClientBuilder
val s3 = AmazonS3ClientBuilder.defaultClient()
try {
s3.copyObject(sourceBkt, oldFilename, targetBkt, newFilename)
s3.deleteObject(sourceBkt, oldFilename)
} catch {
case e: AmazonServiceException =>
System.err.println(e.getErrorMessage)
System.exit(1)
}