Функция Networkx не работает в Pyspark

Я пытаюсь сделать так, чтобы графовая функция networkx могла работать в распределенной среде pyspark, но я понятия не имею, как преобразовать ее в RDD или другую выполняемую функцию networkx в распределенной среде spark.

Вот часть моего кода на Python для реализации алгоритма CPM(Clique Percolation Method). Чтобы связать каждую клику с ребрами в графе сообщества, я пишу вложенный цикл для последовательного добавления ребер в клики.

graph = nx.Graph()
cliques = [frozenset(c) for c in nx.find_cliques(G) if len(c) >= k]
graph.add_nodes_from(cliques)

# Find the clique that each node belongs to
member = defaultdict(list)
for clique in cliques:
    for node in clique:
        member[node].append(clique)

# Check the adjacent cliques for each clique
clqALL = lambda cliques: (clique for clique in cliques)
clqADJ = lambda clique: (add_graph_edges(graph, k, clique, adj_clique)
                         for adj_clique in get_adj_cliques(clique, member))
rdd = sc.parallelize(cliques).flatMap(clqALL).flatMap(clqADJ)
rdd.collect()

def add_graph_edges(graph, k, clique, adj_clique):
if clique.intersection(adj_clique).__len__() >= (k - 1):
    graph.add_edge(clique, adj_clique)

def get_adj_cliques(clique, member):
    adjacent_cliques = set()
    for n in clique:
        for adj_clq in member[n]:
            if clique != adj_clq:
                adjacent_cliques.add(adj_clq)
    return adjacent_cliques

Вышеприведенный код выполнялся нормально, однако в графе не было добавлено никаких ребер.

Почему graph.add_edge() не работает?

0 ответов

Другие вопросы по тегам