Передача потокового RDD Spark в Neo4j -Scala
Мне нужно установить соединение между Spark Streaming и графической базой данных Neo4j. СДР имеют тип ((есть, я),(есть, привет)(sam, счастлив)....). Мне нужно установить грань между каждой парой слов в Neo4j.
В документации Spark Streaming я нашел
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
to the push to the data to an external database.
Я делаю это в Скала. Я немного запутался в том, что делать? Я нашел AnormCypher и Neo4jScala обертку. Могу ли я использовать их для выполнения работы? Если так, как я могу это сделать? Если нет, то есть ли лучшие альтернативы?
Спасибо вам всем....
2 ответа
Я провел эксперимент с AnormCypher
Как это:
implicit val connection = Neo4jREST.setServer("localhost", 7474, "/db/data/")
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(FILE, 4).cache()
val count = logData
.flatMap( _.split(" "))
.map( w =>
Cypher("CREATE(:Word {text:{text}})")
.on( "text" -> w ).execute()
).filter( _ ).count()
Neo4j 2.2.x обладает отличной производительностью одновременной записи, которую вы можете использовать в Spark. Таким образом, чем больше параллельных потоков вы можете написать в Neo4j, тем лучше. Если вы можете выполнять пакетные операторы партиями от 100 до 1000 для каждого запроса, то даже лучше.
Взгляните на MazeRunner ( http://www.kennybastani.com/2014/11/using-apache-spark-and-neo4j-for-big.html), так как он даст вам некоторые идеи.