Добавление новой вершины в граф в Spark с помощью Scala

Я использую Spark в Scala. Я хочу создать график и динамически обновлять график.

Я сделал это с помощью следующего кода:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

object firstgraph {
  def addVertex(
    sc: SparkContext,
    vertexRDD: RDD[(Long(String,Int))],
    name: String,
    age: Int,
    counter:Long): RDD[(Long, (String, Int))] = {
    val newVertexArray = Array((counter, (name, age)))
    val newVertexRdd: RDD[(Long, (String, Int))] = sc.parallelize(newVertexArray)
    newVertexRdd ++ vertexRDD
  }

  def main(args: Array[String]) {
    val conf = new SparkConf().setMaster("local").setAppName("firstgraph")
    val sc = new SparkContext(conf)

    val vertexArray = Array(
      (1L, ("Alice", 28)),
      (2L, ("Bob", 27)),
      (3L, ("Charlie", 65)),
      (4L, ("David", 42)),
      (5L, ("Ed", 55)),
      (6L, ("Fran", 50)))

    val edgeArray = Array(
      Edge(2L, 1L, 7),
      Edge(2L, 4L, 2),
      Edge(3L, 2L, 4),
      Edge(3L, 6L, 3),
      Edge(4L, 1L, 1),
      Edge(5L, 2L, 2),
      Edge(5L, 3L, 8),
      Edge(5L, 6L, 3))

    var vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)
    var edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
    var graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)
    graph.vertices.filter { case (id, (name, age)) => age > 30 }.collect.foreach {
      case (id, (name, age)) => println(s"$name is $age")
    }
    var x = 0
    var counter = 7L
    var name = ""
    var age = 0
    while (x == 0) {
      println("Enter Name")
      name = Console.readLine
      println("Enter age")
      age = Console.readInt
      vertexRDD = addVertex(sc, vertexRDD, name, age, counter)
      graph = Graph(vertexRDD, edgeRDD)
      graph.vertices.filter { case (id, (name, age)) => age > 30 }.collect.foreach {
        case (id, (name, age)) => println(s"$name is $age")
      }
      counter = counter + 1
      println("want to enter more node press 0 for yes and 1 for no ")
      x = Console.readInt
    }
  }
}

Эта программа добавляет новую вершину в граф, но вычисляет граф снова и снова всякий раз, когда вставляется новая вершина. Я хочу сделать это без пересчета графика.

2 ответа

СДР Apache Spark не предназначены для детальных обновлений. Все операции на СДР касаются изменения всего СДР.

Сначала я бы порекомендовал вам переосмыслить свой подход и попытаться использовать СДР по мере их разработки. Например, многие распространенные алгоритмы предназначены для работы на одной машине. Как Quicksort. Вы не можете реализовать Quicksort на RDD без изменений, меняя местами только два элемента на каждом шаге. Потеря потенциала распределенной системы для параллельного выполнения многих задач. Вместо этого вам нужно изменить дизайн алгоритма, чтобы воспользоваться преимуществами параллелизма.

Это может быть неприменимо к вашему случаю, и вам, возможно, действительно потребуется внести точечные обновления, как, например, в вашем примере. В этом случае вам, вероятно, лучше использовать другой бэкэнд. HBase и Cassandra предназначены для точечных обновлений, как и все остальные базы данных SQL и без SQL. Так же как и Neo4j, если вам нужна графическая база данных.

Но последнее, что нужно проверить перед выходом из Spark - это IndexedRDD. Это своего рода RDD, который предназначен для точечных обновлений. Он родился как часть GraphX, так что он может хорошо подойти для вашего случая.

Пожалуйста, попробуйте следующий код для добавления группы вершин в существующий граф. Вот inputGraph - это мой существующий граф, который предопределен как глобальная переменная и создан ранее с помощью какой-либо другой функции. Этот кусок кода только добавляет вершины к этому. Здесь переменная rdd - это моя коллекция, значение которой конвертируется в Long и используется в качестве идентификатора вершины и добавляется в граф.

def addVertex(rdd: RDD[String], sc: SparkContext, session: String): Long = {
val defaultUser = (0, 0)
rdd.collect().foreach { x =>
  {
    val aVertex: RDD[(VertexId, (Int, Int))] = sc.parallelize(Array((x.toLong, (100, 100))))
    gVertices = gVertices.union(aVertex)
  }
}
inputGraph = Graph(gVertices, gEdges, defaultUser)
inputGraph.cache()
gVertices = inputGraph.vertices
gVertices.cache()
val count = gVertices.count
println(count);

return 1;

}

Другие вопросы по тегам