Spark структурированный поток: текущая партия отстает

Это кажется очень простой реализацией, но похоже, что есть некоторые проблемы.

Это задание считывает смещения (данные событий пользовательского интерфейса) из темы kafka, выполняет некоторую агрегацию и записывает ее в базу данных Aerospike.

В случае высокого трафика я начинаю видеть эту проблему, когда задание работает нормально, но новые данные не вставляются. Глядя на логи я вижу это ПРЕДУПРЕЖДАЮЩИЕ сообщения:

Текущая партия отстает. Интервал запуска составляет 30000 миллисекунд, но затрачено 43491 миллисекунд.

Пару раз задание возобновляет запись данных, но я вижу, что число отсчетов низкое, что указывает на некоторую потерю данных.

Вот код:

Dataset<Row> stream = sparkSession.readStream()
          .format("kafka")
          .option("kafka.bootstrap.servers", kafkaBootstrapServersString)
          .option("subscribe", newTopic)
          .option("startingOffsets", "latest")
          .option("enable.auto.commit", false)
          .option("failOnDataLoss", false)
          .load();
StreamingQuery query = stream
        .writeStream()
        .option("startingOffsets", "earliest")
        .outputMode(OutputMode.Append())
        .foreach(sink)
        .trigger(Trigger.ProcessingTime(triggerInterval))
        .queryName(queryName)
        .start();

1 ответ

Возможно, вам придется иметь дело с maxOffsetsPerTrigger настроить общее количество входных записей на пакет. В противном случае задержка в вашем приложении может привести к большему количеству записей в пакете, следовательно, он замедлит следующий пакет, в свою очередь, приведет к большему количеству задержек в следующих пакетах.

Пожалуйста, обратитесь к ссылке ниже для более подробной информации о конфигурации Kafka.

https://spark.apache.org/docs/2.4.0/structured-streaming-kafka-integration.html

Другие вопросы по тегам