Kapacitor: вычисление разницы между двумя потоками через соединение
Полное раскрытие: я также опубликовал вариант этого вопроса здесь.
У меня есть встроенное устройство, являющееся частью системы отопления, которое публикует два значения температуры, каждое в отдельной теме MQTT, каждые 5 секунд через брокера MQTT от Mosquitto. "mydevice/sensor1" - это температура предварительного нагрева, а "mydevice/sensor2" - температура после нагрева. Значения публикуются практически одновременно, поэтому обычно задержка между этими двумя сообщениями не превышает полсекунды, но они точно не синхронизируются.
Telegraf подписан на того же брокера и с радостью помещает эти измерения в базу данных InfluxDB под названием "telegraf.autogen". Оба измерения отображаются под одним измерением под названием "mqtt_consumer" с полем под названием "значение". В InfluxDB я могу различать значения с тегами по темам, фильтруя по тегу "topic":
SELECT mean("value") AS "mean_value" FROM "telegraf"."autogen"."mqtt_consumer" WHERE time > now() - 1m AND "topic"='mydevice/sensor1' GROUP BY time(5s)
Кажется, все это работает правильно.
То, что я хочу сделать, - это рассчитать разницу между этими двумя значениями темы для каждой пары входящих значений, чтобы рассчитать разность температур и в конечном итоге рассчитать энергию, передаваемую системой отопления (скорость потока постоянна и известна). Я пытался сделать это с помощью запросов InfluxDB в Grafana, но это казалось довольно сложным (я потерпел неудачу), поэтому я подумал, что попробую использовать TICKscript, чтобы разбить мой процесс на маленькие шаги.
Я собираю TICKscript для расчета разницы на основе этого примера:
https://docs.influxdata.com/kapacitor/v1.3/guides/join_backfill/
Однако в моем случае у меня нет двух отдельных измерений. Вместо этого я создаю два отдельных потока из одного измерения "mqtt_consumer", используя тег темы в качестве фильтра. Затем я пытаюсь соединить их с допуском 1 с (значения всегда публикуются достаточно близко во времени). я использую httpOut
генерировать представление для отладки (кроме: это обновляется только каждые 10 секунд, пропуская каждое второе значение, даже если мой поток работает с 5-секундными интервалами - почему это так? Я вижу в новом БД, что значения все присутствуют, хотя),
После того, как я их объединю, я буду оценивать разницу в значениях и сохранять ее в новой базе данных под измерением, называемым "diff".
Вот мой сценарий до сих пор:
var sensor1 = stream
|from()
.database('telegraf')
.retentionPolicy('autogen')
.measurement('mqtt_consumer')
.where(lambda: "topic" == 'mydevice/sensor1')
.groupBy(*)
|httpOut('sensor1')
var sensor2 = stream
|from()
.database('telegraf')
.retentionPolicy('autogen')
.measurement('mqtt_consumer')
.where(lambda: "topic" == 'mydevice/sensor2')
.groupBy(*)
|httpOut('sensor2')
sensor1
|join(sensor2)
.as('value1', 'value2')
.tolerance(1s)
|httpOut('join')
|eval(lambda: "sensor1.value1" - "sensor1.value2")
.as('diff')
|httpOut('diff')
|influxDBOut()
.create()
.database('mydb')
.retentionPolicy('myrp')
.measurement('diff')
К сожалению, мой сценарий не может передать какие-либо элементы через join
узел. В kapacitor show
Я вижу, что httpOut
оба узла передают элементы в join
узел, но он не проходит дальше. Логи конденсатора также не показывают ничего очевидного. HTTP GET для httpOut('join')
возвращает:
{"series":null}
У меня есть два вопроса:
- Действительно ли этот подход, использующий Kapacitor с TICKscript для расчета энергии на основе разницы между двумя значениями в одном измерении, действителен? Или есть лучший / более простой способ сделать это?
- почему не
join
узел, производящий какой-либо вывод? Что я могу сделать, чтобы отладить это дальше?
1 ответ
Попробуйте добавить | средний узел, чтобы рассчитать среднее значение поля, в обоих датчиках:
var sensor1 = stream
|from()
.database('telegraf')
.retentionPolicy('autogen')
.measurement('mqtt_consumer')
.where(lambda: "topic" == 'mydevice/sensor1')
.groupBy(*)
|mean('field1')
|httpOut('sensor1')
После объединения вы должны использовать вновь назначенное имя для потоков, а также исходные:
sensor1
|join(sensor2)
.as('value1', 'value2')
.tolerance(1s)
|httpOut('join')
|eval(lambda: "value1.field1" - "value2.field2")
.as('diff')
|httpOut('diff')
|influxDBOut()
.create()
.database('mydb')
.retentionPolicy('myrp')
.measurement('diff')
Где средние поля - это поля, рассчитанные в моем предыдущем комментарии. Попробуйте!
Кроме того, для дальнейшей отладки попробуйте добавить узлы журнала, на которые вы хотите обратить внимание.
Надеюсь это поможет! С уважением