Подключение двух каналов очереди, поддерживаемых JDBC Store в Spring Intgegration DSL

Я пытаюсь соединить два канала очереди (при поддержке JDBC Store).

@Configuration
public class DemoIntegration {
    @Bean
    public IntegrationFlow flow(MessageChannel firstJDBCChannel,
                                MessageChannel secondJDBCChannel) {
        return IntegrationFlows.from(firstJDBCChannel)
                .bridge(bridgeHandler -> bridgeHandler.poller(p -> p.fixedDelay(100L)))
                .handle(secondJDBCChannel)
                .get();
    }
}

Я попытался поместить различные конструкции между этими двумя каналами и все еще с ошибкой как:

Вызвано: java.lang.IllegalArgumentException: найден неоднозначный тип параметра [класс java.lang.Void] для соответствия метода: [public boolean org.springframework.integration.channel.AbstractPollableChannel.removeInterceptor(org.springframework.messorhanInInterter, общедоступный org.springframework.messaging.Message org.springframework.integration.channel.AbstractPollableChannel.receive (long), общедоступный конечный void org.springframework.integration.context.IntegrationObjectSupport.setPrimaryExpression(org. org.springframework.integration.channel.AbstractMessageChannel.setStatsEnabled (логическое значение), общедоступная пустота AbstractMessageChannel.setDatatypes(java.lang.Class...), публичный void org.springframework.integration.channel.AbstractMessageChannel.configureMetrics(org.springframework.integration.support.management.AbstractMessageChannelMetrics), общедоступный void org.springframework.integration.context.IntegrationObjectSupport.setComponentName(java.lang.String), общедоступный org.springsshangrapp.rg. integra.channel.AbstractPollableChannel.removeInterceptor(int), публичный void org.springframework.integration.context.IntegrationObjectSupport.setMessageBuilderFactory(org.springframework.integration.support.MessageBuilderpr.QueueChannel.purge (org.springframework.integration.core..springframework.integration.context.IntegrationObjectSupport.setBeanFactory (org.springframework.beans.factory.BeanFactory), общедоступный или g.springframework.expression.Expression org.springframework.integration.context.IntegrationObjectSupport.getExpression(), общедоступный void org.springframework.integration.channel.AbstractPollableChannel.setInterceptors(java.utiling.spr. org.info) org.info context.IntegrationObjectSupport.setChannelResolver(org.springframework.messaging.core.DestinationResolver)] в org.springframework.util.Assert.isNull(Assert.java:155) ~[spring-core-5.0.1.RELEASE.jar:5.0.1.RELEASE] at org.springframework.integration.util.MessagingMethodInvokerHelper.findHandlerMethodsForTarget(MessagingMethodInvokerHelper.java:776) ~[spring-gration-core-5.0.0.RC1.jar:5.0.0.RC1] в org.spr integra.util.MessagingMethodInvokerHelper.(MessagingMethodInvokerHelper.java:379) ~[spring-gration-core-5.0.0.RC1.jar:5.0.0.RC1] в org.springframework.integration.util.MessagingMethodInvokerHelper.(MessagingMethod.javaHelper.:225) ~[spring-gration-core-5.0.0.RC1.jar:5.0.0.RC1] в org.springframework.integration.util.MessagingMethodInvokerHelper.(MessagingMethodInvokerHelper.java:220) ~[spring-gration-core-5.0.0.RC1.jar:5.0.0.RC1] в org.springframework.integration.handler.MethoageProcessor.(MethodInvokingMessageProcessor.java:60) ~[spring-gration-core-5.0.0.RC1.jar:5.0.0.RC1] в org.springframework.integration.handler.ServiceActivatingHandler.(ServiceActivatingHandler.java:38) ~[spring-integration-core-5.0.0.RC1.jar:5.0.0.RC1] в org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:924) ~[spring-gration-core-5.0.0.RC1.jar:5.0.0.RC1] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:904) ~[spring-gration-core-5.0.0.RC1.jar:5.0.0.RC1] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:891) ~[spring-gration-core-5.0.0.RC1.jar:5.0.0.RC1] по адресу name.karwowski.blazej.integrationdemo2.DemoIntegration.flow(DemoIntegration.java:16) ~[classes/:na] в name.karwowski.blazej.integrationdemo2.DemoIntegration$$EnhancerBySpringCGLIB$$f82aadc3.CGLIB$flow$0() ~[classes/:na] по имени. karwowski.blazej.integrationDemo2.DemoIntegration [spring-core-5.0.1.RELEASE.jar: 5.0.1.RELEASE] в org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:361) ~[spring-context-5.0.1.RELEASE.jar:5.0.1.RELEASE] по адресу name.karwowski.blazej.integrationdemo2.DemoIntegration$$EnhancerBySpringCGLIB$$f82aadc3.flow() ~[классы /: на] в sun.reflect.NativeMethodAccessorImpl.inke метод) [na:1.8.0_151] в sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_151] в sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_151] в java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_151] в org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:155) ~[spring-beans-5.0.1.RELEASE.jar:5.0.1.RELEASE] ... 19 общих кадров опущено

Как правильно соединить два (или более) канала очереди? Мне нужно сделать некоторую обработку между ними и позволить приложению сохранять сообщения в случае остановки.

Полный журнал и пример кода находятся на github: https://github.com/blazejkarwowski/integration-test

1 ответ

Решение

Вместо .handle(secondJDBCChannel) ты должен использовать .channel(secondJDBCChannel), Ничего не поделаешь - это канал между ними.

См. Справочное руководство для получения дополнительной информации.

Другие вопросы по тегам