Ошибка в структурированной потоковой передаче Spark с источником файла и приемником файла
Моя команда сейчас вступает в сферу структурированного потокового вещания. Я относительно новичок в структурированном потоке.
У меня есть требование с
Источник - CSV
Раковина - JSON
Подробности Env:
Кластер: Искра 2.2.1
Язык программирования: Scala
Инструмент для сборки: Gradle
Объем:
Я реализовал этот простой код
val schema = StructType(
Array(StructField("customer_id", StringType),
StructField("name", StringType),
StructField("pid", StringType),
StructField("product_name", StringType)))
val fileData = spark.readStream
.option("header", "true")
.schema(schema)
.csv(args(0))
Затем я применяю простую агрегацию как
// The actual business logic is more complex than this
val customerCount = fileData.groupBy("customer_id").count()
Наконец, напишите в JSON
val query = customerCount
.writeStream
.format("json")
.option("path", "src/main/resources/output/myjson_dir")
.option("checkpointLocation", "src/main/resources/chkpoint_dir")
.trigger(Trigger.ProcessingTime("10 seconds"))
.start()
Вопросы:
- Это работает, как и ожидалось, когда я использую
.format("console")
, Но это исключение, когда я использую.format("json")
-
Исключение в потоке "main" org.apache.spark.sql.AnalysisException: добавление режима вывода не поддерживается при потоковых агрегатах потоковых DataFrames/DataSets без водяного знака;; Aggregate [customer_id#0], [customer_id#0, count(1) AS count#18L] +- Источник данных StreamingRelation (org.apache.spark.sql.SparkSession@4b56b031,csv,List(),Some(StructType(StructField(customer_id,StringType,true), StructField(имя,StringType,true), StructField(product_id,StringType,true), StructField(product_name,StringType,true))),List(), Нет, Карта (заголовок -> true, путь -> /Users/Underwood/Documents/workspace/Spark_Streaming_Examples/src/main/resources/input), Нет), FileSource[/Users/Underwood/Documents/workspace/Spark_Streaming_Examples/src/main/resources/input], [customer_id#0, имя № 1, идентификатор продукта № 2, имя продукта № 3, дата № 4] в org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$ катализатор анализ $UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
Я пробовал это другие комбинации outputMode = "update"
а также outputMode = "complete"
, Но эти ошибки также выбрасывают. Почему это так? Это ожидаемое поведение? Как мне записать вывод в приемник JSON?
Вышеуказанное исключение говорит об использовании водяных знаков. AFAIK, водяные знаки используются с полем Timestamp, но в моих входных данных у меня нет поля timestamp или date. Пожалуйста, дайте мне знать, если я ошибаюсь здесь. Как добавление водяного знака будет иметь значение здесь?
Моей следующей попыткой было написать собственный ForEachSink. Я ссылался на этот пост. Но это мне тоже не помогло. Проблема была в том, что я получал 200 каталогов с 0-байтовым файлом в каждом.
Как выбрать не группировать по столбцам в окончательном выводе? В простой пакетной обработке я обычно достигаю этого, объединяя агрегированный DF с исходным DF и выбирая необходимые строки. Но структурированному потоковому вещанию, похоже, не нравится этот подход. Вот мой пример кода
val customerCount = fileData.groupBy("customer_id").count() val finalDF = fileData.join(customerCount, Seq("customer_id")) .select("customer_id", "count", "product_name" )
Пожалуйста, дайте мне знать, если я пропустил какие-либо детали.
1 ответ
Прочитайте официальную документацию Spark Structured Streaming, связанную с водяными знаками.
В основном, когда вы агрегируете, вы должны установить outputMode = "complete"
потому что не имеет смысла добавлять новые данные, не сохраняя в памяти ранее выполненную обработку (например, количество слов).
Из-за этого вы должны указать, используя водяной знак или оконную функцию, когда программа должна начать новую агрегацию и когда данные будут слишком запоздалыми.
Если у вас нет столбца с отметкой времени, вы можете создать его, используя now()
функция, и это будет время обработки.
Если есть что-то неясное или есть вопросы, прокомментируйте, и я обновлю свой ответ.