Spring amqp throwing Listener не зарегистрирован, когда подтверждение публикации включено при втором вызове
Я пытался получить издателя Spring RabbitMQ с включенным подтверждением издателя. Первые вызовы проходят нормально и подтверждают обратный вызов. со второго вызова и далее происходит сбой со следующим сообщением об ошибке
javax.servlet.ServletException: org.springframework.web.util.NestedServletException: Request processing failed; nested exception is org.springframework.amqp.UncategorizedAmqpException: java.lang.IllegalArgumentException: Listener not registered: org.springframework.amqp.rabbit.core.RabbitTemplate@42802df2 []
at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:169)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
at org.eclipse.jetty.server.Server.handle(Server.java:564)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:317)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:251)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:279)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:110)
at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:124)
at org.eclipse.jetty.util.thread.Invocable.invokePreferred(Invocable.java:128)
at org.eclipse.jetty.util.thread.Invocable$InvocableExecutor.invoke(Invocable.java:222)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:294)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:199)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:672)
at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:590)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.web.util.NestedServletException: Request processing failed; nested exception is org.springframework.amqp.UncategorizedAmqpException: java.lang.IllegalArgumentException: Listener not registered: org.springframework.amqp.rabbit.core.RabbitTemplate@42802df2 []
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:982)
at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:872)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:846)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:841)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:535)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:188)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1253)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:168)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:166)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1155)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:126)
... 15 common frames omitted
Caused by: org.springframework.amqp.UncategorizedAmqpException: java.lang.IllegalArgumentException: Listener not registered: org.springframework.amqp.rabbit.core.RabbitTemplate@42802df2 []
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:83)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithDirect(RabbitTemplate.java:1650)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:1531)
at org.springframework.amqp.rabbit.core.RabbitTemplate.sendAndReceive(RabbitTemplate.java:1320)
at com.example.messagequeueintegration.impl.RabbitMQFacade.sendMessage(RabbitMQFacade.java:83)
at com.example.messagequeueintegration.impl.RabbitMQFacade.sendMessage(RabbitMQFacade.java:68)
at com.example.messagequeueintegration.impl.RabbitMQFacade.sendPaymentMessage(RabbitMQFacade.java:51)
at com.example.messagequeueintegration.server.rest.controllers.SendMessageController.sendPaymentMessage(SendMessageController.java:35)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:205)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:133)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:116)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:827)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:738)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:963)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:897)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:970)
... 29 common frames omitted
Caused by: java.lang.IllegalArgumentException: Listener not registered: org.springframework.amqp.rabbit.core.RabbitTemplate@42802df2 []
at org.springframework.util.Assert.notNull(Assert.java:134)
at org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl.addPendingConfirm(PublisherCallbackChannelImpl.java:937)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:981)
at com.sun.proxy.$Proxy47.addPendingConfirm(Unknown Source)
at org.springframework.amqp.rabbit.core.RabbitTemplate.setupConfirm(RabbitTemplate.java:2007)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSend(RabbitTemplate.java:1979)
at org.springframework.amqp.rabbit.core.RabbitTemplate.exchangeMessages(RabbitTemplate.java:1732)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveAsListener(RabbitTemplate.java:1683)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithDirect(RabbitTemplate.java:1645)
... 48 common frames omitted
Вот так выглядит моя конфигурация
@Configuration
public class RabbitMQConfig {
@Autowired
private Environment env;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(
env.getRequiredProperty(QUEUE_HOST),
env.getRequiredProperty(QUEUE_PORT, Integer.class));
cachingConnectionFactory.setPublisherConfirms(true);
cachingConnectionFactory.setPublisherReturns(true);
return cachingConnectionFactory;
}
@Bean
public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public MessageUtil messageUtil(
@Qualifier("messagequeueintegration") ObjectMapper objectMapper) {
return new MessageUtil(objectMapper);
}
@Bean
public BrokerFacade brokerFacade(
@Qualifier("messagequeueintegration") ObjectMapper objectMapper,
Environment environment,
MessageUtil messageUtil) {
return new RabbitMQFacade(objectMapper, environment, messageUtil);
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE_KEY);
}
@Bean
Binding queue2Binding(@Qualifier("queue2") Queue queue2, TopicExchange topicExchange) {
return BindingBuilder.bind(queue2).to(topicExchange)
.with(env.getRequiredProperty("queue2name"));
}
@Bean
Binding queue1Binding(@Qualifier("queue1") Queue queue1, TopicExchange topicExchange) {
return BindingBuilder.bind(queueq).to(topicExchange)
.with(env.getRequiredProperty("queue1name"));
}
@Bean
public Queue queue1() {
return new Queue(
env.getRequiredProperty("queue1name"),
true,
false,
true;
}
@Bean
public Queue queue2() {
return new Queue(
env.getRequiredProperty("queue2name"),
true,
false,
true);
}
я автоматически соединяю шаблон кролика и звоню его издателю, чтобы подтвердить обратные вызовы, как указано ниже
Message amqpMessage = new Message(message.getBytes(), new MessageProperties());
rabbitTemplate.setConfirmCallback((cdata, ack, cause) -> {
System.out.println("Confirm callback"+ ack +" for "+cdata.toString());
});
rabbitTemplate.setReturnCallback((msg,replyCode,replyText,
exchange, routingKey)->{
System.out.println("call back returned for "+ routingKey);
});
rabbitTemplate.setMandatory(true);
rabbitTemplate
.sendAndReceive(TOPIC_EXCHANGE_KEY,environment.getProperty(correlationData.getMessageType().getQueueName()),
amqpMessage, createCorrelationData(correlationData));
После долгих отладок я обнаружил, что он терпит неудачу, потому что шаблон не регистрируется как слушатель в новых экземплярах PublisherCallbackChannelImpl
которые создаются последующими звонками после первого. Я отчасти заблудился относительно того, является ли это проблемой конфигурации или проблемой с логикой подтверждения издателя. Любое понимание этого будет очень полезно. Спасибо
Изменить Я использую Spring-amqp версии 2.0.3-RELEASE и Spring-Messaging версии 5.0.6-RELEASE
Edit2: отладка журналов для первого и второго вызовов https://gist.github.com/jissjanardhanan/cbad51ba77fad3eda484d7c33c0b1517
0 ответов
rabbitTemplate.setUseDirectReplyToContainer(false);
@ см. https://github.com/spring-projects/spring-amqp/issues/846#issuecomment-439072434