Spring Boot и Spring AMQP RPC - не найден конвертер для преобразования исключения

У меня есть несколько учебных пособий, работающих с Spring Boot и RPC через RabbitMQ. Тем не менее, как только я пытаюсь добавить конвертер сообщений JSON Джексона, все рушится.

Удаленный вызов успешно получен сервером, поэтому я уверен, что это не конфигурация клиента.

Exchange    DATAFLOW_EXCHANGE
Routing Key     dataflowRunner
Redelivered     ○
Properties  
reply_to:   amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAdXNoeWRnbmFkaXBhbHZ4AAAr0wAAAAAB.MmIZ6Htejtc1qB11G7BBQw==
priority:   0
delivery_mode:  2
headers:    
__TypeId__: org.springframework.remoting.support.RemoteInvocation
content_encoding:   UTF-8
content_type:   application/json
Payload
675 bytes
Encoding: string


{"methodName":"run","parameterTypes":["dw.dataflow.Dataflow"],"arguments":[{ Valid Dataflow JSON Removed for Brevity } ]}

Однако выводится следующее исключение:

Caused by: org.springframework.messaging.converter.MessageConversionException: 
No converter found to convert to class dw.dataflow.Dataflow, message=GenericMessage 
[payload=RemoteInvocation: method name 'run'; parameter types [dw.dataflow.Dataflow], headers={amqp_receivedExchange=DATAFLOW_EXCHANGE, amqp_deliveryTag=1, amqp_replyTo=amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAdXNoeWRnbmFkaXBhbHZ4AAArRAAAAAQC.PA/bJ6lcUfaP3csAP5v5NA==, amqp_consumerQueue=DATAFLOW_QUEUE, amqp_redelivered=false, amqp_receivedRoutingKey=dataflowRunner, amqp_contentEncoding=UTF-8, amqp_deliveryMode=PERSISTENT, id=adb37c77-c0da-16bd-8df4-b739cfddf89f, amqp_consumerTag=amq.ctag-N_tFCc_Hp9UtQkiXl7FZ8g, contentType=application/json, __TypeId__=org.springframework.remoting.support.RemoteInvocation, timestamp=1462560945203}]
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:118)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:98)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:138)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107)
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:112)
... 12 common frames omitted

Таким образом, при доставке он ЗНАЕТ, что это должен быть объект dw.dataflow.Dataflow, он просто не может найти конвертер. Тем не менее, мой конвертер определен ВЕЗДЕ.

Конфигурация сервера

@Configuration
@EnableRabbit
public class RabbitListenerConfiguration {
    @Autowired
    ConnectionFactory connectionFactory;
    @Autowired
    ObjectMapper      jacksonObjectMapper;

@Bean
public TopicExchange exchange() {
    return new TopicExchange("DATAFLOW_EXCHANGE", true, false);
}

@Bean
public Queue queue() {
    return new Queue("DATAFLOW_QUEUE", true);
}

@Bean
public AmqpInvokerServiceExporter amqpInvokerServiceExporter() {
    AmqpInvokerServiceExporter exporter = new AmqpInvokerServiceExporter() ;
    exporter.setAmqpTemplate(rabbitTemplate());
    exporter.setMessageConverter(jackson2JsonMessageConverter());
    exporter.setServiceInterface(DataflowRunner.class);
    exporter.setService(dataflowRunner());
    return exporter ;
}

@Bean
public DataflowRunner dataflowRunner() {
    return new DataflowRunnerServerImpl();
}

@Bean
public MessageConverter jackson2JsonMessageConverter() {
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    converter.setJsonObjectMapper(jacksonObjectMapper);
    return converter;
}

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMessageConverter(jackson2JsonMessageConverter());
    return template;
}


@Bean(name="rabbitListenerContainerFactory")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setMessageConverter(jackson2JsonMessageConverter());
    factory.setDefaultRequeueRejected(false); 
    return factory;
}

Вот интерфейс Сервиса:

public interface DataflowRunner {
    String run(Dataflow dataflow) throws Exception;
}

И конкретная реализация:

