Spring Integration AMQP отвечает с помощью DirectExchnage

У меня есть некоторый клиентский код, который вызывает Spring Integration через вызов DirectExchange, например

    Map<String, Object> result = (Map<String, Object>) rabbitTemplate.convertSendAndReceive("rpc", "KEY", map);

Я вижу, что поток интеграции вызывается, собирает правильный результат и отправляет сообщение AMQP обратно, но сообщение так и не получено. Я должен использовать конфигурацию XML, и я пробовал различные комбинации канальных адаптеров и шлюзов, с разной степенью успеха, но ни одна комбинация, которая возвращает все время.

Просто входящий шлюз вызовет поток, и клиентский код увидит ответ, но половина ответов - это сам ввод:

<int-amqp:inbound-gateway auto-startup="true"
        request-channel="requestChannel"
        reply-channel="responseChannel"
        message-converter="jacksonMessageConverter"
        queue-names="rpc.KEY"/

Канальные адаптеры также вызывают поток, но ответ никогда не используется клиентским кодом, с выражениями обмена и маршрутизации или без них, хотя создается впечатление, что сообщение AMQP собрано и опубликовано.

<int-amqp:inbound-channel-adapter auto-startup="true"
    channel="requestChannel2"
    message-converter="jacksonMessageConverter"
    mapped-request-headers="*"
    queue-names="rpc.KEY"/>

<int-amqp:outbound-channel-adapter auto-startup="true"
        mapped-request-headers="*"
        exchange-name-expression="headers.amqp_receivedExchange"
        routing-key-expression="headers.amqp_replyTo"
        channel="responseChannel2"
        amqp-template="rpcTemplate"/>

При любом подходе какой шаг настройки я пропускаю?

Редактировать: полный образец контекста; использование входящего шлюза отвечает половину времени; Использование канальных адаптеров не позволяет. Поведение одинаково, использую ли я цепочку или отдельные каналы один за другим.

   <bean id = "jacksonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>

   // to be clear, tests use gateway or channel adapters, not both at the same
   // time; just included here for completeness on the example
   <int-amqp:inbound-gateway auto-startup="true"
        request-channel="requestChannel2"
        reply-channel="responseChannel2"
        message-converter="jacksonMessageConverter"
        amqp-template="rpcTemplate"
        queue-names="rpc.KEY"/>

    <int-amqp:inbound-channel-adapter auto-startup="true"
        channel="requestChannel2"
        message-converter="jacksonMessageConverter"
        mapped-request-headers="*"
        concurrent-consumers="5"
        queue-names="rpc.KEY"/>

    <int-amqp:outbound-channel-adapter auto-startup="true"
            mapped-request-headers="*"
            channel="responseChannel2"
            amqp-template="rpcTemplate"/>

    <int:chain input-channel="requestChannel2" output-channel="responseChannel2">
        <int:header-enricher id="jsonContentType">
            <int:header name="Content-Type" value="application/json"/>
        </int:header-enricher>
        <int-http:outbound-gateway id="httpOutbound2"
                                   url="http://api.icndb.com/jokes/{stem}"
                                   http-method="GET"
                                   expected-response-type="java.lang.String">

            <int-http:uri-variable name="stem" expression="payload.stem" />
        </int-http:outbound-gateway>
        <int:json-to-object-transformer
                type="java.util.Map"/>
        <int:transformer>
            <int-script:script lang="groovy">
                <![CDATA[
                    def outputVariables = ['joke':payload.value.joke]
                    return outputVariables
                ]]>
            </int-script:script>
        </int:transformer>
    </int:chain>

    <int:channel id="requestChannel2"/>

    <int:channel id="responseChannel2"/>

Быстрое и грязное применение клиентского кода в тесте:

RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setExchange("rpc");
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setReplyTimeout((3000L));

Map<String, Object> map = new HashMap<>();
map.put("stem", "random");
int failures = 0;
for (int i=0;i<2;i++) {
    Map<String, Object> result = (Map<String, Object>) rabbitTemplate.convertSendAndReceive("rpc", "YAY", map);
    if (result == null || !result.containsKey("joke")) {
        failures++;
    }
}
assertEquals(0, failures);

0 ответов

Другие вопросы по тегам