Как использовать все сообщения, необходимые в Spring IntegrationFlow, когда число сообщений превышает число одновременных потребителей?

У меня есть поток интеграции, определенный следующим образом:

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .id("id")
                    .autoStartup(autoStartup)
                    .concurrentConsumers(2)
                    .maxConcurrentConsumers(3)
                    .messageConverter(messageConverter()))
                    .aggregate(a -> a.correlationExpression("payload.entityId")
                                    .releaseExpression("size() eq iterator().next().payload.batchSize")
                                    .sendPartialResultOnExpiry(true)
                                    .groupTimeout(2000)
                                    .expireGroupsUponCompletion(true)
                                    .outputProcessor(myMessageGroupProcessor))
                    .handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
                    .get();

Чья цель состоит в том, чтобы сгруппировать несколько связанных сообщений, которые отправляются в "пакете". Вот пример:

// Message 1
{ "name": "Message1", 
  "entityId": "someId"
  "batchSize": 2,
  "batchIndex": 1, 
  .... }

// Message 2
{ "name": "Message2",
  "entityId": "someId"
  "batchSize": 2,
  "batchIndex": 2, 
  .... }

По причинам, описанным здесь, мы используем ручное подтверждение: RabbitMQ, чтобы избежать потери сообщений.

Поток интеграции отлично работает для пакетов размером 2, но как только в пакете появляется более 2 сообщений, мы сталкиваемся с проблемами:

[my-service] 2017-12-04 17:46:07.966  INFO 1 --- [ask-scheduler-5] x.y.EntityUpdater : Will update entity [entitId] from messages: Message1, Message2 
[my-service] 2017-12-04 17:46:09.976  INFO 1 --- [ask-scheduler-3] x.y.EntityUpdater : Will update entity [entitId] from messages: Message3

Обратите внимание, что время между зарегистрированными сообщениями составляет примерно 2 секунды (то есть то, что мы подтвердили как groupTimeout).

Я подозреваю, что причина этого в том, что Spring потребляет 2 сообщения (которые не проверяются автоматически), затем агрегация ожидает 3-го сообщения (так как batchSize 3 в этом случае). Но это сообщение никогда не будет использовано в течение 2-секундного окна, поскольку есть только два одновременных потребителя.

Увеличение concurrentConsumers считать до 3 решает эту конкретную проблему. Проблема в том, что мы не знаем размер пакетов, которые мы получаем, и они могут быть довольно большими, возможно, размером 50 или около того. Это означает, что просто увеличение concurrentConsumers не является жизнеспособным вариантом.

Как правильно обращаться с этим весной?

1 ответ

Решение

Как я уже обсуждал в комментариях к этому ответу...

При использовании этого шаблона concurrency * prefetch должен быть достаточно большим, чтобы содержать сообщения для всех ожидающих пакетов.

По этой причине я не одобряю использование шаблона, если у вас нет достаточно предсказуемых данных.

Другие вопросы по тегам