Оптимизация Spark UDF для вставок Graph Database (Neo4j)

Это первая проблема, которую я публикую, поэтому извиняюсь, если мне не хватает информации и посредственного форматирования. Я могу обновить, если требуется.

Я постараюсь добавить как можно больше деталей. У меня не очень оптимизированный Spark Job, который преобразует данные RDBMS в узлы графа и отношения в Neo4j.

Сделать это. Вот шаги, за которыми я следую:

  1. создайте денормализованный фрейм данных "data" с помощью spark sql и join.
  2. Строка foreach в "data" запускает функцию graphInsert, которая выполняет следующие действия:

    а. читать содержимое строки
    б. сформулировать запрос шифрования neo4j (мы используем команду Merge, чтобы у нас был только один Город, например, Чикаго, созданный в Neo4j, когда Чикаго будет присутствовать в нескольких строках в таблице RDBMS)
    с. подключиться к neo4j
    д. выполнить запрос
    е. отключиться от neo4j

Вот список проблем, с которыми я сталкиваюсь.

  1. Вставки медленные.

Я знаю, что запрос слияния выполняется медленнее, чем создание, но есть ли другой способ сделать это вместо подключения и отключения для каждой записи? Это был мой первый набросок кода, и, возможно, я изо всех сил пытаюсь использовать одно соединение для вставки из нескольких потоков на разных рабочих узлах искры. Следовательно, подключение и отключение для каждой записи.

  1. Работа не масштабируема. Работает нормально только с 1 ядром. Как только я запускаю работу с 2-мя искровыми ядрами, я неожиданно получаю 2 города с одинаковыми именами, даже когда выполняю запросы на слияние. Например, есть 2 города Чикаго, которые нарушают использование Merge. Я предполагаю, что функции слияния что-то вроде "Создать, если не существует".

Я не знаю, если моя реализация неверна в neo4j части или искры. Если кто-нибудь может направить меня к какой-либо документации, которая поможет мне реализовать это в более широком масштабе, это будет полезно, поскольку у меня есть большой искровой кластер, который мне нужно использовать с полным потенциалом для этой работы.

Если вам интересно смотреть на код, а не на алгоритм. Вот реализация graphInsert в scala:

class GraphInsert extends Serializable{
   var case_attributes = new Array[String](4)
   var city_attributes = new Array[String](2)
   var location_attributes = new Array[String](20)
   var incident_attributes = new Array[String](20)
   val prop = new Properties()
   prop.load(getClass().getResourceAsStream("/GraphInsertConnection.properties"))
   // properties Neo4j
   val url_neo4j = prop.getProperty("url_neo4j")
   val neo4j_user = prop.getProperty("neo4j_user")
   val neo4j_password = prop.getProperty("neo4j_password")


   def graphInsert(data : Row){  
      val query = "MERGE (d:CITY {name:city_attributes(0)})\n" +"MERGE (a:CASE { " + case_attributes(0)  + ":'" +data(11) + "'," +case_attributes(1)  + ":'" +data(13)  + "'," +case_attributes(2)  + ":'" +data(14) +"'}) \n" +"MERGE (b:INCIDENT { " + incident_attributes(0)  + ":" +data(0) + "," +incident_attributes(1)  + ":" +data(2)  + "," +incident_attributes(2)  + ":'" +data(3) +  "'," +incident_attributes(3)  + ":'" +data(8)+  "'," +incident_attributes(4)  + ":" +data(5) +  "," +incident_attributes(5)  + ":'" +data(4) +  "'," +incident_attributes(6)  + ":'" +data(6) +  "'," +incident_attributes(7)  + ":'" +data(1) +  "'," +incident_attributes(8)  + ":" +data(7)+"}) \n" +"MERGE (c:LOCATION { " + location_attributes(0)  + ":" +data(9) + "," +location_attributes(1)  + ":" +data(10)  + "," +location_attributes(2)  + ":'" +data(19) +  "'," +location_attributes(3)  + ":'" +data(20)+  "'," +location_attributes(4)  + ":" +data(18) +  "," +location_attributes(5)  + ":" +data(21) +  "," +location_attributes(6)  + ":'" +data(17) +  "'," +location_attributes(7)  + ":" +data(22) +  "," +location_attributes(8)  + ":" +data(23)+"}) \n" +"MERGE (a) - [r1:"+relation_case_incident+"]->(b)-[r2:"+relation_incident_location+"]->(c)-[r3:belongs_to]->(d);"
              println(query)
              try{
                      var con = DriverManager.getConnection(url_neo4j, neo4j_user, neo4j_password)
                          var stmt = con.createStatement()
                          var rs = stmt.executeQuery(query)
                          con.close()
              }catch{
              case ex: SQLException =>{
                  println(ex.getMessage)
              }
              }
  } 

def operations(sqlContext: SQLContext){
    ....
    #Get 'data' before this step
    city_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_city").map(x =>x.getString(5)).collect()
    case_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_case_number").map(x =>x.getString(5)).collect()
    location_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_location").map(x =>x.getString(5)).collect()
    incident_attributes= entity_metadata.filter(entity_metadata("source_name") === "tb_incident").map(x =>x.getString(5)).collect()

    data.foreach(graphInsert)

}

object GraphObject {
  def main(args: Array[String]) {  
      val conf = new SparkConf()
        .setAppName("GraphNeo4j")
        .setMaster("xyz")
        .set("spark.cores.max","2")
        .set("spark.executor.memory","10g")

      Logger.getLogger("org").setLevel(Level.ERROR)
      Logger.getLogger("akka").setLevel(Level.ERROR)
      val sc = new SparkContext(conf)
      val sqlContext = new SQLContext(sc)
      val graph = new GraphInsert()
      graph.operations(sqlContext)

  }
}

2 ответа

Решение

Все, что вы пишете внутри замыкания, т.е. это должно быть выполнено в Worker, распространяется. Вы можете прочитать больше об этом здесь: http://spark.apache.org/docs/latest/programming-guide.html

И, как вы увеличиваете количество ядер, я думаю, что это не должно влиять на приложение, потому что, если вы не укажете это! тогда он принимает жадный подход! Я надеюсь, что этот документ поможет.

Я сделал улучшение процесса, но ничто не могло сделать это так быстро, как команда LOAD в Cypher. Надеюсь, это поможет кому-то, хотя: использовать foreachPartition вместо foreach дает значительный выигрыш при выполнении такого процесса. Также добавление периодического коммита с использованием шифра.

Другие вопросы по тегам