Загрузить Spark RDD в Neo4j в Python
Я работаю над проектом, в котором я использую Spark для обработки данных. Мои данные теперь обрабатываются, и мне нужно загрузить данные в Neo4j. После загрузки в Neo4j я буду использовать это для демонстрации результатов.
Я хотел, чтобы вся реализация была выполнена в программировании на Python. Но я не смог найти ни одной библиотеки или примера в сети. Можете ли вы помочь с ссылками или библиотеками или любым примером.
Мой RDD - это PairedRDD. И в каждом кортеже я должен создавать отношения.
PairedRDD
Key Value
Jack [a,b,c]
Для простоты я преобразовал СДР в
Key value
Jack a
Jack b
Jack c
Тогда я должен создать отношения между
Jack->a
Jack->b
Jack->c
Основываясь на ответе Уильяма, я могу загрузить список напрямую. Но эти данные выбрасывают ошибку шифра.
Я пытался так:
def writeBatch(b):
print("writing batch of " + str(len(b)))
session = driver.session()
session.run('UNWIND {batch} AS elt MERGE (n:user1 {user: elt[0]})', {'batch': b})
session.close()
def write2neo(v):
batch_d.append(v)
for hobby in v[1]:
batch_d.append([v[0],hobby])
global processed
processed += 1
if len(batch) >= 500 or processed >= max:
writeBatch(batch)
batch[:] = []
max = userhobbies.count()
userhobbies.foreach(write2neo)
б список списков. Unwinded elt - это список двух элементов elt[0],elt[1] в качестве ключа и значений.
ошибка
ValueError: Structure signature must be a single byte value
Заранее спасибо.
1 ответ
Вы можете сделать foreach
на вашем RDD, пример:
from neo4j.v1 import GraphDatabase, basic_auth
driver = GraphDatabase.driver("bolt://localhost", auth=basic_auth("",""), encrypted=False)
from pyspark import SparkContext
sc = SparkContext()
dt = sc.parallelize(range(1, 5))
def write2neo(v):
session = driver.session()
session.run("CREATE (n:Node {value: {v} })", {'v': v})
session.close()
dt.foreach(write2neo)
Однако я бы улучшил функцию пакетной записи, но этот простой фрагмент работает для базовой реализации.
ОБНОВЛЕНИЕ С ПРИМЕРОМ ЗАПИСЕЙ ПИСЬМА
sc = SparkContext()
batch = []
max = None
processed = 0
def writeBatch(b):
print("writing batch of " + str(len(b)))
session = driver.session()
session.run('UNWIND {batch} AS elt CREATE (n:Node {v: elt})', {'batch': b})
session.close()
def write2neo(v):
batch.append(v)
global processed
processed += 1
if len(batch) >= 500 or processed >= max:
writeBatch(batch)
batch[:] = []
dt = sc.parallelize(range(1, 2136))
max = dt.count()
dt.foreach(write2neo)
- Какие результаты с
16/09/15 12:25:47 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
writing batch of 500
writing batch of 500
writing batch of 500
writing batch of 500
writing batch of 135
16/09/15 12:25:47 INFO PythonRunner: Times: total = 279, boot = -103, init = 245, finish = 137
16/09/15 12:25:47 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1301 bytes result sent to driver
16/09/15 12:25:47 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 294 ms on localhost (1/1)
16/09/15 12:25:47 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/09/15 12:25:47 INFO DAGScheduler: ResultStage 1 (foreach at /Users/ikwattro/dev/graphaware/untitled/writeback.py:36) finished in 0.295 s
16/09/15 12:25:47 INFO DAGScheduler: Job 1 finished: foreach at /Users/ikwattro/dev/graphaware/untitled/writeback.py:36, took 0.308263 s