Контрольная точка S3 со структурированной потоковой передачей

Я попробовал предложения, приведенные в Apache Spark (Структурированная потоковая передача): поддержка S3 Checkpoint

Я все еще сталкиваюсь с этой проблемой. Ниже приведена ошибка, которую я получаю

17/07/06 17:04:56 WARN FileSystem: "s3n" is a deprecated filesystem 
name. Use "hdfs://s3n/" instead.
Exception in thread "main" java.lang.IllegalArgumentException: 
java.net.UnknownHostException: s3n

У меня есть что-то вроде этого как часть моего кода

SparkSession spark = SparkSession
    .builder()
    .master("local[*]")
    .config("spark.hadoop.fs.defaultFS","s3")
    .config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
    .config("spark.hadoop.fs.s3n.awsAccessKeyId","<my-key>")
    .config("spark.hadoop.fs.s3n.awsSecretAccessKey","<my-secret-key>")
    .appName("My Spark App")
    .getOrCreate();

и затем каталог контрольных точек используется следующим образом:

StreamingQuery line = topicValue.writeStream()
   .option("checkpointLocation","s3n://<my-bucket>/checkpointLocation/")

Любая помощь приветствуется. Заранее спасибо!

1 ответ

Решение

Для поддержки контрольных точек S3 в структурированном потоке вы можете попробовать следующий способ:

SparkSession spark = SparkSession
    .builder()
    .master("local[*]")
    .appName("My Spark App")
    .getOrCreate();

spark.sparkContext.hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "<my-key>")
spark.sparkContext.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "<my-secret-key>")

и тогда каталог контрольных точек может быть таким:

StreamingQuery line = topicValue.writeStream()
   .option("checkpointLocation","s3n://<my-bucket>/checkpointLocation/")

Надеюсь, это поможет!

Другие вопросы по тегам