Spring AMQP Java Конфигурация

У меня есть проект Maven Spring MVC, я реализовал внутренний обмен сообщениями между сервисами RabbitMQ и библиотекой spring-amqp.

Это мой весенний конфигурационный код amqp java:

@PropertySource({"classpath:hello.properties"})
@Configuration
@ComponentScan("com.example.hello")
public class MessageConfig {

    @Value("${amqp.host}")
    private String host;

    @Value("${amqp.port}")
    private int port;

    @Value("${amqp.usr}")
    private String username;

    @Value("${amqp.pwd}")
    private String password;

    @Value("${amqp.vhost}")
    private String virtual_host;

    @Value("${amqp.ex}")
    private String exchange;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost(host);
        connectionFactory.setPort(port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtual_host);
        return connectionFactory;
    }

    @Bean
    public TopicExchange emsExchange() {
        return new TopicExchange(exchange, true, false);
    }

    @Bean    
    public Queue systemQueque() {
        return new Queue("system");
    }

    @Bean 
    public Binding systemBinding() {
        return BindingBuilder.bind(systemQueque()).to(emsExchange()).with(systemQueque().getName());
    }

    @Bean
    public SimpleMessageListenerContainer listenerSystemQueque() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setQueueNames(systemQueque().getName());
        container.setMessageListener(listenerAdapterSystem());
        return container;
    }

    @Bean
    public MessageConsumerSystem listenerAdapterSystem() {
        return new MessageConsumerSystem();
    }
}

Это мой "потребитель"

public class MessageConsumerSystem implements MessageListener {

    private static final Logger LOGGER = LoggerFactory.getLogger(MessageConsumerSystem.class);

    @Override
    public void onMessage(Message message) {
        try {
            final MessageProperties messageProperties = message.getMessageProperties();
            final String body = new String(message.getBody());

            LOGGER.debug("*********** AMQP Message **********");
            LOGGER.debug(" Id          : " + messageProperties.getMessageId());
            LOGGER.debug(" CorrelId    : " + messageProperties.getCorrelationId());
            LOGGER.debug(" Timestamp   : " + messageProperties.getTimestamp());
            LOGGER.debug(" Service     : " + messageProperties.getHeaders().get("service"));
            LOGGER.debug(" Content-Type: " + messageProperties.getContentType());
            LOGGER.debug(" Encoding    : " + messageProperties.getContentEncoding());
            LOGGER.debug(" Message     : " + body);
            LOGGER.debug("*************** End ***************");

            JAXBContext jaxbContext = JAXBContext.newInstance(ObjectFactory.class);
            Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();
            unmarshaller.setEventHandler(new javax.xml.bind.helpers.DefaultValidationEventHandler());
            Object objectJAXB = unmarshaller.unmarshal(new StringReader(body));

            if (objectJAXB instanceof ServiceStart) {   
            }
            else if (objectJAXB instanceof ServiceStop) {
            }
        } catch (JAXBException ex) {
            LOGGER.error("AMQP Message unmarshalling error: " + ex.getMessage());
        }
    }
}

Я также реализовал "производителя": когда я запускаю приложение, очередь и привязка не создаются автоматически, я должен создавать их вручную из веб-менеджера RabbitMQ.

Вот некоторые журналы:

DEBUG [AMQP Connection 192.168.0.11:5672] org.springframework.amqp.rabbit.connection.CachingConnectionFactory | Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'system' in vhost 'ems', class-id=50, method-id=10)

DEBUG [listenerSnmpQueque-1] org.springframework.amqp.rabbit.connection.CachingConnectionFactory | Detected closed channel on exception.  Re-initializing: AMQChannel(amqp://admin@192.168.0.11:5672/ems,3)

WARN [listenerSnmpQueque-1] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer | Failed to declare queue: system

WARN [listenerSnmpQueque-1] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer | Queue declaration failed; retries left=1
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[system]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:711) ~[spring-rabbit-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:587) ~[spring-rabbit-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:964) [spring-rabbit-2.0.1.

Я использую тот же код в проекте Spring Boot другого сервиса и работаю правильно, очереди и привязки создаются автоматически.

Вы можете мне помочь?

1 ответ

Вы должны добавить RabbitAdmin@Bean чтобы автоматически обрабатывать объявления, он обнаруживает bean-компоненты в контексте приложения - загрузчик делает это автоматически при автоматической настройке RabbitMQ. Смотрите документацию Spring AMQP.

Другие вопросы по тегам