Не проверяйте сообщения RabbitMQ до тех пор, пока метод-активатор службы IntegrationFlow не вернется успешно?
У меня есть поток интеграции, определенный следующим образом:
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("id")
.autoStartup(autoStartup)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> ...)
.handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
.get();
Где serviceActivatorBean
определяется так:
@Component
@Transactional
public class ServiceActivator {
@ServiceActivator
public void myMethod(Collection<MyEvent> events) {
....
}
}
А также requestHandlerRetryAdviceForIntegrationFlow()
определяется так:
public static RequestHandlerRetryAdvice requestHandlerRetryAdviceForIntegrationFlow() {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(MAX_VALUE);
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setListeners(new RetryListenerSupport[]{new RetryListenerSupport() {
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
log.error("Caught exception {} (retry count {}), will retry again!", throwable.getClass().getSimpleName(),
context.getRetryCount(), throwable);
}
}});
advice.setRetryTemplate(retryTemplate);
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setMaxInterval(5000L);
backOffPolicy.setInitialInterval(200L);
backOffPolicy.setMultiplier(2);
retryTemplate.setBackOffPolicy(backOffPolicy);
return advice;
}
Проблема, с которой мы сталкиваемся, это когда events
Коллекция в активаторе службы содержит 2 или более событий и по какой-то причине обработка myMethod
не удается и сбой сервера. Кажется, что происходит то, что IntegrationFlow
потребляет и проверяет одно сообщение за один раз из RabbitMQ, поэтому, если сбой сервера во время обработки myMethod
все, кроме последнего события, потеряно. Это не хорошо и не достаточно безопасно для нас. Есть ли что-то, что мы можем сделать, чтобы настроить IntegrationFlow
НЕ подтверждать какое-либо сообщение, пока myMethod
в сервис активатор был успешно завершен?
1 ответ
Вы можете использовать РУЧНОЙ режим подтверждения и затем подтвердить заголовки:
https://docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/amqp.html