Передача потокового RDD Spark в Neo4j -Scala

Мне нужно установить соединение между Spark Streaming и графической базой данных Neo4j. СДР имеют тип ((есть, я),(есть, привет)(sam, счастлив)....). Мне нужно установить грань между каждой парой слов в Neo4j.

В документации Spark Streaming я нашел

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

to the push to the data to an external database.

Я делаю это в Скала. Я немного запутался в том, что делать? Я нашел AnormCypher и Neo4jScala обертку. Могу ли я использовать их для выполнения работы? Если так, как я могу это сделать? Если нет, то есть ли лучшие альтернативы?

Спасибо вам всем....

2 ответа

Решение

Я провел эксперимент с AnormCypher

Как это:

implicit val connection = Neo4jREST.setServer("localhost", 7474, "/db/data/")
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(FILE, 4).cache()
val count = logData
  .flatMap( _.split(" "))
  .map( w =>
    Cypher("CREATE(:Word {text:{text}})")
      .on( "text" -> w ).execute()
   ).filter( _ ).count()

Neo4j 2.2.x обладает отличной производительностью одновременной записи, которую вы можете использовать в Spark. Таким образом, чем больше параллельных потоков вы можете написать в Neo4j, тем лучше. Если вы можете выполнять пакетные операторы партиями от 100 до 1000 для каждого запроса, то даже лучше.

Взгляните на MazeRunner ( http://www.kennybastani.com/2014/11/using-apache-spark-and-neo4j-for-big.html), так как он даст вам некоторые идеи.

Другие вопросы по тегам