Кафка разница во времени последние две записи, KSQL или другие?
Поэтому я оцениваю Кафку. В нашем случае использования пришлось бы создавать новые темы, содержащие "прошедшее время" от одного события к другому, по существу, так как датчик будет сообщать как "включен" или "выключен" в Kafka. Таким образом, имея метку времени, сенсорное имя и состояние, создайте новые темы с продолжительностью состояния "включено" и "выключено".
- Это выполнимо в KSQL и как?
- Или действительно нужно оставить это потребителям или потоковым процессорам, чтобы выяснить это?
Мои данные примерно такие:
{ 2019:02:15 00:00:00, sensor1, off}
{ 2019:02:15 00:00:30, sensor1, on}
чтобы получить результат
{ 2019:02:15 00:30:00, sensor1, off, 30sec }.
По сути, необходимо объединить состояния нескольких датчиков, чтобы определить объединенное состояние машины. Сотни, если не в конечном итоге тысячи датчиков на заводе
0 ответов
Это довольно просто в Kafka Streams, поэтому я бы выбрал 2.
Сначала вам нужно правильно смоделировать входные данные. В вашем примере используется местное время, что делает невозможным вычисление продолжительности между двумя отметками времени. Используйте что-то вроде времени эпохи.
Начните с исходной модели данных, например
interface SensorState {
String getId();
Instant getTime();
State getState();
enum State {
OFF,
ON
}
}
и цель
interface SensorStateWithDurationX {
SensorState getEvent();
Duration getDuration();
}
Теперь, когда вы определили поток ввода и вывода (но см. " Типы данных и сериализация"), вам просто нужно преобразовать значения (" Применение процессоров и преобразователей"), просто определив ValueTransformer
.
Он должен делать 2 вещи:
Проверьте хранилище состояний на предмет исторических данных для датчика и при необходимости обновите его новыми данными.
Когда доступны исторические данные, рассчитайте разницу между отметками времени и отправьте данные вместе с рассчитанной продолжительностью.
class DurationProcessor implements ValueTransformer<SensorState, SensorStateWithDuration> {
KeyValueStore<String, SensorState> store;
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.store = (KeyValueStore<String, SensorState>) context.getStateStore("SensorStates");
}
public SensorStateWithDuration transform(SensorState sensorState) {
// Nothing to do
if (sensorState == null) {
return null;
}
// Check for the previous state, update if necessary
var oldState = checkAndUpdateSensorState(sensorState);
// When we have historical data, return duration so far. Otherwise return null
return oldState.map(state -> addDuration(state, sensorState)).orElse(null);
}
public void close() {}
/**
* Checks the state store for historical state based on sensor ID and updates it, if necessary.
*
* @param sensorState The new sensor state
* @return The old sensor state
*/
Optional<SensorState> checkAndUpdateSensorState(SensorState sensorState) {
// The Sensor ID is our index
var index = sensorState.getId();
// Get the historical state (might be null)
var oldState = store.get(index);
if (neetToUpdate(oldState, sensorState)) {
// Update the state store to the new state
store.put(index, sensorState);
}
return Optional.ofNullable(oldState);
}
/**
* Check if we need to update the state in the state store.
*
* <p>Either we have no historical data, or the state has changed.
*
* @param oldState The old sensor state
* @param sensorState The new sensor state
* @return Flag whether we need to update
*/
boolean neetToUpdate(SensorState oldState, SensorState sensorState) {
return oldState == null || oldState.getState() != sensorState.getState();
}
/**
* Wrap the old state with a duration how log it lasted.
*
* @param oldState The state of the sensor so far
* @param sensorState The new state of the sensor
* @return Wrapped old state with duration
*/
SensorStateWithDuration addDuration(SensorState oldState, SensorState sensorState) {
var duration = Duration.between(oldState.getTime(), sensorState.getTime());
return SensorStateWithDuration.builder().setEvent(oldState).setDuration(duration).build();
}
}
Собираем все вместе (" Соединение процессоров и государственных хранилищ") в простой топологии:
var builder = new StreamsBuilder();
// Our state store
var storeBuilder =
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("SensorStates"),
Serdes.String(),
storeSerde);
// Register the store builder
builder.addStateStore(storeBuilder);
builder.stream("input-topic", Consumed.with(Serdes.String(), inputSerde))
.transformValues(DurationProcessor::new, DurationProcessor.SENSOR_STATES)
.to("result-topic", Produced.with(Serdes.String(), resultSerde));
var topology = builder.build();
Полное приложение находится на https://github.com/melsicon/kafka-sensors.
Следуя идее https://github.com/confluentinc/ksql/issues/2562 об использовании самостоятельного соединения, я придумал следующее решение:
- Создайте данные
#kafka-topics --bootstrap-server localhost:9092 --delete --topic temptest
echo '{"temp": 3.0, "counter": 1}' | kafkacat -b localhost -t temptest
echo '{"temp": 4.0, "counter": 2}' | kafkacat -b localhost -t temptest
echo '{"temp": 6.0, "counter": 3}' | kafkacat -b localhost -t temptest
echo '{"temp": 3.0, "counter": 4}' | kafkacat -b localhost -t temptest
echo '{"temp": 3.1, "counter": 6}' | kafkacat -b localhost -t temptest
echo '{"temp": 3.1, "counter": 5}' | kafkacat -b localhost -t temptest
Здесь мы предполагаем, что у последовательных событий уже есть свойство counter. Такой счетчик также можно добавить с помощью ksql, просто суммируя счетчики событий с течением времени.
- Дифференцируйте функцию
-- import the topic into ksql
CREATE STREAM temp_json (ingesttime BIGINT, row VARCHAR, temp DOUBLE, counter INTEGER) WITH (kafka_topic='temptest', value_format='JSON', KEY='counter');
--- change format to avro and repartion
CREATE STREAM temp WITH (VALUE_FORMAT='AVRO') AS SELECT temp, counter, CAST(counter AS VARCHAR) as counter_key FROM temp_json PARTITION BY counter_key;
--- create second stream with shifted counter
CREATE STREAM temp_shift AS SELECT temp, counter as counter_orig, counter+ 1 as counter from temp PARTITION BY counter;
-- join the streams by counter
CREATE STREAM temp_diff AS SELECT
prev.temp-cur.temp as temp_difference, cur.temp as temp, prev.temp as prev_temp, cur.counter as counter
FROM temp cur
LEFT JOIN temp_shift prev WITHIN 2 HOURS
ON cur.counter = prev.counter;
Попробуй это
ksql> SELECT * FROM temp_diff LIMIT 4;
1574321370281 | 1 | null | 3.0 | null | 1
1574321372307 | 2 | -1.0 | 4.0 | 3.0 | 2
1574321372319 | 3 | -2.0 | 6.0 | 4.0 | 3
1574321372331 | 4 | 3.0 | 3.0 | 6.0 | 4
Сам датчик не используется для упрощения решения. Однако его можно легко добавить, используя составной ключ для разделения, как описано в https://www.confluent.io/stream-processing-cookbook/ksql-recipes/creating-composite-key