Как найти количество вершин, которые достижимы из данной вершины в Spark GraphX
Я хочу узнать количество достижимых вершин из данной вершины в ориентированном графе (см. Изображение ниже), например, для id=0L, так как он соединяется с 1L и 2L, 1L соединяется с 3L, 2L соединяется с 4L, следовательно, вывод должен быть 4. Ниже приведены данные отношения графика:
edgeid from to distance
0 0 1 10.0
1 0 2 5.0
2 1 2 2.0
3 1 3 1.0
4 2 1 3.0
5 2 3 9.0
6 2 4 2.0
7 3 4 4.0
8 4 0 7.0
9 4 3 5.0
Мне удалось настроить график, но я не уверен, как использовать graph.edges.filter для получения выходных данных
val vertexRDD: RDD[(Long, (Double))] = sc.parallelize(vertexArray)
val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
val graph: Graph[(Double), Int] = Graph(vertexRDD, edgeRDD)
1 ответ
В вашем примере все вершины связаны с направленным путем, поэтому каждая вершина должна иметь значение 4.
Но если вы удалите соединение 4-> 0 (id = 8), конечно, будет другое число.
Поскольку ваша проблема основана на (рекурсивном) обходе графа параллельно, API Graphx Pregel, вероятно, является лучшим подходом.
Прегелевый вызов принимает 3 функции
vprog
инициализировать каждую вершину сообщением (в вашем случае пустымList[VertexId]
)sendMsg
шаг обновления, который применяется к каждой итерации (в вашем случае накопление соседнегоVertexId
и возвращаяIterator
с сообщениями для отправки на следующую итерациюmergeMsg
объединить два сообщения (2List[VertexId]
с в 1)
В коде это будет выглядеть так:
def vprog(id: VertexId, orig: List[VertexId], newly: List[VertexId]) : List[VertexId] = newly
def mergeMsg(a: List[VertexId], b: List[VertexId]) : List[VertexId] = (a ++ b).distinct
def sendMsg(trip: EdgeTriplet[List[VertexId],Double]) : Iterator[(VertexId, List[VertexId])] = {
val recursivelyConnectedNeighbors = (trip.dstId :: trip.dstAttr).filterNot(_ == trip.srcId)
if (trip.srcAttr.intersect(recursivelyConnectedNeighbors).length != recursivelyConnectedNeighbors.length)
Iterator((trip.srcId, recursivelyConnectedNeighbors))
else
Iterator.empty
}
val initList = List.empty[VertexId]
val result = graph
.mapVertices((_,_) => initList)
.pregel(
initialMsg = initList,
activeDirection = EdgeDirection.Out
)(vprog, sendMsg, mergeMsg)
.mapVertices((_, neighbors) => neighbors.length)
result.vertices.toDF("vertex", "value").show()
Выход:
+------+-----+
|vertex|value|
+------+-----+
| 0| 4|
| 1| 3|
| 2| 3|
| 3| 1|
| 4| 1|
+------+-----+
Обязательно поэкспериментируйте с spark.graphx.pregel.checkpointInterval
если вы получаете обход больших графиков OoM (или настройка maxIterations
в pregel init)