Используя Grails 3.3.2 с плагином rabbitmq-native (3.4.4), одиночный потребитель с prefetch=1 прекращает потребление сообщений после возникновения исключения

Я попытался установить значение true. Насколько я понимаю, сообщение должно постоянно доставляться потребителю снова и снова. Вместо этого он просто сидит там, не потребляя поставленное в очередь сообщение или какие-либо новые сообщения. Я включил регистрацию для com.budjb и com.rabbitmq и org.springframework.amqp вплоть до TRACE и не вижу, что происходит какое-либо отключение... Heeelppp

application.groovy

rabbitmq {
uri = new URI(System.env.CLOUDAMQP_URL ?: "amqp://test:test@localhost/test")
username = uri.userInfo.split(":")[0]
password = uri.userInfo.split(":")[1]

connections = [
        [name              : 'main',
         host              : uri.host,
         port              : 5672,
         username          : username,
         requestedHeartbeat: 10,
         automaticReconnect: true,
         virtualHost       : uri.path.substring(1),   //remove leading slash
         password          : password]
]

queues = [[name: com.coco.jms.RabbitQueues.INDEX_TRANSACTION.destinationName, autoDelete: false, durable: true, exclusive: false]]

Потребитель:

class IndexTransactionConsumer implements MessageConsumerEventHandler {

static rabbitConfig = [
        connection: 'main',
        consumers : 1,
        queue     : Boolean.valueOf((String) System.getProperty("is_amqp_consumer")) ? RabbitQueues.INDEX_TRANSACTION.destinationName : null,
        transacted: true,
        autoAck   : AutoAck.POST,
        retry     : true
]

def handleMessage(Map body, MessageContext messageContext) {
    log.info("RABBITMQ - *CONSUME* Received event to index transaction (Map). " + body)

    throw new Exception("Force fail")
}
....
}

ОБНОВЛЕНИЕ, кажется, что txRollback(), который запускается внутри AbstractConsumerContext.groovy, когда transacted=true и autoAck = AutoAck.POST мешает элементу BasicReject достичь сервера RabbitMQ.

if (configuration.getTransacted()) {
    context.getChannel().txRollback()
}

if (configuration.getAutoAck() == AutoAck.POST) {
            context.getChannel().basicReject(context.getEnvelope().deliveryTag, configuration.getRetry())
}

1 ответ

Я решил свою проблему, не позволив исключениям покинуть слушателя и самостоятельно управляя ack/nack. Я думаю, что есть большой плагин rabbitmq-native, где transacted=true. Мне кажется, что он откатывает дужку, которая должна срабатывать, когда ловится исключение.

def handleMessage(Map body, MessageContext context) {

    log.info("RABBITMQ - *CONSUME* Received event. " + body)

    try {
        //ensure casting by JMS to Integer is reverted
        body.conflictIDList = body.conflictIDList.collect { ((Number) it).toLong() }

        //do work

        context.channel.basicAck(context.envelope.deliveryTag, false)
    } catch (Exception ex) {

        ConsumerUtility.handleMessageException(rabbitMessagePublisher, body, context, ex)
    }
}

Из ConsumerUtility

def
static handleMessageException(RabbitMessagePublisher rabbitMessagePublisher, Map body, MessageContext context, Throwable ex) {

    log.warn("E_ception caught attempting digest message sent to " + context.envelope.routingKey + ". body=" + body + ", reason=" + ex.message)
    if (body.retryCount < 3) {

        //pull current message off queue, sleep thread and republish onto back of queue
        context.channel.basicAck(context.envelope.deliveryTag, false)

        body.retryCount = body.retryCount + 1

        //upon failure sleep for 3, 6, then 9 seconds
        sleep(3000 * (Integer) body.retryCount)

        rabbitMessagePublisher.send {
            channel = context.channel
            routingKey = context.envelope.routingKey
            setBody(body)
        }
    } else {

        log.error("Rejecting message after three failed tries onto DLQ. body=" + body, ex)
        context.channel.basicReject(context.envelope.deliveryTag, false)
    }
}
Другие вопросы по тегам