Понимание весенних облачных сообщений с rabbitmq

Я думаю, что у меня проблемы с пониманием весенних облачных сообщений и я не могу найти ответ на "проблему", с которой я сталкиваюсь.

У меня есть следующие настройки (с помощью Spring-Boot 2.0.3.RELEASE).

application.yml

spring:
    rabbitmq:
      host: localhost
      port: 5672
      username: guest
      password: guest
      virtual-host: /
    cloud:
      stream:
        bindings:
          input:
            destination: foo
            group: fooGroup
          fooChannel:
            destination: foo

Сервисный класс

@Autowired
FoodOrderController foodOrderController;

@Bean
public CommandLineRunner runner() {
    return (String[] args) -> {
       IntStream.range(0,50).forEach(e -> foodOrderController.orderFood());
    };
}

@StreamListener(target = FoodOrderSource.INPUT)
public void processCheapMeals(String meal){
    System.out.println("This was a great meal!: "+ meal);
}

@StreamListener(target = FoodOrderSource.INPUT)
public void processCheapMeals1(String meal){
    System.out.println("This was a great meal!: "+ meal);
}

FoodOrderController

public class FoodOrderController {

    @Autowired
    FoodOrderSource foodOrderSource;

    public String orderFood(){
        var foodOrder = new FoodOrder();
        foodOrder.setCustomerAddress(UUID.randomUUID().toString());
        foodOrder.setOrderDescription(UUID.randomUUID().toString());
        foodOrder.setRestaurant("foo");
        foodOrderSource.foodOrders().send(MessageBuilder.withPayload(foodOrder).build());
       // System.out.println(foodOrder.toString());
        return "food ordered!";
    }
}

FoodOrderSource

public interface FoodOrderSource {
    String INPUT = "foo";
    String OUTPUT = "fooChannel";

    @Input("foo")
    SubscribableChannel foo();
    @Output("fooChannel")
    MessageChannel foodOrders();
}

FoodOrderPublisher

@EnableBinding(FoodOrderSource.class)
public class FoodOrderPublisher {
}

Настройка работает, за исключением того, что оба StreamListener получать те же сообщения. Таким образом, все записывается дважды. Читая документацию, он говорит, указав group внутри привязок очередей оба слушателя будут зарегистрированы внутри группы, и только один слушатель получит одно сообщение. Я знаю, что приведенный выше пример не имеет смысла, но я хочу имитировать многоузловую среду с настройкой нескольких слушателей.

Почему сообщение получают оба слушателя? И как я могу убедиться, что сообщение получено только один раз в группе настройки?

Согласно документации, сообщения также должны быть автоматически подтверждены по умолчанию, но я не могу найти ничего, что указывает на то, что сообщения фактически подтверждаются. Я что-то здесь упускаю?

Вот несколько скриншотов кролика админа

2 ответа

Решение

Читая документацию, говорится, что указание группы внутри привязок очередей, оба слушателя будут зарегистрированы внутри группы, и только один слушатель получит одно сообщение.

Это верно, когда слушатели находятся в разных экземплярах приложения. Когда в одном экземпляре несколько слушателей, все они получают одно и то же сообщение. Это обычно используется с condition где каждый слушатель может выразить заинтересованность в том, какие блюда им интересны.

По сути, конкурирующий потребитель - это сама привязка, которая отправляет сообщение фактическому @StreamListener с в приложении.

Таким образом, вы не можете "имитировать многоузловую среду с настройкой нескольких слушателей" таким образом.

но я не могу найти ничего, что указывает на то, что сообщения на самом деле получают подтверждение

Что ты имеешь в виду? Если сообщение успешно обработано, контейнер получает сообщение и удаляется из очереди.

Как правильно ответить на этот пост уже ответил, но вы все еще можете посмотреть на это:

https://github.com/jinternals/spring-cloud-stream

Другие вопросы по тегам