Spring AMQP Java Конфигурация
У меня есть проект Maven Spring MVC, я реализовал внутренний обмен сообщениями между сервисами RabbitMQ и библиотекой spring-amqp.
Это мой весенний конфигурационный код amqp java:
@PropertySource({"classpath:hello.properties"})
@Configuration
@ComponentScan("com.example.hello")
public class MessageConfig {
@Value("${amqp.host}")
private String host;
@Value("${amqp.port}")
private int port;
@Value("${amqp.usr}")
private String username;
@Value("${amqp.pwd}")
private String password;
@Value("${amqp.vhost}")
private String virtual_host;
@Value("${amqp.ex}")
private String exchange;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtual_host);
return connectionFactory;
}
@Bean
public TopicExchange emsExchange() {
return new TopicExchange(exchange, true, false);
}
@Bean
public Queue systemQueque() {
return new Queue("system");
}
@Bean
public Binding systemBinding() {
return BindingBuilder.bind(systemQueque()).to(emsExchange()).with(systemQueque().getName());
}
@Bean
public SimpleMessageListenerContainer listenerSystemQueque() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueNames(systemQueque().getName());
container.setMessageListener(listenerAdapterSystem());
return container;
}
@Bean
public MessageConsumerSystem listenerAdapterSystem() {
return new MessageConsumerSystem();
}
}
Это мой "потребитель"
public class MessageConsumerSystem implements MessageListener {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageConsumerSystem.class);
@Override
public void onMessage(Message message) {
try {
final MessageProperties messageProperties = message.getMessageProperties();
final String body = new String(message.getBody());
LOGGER.debug("*********** AMQP Message **********");
LOGGER.debug(" Id : " + messageProperties.getMessageId());
LOGGER.debug(" CorrelId : " + messageProperties.getCorrelationId());
LOGGER.debug(" Timestamp : " + messageProperties.getTimestamp());
LOGGER.debug(" Service : " + messageProperties.getHeaders().get("service"));
LOGGER.debug(" Content-Type: " + messageProperties.getContentType());
LOGGER.debug(" Encoding : " + messageProperties.getContentEncoding());
LOGGER.debug(" Message : " + body);
LOGGER.debug("*************** End ***************");
JAXBContext jaxbContext = JAXBContext.newInstance(ObjectFactory.class);
Unmarshaller unmarshaller = jaxbContext.createUnmarshaller();
unmarshaller.setEventHandler(new javax.xml.bind.helpers.DefaultValidationEventHandler());
Object objectJAXB = unmarshaller.unmarshal(new StringReader(body));
if (objectJAXB instanceof ServiceStart) {
}
else if (objectJAXB instanceof ServiceStop) {
}
} catch (JAXBException ex) {
LOGGER.error("AMQP Message unmarshalling error: " + ex.getMessage());
}
}
}
Я также реализовал "производителя": когда я запускаю приложение, очередь и привязка не создаются автоматически, я должен создавать их вручную из веб-менеджера RabbitMQ.
Вот некоторые журналы:
DEBUG [AMQP Connection 192.168.0.11:5672] org.springframework.amqp.rabbit.connection.CachingConnectionFactory | Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'system' in vhost 'ems', class-id=50, method-id=10)
DEBUG [listenerSnmpQueque-1] org.springframework.amqp.rabbit.connection.CachingConnectionFactory | Detected closed channel on exception. Re-initializing: AMQChannel(amqp://admin@192.168.0.11:5672/ems,3)
WARN [listenerSnmpQueque-1] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer | Failed to declare queue: system
WARN [listenerSnmpQueque-1] org.springframework.amqp.rabbit.listener.BlockingQueueConsumer | Queue declaration failed; retries left=1
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[system]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:711) ~[spring-rabbit-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:587) ~[spring-rabbit-2.0.1.RELEASE.jar:2.0.1.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:964) [spring-rabbit-2.0.1.
Я использую тот же код в проекте Spring Boot другого сервиса и работаю правильно, очереди и привязки создаются автоматически.
Вы можете мне помочь?
1 ответ
Вы должны добавить RabbitAdmin
@Bean
чтобы автоматически обрабатывать объявления, он обнаруживает bean-компоненты в контексте приложения - загрузчик делает это автоматически при автоматической настройке RabbitMQ. Смотрите документацию Spring AMQP.