Как проверить, была ли отправлена ​​новая запись в данный период времени, используя kafka и faust

Я использую тестовую настройку, включая платформу слияния (докер), и обрабатываю записи со следующей информацией: идентификатор датчика, метка времени, значение. Используя robinhood faust (похожий на Kafka Streams, но в python), я пытаюсь сделать следующее:

Всякий раз, когда есть новая запись для датчика, должен быть "таймер", и если новая запись для этого идентификатора датчика не получена в течение заданного времени, должна быть ошибка, указывающая на возможную неисправность этого датчика / машины.

Я пытался использовать time.sleep() но происходит то, что он будет просто спать в течение 10 секунд, а затем обработать следующую запись.

Можно ли вообще сделать что-то подобное с помощью настроек, которые я использую?

1 ответ

Вы можете использовать изменение окна KSQL:

Создать поток сенсорной информации;

CREATE STREAM sensorinformation \
  (sensorid VARCHAR, \
   sensortimestamp BIGINT, \
   value VARCHAR) \
 WITH (KAFKA_TOPIC='sensorinformationtopic', \
       VALUE_FORMAT='DELIMITED', \
       KEY='sensorid', \
       TIMESTAMP='sensortimestamp');

И, наконец, создайте таблицу, содержащую неисправные датчики, которые появляются только один раз в течение временного окна в 10 секунд:

CREATE TABLE faulty_sensors AS \
  SELECT sensorid, \
         count(*) \
  FROM sensorinformation \
  WINDOW TUMBLING (SIZE 10 SECONDS) \
  GROUP BY sensorid \
  HAVING count(*) = 1;
Другие вопросы по тегам