Spark Чтение нескольких путей с автоматическим обнаружением разделов

Я пытаюсь прочитать некоторые файлы avro в DataFrame из нескольких путей. Допустим, мой путь "s3a://bucket_name/path/to/file/year=18/month=11/day=01"По этому пути у меня есть еще два раздела, скажем country=XX/region=XX

Я хочу читать несколько дат одновременно, не называя разделов страны и региона. Кроме того, я хочу, чтобы страна и регион были столбцами в этом фрейме данных.

sqlContext.read.format("com.databricks.spark.avro").load("s3a://bucket_name/path/to/file/year=18/month=11/day=01")

Эта строка прекрасно работает, так как я читаю только один путь. Он обнаруживает разделы страны и региона и определяет их схему.

Когда я пытаюсь прочитать несколько дат, скажем

val paths = Seq("s3a://bucket_name/path/to/file/year=18/month=11/day=01", "s3a://bucket_name/path/to/file/year=18/month=11/day=02")

sqlContext.read.format("com.databricks.spark.avro").load(paths:_*)

Я получаю эту ошибку:

    18/12/03 03:13:53 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result insub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
18/12/03 03:13:53 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.
java.lang.AssertionError: assertion failed: Conflicting directory structures detected. Suspicious paths:?
 s3a://bucket_name/path/to/file/year=18/month=11/day=02
s3a://bucket_name/path/to/file/year=18/month=11/day=01

If provided paths are partition directories, please set "basePath" in the options of the data source to specify the root directory of the table. If there are multiple root directories, please load them separately and then union them.
        at scala.Predef$.assert(Predef.scala:179)
        at org.apache.spark.sql.execution.datasources.PartitioningUtils$.parsePartitions(PartitioningUtils.scala:106)
        at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$discoverPartitions(interfaces.scala:621)
        at org.apache.spark.sql.sources.HadoopFsRelation$$anonfun$partitionSpec$3.apply(interfaces.scala:526)
        at org.apache.spark.sql.sources.HadoopFsRelation$$anonfun$partitionSpec$3.apply(interfaces.scala:525)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.sql.sources.HadoopFsRelation.partitionSpec(interfaces.scala:524)
        at org.apache.spark.sql.sources.HadoopFsRelation$$anonfun$partitionColumns$1.apply(interfaces.scala:578)
        at org.apache.spark.sql.sources.HadoopFsRelation$$anonfun$partitionColumns$1.apply(interfaces.scala:578)
        at scala.Option.getOrElse(Option.scala:120)
        at org.apache.spark.sql.sources.HadoopFsRelation.partitionColumns(interfaces.scala:578)
        at org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:637)
        at org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:635)
        at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:39)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:136)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
        at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
        at $iwC$$iwC$$iwC.<init>(<console>:38)
        at $iwC$$iwC.<init>(<console>:40)
        at $iwC.<init>(<console>:42)
        at <init>(<console>:44)
        at .<init>(<console>:48)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1045)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1326)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:821)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:852)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:800)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1064)
        at org.apache.spark.repl.Main$.main(Main.scala:35)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:730)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Очевидно, я не могу использовать basePath, потому что пути не разделяют один. Я также пытаюсь использовать /* в конце каждого пути, это на самом деле работает, но полностью игнорирует разделы страны и региона.

Я могу прочитать путь один за другим и объединить его, но я чувствую, что что-то упустил.

Любая идея, почему он работает только для одного пути и как заставить его работать для нескольких путей?

1 ответ

Старый вопрос, но это то, что я сделал в аналогичной ситуации

spark.read.parquet(paths:_*)
  .withColumn("year", regexp_extract(input_file_name, "year=(.+?)/", 1))
  .withColumn("month", regexp_extract(input_file_name, "month=(.+?)/", 1))
  .withColumn("day", regexp_extract(input_file_name, "day=(.+?)/", 1))

Работает, когда у вас есть статическая структура разделов. Кто может бросить вызов расширению его до динамического (т.е. разобрать произвольную структуру разделов вида "x=y/z=c" и преобразовать ее в столбцы)?

Очень хотелось бы, чтобы все сообщения об ошибках были такими же ясными - If provided paths are partition directories, please set "basePath" in the options of the data source to specify the root directory of the table. If there are multiple root directories, please load them separately and then union them.

Есть ли относительный путь year=18/month=11/day=01 происходит из-за разбиения, или вы просто использовали то же соглашение?

Если первое правильно, то вы должны просто прочитать s3a://bucket_name/path/to/file/и использовать предикаты для фильтрации желаемых дат. Или, может быть, как подсказывает ошибка, вы можете попробовать sqlContext.read.option("basePath","s3a://bucket_name/path/to/file/").format("com.databricks.spark.avro").load(paths:_*)где пути относительны

Если последнее верно, то вы должны запросить каждый в отдельности и применить unionAll на кадрах данных (как следует из сообщения об ошибке). Возможно, обработка года / месяца / дня в качестве столбцов раздела также будет работать в этом случае, даже если вы не использовали partitionBy при записи данных...

Другие вопросы по тегам