Как написать паркет в минио из искры?
У нас есть код, который создает и использует локальную искру и записывает паркетные файлы в S3. Он работает как с Amazon S3, так и с IBM Cloud Object Storage. Но когда я поднимаю контейнер minIO и указываю туда код, он выходит из строя с такой ошибкой:
org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:226)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:944)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:944)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:396)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:380)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:269)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:829)
at com.ibm.fhir.jbatch.bulkdata.export.common.SparkParquetWriter.writeParquet(SparkParquetWriter.java:102)
at com.ibm.fhir.bulkcommon.SparkParquetWriterTest.testWriteCOSviaHMAC(SparkParquetWriterTest.java:87)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
at org.testng.internal.Invoker.invokeMethod(Invoker.java:583)
at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719)
at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989)
at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
at org.testng.TestRunner.privateRun(TestRunner.java:648)
at org.testng.TestRunner.run(TestRunner.java:505)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
at org.testng.SuiteRunner.run(SuiteRunner.java:364)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208)
at org.testng.TestNG.runSuitesLocally(TestNG.java:1137)
at org.testng.TestNG.runSuites(TestNG.java:1049)
at org.testng.TestNG.run(TestNG.java:1017)
at org.testng.remote.AbstractRemoteTestNG.run(AbstractRemoteTestNG.java:115)
at org.testng.remote.RemoteTestNG.initAndRun(RemoteTestNG.java:251)
at org.testng.remote.RemoteTestNG.main(RemoteTestNG.java:77)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, 192.168.0.107, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:291)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:205)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: saving output test.parquet/part-00000-4f28f646-e457-4a0c-a4fd-3c3242180b72-c000-attempt_20200730040459_0001_m_000000_1.snappy.parquet com.amazonaws.services.s3.model.AmazonS3Exception: Object-prefix is already an object, please choose a different object-prefix name. (Service: Amazon S3; Status Code: 400; Error Code: XMinioParentIsObject; Request ID: 1626792118789256; S3 Extended Request ID: 4f62aa42-7879-4a61-bb0f-4720577a9dd5), S3 Extended Request ID: 4f62aa42-7879-4a61-bb0f-4720577a9dd5
at com.ibm.stocator.fs.cos.COSOutputStream.close(COSOutputStream.java:196)
at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
at org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:64)
at org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:685)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:122)
at org.apache.parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:165)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.close(ParquetOutputWriter.scala:42)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:58)
at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:75)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:275)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:281)
... 9 more
Насколько я могу судить, spark/hadoop/stocator записывают пустой объект с именем test.parquet
а затем написать паркетный объект, который логически находится "ниже" (имитация иерархии файловой системы). К сожалению, разработчики minIO категорически категорически возражают против того, чтобы это не поддерживалось, поскольку minIO поддерживается файловой системой, и они сопоставляют свои ключи с реальными путями файловой системы (так что пустойtest.parquet
объект каталога мешает им успешно создать каталог с тем же именем, в котором можно разместить разделы).
Итак, есть ли другой способ записать parquet в minIO через Spark?! Есть ли способ сказать ему не создавать этот пустой файл (или заставить его иметь/
в конце, что, я думаю, может сработать на minio?)
Пример фрагмента кода для записи:
SparkSession spark = SparkSession.builder()
.appName("parquetWriter")
.master("local[*]")
.config("spark.ui.enabled", false)
.config("fs.cos.impl", "com.ibm.stocator.fs.ObjectStoreFileSystem")
.config("fs.stocator.scheme.list", "cos")
.config("fs.stocator.cos.impl", "com.ibm.stocator.fs.cos.COSAPIClient")
.config("fs.stocator.cos.scheme", "cos")
.config("fs.cos.service.endpoint", ENDPOINT)
.config("fs.cos.service.access.key", ACCESS_KEY)
.config("fs.cos.service.secret.key", SECRET_KEY)
.getOrCreate();
List<String> json = Collections.singletonList("{\"a\":1}");
String itemName = "cos://mybucket.service/test.parquet/";
Dataset<String> jDataset = spark.createDataset(json, Encoders.STRING());
Dataset<?> jsonDF = spark.read().json(jDataset);
jsonDF.coalesce(1).write().mode("append").parquet(itemName);