public class DataflowRunnerServerImpl implements DataflowRunner {
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", queues="DATAFLOW_QUEUE")
public String run(Dataflow dataflow) throws Exception {
    // SNIP
}

Для ухмылок я также попытался настроить класс реализации сервера со следующими аннотациями, но он имеет ту же ошибку:

@RabbitHandler
@RabbitListener(
        bindings = @QueueBinding(key = "dataflowRunner",
                value = @Queue(value = "DATAFLOW_QUEUE", durable = "true", autoDelete = "false", exclusive = "false"),
                exchange = @Exchange(value = "DATAFLOW_EXCHANGE", durable = "true", autoDelete = "false", type = "topic")) )
public String run(Dataflow dataflow) throws Exception {

Конфигурация клиента

    @Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitHost, rabbitPort);
    connectionFactory.setUsername(rabbitUser);
    connectionFactory.setPassword(rabbitPassword);
    connectionFactory.setAddresses(rabbitAddresses);
    return connectionFactory;
}

@Bean
public AmqpAdmin amqpAdmin() {
    return new RabbitAdmin(connectionFactory());
}

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    template.setMessageConverter(jackson2MessageConverter());
    return template;
}

Что-то кажется неправильно настроенным? Что мне не хватает? У меня есть конвертер, установленный на экспортере сервисов, и фабрика-слушатель контейнера.

Любая помощь и / или мысли приветствуются.

3 ответа

Решение

@RabbitListener не предназначен для использования с экспортером сервисов - просто простой класс Java.

Для Spring Remoting через RPC экспортером сервисов является MessageListener для SimpleMessageListenerContainer,

С @RabbitListenerесть специальный адаптер слушателя, который оборачивает метод pojo.

Таким образом, вы, кажется, смешиваете две разные парадигмы.

ServiceExporter (Весеннее удаленное взаимодействие) ожидается в паре с AmqpProxyFactoryBean на стороне клиента с экспортером сервиса в качестве слушателя на стороне сервера.

Для простого POJO RPC (который намного новее, чем использование Spring Remoting поверх RabbitMQ), используйте @RabbitListener а также RabbitTemplate.convertSendAndReceive() на стороне клиента. Избавьтесь от ПФБ и ЮВ.

Можете ли вы объяснить, что привело вас на этот путь, на случай, если нам потребуется добавить некоторые пояснения к документации.

РЕДАКТИРОВАТЬ

Если вы действительно хотите использовать Spring Remoting (внедрить интерфейс на стороне клиента и заставить его "волшебным образом" вызывать службу на стороне сервера), вам нужно избавиться от всех вещей фабрики контейнеров и просто подключить SimpleMessageListenerContainer и введите сервисный экспортер как MessageListener,

В справочном руководстве есть пример XML, но вы можете подключить SMLC как @Bean,

EDIT2

Я запустил несколько тестов, и Spring Remoting через AMQP не работает с JSON, потому что объект верхнего уровня является RemoteInvocation - хотя конвертер сообщений может воссоздать этот объект, он не имеет информации о типе фактических аргументов, поэтому оставляет его в виде связанной хэш-карты.

На данный момент, если вы должны использовать JSON, шаблон convertSendAndReceive в сочетании с @RabbitListener это путь сюда. Я открою проблему JIRA, чтобы посмотреть, сможем ли мы решить проблему с помощью Spring Remoting RPC с JSON, но она действительно была разработана для сериализации Java.

Я потратил несколько минут на это, и мне удалось решить проблему с помощью ужасного взлома, который, кажется, работает.

Я в основном расширил классы, участвующие в вызове с обеих сторон, чтобы обеспечить преобразование внутренних аргументов и значений в / из строк JSON.

С немного большей любовью это может быть улучшено для работы с другими типами данных с использованием других конвертеров, но у меня не было времени на это. Я оставляю это вам, если вы достаточно смелы, чтобы попробовать:-)

На стороне сервера

Во-первых, я подкласс AmqpInvokerServiceExporter чтобы иметь возможность добавить поддержку преобразования в / из объектов JSON. Первым шагом является преобразование аргументов метода из JSON в соответствующие им типы. Второй шаг - преобразовать возвращаемое значение из объекта в соответствующую ему строку JSON, чтобы отправить его обратно.

public class JSONAmqpInvokerServiceExporter extends AmqpInvokerServiceExporter {

    private final ObjectMapper objectMapper = new ObjectMapper();

