Шаблон интеграции: как синхронизировать обработку сообщений, полученных от нескольких систем

Я строю систему, которая будет получать сообщения через брокер сообщений (в настоящее время JMS) из разных систем. Все сообщения от всех систем отправителей имеют идентификатор устройства, и порядок приема сообщений отсутствует. Например, система A может отправить сообщение с deviceId=1, а система b может отправить сообщение с deviceId=2.

Моя цель - не начинать обработку сообщений, относящихся к одному и тому же идентификатору устройства, если я не получил все сообщения от всех отправителей с одинаковым идентификатором устройства.

Например, если у меня есть 3 системы A, B и C, отправляющие сообщения в мою систему:

System A sends messageA1 with deviceId=1
System B sends messageB1 with deviceId=1
System C sends messageC1 with deviceId=3
System C sends messageC2 with deviceId=1 <--- here I should start processing of messageA1, messageB1 and messageC2 because they are having the same deviceID 1.

Должна ли эта проблема быть решена с помощью какого-либо механизма синхронизации в моей системе, с помощью брокера сообщений или интегрированной среды, такой как Spring-Integration / Apache Camel?

3 ответа

Решение

Аналогичное решение с Агрегатором (о чем упоминал @Artem Bilan) также может быть реализовано в Camel с пользовательским AggregationStrategy и с контролем завершения Агрегатора с помощью Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP имущество.

Следующее может быть хорошей отправной точкой. ( Вы можете найти образец проекта с тестами здесь)

Маршрут:

from("direct:start")
    .log(LoggingLevel.INFO, "Received ${headers.system}${headers.deviceId}")
    .aggregate(header("deviceId"), new SignalAggregationStrategy(3))
    .log(LoggingLevel.INFO, "Signaled body: ${body}")
    .to("direct:result");

SignalAggregationStrategy.java

public class SignalAggregationStrategy extends GroupedExchangeAggregationStrategy implements Predicate {

    private int numberOfSystems;

    public SignalAggregationStrategy(int numberOfSystems) {
        this.numberOfSystems = numberOfSystems;
    }

    @Override
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        Exchange exchange = super.aggregate(oldExchange, newExchange);

        List<Exchange> aggregatedExchanges = exchange.getProperty("CamelGroupedExchange", List.class);

        // Complete aggregation if we have "numberOfSystems" (currently 3) different messages (where "system" headers are different)
        // https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#completing-current-group-decided-from-the-aggregationstrategy
        if (numberOfSystems == aggregatedExchanges.stream().map(e -> e.getIn().getHeader("system", String.class)).distinct().count()) {
            exchange.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true);
        }

        return exchange;
    }

    @Override
    public boolean matches(Exchange exchange) {
        // make it infinite (4th bullet point @ https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#about-completion)
        return false;
    }
}

Надеюсь, поможет!

Вы можете сделать это в Apache Camel, используя компонент кэширования. Я думаю, что есть компонент EHCache.

По существу:

  1. Вы получите сообщение с указанным deviceId скажем deviceId1.
  2. Вы ищите в своем кэше, чтобы увидеть, какие сообщения были получены для deviceId1.
  3. Пока вы не получили все три, вы добавляете текущую систему / сообщение в кеш.
  4. Как только все сообщения будут там, вы обрабатываете и очищаете кеш.

Затем вы могли бы, конечно, перенаправить каждое входящее сообщение в определенную очередь на основе deviceId для временного хранения. Это может быть JMS, ActiveMQ или что-то подобное.

Spring Integration предоставляет компоненты именно для таких задач - не выбрасывайте, пока не собрана вся группа. И это имя Агрегатор. Ваш deviceId определенно correlationKey, releaseStrategy действительно может быть основано на количестве систем - сколько deviceId1 сообщения, которые вы ждете, прежде чем перейти к следующему шагу.

Другие вопросы по тегам