Плохая производительность с оконной функцией в потоковой работе
Я использую Spark 2.0.2, Kafka 0.10.1 и интеграцию spark-streaming-kafka-0-8. Я хочу сделать следующее:
Я извлекаю функции в потоковом задании из соединений NetFlow и затем применяю записи к модели k-средних. Некоторые функции являются простыми, которые рассчитываются непосредственно из записи. Но у меня также есть более сложные функции, которые зависят от записей из указанного временного окна раньше. Они подсчитывают, сколько подключений за последнюю секунду было к тому же хосту или сервису, что и текущий. Я решил использовать для этого оконные функции SQL.
Поэтому я строю спецификации окон:
val hostCountWindow = Window.partitionBy("plainrecord.ip_dst").orderBy(desc("timestamp")).rangeBetween(-1L, 0L)
val serviceCountWindow = Window.partitionBy("service").orderBy(desc("timestamp")).rangeBetween(-1L, 0L)
И функция, которая вызывается для извлечения этих функций в каждом пакете:
def extractTrafficFeatures(dataset: Dataset[Row]) = {
dataset
.withColumn("host_count", count(dataset("plainrecord.ip_dst")).over(hostCountWindow))
.withColumn("srv_count", count(dataset("service")).over(serviceCountWindow))
}
И используйте эту функцию следующим образом
stream.map(...).map(...).foreachRDD { rdd =>
val dataframe = rdd.toDF(featureHeaders: _*).transform(extractTrafficFeatures(_))
...
}
Проблема в том, что это имеет очень плохую производительность. Пакету требуется от 1 до 3 секунд для средней скорости ввода менее 100 записей в секунду. Я предполагаю, что это происходит от разбиения, которое производит много перетасовки?
Я пытался использовать RDD API и countByValueAndWindow()
, Это кажется намного быстрее, но код выглядит намного лучше и чище с API DataFrame.
Есть ли лучший способ рассчитать эти функции на потоковых данных? Или я что-то здесь не так делаю?
1 ответ
Относительно низкой производительности следует ожидать здесь. Ваш код должен перемешивать и сортировать данные дважды, один раз для:
Window
.partitionBy("plainrecord.ip_dst")
.orderBy(desc("timestamp")).rangeBetween(-1L, 0L)
и один раз для:
Window
.partitionBy("service")
.orderBy(desc("timestamp")).rangeBetween(-1L, 0L)
Это окажет огромное влияние на время выполнения, и если это жесткие требования, вы не сможете добиться гораздо большего.