Использовать Kinesis Analytics для анализа событий и связанных с ними пропущенных событий, разделенных по времени?
У меня есть поток событий для различных устройств, которые могут быть "подключены" или "отключены".
Т.е. событие имеет следующую структуру:
- отметка времени
- идентификатор устройства
- событие ("подключено" или "отключено")
Я хочу немедленно запустить действие, когда устройство было отключено и не подключено в течение (настраиваемого устройства) периода времени, например, 1 часа. Я хочу запускать только один раз за "отключенное" событие.
Это можно сделать с помощью AWS Kinesis Analytics и, если да, как будет выглядеть запрос? Если нет, то можно ли это сделать с помощью какого-либо другого инструмента, или я должен сделать это по индивидуальному заказу?
1 ответ
Это возможно с помощью Drools Kinesis Analytics (управляемый сервис на Amazon):
Типы:
package com.test;
import java.util.Set;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBAttribute;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBHashKey;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBTable;
declare DeviceConfig
@DynamoDBTable(tableName="DeviceConfig")
deviceId: int @DynamoDBHashKey(attributeName="device_id");
timeoutMillis: int @DynamoDBAttribute(attributeName="timeout_millis");
end
declare DeviceEvent
@role( event )
// attributes
deviceId: int;
timestamp: java.util.Date;
event: String;
end
declare DisconnectAlert
deviceId: int;
end
Правила:
package com.test;
// setup dynamic timer
rule "disconnect timer"
timer( expr: $timeout )
when
$event : DeviceEvent( $id : deviceId, event == "disconnected" ) from entry-point events
DeviceConfig(deviceId == $event.deviceId, $timeout : timeoutMillis) from entry-point configs
then
insertLogical(new DisconnectAlert($event.getDeviceId()));
end
rule "remove dups"
when
$event : DeviceEvent( $id : deviceId, $state : event ) from entry-point events
$dup : DeviceEvent(this != $event, deviceId == $event.deviceId, event == $state, this after $event) from entry-point events
then
delete($dup);
end
// on connect event remove "disconnected" state
rule "connect device"
when
$disconnected : DeviceEvent( $id : deviceId, event == "disconnected" ) from entry-point events
DeviceEvent(deviceId == $disconnected.deviceId, event == "connected", this after $disconnected) from entry-point events
then
delete($disconnected);
end
// cleanup "connected" state to free up memory (not needed any more)
rule "delete connected state"
when
$connected : DeviceEvent(event == "connected") from entry-point events
then
delete($connected);
end
Обратите внимание, что есть 2 типа входов:
- DeviceConfig, который является в основном статической конфигурацией устройства, расположен в DynamoDB.
- DeviceEvent, который представляет собой поток событий устройства Kinesis.