Snappydata - sql, помещенный в сервер заданий, не агрегирует значения

Я пытаюсь создать банку для запуска на оболочке snappy-job с потоковой передачей. У меня есть функция агрегации, и она отлично работает в Windows. Но мне нужно иметь таблицу с одним значением для каждого ключа. Основываясь на примере из github a, создайте jar-файл, и теперь у меня проблема с командой put в sql.

Мой код для агрегации:

val resultStream: SchemaDStream = snsc.registerCQ("select publisher, cast(sum(bid)as int) as bidCount from " +
  "AggrStream window (duration 1 seconds, slide 1 seconds) group by publisher")

val conf = new ConnectionConfBuilder(snsc.snappySession).build()

resultStream.foreachDataFrame(df => {

  df.write.insertInto("windowsAgg")

    println("Data received in streaming window")
    df.show()

    println("Updating table updateTable")
    val conn = ConnectionUtil.getConnection(conf)
    val result = df.collect()

  val stmt = conn.prepareStatement("put into updateTable (publisher, bidCount) values  " +
    "(?,?+(nvl((select bidCount from updateTable where publisher = ?),0)))")

    result.foreach(row => {
      println("row" + row)
      val publisher = row.getString(0)
      println("publisher " + publisher)
      val bidCount = row.getInt(1)
      println("bidcount : " + bidCount)

      stmt.setString(1, publisher)
      stmt.setInt(2, bidCount)
      stmt.setString(3, publisher)

      println("Prepared Statement after bind variables set: " + stmt.toString())

      stmt.addBatch()
    }
    )
    stmt.executeBatch()
    conn.close()
})

snsc.start()
snsc.awaitTermination()
}

Я должен обновить или вставить в таблицу updateTable, но во время команды обновления текущее значение должно быть добавлено к значению из потока. И сейчас:

Что я вижу, когда я выполняю код:

select * from updateTable;
PUBLISHER                       |BIDCOUNT   
--------------------------------------------
publisher333                    |10  

Затем я отправил сообщение Кафке:

1488487984048,publisher333,adv1,web1,geo1,11,c1 

и снова выберите из таблицы обновлений:

select * from updateTable;
PUBLISHER                       |BIDCOUNT   
--------------------------------------------
publisher333                    |11  

значение Bidcount перезаписывается, а не добавляется. Но когда я выполняю команду put in из оболочки snappy-sql, она работает отлично:

put into updateTable (publisher, bidcount) values ('publisher333',4+
(nvl((select bidCount from updateTable where publisher = 
'publisher333'),0)));
1 row inserted/updated/deleted
snappy> select * from updateTable;
PUBLISHER                       |BIDCOUNT   
--------------------------------------------
publisher333                    |15   

Не могли бы вы помочь мне с этим делом? Может, у кого-то есть другое решение для вставки или обновления значения с использованием snappydata?

Заранее благодарю.

1 ответ

Значение bidCount читается из таблицы tomi_update в случае потоковой передачи, но оно читается из updateTable в случае snappy-sql. Это намеренно? Может быть, вы хотели использовать updateTable в обоих случаях?

Другие вопросы по тегам