Spark - GraphX - масштабирование подключенных компонентов
Я пытаюсь использовать подключенные компоненты, но у меня проблема с масштабированием. Мой вот что у меня есть -
// get vertices
val vertices = stage_2.flatMap(x => GraphUtil.getVertices(x)).cache
// get edges
val edges = stage_2.map(x => GraphUtil.getEdges(x)).filter(_ != null).flatMap(x => x).cache
// create graph
val identityGraph = Graph(vertices, edges)
// get connected components
val cc = identityGraph.connectedComponents.vertices
Где, GraphUtil имеет вспомогательные функции для возврата вершин и ребер. На данный момент у моего графа ~1 миллион узлов и ~2 миллиона ребер (кстати, ожидается, что он вырастет до ~100 миллионов узлов). Мой график довольно слабо связан - поэтому я ожидаю множество маленьких графиков.
Когда я запускаю выше, я продолжаю получать java.lang.OutOfMemoryError: Java heap space
, Я пробовал с executor-memory 32g
и запуск кластера из 15 узлов с 45g в качестве размера контейнера пряжи.
Вот деталь исключения:
16/10/26 10:32:26 ERROR util.Utils: uncaught error in thread SparkListenerBus, stopping SparkContext
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOfRange(Arrays.java:2694)
at java.lang.String.<init>(String.java:203)
at java.lang.StringBuilder.toString(StringBuilder.java:405)
at com.fasterxml.jackson.core.util.TextBuffer.contentsAsString(TextBuffer.java:360)
at com.fasterxml.jackson.core.io.SegmentedStringWriter.getAndClear(SegmentedStringWriter.java:98)
at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:2216)
at org.json4s.jackson.JsonMethods$class.compact(JsonMethods.scala:32)
at org.json4s.jackson.JsonMethods$.compact(JsonMethods.scala:44)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:146)
at org.apache.spark.scheduler.EventLoggingListener$$anonfun$logEvent$1.apply(EventLoggingListener.scala:146)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:146)
at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:173)
at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:34)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:55)
at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:37)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(AsynchronousListenerBus.scala:80)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(AsynchronousListenerBus.scala:65)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(AsynchronousListenerBus.scala:64)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1181)
at org.apache.spark.util.AsynchronousListenerBus$$anon$1.run(AsynchronousListenerBus.scala:63)
Кроме того, я получаю множество следующих журналов:
16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 320 is 263 bytes
16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 321 is 268 bytes
16/10/26 10:30:32 INFO spark.MapOutputTrackerMaster: Size of output statuses for shuffle 322 is 264 bytes
Мой вопрос: кто-нибудь пробовал подключенные компоненты в таком масштабе? Если да, что я делаю не так?
2 ответа
Как я писал выше в комментариях, я реализовал подключенный компонент, используя карту / уменьшение на Spark. Вы можете найти более подробную информацию здесь - https://www.linkedin.com/pulse/connected-component-using-map-reduce-apache-spark-shirish-kumar и исходный код в соответствии с лицензией MIT здесь - https://github.com/kwartile/connected-component.
Алгоритм связанного компонента не очень хорошо масштабируется, и его производительность во многом зависит от топологии вашего графа. Разреженность ваших краев не означает, что у вас есть маленькие компоненты. Длинная цепочка ребер очень разрежена (число ребер = количество вершин - 1), но алгоритм грубой силы, реализованный в GraphX, не будет очень эффективным (см. Источник cc и pregel).
Вот что вы можете попробовать (отсортировано, только код):
- Проверьте свои вершины и ребра в паркете (на диске), затем снова загрузите их, чтобы построить график. Кэширование иногда просто не сокращает его, когда ваш план выполнения становится слишком большим.
- Преобразуйте ваш график таким образом, чтобы результат алгоритма не изменился. Например, вы можете видеть в коде, что алгоритм распространяет информацию в обоих направлениях (как и должно быть, по умолчанию). Поэтому, если у вас есть несколько ребер, соединяющих одни и те же две вершины, отфильтруйте их от своего графа, к которому вы применяете алгоритм.
- Оптимизируйте код GraphX самостоятельно (это действительно довольно просто), используя либо общую оптимизацию, экономящую память (т. Е. Контрольные точки на диске при каждой итерации, чтобы избежать OOM), либо специфическую для домена оптимизацию (аналогично пункту 2)
Если вы можете оставить GraphX (который становится несколько устаревшим), вы можете рассмотреть GraphFrames ( пакет, блог). Я никогда не пробовал, поэтому я не знаю, есть ли у него CC.
Я уверен, что вы можете найти другие возможности среди пакетов spark, но, возможно, вам даже захочется использовать что-то за пределами Spark. Но это выходит за рамки вопроса.
Удачи!