    @Override
    public void onMessage(Message message) {
        Address replyToAddress = message.getMessageProperties().getReplyToAddress();
        if (replyToAddress == null) {
            throw new AmqpRejectAndDontRequeueException("No replyToAddress in inbound AMQP Message");
        }

        Object invocationRaw = getMessageConverter().fromMessage(message);

        RemoteInvocationResult remoteInvocationResult;
        if (invocationRaw == null || !(invocationRaw instanceof RemoteInvocation)) {
            remoteInvocationResult = new RemoteInvocationResult(
                new IllegalArgumentException("The message does not contain a RemoteInvocation payload"));
        }
        else {
            RemoteInvocation invocation = (RemoteInvocation) invocationRaw;
            int argCount = invocation.getArguments().length;
            if (argCount > 0) {
                Object[] arguments = invocation.getArguments();
                Class<?>[] parameterTypes = invocation.getParameterTypes();
                for (int i = 0; i < argCount; i++) {
                    try {
                        //convert arguments from JSON strings to objects
                        arguments[i] = objectMapper.readValue(arguments[i].toString(), parameterTypes[i]);
                    }
                    catch (IOException cause) {
                        throw new MessageConversionException(
                            "Failed to convert JSON to value: " + arguments[i] + " of type" + parameterTypes[i], cause);
                    }
                }
            }

            remoteInvocationResult = invokeAndCreateResult(invocation, getService());
        }
        send(remoteInvocationResult, replyToAddress);
    }

    private void send(RemoteInvocationResult result, Address replyToAddress) {
        Object value = result.getValue();
        if (value != null) {
            try {
                //convert the returning value from a model to a JSON string
                //before we send it back
                Object json = objectMapper.writeValueAsString(value);
                result.setValue(json);
            }
            catch (JsonProcessingException cause) {
                throw new MessageConversionException("Failed to convert value to JSON: " + value, cause);
            }
        }
        Message message = getMessageConverter().toMessage(result, new MessageProperties());

        getAmqpTemplate().send(replyToAddress.getExchangeName(), replyToAddress.getRoutingKey(), message);
    }

}

Теперь, когда этот класс определен, я изменил определение моего прослушивателя службы на что-то вроде этого:

<bean id="toteServiceListener" class="amqphack.FFDAmqpInvokerServiceExporter">
    <property name="serviceInterface" value="ampqphack.ToteService"/>
    <property name="service" ref="defaultToteService"/>
    <property name="amqpTemplate" ref="rabbitTemplate"/>
</bean>

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="toteServiceListener" queue-names="tote-service"/>
</rabbit:listener-container>

Я использовал обычный AmqTemplate в этом случае, поскольку я знаю, что ResultInvocationValue всегда будет всегда преобразовываться в строку JSON, поэтому я не против, если InvocationResult сериализуется с использованием традиционной Java-сериализации.

На стороне клиента

В клиенте мне пришлось поменяться на вещи. Во-первых, мне нужно, чтобы все аргументы, которые мы отправляем в вызове, были преобразованы в строки JSON, но мы по-прежнему сохраняем их типы параметров. К счастью, существующие AmqpProxyFactoryBean принимает remoteInvocationFactory параметр, где мы можем перехватить вызов и изменить его. Итак, я сначала определился и новый RemoteInvocationFactory:

public class JSONRemoteInvocationFactory implements RemoteInvocationFactory {

    private final ObjectMapper mapper = new ObjectMapper();

    @Override
    public RemoteInvocation createRemoteInvocation(MethodInvocation methodInvocation) {
        RemoteInvocation invocation = new RemoteInvocation(methodInvocation);
        if (invocation.getParameterTypes() != null) {
            int paramCount = invocation.getParameterTypes().length;
            Object[] arguments = new Object[paramCount];
            try {
                for (int i = 0; i < paramCount; i++) {
                    arguments[i] = mapper.writeValueAsString(invocation.getArguments()[i]);
                }
                invocation.setArguments(arguments);
            }
            catch (JsonProcessingException cause) {
                throw new RuntimeException(
                    "Failed converting arguments to json: " + Arrays.toString(invocation.getArguments()), cause);
            }
        }
        return invocation;
    }
}

Но этого недостаточно. Когда мы вернем результат, нам нужно снова превратить его результат в объект Java. Для этого мы можем использовать интерфейс сервиса ожидаемого типа возврата. И для этого я продлил существование AmqpProxyFactoryBean просто преобразовать его результат, который, как я знаю, всегда будет строкой, в модель Java.

public class JSONAmqpProxyFactoryBean extends AmqpProxyFactoryBean {

    private final ObjectMapper mapper = DefaultObjectMapper.createDefaultObjectMapper();

    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        Object ret = super.invoke(invocation);
        return mapper.readValue(ret.toString(), invocation.getMethod().getReturnType());
    }

}

