Apache Spark flatMap сложность времени

Я пытался найти способ подсчитать, сколько раз наборы строк встречаются в базе данных транзакций (реализация алгоритма Apriori в распределенном режиме). Код у меня в настоящее время выглядит следующим образом:

val cand_br = sc.broadcast(cand)
transactions.flatMap(trans => freq(trans, cand_br.value))
            .reduceByKey(_ + _)            
}

def freq(trans: Set[String], cand: Array[Set[String]]) : Array[(Set[String],Int)] = {
    var res = ArrayBuffer[(Set[String],Int)]()
    for (c <- cand) {
        if (c.subsetOf(trans)) {
            res += ((c,1))
        }
    }
    return res.toArray
}

транзакции начинаются как RDD[Set[String]]и я пытаюсь преобразовать его в RDD[(K, V), с K каждый элемент в cand и V количество вхождений каждого элемента в cand в списке транзакций.

При просмотре производительности в пользовательском интерфейсе этап flatMap быстро занимает около 3 минут, в то время как для остальных требуется менее 1 мс.

transactions.count() ~= 88000 а также cand.length ~= 24000 для идеи данных, с которыми я имею дело. Я пробовал разные способы сохранения данных, но я вполне уверен, что это алгоритмическая проблема, с которой я столкнулся.

Есть ли более оптимальное решение для решения этой подзадачи?

PS: я довольно плохо знаком с фреймворком Scala / Spark, поэтому в этом коде могут быть странные конструкции

1 ответ

Решение

Вероятно, правильный вопрос, который нужно задать в этом случае: "Какова временная сложность этого алгоритма". Я думаю, что это очень не связано с операцией FlatMap в Spark.

Грубый анализ O-сложности

Дано 2 коллекции наборов размера m а также nэтот алгоритм считает, сколько элементов одной коллекции является подмножеством элементов другой коллекции, поэтому выглядит как сложность m x n, Если заглянуть на один уровень глубже, мы также увидим, что subsetOf является линейным числом элементов подмножества. x subSet y == x forAll yтак что на самом деле сложность m x n x s где s является количеством проверяемых подмножеств.

Другими словами, это flatMap У операции много работы.

Идти Параллельно

Теперь, возвращаясь к Spark, мы также можем заметить, что этот алгоритм смущающе параллелен, и мы можем использовать возможности Spark в своих интересах.

Чтобы сравнить некоторые подходы, я загрузил набор данных "retail" [1] и запустил алгоритм на val cand = transactions.filter(_.size<4).collect, Размер данных является близким соседом вопроса:

  • Transactions.count = 88162
  • Cand.size = 15451

Некоторые сравнительные прогоны в локальном режиме:

  • Вайнилла: 1,2 минуты
  • Увеличение transactions партиции до # ядер (8): 33 сек

Я также попробовал альтернативную реализацию, используя cartesian вместо flatmap:

 transactions.cartesian(candRDD).map{case (tx,cd) => (cd,if  (cd.subsetOf(tx)) 1 else 0)}.reduceByKey(_ + _).collect

Но это привело к гораздо более длинным пробегам, как видно в верхних 2 строках интерфейса Spark (декартовой и декартовой с большим числом разделов): 2,5 мин.

Учитывая, что у меня есть только 8 логических ядер, выход выше, что не помогает.

Санитарные проверки:

Есть ли добавленная "сложность времени Spark FlatMap"? Вероятно, некоторые, так как это включает сериализацию замыканий и распаковку коллекций, но незначительно по сравнению с выполняемой функцией.

Давайте посмотрим, сможем ли мы сделать лучше: я реализовал тот же алгоритм, используя простой scala:

val resLocal = reduByKey(transLocal.flatMap(trans => freq(trans, cand)))

Где reduceByKey операция является наивной реализацией, взятой из [2]. Время выполнения: 3,67 секунды. Искры дают вам параллелизм из коробки. Это подразумевается полностью последовательно и, следовательно, занимает больше времени для завершения.

Последняя проверка работоспособности: тривиальная операция с плоской картой:

transactions.flatMap(trans => Seq((trans, 1))).reduceByKey( _ + _).collect

Время выполнения: 0,88 с

Выводы:

Spark покупает вам параллелизм и кластеризацию, и этот алгоритм может воспользоваться этим. Используйте больше ядер и разделите входные данные соответственно. Там нет ничего плохого в flatmap, Приз за сложность времени достается функции внутри него.

Другие вопросы по тегам