Обработка графиков становится все более медленной на Titan + DynamoDB (локальный), так как добавляется больше вершин / ребер
Я работаю с Titan 1.0, используя локальную реализацию AWS DynamoDB в качестве бэкэнда хранилища на машине 16 ГБ. Мой вариант использования включает в себя создание графиков, периодически содержащих вершины и ребра, порядка 120К. Каждый раз, когда я генерирую новый граф в памяти, я проверяю граф, хранящийся в БД, и либо (i) добавляю несуществующие вершины / ребра, либо (ii) обновляю свойства, если они уже существуют (существование определяется меткой и атрибут "Значение"). Обратите внимание, что свойство "Значение" проиндексировано. Транзакции совершаются партиями по 500 вершин.
Проблема: я нахожу, что этот процесс замедляется каждый раз, когда я обрабатываю новый график (1-й график завершился за 45 минут с пустой базой данных изначально, 2-й занял 2, 5 часа, 3-й - через 3, 5 часа, 4-й - через 6 часов, 5-й - через 10 часов и т. Д.). Фактически, при обработке данного графика, он довольно быстр во время запуска, но постепенно становится медленнее (начальные пакеты занимают 2-4 секунды, а затем увеличиваются до 100 секунд для того же размера пакета в 500 узлов; я также иногда вижу, что занимает 1000-2000 секунд для партии). Это только время обработки (см. Подход ниже); коммит всегда занимает от 8 до 10 секунд. Я настроил размер кучи jvm на 10G, и я заметил, что когда приложение работает, оно со временем израсходует все.
Вопрос: можно ли ожидать такого поведения? Мне кажется, что-то здесь не так (либо в моем конфиге / подходе?). Любая помощь или предложения будут с благодарностью.
Подход:
- Начиная с корневого узла графа в памяти, я извлекаю все дочерние узлы и поддерживаю очередь
Для каждого дочернего узла я проверяю, существует ли он в БД, иначе создаю новый узел и обновляю некоторые свойства.
Vertex dbVertex = dbgraph.traversal().V() .has(currentVertexInMem.label(), "Value", (String) currentVertexInMem.value("Value")) .tryNext() .orElseGet(() -> createVertex(dbgraph, currentVertexInMem)); if (dbVertex != null) { // Update Properties updateVertexProperties(dbgraph, currentVertexInMem, dbVertex); } // Add edge if necessary if (parentDBVertex != null) { GraphTraversal<Vertex, Edge> edgeIt = graph.traversal().V(parentDBVertex).outE() .has("EdgeProperty1", eProperty1) // eProperty1 is String input parameter .has("EdgeProperty2", eProperty2); // eProperty2 is Long input parameter Boolean doCreateEdge = true; Edge e = null; while (edgeIt.hasNext()) { e = edgeIt.next(); if (e.inVertex().equals(dbVertex)) { doCreateEdge = false; break; } if (doCreateEdge) { e = parentDBVertex.addEdge("EdgeLabel", dbVertex, "EdgeProperty1", eProperty1, "EdgeProperty2", eProperty2); } e = null; it = null; } ... if ((processedVertexCount.get() % 500 == 0) || processedVertexCount.get() == verticesToProcess.get()) { graph.tx().commit(); }
Создать функцию:
public static Vertex createVertex(Graph graph, Vertex clientVertex) {
Vertex newVertex = null;
switch (clientVertex.label()) {
case "Label 1":
newVertex = graph.addVertex(T.label, clientVertex.label(), "Value",
clientVertex.value("Value"),
"Property1-1", clientVertex.value("Property1-1"),
"Property1-2", clientVertex.value("Property1-2"));
break;
case "Label 2":
newVertex = graph.addVertex(T.label, clientVertex.label(), "Value",
clientVertex.value("Value"), "Property2-1",
clientVertex.value("Property2-1"),
"Property2-2", clientVertex.value("Property2-2"));
break;
default:
newVertex = graph.addVertex(T.label, clientVertex.label(), "Value",
clientVertex.value("Value"));
break;
}
return newVertex;
}
Схема Def: (Показаны некоторые индексы)
Замечания:
"EdgeLabel" = Constants.EdgeLabels.Uses
"EdgeProperty1" = Constants.EdgePropertyKeys.EndpointId
"EdgeProperty2" = Constants.EdgePropertyKeys.Timestamp
public void createSchema() {
// Create Schema
TitanManagement mgmt = dbgraph.openManagement();
mgmt.set("cache.db-cache",true);
// Vertex Properties
PropertyKey value = mgmt.getPropertyKey(Constants.VertexPropertyKeys.Value);
if (value == null) {
value = mgmt.makePropertyKey(Constants.VertexPropertyKeys.Value).dataType(String.class).make();
mgmt.buildIndex(Constants.GraphIndexes.ByValue, Vertex.class).addKey(value).buildCompositeIndex(); // INDEX
}
PropertyKey shapeSet = mgmt.getPropertyKey(Constants.VertexPropertyKeys.ShapeSet);
if (shapeSet == null) {
shapeSet = mgmt.makePropertyKey(Constants.VertexPropertyKeys.ShapeSet).dataType(String.class).cardinality(Cardinality.SET).make();
mgmt.buildIndex(Constants.GraphIndexes.ByShape, Vertex.class).addKey(shapeSet).buildCompositeIndex();
}
...
// Edge Labels and Properties
EdgeLabel uses = mgmt.getEdgeLabel(Constants.EdgeLabels.Uses);
if (uses == null) {
uses = mgmt.makeEdgeLabel(Constants.EdgeLabels.Uses).multiplicity(Multiplicity.MULTI).make();
PropertyKey timestampE = mgmt.getPropertyKey(Constants.EdgePropertyKeys.Timestamp);
if (timestampE == null) {
timestampE = mgmt.makePropertyKey(Constants.EdgePropertyKeys.Timestamp).dataType(Long.class).make();
}
PropertyKey endpointIDE = mgmt.getPropertyKey(Constants.EdgePropertyKeys.EndpointId);
if (endpointIDE == null) {
endpointIDE = mgmt.makePropertyKey(Constants.EdgePropertyKeys.EndpointId).dataType(String.class).make();
}
// Indexes
mgmt.buildEdgeIndex(uses, Constants.EdgeIndexes.ByEndpointIDAndTimestamp, Direction.BOTH, endpointIDE,
timestampE);
}
mgmt.commit();
}
1 ответ
Ожидаемое вами поведение. Сегодня DynamoDB Local - это инструмент тестирования, построенный на SQLite. Если вам нужно поддерживать высокий TPS для больших и периодических загрузок данных, я рекомендую вам использовать сервис DynamoDB.