Есть ли хороший способ присоединиться к потоку в искре с пеленальным столом?
Наша среда Spark: DataBricks 4.2 (включает Apache Spark 2.3.1, Scala 2.11)
Чего мы пытаемся достичь: мы хотим обогатить потоковые данные некоторыми справочными данными, которые регулярно обновляются. Обогащение осуществляется путем объединения потока со справочными данными.
Что мы реализовали: Мы реализовали две спарк-работы (jars): Первая - это обновление таблицы Spark TEST_TABLE каждый час (назовем ее "справочные данные") с использованием.write.mode(SaveMode.Overwrite).saveAsTable("TEST_TABLE") И после этого вызывается spark.catalog.refreshTable("TEST_TABLE")
Второе задание (назовем это потоковыми данными) - использование структурированной потоковой передачи Spark для потокового чтения некоторых данных, объединение их с помощью DataFrame.transform() с таблицей TEST_TABLE и запись их в другую систему. Мы читаем справочные данные, используя spark.read.table("TEST_TABLE") в функции, вызываемой.transform (), поэтому мы получаем самые последние значения в таблице. К сожалению, второе приложение вылетает каждый раз, когда первое приложение обновляет таблицу. Следующее сообщение отображается в выходных данных Log4j:
18/08/23 10:34:40 WARN TaskSetManager: Lost task 0.0 in stage 547.0 (TID 5599, 10.139.64.9, executor 0): java.io.FileNotFoundException: dbfs:/user/hive/warehouse/code.db/TEST_TABLE/ part-00000-tid-5184425276562097398-25a0e542-41e4-416f-bae8-469899a72c21-36-c000.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readFile(FileScanRDD.scala:203)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$createNextIterator(FileScanRDD.scala:377)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:295)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:291)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748
Мы также пытались сделать кеш недействительным до того, как прочитали таблицу, но это снизило производительность, и приложение все равно рухнуло. Мы подозреваем, что корневым курсом является ленивая оценка эталонного набора данных (который все еще "указывает" на старые данные, которых больше нет).
Есть ли у вас какие-либо предложения о том, что мы могли бы сделать, чтобы предотвратить эту проблему, или как лучше всего объединить поток с динамическими справочными данными?
2 ответа
Присоединиться к справочной информации; не кэшируйте его, это гарантирует, что вы перейдете к источнику. Найдите данные последней версии, которые обозначены первичным ключом + счетчиком, где этот счетчик наиболее близок или равен счетчику, поддерживаемому в потоковом приложении. Каждый час пишите, добавляйте все данные ref, все еще текущие, снова, но с увеличенным счетчиком; т.е. новая версия. Используйте паркет здесь.
Вместо того, чтобы присоединиться к столу и потоку. Вы можете воспользоваться новой функцией, доступной в версии 2.3.1, то есть объединением двух потоков данных. Создайте поток вместо таблицы с водяным знаком.
Watermarks: Watermarking in Structured Streaming is a way to limit state in all
stateful streaming operations by specifying how much late data to consider.
Specifically, a watermark is a moving threshold in event-time that trails behind the
maximum event-time seen by the query in the processed data. The trailing gap (aka
watermark delay) defines how long should the engine wait for late data to arrive and
is specified in the query using withWatermark.