Понимание весенних облачных сообщений с rabbitmq
Я думаю, что у меня проблемы с пониманием весенних облачных сообщений и я не могу найти ответ на "проблему", с которой я сталкиваюсь.
У меня есть следующие настройки (с помощью Spring-Boot 2.0.3.RELEASE).
application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
cloud:
stream:
bindings:
input:
destination: foo
group: fooGroup
fooChannel:
destination: foo
Сервисный класс
@Autowired
FoodOrderController foodOrderController;
@Bean
public CommandLineRunner runner() {
return (String[] args) -> {
IntStream.range(0,50).forEach(e -> foodOrderController.orderFood());
};
}
@StreamListener(target = FoodOrderSource.INPUT)
public void processCheapMeals(String meal){
System.out.println("This was a great meal!: "+ meal);
}
@StreamListener(target = FoodOrderSource.INPUT)
public void processCheapMeals1(String meal){
System.out.println("This was a great meal!: "+ meal);
}
FoodOrderController
public class FoodOrderController {
@Autowired
FoodOrderSource foodOrderSource;
public String orderFood(){
var foodOrder = new FoodOrder();
foodOrder.setCustomerAddress(UUID.randomUUID().toString());
foodOrder.setOrderDescription(UUID.randomUUID().toString());
foodOrder.setRestaurant("foo");
foodOrderSource.foodOrders().send(MessageBuilder.withPayload(foodOrder).build());
// System.out.println(foodOrder.toString());
return "food ordered!";
}
}
FoodOrderSource
public interface FoodOrderSource {
String INPUT = "foo";
String OUTPUT = "fooChannel";
@Input("foo")
SubscribableChannel foo();
@Output("fooChannel")
MessageChannel foodOrders();
}
FoodOrderPublisher
@EnableBinding(FoodOrderSource.class)
public class FoodOrderPublisher {
}
Настройка работает, за исключением того, что оба StreamListener
получать те же сообщения. Таким образом, все записывается дважды. Читая документацию, он говорит, указав group
внутри привязок очередей оба слушателя будут зарегистрированы внутри группы, и только один слушатель получит одно сообщение. Я знаю, что приведенный выше пример не имеет смысла, но я хочу имитировать многоузловую среду с настройкой нескольких слушателей.
Почему сообщение получают оба слушателя? И как я могу убедиться, что сообщение получено только один раз в группе настройки?
Согласно документации, сообщения также должны быть автоматически подтверждены по умолчанию, но я не могу найти ничего, что указывает на то, что сообщения фактически подтверждаются. Я что-то здесь упускаю?
Вот несколько скриншотов кролика админа
2 ответа
Читая документацию, говорится, что указание группы внутри привязок очередей, оба слушателя будут зарегистрированы внутри группы, и только один слушатель получит одно сообщение.
Это верно, когда слушатели находятся в разных экземплярах приложения. Когда в одном экземпляре несколько слушателей, все они получают одно и то же сообщение. Это обычно используется с condition
где каждый слушатель может выразить заинтересованность в том, какие блюда им интересны.
По сути, конкурирующий потребитель - это сама привязка, которая отправляет сообщение фактическому @StreamListener
с в приложении.
Таким образом, вы не можете "имитировать многоузловую среду с настройкой нескольких слушателей" таким образом.
но я не могу найти ничего, что указывает на то, что сообщения на самом деле получают подтверждение
Что ты имеешь в виду? Если сообщение успешно обработано, контейнер получает сообщение и удаляется из очереди.
Как правильно ответить на этот пост уже ответил, но вы все еще можете посмотреть на это: