Ошибка в структурированной потоковой передаче Spark с источником файла и приемником файла

Моя команда сейчас вступает в сферу структурированного потокового вещания. Я относительно новичок в структурированном потоке.

У меня есть требование с

Источник - CSV
Раковина - JSON

Подробности Env:

Кластер: Искра 2.2.1
Язык программирования: Scala
Инструмент для сборки: Gradle

Объем:

Я реализовал этот простой код

val schema = StructType(
    Array(StructField("customer_id", StringType),
        StructField("name", StringType),
        StructField("pid", StringType),
        StructField("product_name", StringType)))

val fileData = spark.readStream
    .option("header", "true")
    .schema(schema)
    .csv(args(0))

Затем я применяю простую агрегацию как

// The actual business logic is more complex than this
val customerCount = fileData.groupBy("customer_id").count()

Наконец, напишите в JSON

val query = customerCount
    .writeStream
    .format("json")
    .option("path", "src/main/resources/output/myjson_dir")
    .option("checkpointLocation", "src/main/resources/chkpoint_dir")
    .trigger(Trigger.ProcessingTime("10 seconds"))
    .start()

Вопросы:

  1. Это работает, как и ожидалось, когда я использую .format("console"), Но это исключение, когда я использую .format("json") -

Исключение в потоке "main" org.apache.spark.sql.AnalysisException: добавление режима вывода не поддерживается при потоковых агрегатах потоковых DataFrames/DataSets без водяного знака;; Aggregate [customer_id#0], [customer_id#0, count(1) AS count#18L] +- Источник данных StreamingRelation (org.apache.spark.sql.SparkSession@4b56b031,csv,List(),Some(StructType(StructField(customer_id,StringType,true), StructField(имя,StringType,true), StructField(product_id,StringType,true), StructField(product_name,StringType,true))),List(), Нет, Карта (заголовок -> true, путь -> /Users/Underwood/Documents/workspace/Spark_Streaming_Examples/src/main/resources/input), Нет), FileSource[/Users/Underwood/Documents/workspace/Spark_Streaming_Examples/src/main/resources/input], [customer_id#0, имя № 1, идентификатор продукта № 2, имя продукта № 3, дата № 4] в org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$ катализатор анализ $UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)

Я пробовал это другие комбинации outputMode = "update" а также outputMode = "complete", Но эти ошибки также выбрасывают. Почему это так? Это ожидаемое поведение? Как мне записать вывод в приемник JSON?

  1. Вышеуказанное исключение говорит об использовании водяных знаков. AFAIK, водяные знаки используются с полем Timestamp, но в моих входных данных у меня нет поля timestamp или date. Пожалуйста, дайте мне знать, если я ошибаюсь здесь. Как добавление водяного знака будет иметь значение здесь?

  2. Моей следующей попыткой было написать собственный ForEachSink. Я ссылался на этот пост. Но это мне тоже не помогло. Проблема была в том, что я получал 200 каталогов с 0-байтовым файлом в каждом.

  3. Как выбрать не группировать по столбцам в окончательном выводе? В простой пакетной обработке я обычно достигаю этого, объединяя агрегированный DF с исходным DF и выбирая необходимые строки. Но структурированному потоковому вещанию, похоже, не нравится этот подход. Вот мой пример кода

    val customerCount = fileData.groupBy("customer_id").count()
    val finalDF = fileData.join(customerCount, Seq("customer_id"))
        .select("customer_id", "count", "product_name" )
    

Пожалуйста, дайте мне знать, если я пропустил какие-либо детали.

1 ответ

Прочитайте официальную документацию Spark Structured Streaming, связанную с водяными знаками.

В основном, когда вы агрегируете, вы должны установить outputMode = "complete"потому что не имеет смысла добавлять новые данные, не сохраняя в памяти ранее выполненную обработку (например, количество слов).

Из-за этого вы должны указать, используя водяной знак или оконную функцию, когда программа должна начать новую агрегацию и когда данные будут слишком запоздалыми.

Если у вас нет столбца с отметкой времени, вы можете создать его, используя now() функция, и это будет время обработки.

Если есть что-то неясное или есть вопросы, прокомментируйте, и я обновлю свой ответ.

Другие вопросы по тегам