Почему я продолжаю получать сообщения об отклонении от AMQP?

У меня есть приложение на Quarkus, которое получает сообщения AmqpMessages и отправляет их в другую тему.

Я все время получаю сообщение об ошибке от smallrye о том, что сообщение было отклонено.

Вот свойства

mp.messaging.incoming.data.address=incoming
mp.messaging.incoming.data.connector=smallrye-amqp
mp.messaging.incoming.data.host=localhost
mp.messaging.incoming.data.port=5672
mp.messaging.incoming.data.broadcast=true
mp.messaging.incoming.data.durable=false

mp.messaging.outgoing.position.address=outgoing
mp.messaging.outgoing.position.connector=smallrye-amqp
mp.messaging.outgoing.position.host=localhost
mp.messaging.outgoing.position.port=5672
mp.messaging.outgoing.position.durable=false

Сам класс

    @Incoming("data")
    @Outgoing("position")
    public CompletionStage handleMessage(final String topic, final MessagingMessage messageToProcess) {
        final String message = messageToProcess.getMessageString();
        final String tenant = messageToProcess.getTenant();
        final String Id = messageToProcess.Id();
        final Message message = _gson.fromJson(message, Message.class);
        return _service.getStuff(tenant, id)
                .thenApply(stuff -> calculate(message, thing))
                .thenApply(Data -> buildAmqpMessage(tenant, id, message, Data))
                .exceptionally(ex -> {
                    _logger.errorv("Error handling message: {0} ", ex);
                    return null;
                });
    }

    public AmqpMessage buildAmqpMessage(final String tenant, final String id,
                                        final Message message, final Data data) {
        final OpenMessage messageToSend = buildMessage(message, openClosePercentageData);
        return OutgoingAmqpMessage.builder()
                .withSubject(_gson.toJson(messageToSend))
                .build();
    }

Вывод логов:

2020-05-10 20:35:36,376 DEBUG [io.sma.rea.mes.amq.AmqpConnector] (vert.x-eventloop-thread-1) Sending AMQP message to address `outgoing` 
2020-05-10 20:35:36,377 FINEST [io.ver.pro.imp.ProtonTransport] (vert.x-eventloop-thread-1) New Proton Event: LINK_FLOW
2020-05-10 20:35:36,523 FINE  [pro.trace] (vert.x-eventloop-thread-1) IN: CH[0] : Flow{nextIncomingId=2, incomingWindow=2147483647, nextOutgoingId=0, outgoingWindow=2147483647, handle=0, deliveryCount=1, linkCredit=250, available=null, drain=false, echo=false, properties=null}
2020-05-10 20:35:36,523 FINE  [pro.trace] (vert.x-eventloop-thread-1) IN: CH[0] : Disposition{role=RECEIVER, first=0, last=null, settled=true, state=Rejected{error=Error{condition=amqp:not-found, description='Deliveries cannot be sent to an unavailable address', info=null}}, batchable=false}
2020-05-10 20:35:36,523 FINEST [io.ver.pro.imp.ProtonTransport] (vert.x-eventloop-thread-1) New Proton Event: LINK_FLOW
2020-05-10 20:35:36,523 FINEST [io.ver.pro.imp.ProtonTransport] (vert.x-eventloop-thread-1) New Proton Event: DELIVERY
2020-05-10 20:35:36,524 ERROR [io.sma.rea.mes.amq.AmqpConnector] (vert.x-eventloop-thread-1) Unable to send the AMQP message: java.util.concurrent.CompletionException: io.vertx.core.impl.NoStackTraceThrowable: message rejected (REJECTED
        at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
        at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
        at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1137)
        at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
        at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2159)
        at io.vertx.axle.AsyncResultCompletionStage.lambda$toCompletionStage$0(AsyncResultCompletionStage.java:20)
        at io.vertx.amqp.impl.AmqpSenderImpl.lambda$doSend$5(AmqpSenderImpl.java:157)
        at io.vertx.proton.impl.ProtonDeliveryImpl.fireUpdate(ProtonDeliveryImpl.java:158)
        at io.vertx.proton.impl.ProtonTransport.handleSocketBuffer(ProtonTransport.java:160)
        at io.vertx.core.net.impl.NetSocketImpl$DataMessageHandler.handle(NetSocketImpl.java:386)
        at io.vertx.core.net.impl.NetSocketImpl.lambda$new$2(NetSocketImpl.java:101)
        at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:237)
        at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:127)
        at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:364)
        at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:369)
        at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:43)
        at io.vertx.core.impl.ContextImpl.executeFromIO(ContextImpl.java:232)
        at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:173)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:830)

Как вы, ребята, можете видеть, сообщение отклонено и больше нет вывода, объясняющего, почему это происходит. До этого я также могу обнаружить:description='Deliveries cannot be sent to an unavailable address

Есть идеи, почему это происходит. До этого у нас была реализация JMS с теми же темами, и она работала нормально.

1 ответ

Ваш брокер AMQP не может "автоматически создавать" адреса и, таким образом, отклонять сообщения. Вы пытались предварительно настроить своего брокера для создания этих адресов и их типа (одноадресная / многоадресная)?

Другие вопросы по тегам