Данные перепутаны при попытке передать их в arangodb

Я пытаюсь передать ок. 10 ГБ данных json (твиты в моем случае) в коллекцию в arangodb. Я также пытаюсь использовать для этого joblib:

from ArangoConn import ArangoConn
import Userdata as U

import encodings
from joblib import Parallel,delayed
import json
from glob import glob
import time


def progress(total, prog, start, stri = ""):
    if(prog == 0):
        print("")
        prog = 1;
    perc = prog / total
    diff = time.time() - start
    rem = (diff / prog) * (total - prog)
    bar = ""
    for i in range(0,int(perc*20)):
        bar = bar + "|"
    for i in range(int(perc*20),20):
        bar = bar + " "
    print("\r"+"progress: " + "[" + bar + "] " + str(prog) + " of " + 
    str(total) + ": {0:.1f}% ".format(perc * 100) + "- " + 
    time.strftime("%H:%M:%S", time.gmtime(rem)) + " " + stri, end="")

def processfile(filepath):
    file = open(filepath,encoding='utf-8')
    s = file.read()
    file.close()
    data = json.loads(s)
    Parallel(n_jobs=12, verbose=0, backend="threading"
    (map(delayed(ArangoConn.createDocFromObject), data))

files = glob(U.path+'/*.json')
i = 1
j = len(files)
starttime = time.time()
for f in files:
    progress(j,i,starttime,f)
    i = i+1
    processfile(f)

а также

from pyArango.connection import Connection
import Userdata as U
import time


class ArangoConn:
    def __init__(self,server,user,pw,db,collectionname):
        self.server = server
        self.user = user
        self.pw = pw
        self.db = db
        self.collectionname = collectionname
        self.connection = None
        self.dbHandle = self.connect()
        if not self.dbHandle.hasCollection(name=self.collectionname):
            coll = self.dbHandle.createCollection(name=collectionname)
        else:
            coll = self.dbHandle.collections[collectionname]
        self.collection = coll

    def db_createDocFromObject(self, obj):
        data = obj.__dict__()
        doc = self.collection.createDocument()
        for key,value in data.items():
            doc[key] = value

        doc._key= str(int(round(time.time() * 1000)))
        doc.save()

    def connect(self):
        self.connection = Connection(arangoURL=self.server + ":8529",         
        username=self.user, password=self.pw)

        if not self.connection.hasDatabase(self.db):
            db = self.connection.createDatabase(name=self.db)
        else:
            db = self.connection.databases.get(self.db)
        return db

    def disconnect(self):
        self.connection.disconnectSession()


    def getAllData(self):

        docs = []
        for doc in self.collection.fetchAll():
            docs.append(self.doc_to_result(doc))
        return docs


    def addData(self,obj):
            self.db_createDocFromObject(obj)

    def search(self,collection,search,prop):
        docs = []
        aql = """FOR q IN """+collection+""" FILTER q."""+prop+""" LIKE 
            "%"""+search+"""%" RETURN q"""
        results = self.dbHandle.AQLQuery(aql, rawResults=False, batchSize=1)
        for doc in results:
            docs.append(self.doc_to_result(doc))
        return docs


    def doc_to_result(self,arangodoc):
        modstore = arangodoc.getStore()
        modstore["_key"] = arangodoc._key
        return modstore

    def db_createDocFromJson(self,json):

        for d in json:
            doc = self.collection.createDocument()
            for key,value in d.items():
                doc[key] = value
            doc._key = str(int(round(time.time() * 1000)))
            doc.save()



    @staticmethod
    def createDocFromObject(obj):
        c = ArangoConn(U.url, U.user, U.pw, U.db, U.collection)
        data = obj
        doc = c.collection.createDocument()
        for key, value in data.items():
            doc[key] = value
        doc._key = doc["id"]
        doc.save()
        c.connection.disconnectSession()

Это вроде как работает. Моя проблема в том, что данные, которые попадают в базу данных, как-то перепутаны.

Скриншот

Как вы можете видеть на скриншоте, "id" и "id_str" не совпадают - как и должно быть.

что я исследовал до сих пор:

  • Я думал, что в некоторых моментах ключи по умолчанию в данных могут "конфликтовать" из-за многопоточности, поэтому я установил ключ к идентификатору твита.

  • Я пытался сделать это без нескольких потоков. нить не кажется проблемой

  • Я посмотрел на данные, которые я посылаю в базу данных... все вроде бы нормально

Но как только я общаюсь с БД, данные смешиваются.

Мой профессор подумал, что, возможно, что-то в pyarango не является потокобезопасным, и это портит данные, но я так не думаю, так как многопоточность, похоже, не является проблемой.

У меня нет идей, откуда это поведение может появиться... Есть идеи?

1 ответ

Решение

На скриншоте показаны следующие значения:

id     : 892886691937214500
id_str : 892886691937214465

Похоже, что где-то вдоль пути значение преобразуется в двойное значение IEEE754, которое не может безопасно представлять последнее значение. Таким образом, есть потенциальная потеря точности из-за преобразования.

Быстрый пример в файле node.js (JavaScript использует двойные значения IEEE754 для любых числовых значений, больших 0xffffffff), показывает, что это, вероятно, является причиной проблемы:

$ node
> 892886691937214500
892886691937214500
> 892886691937214465
892886691937214500

Таким образом, вопрос в том, где происходит преобразование. Можете ли вы проверить, правильно ли клиентская программа Python отправляет ожидаемые значения в ArangoDB или уже отправляет преобразованные / усеченные значения?

Как правило, любое целое число, превышающее 0x7fffffffffffffff, будет усечено при сохранении в ArangoDB или преобразовано в двойное значение IEEE754. Этого можно избежать, храня числовые значения внутри строки, но, конечно, сравнение двух числовых строк даст другие результаты, чем сравнение двух чисел (например, "10" < "9" против 10 > 9).

Другие вопросы по тегам