Apache Spark (структурированная потоковая передача): поддержка контрольной точки S3
Из документации по структурированной потоковой передаче "Искра":
"Это местоположение контрольной точки должно быть путем в файловой системе, совместимой с HDFS, и может быть установлено в качестве опции в DataStreamWriter
при запуске запроса."
И, конечно же, установка контрольной точки на путь s3 создает:
17/01/31 21:23:56 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020
java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:652)
at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194)
at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:106)
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1305)
at org.apache.hadoop.hdfs.DistributedFileSystem$22.doCall(DistributedFileSystem.java:1301)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430)
at org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51)
at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262)
at com.roku.dea.spark.streaming.FactDeviceLogsProcessor$.main(FactDeviceLogsProcessor.scala:133)
at com.roku.dea.spark.streaming.FactDeviceLogsProcessor.main(FactDeviceLogsProcessor.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:637)
17/01/31 21:23:56 INFO SparkContext: Invoking stop() from shutdown hook
Пара вопросов здесь:
- Почему s3 не поддерживается в качестве контрольной точки dir (обычная потоковая передача искры поддерживает это)? Что делает файловую систему "совместимой с HDFS"?
- Я использую HDFS эмпирически (так как кластеры могут постоянно подниматься или опускаться) и использую s3 в качестве места для сохранения всех данных - каковы будут рекомендации для хранения данных контрольных точек для структурированных потоковых данных в такой конфигурации?
5 ответов
Что делает FSFS HDFS "совместимой"? это файловая система с поведением, указанным в спецификации Hadoop FS. Здесь рассматривается различие между хранилищем объектов и FS, причем ключевой момент заключается в том, что "в конечном итоге согласованные хранилища объектов без добавления или O(1) атомарных переименований не соответствуют"
В частности, для S3
- Это не согласуется: после создания нового блоба команда list часто не показывает его. То же самое для удалений.
- Когда BLOB-объект перезаписывается или удаляется, удаление может занять некоторое время.
- Rename() осуществляется путем копирования, а затем удалить
Запустите потоковые контрольные точки, сохранив все в определенном месте, а затем переименовав его в каталог контрольных точек. Это делает время для контрольной точки пропорциональным времени для копирования данных в S3, что составляет ~6-10 МБ / с.
Текущий бит потокового кода не подходит для s3
А пока, сделай одно из
- контрольная точка в HDFS, а затем скопировать результаты
- контрольная точка для небольшого количества EBS, выделенного и подключенного к вашему кластеру
- контрольной точки на S3, но между контрольными точками имеется большой разрыв, так что время на контрольную точку не приводит к потере потокового приложения.
Если вы используете EMR, вы можете заплатить больше за согласованный динамический S3 на базе БД, что обеспечивает вам лучшую согласованность. Но время копирования остается прежним, поэтому контрольные точки будут такими же медленными
Это известная проблема: https://issues.apache.org/jira/browse/SPARK-19407
Должно быть исправлено в следующем выпуске. Вы можете установить файловую систему по умолчанию на s3, используя --conf spark.hadoop.fs.defaultFS=s3
как обходной путь.
Эта проблема исправлена в https://issues.apache.org/jira/browse/SPARK-19407.
Однако контрольная точка структурированного потока не работает хорошо в S3 из-за отсутствия возможной согласованности в S3. Не рекомендуется использовать S3 для контрольных точек https://issues.apache.org/jira/browse/SPARK-19013.
Майкл Армбурст сказал, что это не будет исправлено в Spark, и решение состоит в том, чтобы дождаться внедрения S3guard. S3Guard когда-нибудь далеко.
Да, если вы используете Spark Structured Streaming v3.x или выше. Сначала создайте
SparkSession
и добавить в его контекст конфиги S3.
val sparkSession = SparkSession
.builder()
.master(sparkMasterUrl)
.appName(appName)
.getOrCreate()
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", accessKey)
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", secretKey)
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", s3EndPoint)
Позже добавим
checkpointLocation
config с именем сегмента S3, в котором должны храниться контрольные точки.
val streamingQuery = streamingDF.writeStream
.option("checkpointLocation", "/s3://bucketName/checkpointDir")
.foreachBatch{(batchDF: DataFrame, batchId: Long) =>
// Transform and write batchDF
}.start()
streamingQuery.awaitTermination()
Вы можете использовать s3 для контрольной точки, но вы должны включить EMRFS, чтобы обеспечить согласованность s3.