Как убедиться, что кортежи с одинаковым ключом обрабатываются по порядку

Я сделал работу с Hazelcast Jet, который преобразует поток измерений IoT в поток сигналов тревоги.

Таким образом, всякий раз, когда уровень влажности одного датчика превышает пороговое значение, возникает аварийный сигнал. Когда он снова падает ниже порога, сигнал тревоги сбрасывается. Может быть до 3 уровней порогов (серьезность).

В настоящее время у меня есть проблемы, когда работа начинается. Он удалит все буферизованные события из моего источника RabbitMQ. Итак, дальние события упорядочены, потому что локальный параллелизм один (давайте предположим, что здесь один кластер-член). Но мы события отправляются в пул кооперативных потоков, нет гарантии на заказ. Могу ли я поручить Jet обработать все события с одним и тем же идентификатором датчика по порядку?

Вот текущее определение моего конвейера:

  StreamStage<Notification> ss = l
        .drawFrom(
              Sources.<SimpleEntry<String, String>> streamFromProcessor("rabbitmq", ReadRabbitMQP.readRabbitMQ()))
        .map(e -> makeMeasurement(e))
        .flatMap(e -> checkThresholds(e))
        .flatMap(e -> checkNotification(e));

  ss.drainTo(Sinks.logger());  

checkNotification сравнивает серьезность события с последней серьезностью для этого датчика. Вот почему порядок важен.


Я попытался реализовать решение, предложенное Гоханом Онером: я изменил исходный код для вывода объектов SimpleMeasurement. Таким образом, я могу добавить метку времени сразу после источника.

StreamStage<Notification> ss = l
      .drawFrom(Sources.<SimpleEntry<Integer, SimpleMeasurement>> streamFromProcessor("rabbitmq",
                  ReadRabbitMQP.readRabbitMQ(mGroupNames, mLocalParallelism)))
      .addTimestamps(e -> e.getValue().getTimestamp().toEpochMilli(), 1000)
      .flatMap(e -> checkThresholds(e))
      .groupingKey(e -> e.getSensorId())
      .window(WindowDefinition.tumbling(1))
      .aggregate(AggregateOperations.sorting(DistributedComparator.comparing(e -> e.getPeakTime())))
      .flatMap(e -> checkNotification(e));

ss.drainTo(Sinks.logger());

С этим кодом события по-прежнему не обрабатываются для того же идентификатора датчика. Более того, с момента считывания события из источника до 20 секунд происходит задержка до момента его обработки в 'checkNotification'.

1 ответ

@PeeWee2201, так как это распределенный поток, нет гарантированного заказа. Но если вы хотите обрабатывать уведомления от тех же датчиков по порядку, то вам необходимо:

  • добавить отметку времени к событиям
  • группировать по идентификатору датчика
  • определить окно, 10 секунд, 30 секунд и т. д., чтобы события могли быть агрегированы в этом окне
  • Сортировать все события на основе любого свойства в одном окне

Так что работа должна выглядеть так:

  StreamStage<Notification> ss = l
        .drawFrom(
              Sources.<SimpleEntry<String, String>> streamFromProcessor("rabbitmq", ReadRabbitMQP.readRabbitMQ()))
        .addTimestamps(...., ...)
        .groupingKey(....)
        .window(WindowDefinition.tumbling(....))
        .aggregate(AggregateOperations.sorting(....))

Если makeMeasurement(e) это шаг, который преобразует данные и может выполняться параллельно, вы можете добавить его перед группировкой.

После этого у вас будет список объектов для checkThresholds Метод: все сообщения в окне для одного и того же сенсора идентифицируются по времени прибытия или по любому порядку сортировки, который вы использовали.

Я верю, что это поможет решить вашу проблему.

Другие вопросы по тегам