Kapacitor: вычисление разницы между двумя потоками через соединение

Полное раскрытие: я также опубликовал вариант этого вопроса здесь.

У меня есть встроенное устройство, являющееся частью системы отопления, которое публикует два значения температуры, каждое в отдельной теме MQTT, каждые 5 секунд через брокера MQTT от Mosquitto. "mydevice/sensor1" - это температура предварительного нагрева, а "mydevice/sensor2" - температура после нагрева. Значения публикуются практически одновременно, поэтому обычно задержка между этими двумя сообщениями не превышает полсекунды, но они точно не синхронизируются.

Telegraf подписан на того же брокера и с радостью помещает эти измерения в базу данных InfluxDB под названием "telegraf.autogen". Оба измерения отображаются под одним измерением под названием "mqtt_consumer" с полем под названием "значение". В InfluxDB я могу различать значения с тегами по темам, фильтруя по тегу "topic":

SELECT mean("value") AS "mean_value" FROM "telegraf"."autogen"."mqtt_consumer" WHERE time > now() - 1m AND "topic"='mydevice/sensor1' GROUP BY time(5s)

Кажется, все это работает правильно.

То, что я хочу сделать, - это рассчитать разницу между этими двумя значениями темы для каждой пары входящих значений, чтобы рассчитать разность температур и в конечном итоге рассчитать энергию, передаваемую системой отопления (скорость потока постоянна и известна). Я пытался сделать это с помощью запросов InfluxDB в Grafana, но это казалось довольно сложным (я потерпел неудачу), поэтому я подумал, что попробую использовать TICKscript, чтобы разбить мой процесс на маленькие шаги.

Я собираю TICKscript для расчета разницы на основе этого примера:

https://docs.influxdata.com/kapacitor/v1.3/guides/join_backfill/

Однако в моем случае у меня нет двух отдельных измерений. Вместо этого я создаю два отдельных потока из одного измерения "mqtt_consumer", используя тег темы в качестве фильтра. Затем я пытаюсь соединить их с допуском 1 с (значения всегда публикуются достаточно близко во времени). я использую httpOut генерировать представление для отладки (кроме: это обновляется только каждые 10 секунд, пропуская каждое второе значение, даже если мой поток работает с 5-секундными интервалами - почему это так? Я вижу в новом БД, что значения все присутствуют, хотя),

После того, как я их объединю, я буду оценивать разницу в значениях и сохранять ее в новой базе данных под измерением, называемым "diff".

Вот мой сценарий до сих пор:

var sensor1 = stream
    |from()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('mqtt_consumer')
        .where(lambda: "topic" == 'mydevice/sensor1')
        .groupBy(*)
    |httpOut('sensor1')

var sensor2 = stream
    |from()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('mqtt_consumer')
        .where(lambda: "topic" == 'mydevice/sensor2')
        .groupBy(*)
    |httpOut('sensor2')

sensor1
    |join(sensor2)
        .as('value1', 'value2')
        .tolerance(1s)
    |httpOut('join')
    |eval(lambda: "sensor1.value1" - "sensor1.value2")
        .as('diff')
    |httpOut('diff')
    |influxDBOut()
        .create()
        .database('mydb')
        .retentionPolicy('myrp')
        .measurement('diff')

К сожалению, мой сценарий не может передать какие-либо элементы через join узел. В kapacitor show Я вижу, что httpOut оба узла передают элементы в join узел, но он не проходит дальше. Логи конденсатора также не показывают ничего очевидного. HTTP GET для httpOut('join') возвращает:

{"series":null}

У меня есть два вопроса:

  1. Действительно ли этот подход, использующий Kapacitor с TICKscript для расчета энергии на основе разницы между двумя значениями в одном измерении, действителен? Или есть лучший / более простой способ сделать это?
  2. почему не join узел, производящий какой-либо вывод? Что я могу сделать, чтобы отладить это дальше?

1 ответ

Попробуйте добавить | средний узел, чтобы рассчитать среднее значение поля, в обоих датчиках:

var sensor1 = stream
    |from()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('mqtt_consumer')
        .where(lambda: "topic" == 'mydevice/sensor1')
        .groupBy(*)
    |mean('field1')
    |httpOut('sensor1')

После объединения вы должны использовать вновь назначенное имя для потоков, а также исходные:

sensor1
    |join(sensor2)
        .as('value1', 'value2')
        .tolerance(1s)
    |httpOut('join')
    |eval(lambda: "value1.field1" - "value2.field2")
        .as('diff')
    |httpOut('diff')
    |influxDBOut()
        .create()
        .database('mydb')
        .retentionPolicy('myrp')
        .measurement('diff')

Где средние поля - это поля, рассчитанные в моем предыдущем комментарии. Попробуйте!

Кроме того, для дальнейшей отладки попробуйте добавить узлы журнала, на которые вы хотите обратить внимание.

Надеюсь это поможет! С уважением

Другие вопросы по тегам