Spark структурированный поток: текущая партия отстает
Это кажется очень простой реализацией, но похоже, что есть некоторые проблемы.
Это задание считывает смещения (данные событий пользовательского интерфейса) из темы kafka, выполняет некоторую агрегацию и записывает ее в базу данных Aerospike.
В случае высокого трафика я начинаю видеть эту проблему, когда задание работает нормально, но новые данные не вставляются. Глядя на логи я вижу это ПРЕДУПРЕЖДАЮЩИЕ сообщения:
Текущая партия отстает. Интервал запуска составляет 30000 миллисекунд, но затрачено 43491 миллисекунд.
Пару раз задание возобновляет запись данных, но я вижу, что число отсчетов низкое, что указывает на некоторую потерю данных.
Вот код:
Dataset<Row> stream = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServersString)
.option("subscribe", newTopic)
.option("startingOffsets", "latest")
.option("enable.auto.commit", false)
.option("failOnDataLoss", false)
.load();
StreamingQuery query = stream
.writeStream()
.option("startingOffsets", "earliest")
.outputMode(OutputMode.Append())
.foreach(sink)
.trigger(Trigger.ProcessingTime(triggerInterval))
.queryName(queryName)
.start();
1 ответ
Возможно, вам придется иметь дело с maxOffsetsPerTrigger
настроить общее количество входных записей на пакет. В противном случае задержка в вашем приложении может привести к большему количеству записей в пакете, следовательно, он замедлит следующий пакет, в свою очередь, приведет к большему количеству задержек в следующих пакетах.
Пожалуйста, обратитесь к ссылке ниже для более подробной информации о конфигурации Kafka.
https://spark.apache.org/docs/2.4.0/structured-streaming-kafka-integration.html