Snappydata - sql, помещенный в сервер заданий, не агрегирует значения
Я пытаюсь создать банку для запуска на оболочке snappy-job с потоковой передачей. У меня есть функция агрегации, и она отлично работает в Windows. Но мне нужно иметь таблицу с одним значением для каждого ключа. Основываясь на примере из github a, создайте jar-файл, и теперь у меня проблема с командой put в sql.
Мой код для агрегации:
val resultStream: SchemaDStream = snsc.registerCQ("select publisher, cast(sum(bid)as int) as bidCount from " +
"AggrStream window (duration 1 seconds, slide 1 seconds) group by publisher")
val conf = new ConnectionConfBuilder(snsc.snappySession).build()
resultStream.foreachDataFrame(df => {
df.write.insertInto("windowsAgg")
println("Data received in streaming window")
df.show()
println("Updating table updateTable")
val conn = ConnectionUtil.getConnection(conf)
val result = df.collect()
val stmt = conn.prepareStatement("put into updateTable (publisher, bidCount) values " +
"(?,?+(nvl((select bidCount from updateTable where publisher = ?),0)))")
result.foreach(row => {
println("row" + row)
val publisher = row.getString(0)
println("publisher " + publisher)
val bidCount = row.getInt(1)
println("bidcount : " + bidCount)
stmt.setString(1, publisher)
stmt.setInt(2, bidCount)
stmt.setString(3, publisher)
println("Prepared Statement after bind variables set: " + stmt.toString())
stmt.addBatch()
}
)
stmt.executeBatch()
conn.close()
})
snsc.start()
snsc.awaitTermination()
}
Я должен обновить или вставить в таблицу updateTable, но во время команды обновления текущее значение должно быть добавлено к значению из потока. И сейчас:
Что я вижу, когда я выполняю код:
select * from updateTable;
PUBLISHER |BIDCOUNT
--------------------------------------------
publisher333 |10
Затем я отправил сообщение Кафке:
1488487984048,publisher333,adv1,web1,geo1,11,c1
и снова выберите из таблицы обновлений:
select * from updateTable;
PUBLISHER |BIDCOUNT
--------------------------------------------
publisher333 |11
значение Bidcount перезаписывается, а не добавляется. Но когда я выполняю команду put in из оболочки snappy-sql, она работает отлично:
put into updateTable (publisher, bidcount) values ('publisher333',4+
(nvl((select bidCount from updateTable where publisher =
'publisher333'),0)));
1 row inserted/updated/deleted
snappy> select * from updateTable;
PUBLISHER |BIDCOUNT
--------------------------------------------
publisher333 |15
Не могли бы вы помочь мне с этим делом? Может, у кого-то есть другое решение для вставки или обновления значения с использованием snappydata?
Заранее благодарю.
1 ответ
Значение bidCount читается из таблицы tomi_update в случае потоковой передачи, но оно читается из updateTable в случае snappy-sql. Это намеренно? Может быть, вы хотели использовать updateTable в обоих случаях?