java.io.FileNotFoundException: не найдено, потому что://mybucket.myservicename/checkpoint/offsets
Я пытаюсь использовать Spark Structured Streaming 2.3 для чтения данных из Kafka (IBM Message Hub) и сохранения их в IBM Cloud Object Storage в 1.1 IBM Analytics Engine Cluster.
После создания кластера вставьте в него ssh:
$ ssh clsadmin@myclusterid.bi.services.eu-gb.bluemix.net
Создать jaas.conf
файл, необходимый для подключения Spark к Message Hub:
$ cat << EOF > jaas.conf
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="kafka"
username="<<MY_MESSAGEHUB_USERNAME>>"
password="<<MY_MESSAGEHUB_PASSWORD>>";
};
EOF
Это создаст файл jaas.conf
в /home/wce/clsadmin
каталог в кластере.
Создайте служебный скрипт для запуска оболочки spark (сейчас у нас только один исполнитель):
$ cat << EOF > start_spark.sh
spark-shell --master local[1] \
--files jaas.conf \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 \
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" \
--num-executors 1 --executor-cores 1
EOF
$ chmod +x start_spark.sh
Запустите сеанс spark, используя служебный скрипт:
$ ./start_spark.sh
Теперь внутри оболочки искры прочитайте поток Kafka (Message Hub). Убедитесь, что вы измените kafka.bootstrap.servers
в соответствии с вашими учетными данными:
val df = spark.readStream.
format("kafka").
option("kafka.bootstrap.servers", "kafka03-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka04-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka01-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka02-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka05-prod01.messagehub.services.eu-de.bluemix.net:9093").
option("subscribe", "transactions_load").
option("kafka.security.protocol", "SASL_SSL").
option("kafka.sasl.mechanism", "PLAIN").
option("kafka.ssl.protocol", "TLSv1.2").
option("kafka.ssl.enabled.protocols", "TLSv1.2").
load()
Мы можем проверить, что наше соединение работает нормально:
df.writeStream.format("console").start()
Через некоторое время вы должны увидеть некоторые данные, напечатанные на консоли, например,
-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+--------------------+-----------------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+--------------------+--------------------+-----------------+---------+------+--------------------+-------------+
|[35 34 30 33 36 3...|[7B 22 49 6E 76 6...|transactions_load| 7| 84874|2018-08-22 15:42:...| 0|
|[35 34 30 33 36 3...|[7B 22 49 6E 76 6...|transactions_load| 7| 84875|2018-08-22 15:42:...| 0|
|[35 34 30 38 33 3...|[7B 22 49 6E 76 6...|transactions_load| 7| 84876|2018-08-22 15:42:...| 0|
...
Настройте сеанс spark, чтобы он мог обращаться к экземпляру COS:
val accessKey = "MY_COS_ACCESS_KEY"
val secretKey = "MY_COS_SECRET_KEY"
val bucketName = "streamingdata"
// arbitrary name for refering to the cos settings from this code
val serviceName = "myservicename"
sc.hadoopConfiguration.set(s"fs.cos.${serviceName}.access.key", accessKey)
sc.hadoopConfiguration.set(s"fs.cos.${serviceName}.secret.key", secretKey)
sc.hadoopConfiguration.set(s"fs.cos.${serviceName}.endpoint", "s3.eu-geo.objectstorage.service.networklayer.com")
Мы можем проверить настройку COS, написав фиктивный файл:
import spark.implicits._
val data = sc.parallelize(Array(1,2,3,4,5))
data.toDF.write.format("csv").save(s"cos://${bucketName}.${serviceName}/data.txt")
spark.read.csv(s"cos://${bucketName}.${serviceName}/data.txt").collect()
Приведенный выше тест должен вывести что-то вроде следующего, если чтение и запись в COS прошла успешно:
res7: Array[org.apache.spark.sql.Row] = Array([1], [2], [3], [4], [5])
Теперь попробуйте записать потоковый фрейм данных в COS:
df.
writeStream.
format("parquet").
option("checkpointLocation", s"cos://${bucketName}.${serviceName}/checkpoint").
option("path", s"cos://${bucketName}.${serviceName}/data").
start()
Для меня это не так с:
scala> 18/08/22 15:43:06 WARN COSAPIClient: file status checkpoint/offsets returned 404
18/08/22 15:43:06 ERROR MicroBatchExecution: Query [id = 78c8c4af-f21d-457d-b5a7-56559e180634, runId = 50e8759e-0293-4fab-9b73-dd4811423b37] terminated with error
java.io.FileNotFoundException: Not found cos://streamingdata.myservicename/checkpoint/offsets
at com.ibm.stocator.fs.cos.COSAPIClient.getFileStatus(COSAPIClient.java:628)
at com.ibm.stocator.fs.ObjectStoreFileSystem.getFileStatus(ObjectStoreFileSystem.java:486)
at com.ibm.stocator.fs.ObjectStoreFileSystem.listStatus(ObjectStoreFileSystem.java:360)
at com.ibm.stocator.fs.ObjectStoreFileSystem.listStatus(ObjectStoreFileSystem.java:336)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileSystemManager.list(HDFSMetadataLog.scala:412)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:231)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:180)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:124)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Это проблема со Stocator или Spark Structured Streaming?
1 ответ
Переход на S3AFileSystem, похоже, решил проблему:
sc.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
sc.hadoopConfiguration.set("fs.s3a.access.key", accessKey)
sc.hadoopConfiguration.set("fs.s3a.secret.key", secretKey)
sc.hadoopConfiguration.set("fs.s3a.endpoint", "s3.eu-geo.objectstorage.service.networklayer.com")
val s3Url = s"s3a://${bucketName}/"
...
df.
writeStream.
format("parquet").
option("checkpointLocation", s"${s3Url}/checkpoint").
option("path", s"${s3Url}/data").
start()
Похоже, что эта проблема с драйвером Stocator.
ОБНОВЛЕНИЕ 23 августа 2018 года: эта проблема была исправлена в Stocator v1.0.24, но Stocator в IBM Analytics Engine еще не был обновлен до этой версии.