Плохая производительность с оконной функцией в потоковой работе

Я использую Spark 2.0.2, Kafka 0.10.1 и интеграцию spark-streaming-kafka-0-8. Я хочу сделать следующее:

Я извлекаю функции в потоковом задании из соединений NetFlow и затем применяю записи к модели k-средних. Некоторые функции являются простыми, которые рассчитываются непосредственно из записи. Но у меня также есть более сложные функции, которые зависят от записей из указанного временного окна раньше. Они подсчитывают, сколько подключений за последнюю секунду было к тому же хосту или сервису, что и текущий. Я решил использовать для этого оконные функции SQL.

Поэтому я строю спецификации окон:

val hostCountWindow = Window.partitionBy("plainrecord.ip_dst").orderBy(desc("timestamp")).rangeBetween(-1L, 0L)
val serviceCountWindow = Window.partitionBy("service").orderBy(desc("timestamp")).rangeBetween(-1L, 0L)

И функция, которая вызывается для извлечения этих функций в каждом пакете:

def extractTrafficFeatures(dataset: Dataset[Row]) = {
  dataset
    .withColumn("host_count", count(dataset("plainrecord.ip_dst")).over(hostCountWindow))
    .withColumn("srv_count", count(dataset("service")).over(serviceCountWindow))
}

И используйте эту функцию следующим образом

stream.map(...).map(...).foreachRDD { rdd =>
  val dataframe = rdd.toDF(featureHeaders: _*).transform(extractTrafficFeatures(_))
  ...
}

Проблема в том, что это имеет очень плохую производительность. Пакету требуется от 1 до 3 секунд для средней скорости ввода менее 100 записей в секунду. Я предполагаю, что это происходит от разбиения, которое производит много перетасовки?

Я пытался использовать RDD API и countByValueAndWindow(), Это кажется намного быстрее, но код выглядит намного лучше и чище с API DataFrame.

Есть ли лучший способ рассчитать эти функции на потоковых данных? Или я что-то здесь не так делаю?

1 ответ

Относительно низкой производительности следует ожидать здесь. Ваш код должен перемешивать и сортировать данные дважды, один раз для:

Window
  .partitionBy("plainrecord.ip_dst")
  .orderBy(desc("timestamp")).rangeBetween(-1L, 0L)

и один раз для:

Window
  .partitionBy("service")
  .orderBy(desc("timestamp")).rangeBetween(-1L, 0L)

Это окажет огромное влияние на время выполнения, и если это жесткие требования, вы не сможете добиться гораздо большего.

Другие вопросы по тегам