Как проверить, была ли отправлена новая запись в данный период времени, используя kafka и faust
Я использую тестовую настройку, включая платформу слияния (докер), и обрабатываю записи со следующей информацией: идентификатор датчика, метка времени, значение. Используя robinhood faust (похожий на Kafka Streams, но в python), я пытаюсь сделать следующее:
Всякий раз, когда есть новая запись для датчика, должен быть "таймер", и если новая запись для этого идентификатора датчика не получена в течение заданного времени, должна быть ошибка, указывающая на возможную неисправность этого датчика / машины.
Я пытался использовать time.sleep()
но происходит то, что он будет просто спать в течение 10 секунд, а затем обработать следующую запись.
Можно ли вообще сделать что-то подобное с помощью настроек, которые я использую?
1 ответ
Вы можете использовать изменение окна KSQL:
Создать поток сенсорной информации;
CREATE STREAM sensorinformation \
(sensorid VARCHAR, \
sensortimestamp BIGINT, \
value VARCHAR) \
WITH (KAFKA_TOPIC='sensorinformationtopic', \
VALUE_FORMAT='DELIMITED', \
KEY='sensorid', \
TIMESTAMP='sensortimestamp');
И, наконец, создайте таблицу, содержащую неисправные датчики, которые появляются только один раз в течение временного окна в 10 секунд:
CREATE TABLE faulty_sensors AS \
SELECT sensorid, \
count(*) \
FROM sensorinformation \
WINDOW TUMBLING (SIZE 10 SECONDS) \
GROUP BY sensorid \
HAVING count(*) = 1;