Как убедиться, что кортежи с одинаковым ключом обрабатываются по порядку
Я сделал работу с Hazelcast Jet, который преобразует поток измерений IoT в поток сигналов тревоги.
Таким образом, всякий раз, когда уровень влажности одного датчика превышает пороговое значение, возникает аварийный сигнал. Когда он снова падает ниже порога, сигнал тревоги сбрасывается. Может быть до 3 уровней порогов (серьезность).
В настоящее время у меня есть проблемы, когда работа начинается. Он удалит все буферизованные события из моего источника RabbitMQ. Итак, дальние события упорядочены, потому что локальный параллелизм один (давайте предположим, что здесь один кластер-член). Но мы события отправляются в пул кооперативных потоков, нет гарантии на заказ. Могу ли я поручить Jet обработать все события с одним и тем же идентификатором датчика по порядку?
Вот текущее определение моего конвейера:
StreamStage<Notification> ss = l
.drawFrom(
Sources.<SimpleEntry<String, String>> streamFromProcessor("rabbitmq", ReadRabbitMQP.readRabbitMQ()))
.map(e -> makeMeasurement(e))
.flatMap(e -> checkThresholds(e))
.flatMap(e -> checkNotification(e));
ss.drainTo(Sinks.logger());
checkNotification сравнивает серьезность события с последней серьезностью для этого датчика. Вот почему порядок важен.
Я попытался реализовать решение, предложенное Гоханом Онером: я изменил исходный код для вывода объектов SimpleMeasurement. Таким образом, я могу добавить метку времени сразу после источника.
StreamStage<Notification> ss = l
.drawFrom(Sources.<SimpleEntry<Integer, SimpleMeasurement>> streamFromProcessor("rabbitmq",
ReadRabbitMQP.readRabbitMQ(mGroupNames, mLocalParallelism)))
.addTimestamps(e -> e.getValue().getTimestamp().toEpochMilli(), 1000)
.flatMap(e -> checkThresholds(e))
.groupingKey(e -> e.getSensorId())
.window(WindowDefinition.tumbling(1))
.aggregate(AggregateOperations.sorting(DistributedComparator.comparing(e -> e.getPeakTime())))
.flatMap(e -> checkNotification(e));
ss.drainTo(Sinks.logger());
С этим кодом события по-прежнему не обрабатываются для того же идентификатора датчика. Более того, с момента считывания события из источника до 20 секунд происходит задержка до момента его обработки в 'checkNotification'.
1 ответ
@PeeWee2201, так как это распределенный поток, нет гарантированного заказа. Но если вы хотите обрабатывать уведомления от тех же датчиков по порядку, то вам необходимо:
- добавить отметку времени к событиям
- группировать по идентификатору датчика
- определить окно, 10 секунд, 30 секунд и т. д., чтобы события могли быть агрегированы в этом окне
- Сортировать все события на основе любого свойства в одном окне
Так что работа должна выглядеть так:
StreamStage<Notification> ss = l
.drawFrom(
Sources.<SimpleEntry<String, String>> streamFromProcessor("rabbitmq", ReadRabbitMQP.readRabbitMQ()))
.addTimestamps(...., ...)
.groupingKey(....)
.window(WindowDefinition.tumbling(....))
.aggregate(AggregateOperations.sorting(....))
Если makeMeasurement(e)
это шаг, который преобразует данные и может выполняться параллельно, вы можете добавить его перед группировкой.
После этого у вас будет список объектов для checkThresholds
Метод: все сообщения в окне для одного и того же сенсора идентифицируются по времени прибытия или по любому порядку сортировки, который вы использовали.
Я верю, что это поможет решить вашу проблему.