Кафка разница во времени последние две записи, KSQL или другие?

Поэтому я оцениваю Кафку. В нашем случае использования пришлось бы создавать новые темы, содержащие "прошедшее время" от одного события к другому, по существу, так как датчик будет сообщать как "включен" или "выключен" в Kafka. Таким образом, имея метку времени, сенсорное имя и состояние, создайте новые темы с продолжительностью состояния "включено" и "выключено".

  1. Это выполнимо в KSQL и как?
  2. Или действительно нужно оставить это потребителям или потоковым процессорам, чтобы выяснить это?

Мои данные примерно такие:

{ 2019:02:15 00:00:00, sensor1, off}
{ 2019:02:15 00:00:30, sensor1, on} 

чтобы получить результат

{ 2019:02:15 00:30:00, sensor1, off, 30sec }. 

По сути, необходимо объединить состояния нескольких датчиков, чтобы определить объединенное состояние машины. Сотни, если не в конечном итоге тысячи датчиков на заводе

0 ответов

Это довольно просто в Kafka Streams, поэтому я бы выбрал 2.

Сначала вам нужно правильно смоделировать входные данные. В вашем примере используется местное время, что делает невозможным вычисление продолжительности между двумя отметками времени. Используйте что-то вроде времени эпохи.

Начните с исходной модели данных, например

interface SensorState {
  String getId();
  Instant getTime();
  State getState();
  enum State {
    OFF,
    ON
  }
}

и цель

interface SensorStateWithDurationX {
  SensorState getEvent();
  Duration getDuration();
}

Теперь, когда вы определили поток ввода и вывода (но см. " Типы данных и сериализация"), вам просто нужно преобразовать значения (" Применение процессоров и преобразователей"), просто определив ValueTransformer.

Он должен делать 2 вещи:

  1. Проверьте хранилище состояний на предмет исторических данных для датчика и при необходимости обновите его новыми данными.

  2. Когда доступны исторические данные, рассчитайте разницу между отметками времени и отправьте данные вместе с рассчитанной продолжительностью.

class DurationProcessor implements ValueTransformer<SensorState, SensorStateWithDuration> {
  KeyValueStore<String, SensorState> store;

  @SuppressWarnings("unchecked")
  public void init(ProcessorContext context) {
    this.store = (KeyValueStore<String, SensorState>) context.getStateStore("SensorStates");
  }

  public SensorStateWithDuration transform(SensorState sensorState) {
    // Nothing to do
    if (sensorState == null) {
      return null;
    }

    // Check for the previous state, update if necessary
    var oldState = checkAndUpdateSensorState(sensorState);

    // When we have historical data, return duration so far. Otherwise return null
    return oldState.map(state -> addDuration(state, sensorState)).orElse(null);
  }

  public void close() {}

  /**
   * Checks the state store for historical state based on sensor ID and updates it, if necessary.
   *
   * @param sensorState The new sensor state
   * @return The old sensor state
   */
  Optional<SensorState> checkAndUpdateSensorState(SensorState sensorState) {
    // The Sensor ID is our index
    var index = sensorState.getId();

    // Get the historical state (might be null)
    var oldState = store.get(index);
    if (neetToUpdate(oldState, sensorState)) {
      // Update the state store to the new state
      store.put(index, sensorState);
    }
    return Optional.ofNullable(oldState);
  }

  /**
   * Check if we need to update the state in the state store.
   *
   * <p>Either we have no historical data, or the state has changed.
   *
   * @param oldState The old sensor state
   * @param sensorState The new sensor state
   * @return Flag whether we need to update
   */
  boolean neetToUpdate(SensorState oldState, SensorState sensorState) {
    return oldState == null || oldState.getState() != sensorState.getState();
  }

  /**
   * Wrap the old state with a duration how log it lasted.
   *
   * @param oldState The state of the sensor so far
   * @param sensorState The new state of the sensor
   * @return Wrapped old state with duration
   */
  SensorStateWithDuration addDuration(SensorState oldState, SensorState sensorState) {
    var duration = Duration.between(oldState.getTime(), sensorState.getTime());
    return SensorStateWithDuration.builder().setEvent(oldState).setDuration(duration).build();
  }
}

Собираем все вместе (" Соединение процессоров и государственных хранилищ") в простой топологии:

var builder = new StreamsBuilder();

// Our state store
var storeBuilder =
    Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore("SensorStates"),
        Serdes.String(),
        storeSerde);

// Register the store builder
builder.addStateStore(storeBuilder);

builder.stream("input-topic", Consumed.with(Serdes.String(), inputSerde))
    .transformValues(DurationProcessor::new, DurationProcessor.SENSOR_STATES)
    .to("result-topic", Produced.with(Serdes.String(), resultSerde));

var topology = builder.build();

Полное приложение находится на https://github.com/melsicon/kafka-sensors.

Следуя идее https://github.com/confluentinc/ksql/issues/2562 об использовании самостоятельного соединения, я придумал следующее решение:

  1. Создайте данные
#kafka-topics --bootstrap-server localhost:9092  --delete --topic temptest
echo '{"temp": 3.0, "counter": 1}' | kafkacat -b localhost -t temptest
echo '{"temp": 4.0, "counter": 2}' | kafkacat -b localhost -t temptest
echo '{"temp": 6.0, "counter": 3}' | kafkacat -b localhost -t temptest
echo '{"temp": 3.0, "counter": 4}' | kafkacat -b localhost -t temptest
echo '{"temp": 3.1, "counter": 6}' | kafkacat -b localhost -t temptest
echo '{"temp": 3.1, "counter": 5}' | kafkacat -b localhost -t temptest

Здесь мы предполагаем, что у последовательных событий уже есть свойство counter. Такой счетчик также можно добавить с помощью ksql, просто суммируя счетчики событий с течением времени.

  1. Дифференцируйте функцию
-- import the topic into ksql
CREATE STREAM temp_json (ingesttime BIGINT, row VARCHAR, temp DOUBLE, counter INTEGER) WITH (kafka_topic='temptest', value_format='JSON', KEY='counter');

--- change format to avro and repartion
CREATE STREAM temp WITH (VALUE_FORMAT='AVRO') AS SELECT temp, counter, CAST(counter AS VARCHAR) as counter_key FROM temp_json PARTITION BY counter_key;

--- create second stream with shifted counter
CREATE STREAM temp_shift AS SELECT temp, counter as counter_orig, counter+ 1 as counter from temp PARTITION BY counter;

-- join the streams by counter
CREATE STREAM temp_diff AS SELECT
  prev.temp-cur.temp as temp_difference, cur.temp as temp,  prev.temp as prev_temp, cur.counter as counter
  FROM temp cur
  LEFT JOIN temp_shift prev WITHIN 2 HOURS
  ON cur.counter = prev.counter;

Попробуй это

ksql> SELECT * FROM temp_diff LIMIT 4;
1574321370281 | 1 | null | 3.0 | null | 1
1574321372307 | 2 | -1.0 | 4.0 | 3.0 | 2
1574321372319 | 3 | -2.0 | 6.0 | 4.0 | 3
1574321372331 | 4 | 3.0 | 3.0 | 6.0 | 4

Сам датчик не используется для упрощения решения. Однако его можно легко добавить, используя составной ключ для разделения, как описано в https://www.confluent.io/stream-processing-cookbook/ksql-recipes/creating-composite-key

Другие вопросы по тегам