Интеграция Spring AMQP - подтверждение для потребителя

Я тестирую Spring-AMQP с Spring-Integration поддержка, я имею следующую конфигурацию и тест:

<rabbit:connection-factory id="connectionFactory" />
<rabbit:queue name="durableQ"/>
<int:channel id="consumingChannel">
    <int:queue capacity="2"/> <!-- Message get Acked as-soon-as filled in Q -->
</int:channel>

<int-amqp:inbound-channel-adapter 
    channel="consumingChannel"
    queue-names="durableQ" 
    connection-factory="connectionFactory"
    concurrent-consumers="1"
    acknowledge-mode="AUTO"
    />


public static void main(String[] args) {
System.out.println("Starting consumer with integration..");
    AbstractApplicationContext context = new ClassPathXmlApplicationContext(
    "classpath:META-INF/spring/integration/spring-integration-context-consumer.xml");

    PollableChannel consumingChannel = context.getBean("consumingChannel",   
                                                          PollableChannel.class);           
        int count = 0;
        while (true) {
            Message<?> msg = consumingChannel.receive(1000);
            System.out.println((count++) + " \t -> " + msg);

            try { //sleep to check number of messages in queue
                Thread.sleep(50000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

В этой конфигурации было очевидно, что как только приходит сообщение consumingChannel они Acked и, следовательно, удалены из очереди. Я подтвердил это, поместив высокий sleep после receive и проверить queue-size, Там нет дальнейшего контроля над этим.

Теперь, если я установлю acknowledge-mode=MANUALнет способов вроде бы сделать ручным ack с помощью пружинной интеграции.

Мне нужно обработать сообщение и после обработки сделать manual-ack так до ack сообщение остается в durableQ,

Есть ли способ справиться MANUAL с spring-amqp-integration? Я хочу избежать прохождения ChannelAwareMessageListener в inbound-channel-adapter так как я хочу иметь контроль над потребителем receive,

Обновить:

Это даже не представляется возможным при использовании собственного listener-container с inbound-channel-adapter:

// Below creates a default direct-channel (spring-integration channel) named "adapter", to receive poll this channel which is same as above
<int-amqp:inbound-channel-adapter id="adapter" listener-container="amqpListenerContainer" /> 

<bean id="amqpListenerContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="durableQ" />
    <property name="acknowledgeMode" value="MANUAL" />

// messageListener not allowed when using with adapter, so no way of having own ChannelAwareMessageListener, so no channel exposed onMessage, hence no way to ack
    <property name="messageListener" ref="listener"/>
</bean>
<bean id="listener" class="com.sd.springint.rmq.MsgListener"/>

Выше конфигурации выдает ошибку как messageListener свойство не допускается, см. встроенный комментарий к тегу. Итак, цель использования listner-container потерпел поражение (за разоблачение channel с помощью ChannelAwareMessageListener).

Мне spring-integration не может быть использован для manual-acknowledgement (Я знаю, это трудно сказать!), Кто-нибудь может мне помочь в проверке этого или Есть какой-то конкретный подход / конфигурация, необходимая для этого, которую я пропускаю?

2 ответа

Решение

Проблема в том, что вы используете асинхронную передачу обслуживания, используя QueueChannel, Обычно лучше контролировать параллелизм в контейнере (concurrent-consumers="2") и не делайте никаких асинхронных передач в вашем потоке (используйте DirectChannelс). Таким образом, AUTO ACK будет работать просто отлично. Вместо получения от PollableChannel подписаться new MessageHandler() к SubscribableChannel,

Обновить:

Обычно вам не нужно иметь дело с сообщениями в приложении SI, но эквивалент вашего теста с DirectChannel будет...

    SubscribableChannel channel = context.getBean("fromRabbit", SubscribableChannel.class);

    channel.subscribe(new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println("Got " + message);
        }
    });

РУКОВОДСТВО Ack допускается только через Channel.basicAck(), Итак, вы должны иметь доступ к Channel, на который ваше сообщение было получено.

Попробуйте поиграть с advice-chain из <int-amqp:inbound-channel-adapter>:

  1. Реализовать некоторые Advice как MethodBeforeAdvice
  2. advice-chain на контейнер применяется для ContainerDelegate#invokeListener
  3. Первый аргумент этого метода точно Channel
  4. Предположим, вы можете разместить на MessageProperties.headers тот Channel в этом Advice
  5. И настроить <int-amqp:inbound-channel-adapter> с mapped-request-headers к этому Channel,
  6. И, в конце концов, попытаться вызвать basicAck() на что Channel Заголовок из сообщения интеграции Spring в любом месте вашего нисходящего потока.
Другие вопросы по тегам