Шаблон интеграции: как синхронизировать обработку сообщений, полученных от нескольких систем
Я строю систему, которая будет получать сообщения через брокер сообщений (в настоящее время JMS) из разных систем. Все сообщения от всех систем отправителей имеют идентификатор устройства, и порядок приема сообщений отсутствует. Например, система A может отправить сообщение с deviceId=1, а система b может отправить сообщение с deviceId=2.
Моя цель - не начинать обработку сообщений, относящихся к одному и тому же идентификатору устройства, если я не получил все сообщения от всех отправителей с одинаковым идентификатором устройства.
Например, если у меня есть 3 системы A, B и C, отправляющие сообщения в мою систему:
System A sends messageA1 with deviceId=1
System B sends messageB1 with deviceId=1
System C sends messageC1 with deviceId=3
System C sends messageC2 with deviceId=1 <--- here I should start processing of messageA1, messageB1 and messageC2 because they are having the same deviceID 1.
Должна ли эта проблема быть решена с помощью какого-либо механизма синхронизации в моей системе, с помощью брокера сообщений или интегрированной среды, такой как Spring-Integration / Apache Camel?
3 ответа
Аналогичное решение с Агрегатором (о чем упоминал @Artem Bilan) также может быть реализовано в Camel с пользовательским AggregationStrategy
и с контролем завершения Агрегатора с помощью Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP
имущество.
Следующее может быть хорошей отправной точкой. ( Вы можете найти образец проекта с тестами здесь)
Маршрут:
from("direct:start")
.log(LoggingLevel.INFO, "Received ${headers.system}${headers.deviceId}")
.aggregate(header("deviceId"), new SignalAggregationStrategy(3))
.log(LoggingLevel.INFO, "Signaled body: ${body}")
.to("direct:result");
SignalAggregationStrategy.java
public class SignalAggregationStrategy extends GroupedExchangeAggregationStrategy implements Predicate {
private int numberOfSystems;
public SignalAggregationStrategy(int numberOfSystems) {
this.numberOfSystems = numberOfSystems;
}
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Exchange exchange = super.aggregate(oldExchange, newExchange);
List<Exchange> aggregatedExchanges = exchange.getProperty("CamelGroupedExchange", List.class);
// Complete aggregation if we have "numberOfSystems" (currently 3) different messages (where "system" headers are different)
// https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#completing-current-group-decided-from-the-aggregationstrategy
if (numberOfSystems == aggregatedExchanges.stream().map(e -> e.getIn().getHeader("system", String.class)).distinct().count()) {
exchange.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true);
}
return exchange;
}
@Override
public boolean matches(Exchange exchange) {
// make it infinite (4th bullet point @ https://github.com/apache/camel/blob/master/camel-core/src/main/docs/eips/aggregate-eip.adoc#about-completion)
return false;
}
}
Надеюсь, поможет!
Вы можете сделать это в Apache Camel, используя компонент кэширования. Я думаю, что есть компонент EHCache.
По существу:
- Вы получите сообщение с указанным deviceId скажем deviceId1.
- Вы ищите в своем кэше, чтобы увидеть, какие сообщения были получены для deviceId1.
- Пока вы не получили все три, вы добавляете текущую систему / сообщение в кеш.
- Как только все сообщения будут там, вы обрабатываете и очищаете кеш.
Затем вы могли бы, конечно, перенаправить каждое входящее сообщение в определенную очередь на основе deviceId для временного хранения. Это может быть JMS, ActiveMQ или что-то подобное.
Spring Integration предоставляет компоненты именно для таких задач - не выбрасывайте, пока не собрана вся группа. И это имя Агрегатор. Ваш deviceId
определенно correlationKey
, releaseStrategy
действительно может быть основано на количестве систем - сколько deviceId1
сообщения, которые вы ждете, прежде чем перейти к следующему шагу.