Как использовать все сообщения, необходимые в Spring IntegrationFlow, когда число сообщений превышает число одновременных потребителей?
У меня есть поток интеграции, определенный следующим образом:
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("id")
.autoStartup(autoStartup)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> a.correlationExpression("payload.entityId")
.releaseExpression("size() eq iterator().next().payload.batchSize")
.sendPartialResultOnExpiry(true)
.groupTimeout(2000)
.expireGroupsUponCompletion(true)
.outputProcessor(myMessageGroupProcessor))
.handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
.get();
Чья цель состоит в том, чтобы сгруппировать несколько связанных сообщений, которые отправляются в "пакете". Вот пример:
// Message 1
{ "name": "Message1",
"entityId": "someId"
"batchSize": 2,
"batchIndex": 1,
.... }
// Message 2
{ "name": "Message2",
"entityId": "someId"
"batchSize": 2,
"batchIndex": 2,
.... }
По причинам, описанным здесь, мы используем ручное подтверждение: RabbitMQ, чтобы избежать потери сообщений.
Поток интеграции отлично работает для пакетов размером 2, но как только в пакете появляется более 2 сообщений, мы сталкиваемся с проблемами:
[my-service] 2017-12-04 17:46:07.966 INFO 1 --- [ask-scheduler-5] x.y.EntityUpdater : Will update entity [entitId] from messages: Message1, Message2
[my-service] 2017-12-04 17:46:09.976 INFO 1 --- [ask-scheduler-3] x.y.EntityUpdater : Will update entity [entitId] from messages: Message3
Обратите внимание, что время между зарегистрированными сообщениями составляет примерно 2 секунды (то есть то, что мы подтвердили как groupTimeout
).
Я подозреваю, что причина этого в том, что Spring потребляет 2 сообщения (которые не проверяются автоматически), затем агрегация ожидает 3-го сообщения (так как batchSize
3 в этом случае). Но это сообщение никогда не будет использовано в течение 2-секундного окна, поскольку есть только два одновременных потребителя.
Увеличение concurrentConsumers
считать до 3 решает эту конкретную проблему. Проблема в том, что мы не знаем размер пакетов, которые мы получаем, и они могут быть довольно большими, возможно, размером 50 или около того. Это означает, что просто увеличение concurrentConsumers
не является жизнеспособным вариантом.
Как правильно обращаться с этим весной?
1 ответ
Как я уже обсуждал в комментариях к этому ответу...
При использовании этого шаблона concurrency * prefetch
должен быть достаточно большим, чтобы содержать сообщения для всех ожидающих пакетов.
По этой причине я не одобряю использование шаблона, если у вас нет достаточно предсказуемых данных.