Apache Spark flatMap сложность времени
Я пытался найти способ подсчитать, сколько раз наборы строк встречаются в базе данных транзакций (реализация алгоритма Apriori в распределенном режиме). Код у меня в настоящее время выглядит следующим образом:
val cand_br = sc.broadcast(cand)
transactions.flatMap(trans => freq(trans, cand_br.value))
.reduceByKey(_ + _)
}
def freq(trans: Set[String], cand: Array[Set[String]]) : Array[(Set[String],Int)] = {
var res = ArrayBuffer[(Set[String],Int)]()
for (c <- cand) {
if (c.subsetOf(trans)) {
res += ((c,1))
}
}
return res.toArray
}
транзакции начинаются как RDD[Set[String]]
и я пытаюсь преобразовать его в RDD[(K, V)
, с K каждый элемент в cand
и V количество вхождений каждого элемента в cand
в списке транзакций.
При просмотре производительности в пользовательском интерфейсе этап flatMap быстро занимает около 3 минут, в то время как для остальных требуется менее 1 мс.
transactions.count() ~= 88000
а также cand.length ~= 24000
для идеи данных, с которыми я имею дело. Я пробовал разные способы сохранения данных, но я вполне уверен, что это алгоритмическая проблема, с которой я столкнулся.
Есть ли более оптимальное решение для решения этой подзадачи?
PS: я довольно плохо знаком с фреймворком Scala / Spark, поэтому в этом коде могут быть странные конструкции
1 ответ
Вероятно, правильный вопрос, который нужно задать в этом случае: "Какова временная сложность этого алгоритма". Я думаю, что это очень не связано с операцией FlatMap в Spark.
Грубый анализ O-сложности
Дано 2 коллекции наборов размера m
а также n
этот алгоритм считает, сколько элементов одной коллекции является подмножеством элементов другой коллекции, поэтому выглядит как сложность m x n
, Если заглянуть на один уровень глубже, мы также увидим, что subsetOf является линейным числом элементов подмножества. x subSet y
== x forAll y
так что на самом деле сложность m x n x s
где s
является количеством проверяемых подмножеств.
Другими словами, это flatMap
У операции много работы.
Идти Параллельно
Теперь, возвращаясь к Spark, мы также можем заметить, что этот алгоритм смущающе параллелен, и мы можем использовать возможности Spark в своих интересах.
Чтобы сравнить некоторые подходы, я загрузил набор данных "retail" [1] и запустил алгоритм на val cand = transactions.filter(_.size<4).collect
, Размер данных является близким соседом вопроса:
- Transactions.count = 88162
- Cand.size = 15451
Некоторые сравнительные прогоны в локальном режиме:
- Вайнилла: 1,2 минуты
- Увеличение
transactions
партиции до # ядер (8): 33 сек
Я также попробовал альтернативную реализацию, используя cartesian
вместо flatmap
:
transactions.cartesian(candRDD).map{case (tx,cd) => (cd,if (cd.subsetOf(tx)) 1 else 0)}.reduceByKey(_ + _).collect
Но это привело к гораздо более длинным пробегам, как видно в верхних 2 строках интерфейса Spark (декартовой и декартовой с большим числом разделов): 2,5 мин.
Учитывая, что у меня есть только 8 логических ядер, выход выше, что не помогает.
Санитарные проверки:
Есть ли добавленная "сложность времени Spark FlatMap"? Вероятно, некоторые, так как это включает сериализацию замыканий и распаковку коллекций, но незначительно по сравнению с выполняемой функцией.
Давайте посмотрим, сможем ли мы сделать лучше: я реализовал тот же алгоритм, используя простой scala:
val resLocal = reduByKey(transLocal.flatMap(trans => freq(trans, cand)))
Где reduceByKey
операция является наивной реализацией, взятой из [2]. Время выполнения: 3,67 секунды. Искры дают вам параллелизм из коробки. Это подразумевается полностью последовательно и, следовательно, занимает больше времени для завершения.
Последняя проверка работоспособности: тривиальная операция с плоской картой:
transactions.flatMap(trans => Seq((trans, 1))).reduceByKey( _ + _).collect
Время выполнения: 0,88 с
Выводы:
Spark покупает вам параллелизм и кластеризацию, и этот алгоритм может воспользоваться этим. Используйте больше ядер и разделите входные данные соответственно. Там нет ничего плохого в flatmap
, Приз за сложность времени достается функции внутри него.