Оптимизация Spark UDF для вставок Graph Database (Neo4j)
Это первая проблема, которую я публикую, поэтому извиняюсь, если мне не хватает информации и посредственного форматирования. Я могу обновить, если требуется.
Я постараюсь добавить как можно больше деталей. У меня не очень оптимизированный Spark Job, который преобразует данные RDBMS в узлы графа и отношения в Neo4j.
Сделать это. Вот шаги, за которыми я следую:
- создайте денормализованный фрейм данных "data" с помощью spark sql и join.
Строка foreach в "data" запускает функцию graphInsert, которая выполняет следующие действия:
а. читать содержимое строки
б. сформулировать запрос шифрования neo4j (мы используем команду Merge, чтобы у нас был только один Город, например, Чикаго, созданный в Neo4j, когда Чикаго будет присутствовать в нескольких строках в таблице RDBMS)
с. подключиться к neo4j
д. выполнить запрос
е. отключиться от neo4j
Вот список проблем, с которыми я сталкиваюсь.
- Вставки медленные.
Я знаю, что запрос слияния выполняется медленнее, чем создание, но есть ли другой способ сделать это вместо подключения и отключения для каждой записи? Это был мой первый набросок кода, и, возможно, я изо всех сил пытаюсь использовать одно соединение для вставки из нескольких потоков на разных рабочих узлах искры. Следовательно, подключение и отключение для каждой записи.
- Работа не масштабируема. Работает нормально только с 1 ядром. Как только я запускаю работу с 2-мя искровыми ядрами, я неожиданно получаю 2 города с одинаковыми именами, даже когда выполняю запросы на слияние. Например, есть 2 города Чикаго, которые нарушают использование Merge. Я предполагаю, что функции слияния что-то вроде "Создать, если не существует".
Я не знаю, если моя реализация неверна в neo4j части или искры. Если кто-нибудь может направить меня к какой-либо документации, которая поможет мне реализовать это в более широком масштабе, это будет полезно, поскольку у меня есть большой искровой кластер, который мне нужно использовать с полным потенциалом для этой работы.
Если вам интересно смотреть на код, а не на алгоритм. Вот реализация graphInsert в scala:
class GraphInsert extends Serializable{
var case_attributes = new Array[String](4)
var city_attributes = new Array[String](2)
var location_attributes = new Array[String](20)
var incident_attributes = new Array[String](20)
val prop = new Properties()
prop.load(getClass().getResourceAsStream("/GraphInsertConnection.properties"))
// properties Neo4j
val url_neo4j = prop.getProperty("url_neo4j")
val neo4j_user = prop.getProperty("neo4j_user")
val neo4j_password = prop.getProperty("neo4j_password")
def graphInsert(data : Row){
val query = "MERGE (d:CITY {name:city_attributes(0)})\n" +"MERGE (a:CASE { " + case_attributes(0) + ":'" +data(11) + "'," +case_attributes(1) + ":'" +data(13) + "'," +case_attributes(2) + ":'" +data(14) +"'}) \n" +"MERGE (b:INCIDENT { " + incident_attributes(0) + ":" +data(0) + "," +incident_attributes(1) + ":" +data(2) + "," +incident_attributes(2) + ":'" +data(3) + "'," +incident_attributes(3) + ":'" +data(8)+ "'," +incident_attributes(4) + ":" +data(5) + "," +incident_attributes(5) + ":'" +data(4) + "'," +incident_attributes(6) + ":'" +data(6) + "'," +incident_attributes(7) + ":'" +data(1) + "'," +incident_attributes(8) + ":" +data(7)+"}) \n" +"MERGE (c:LOCATION { " + location_attributes(0) + ":" +data(9) + "," +location_attributes(1) + ":" +data(10) + "," +location_attributes(2) + ":'" +data(19) + "'," +location_attributes(3) + ":'" +data(20)+ "'," +location_attributes(4) + ":" +data(18) + "," +location_attributes(5) + ":" +data(21) + "," +location_attributes(6) + ":'" +data(17) + "'," +location_attributes(7) + ":" +data(22) + "," +location_attributes(8) + ":" +data(23)+"}) \n" +"MERGE (a) - [r1:"+relation_case_incident+"]->(b)-[r2:"+relation_incident_location+"]->(c)-[r3:belongs_to]->(d);"
println(query)
try{
var con = DriverManager.getConnection(url_neo4j, neo4j_user, neo4j_password)
var stmt = con.createStatement()
var rs = stmt.executeQuery(query)
con.close()
}catch{
case ex: SQLException =>{
println(ex.getMessage)
}
}
}
def operations(sqlContext: SQLContext){
....
#Get 'data' before this step
city_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_city").map(x =>x.getString(5)).collect()
case_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_case_number").map(x =>x.getString(5)).collect()
location_attributes = entity_metadata.filter(entity_metadata("source_name") === "tb_location").map(x =>x.getString(5)).collect()
incident_attributes= entity_metadata.filter(entity_metadata("source_name") === "tb_incident").map(x =>x.getString(5)).collect()
data.foreach(graphInsert)
}
object GraphObject {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("GraphNeo4j")
.setMaster("xyz")
.set("spark.cores.max","2")
.set("spark.executor.memory","10g")
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val graph = new GraphInsert()
graph.operations(sqlContext)
}
}
2 ответа
Все, что вы пишете внутри замыкания, т.е. это должно быть выполнено в Worker, распространяется. Вы можете прочитать больше об этом здесь: http://spark.apache.org/docs/latest/programming-guide.html
И, как вы увеличиваете количество ядер, я думаю, что это не должно влиять на приложение, потому что, если вы не укажете это! тогда он принимает жадный подход! Я надеюсь, что этот документ поможет.
Я сделал улучшение процесса, но ничто не могло сделать это так быстро, как команда LOAD в Cypher. Надеюсь, это поможет кому-то, хотя: использовать foreachPartition
вместо foreach
дает значительный выигрыш при выполнении такого процесса. Также добавление периодического коммита с использованием шифра.