И с этим я смог определить свою клиентскую сторону примерно так:

<bean id="toteService" class="amqphack.JSONAmqpProxyFactoryBean">
    <property name="amqpTemplate" ref="rabbitTemplate"/>
    <property name="serviceInterface" value="amqphack.ToteService"/>
    <property name="routingKey" value="tote-service"/>
    <property name="remoteInvocationFactory" ref="remoteInvocationFactory"/>
</bean>

И после этого все заработало как шарм

ToteService toteService = context.getBean("toteService", ToteService.class);
ToteModel tote = toteService.findTote("18251", "ABCD");

Поскольку я не изменяю традиционный конвертер, это означает, что исключения по-прежнему правильно сериализуются в InvocationResult,

Не знаю, нужно ли это по-прежнему, но я решил проблему использования JSON с AmqpProxyFactoryBean / AmqpInvokerServiceExporter, На стороне клиента я использую Jackson2JsonMessageConverter конвертер и на стороне сервера RemoteInvocationAwareMessageConverterAdapter который оборачивает Jackson2JsonMessageConverter конвертер.

ClientConfig.java:

import com.stayfriends.commons.services.interfaces.GameService;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.remoting.client.AmqpProxyFactoryBean;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ClientConfig {

    @Bean
    public RabbitTemplate gameServiceTemplate(ConnectionFactory connectionFactory,
                                              Jackson2JsonMessageConverter messageConverter) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setExchange("rpc");
        template.setMessageConverter(messageConverter);
        return template;
    }

    @Bean
    public ServiceAmqpProxyFactoryBean gameServiceProxy2(@Qualifier("gameServiceTemplate") RabbitTemplate template) {
        return new ServiceAmqpProxyFactoryBean(template);
    }


    public static class ServiceAmqpProxyFactoryBean implements FactoryBean<Service>, InitializingBean {
        private final AmqpProxyFactoryBean proxy;

        ServiceAmqpProxyFactoryBean(RabbitTemplate template) {
            proxy = new AmqpProxyFactoryBean();
            proxy.setAmqpTemplate(template);
            proxy.setServiceInterface(GameService.class);
            proxy.setRoutingKey(GameService.class.getSimpleName());
        }

        @Override
        public void afterPropertiesSet() {
            proxy.afterPropertiesSet();
        }

        @Override
        public Service getObject() throws Exception {
            return (Service) proxy.getObject();
        }

        @Override
        public Class<?> getObjectType() {
            return Service.class;
        }

        @Override
        public boolean isSingleton() {
            return proxy.isSingleton();
        }
    }

}

ServerConfig.java

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.remoting.service.AmqpInvokerServiceExporter;
import org.springframework.amqp.support.converter.RemoteInvocationAwareMessageConverterAdapter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ServerConfig {

    @Bean
    public DirectExchange serviceExchange() {
        return new DirectExchange("rpc");
    }

    @Bean
    public Queue serviceQueue() {
        return new Queue(Service.class.getSimpleName());
    }

    @Bean
    public Binding binding(@Qualifier("serviceQueue") Queue queue, @Qualifier("serviceExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(Service.class.getSimpleName()).noargs();
    }

    @Bean("remoteInvocationAwareMessageConverter")
    @Primary
    public RemoteInvocationAwareMessageConverterAdapter remoteInvocationAwareMessageConverterAdapter(
        Jackson2JsonMessageConverter jsonMessageConverter) {
        return new RemoteInvocationAwareMessageConverterAdapter(jsonMessageConverter);
    }

    @Bean
    public AmqpInvokerServiceExporter exporter(RabbitTemplate template, ServiceImpl service,
                                               RemoteInvocationAwareMessageConverterAdapter messageConverter) {
        AmqpInvokerServiceExporter exporter = new AmqpInvokerServiceExporter();
        exporter.setAmqpTemplate(template);
        exporter.setService(service);
        exporter.setServiceInterface(Service.class);
        exporter.setMessageConverter(messageConverter);
        return exporter;
    }

    @Bean
    public MessageListenerContainer container(ConnectionFactory connectionFactory,
                                              @Qualifier("serviceQueue") Queue queue,
                                              AmqpInvokerServiceExporter exporter) {
        DirectMessageListenerContainer container = new DirectMessageListenerContainer(connectionFactory);
        container.setQueues(queue);
        container.setMessageListener(exporter);
        container.setConsumersPerQueue(5);
        return container;
    }
}
Другие вопросы по тегам