Как использовать RMQ и весенний облачный поток для создания потребителя на основе разделов

Я могу разработать образец потребителя, используя облачный поток и rabbit mq, если у меня есть 3 раздела, созданные производителем, и если я разверну 3 экземпляра в CF, каждый выбирает одну очередь и обрабатывает сообщения с использованием индекса, как описано в документации.

Теперь вопрос: если у меня есть 10 разделов, кажется, мне нужно 10 экземпляров, что является пустой тратой ресурсов, может ли один потребитель прослушать несколько разделов. Причина, по которой у меня есть производитель разделов, потому что для меня важна последовательность сообщений.

2 ответа

Решение

Вот один из способов...

@SpringBootApplication
@EnableBinding(TwoInputs.class)
public class So43661064Application {

    public static void main(String[] args) {
        SpringApplication.run(So43661064Application.class, args);
    }

    @StreamListener("input1")
    public void foo1(String in) {
        doFoo(in);
    }

    @StreamListener("input2")
    public void foo2(String in) {
        doFoo(in);
    }

    protected void doFoo(String in) {
        System.out.println(in);
    }

    public interface TwoInputs {

        @Input("input1")
        SubscribableChannel input1();

        @Input("input2")
        SubscribableChannel input2();

    }

}

а также

spring.cloud.stream.bindings.input1.group=bar-0
spring.cloud.stream.bindings.input1.destination=foo
spring.cloud.stream.rabbit.bindings.input1.consumer.bindingRoutingKey=foo-0

spring.cloud.stream.bindings.input2.group=bar-1
spring.cloud.stream.bindings.input2.destination=foo
spring.cloud.stream.rabbit.bindings.input2.consumer.bindingRoutingKey=foo-1

Это будет использовать из 2 разделов, созданных производителем в ответ на ваш другой вопрос.

Там в настоящее время нет способа иметь @StreamListener слушать сразу 2 раздела.

РЕДАКТИРОВАТЬ

Вот еще один способ, используя exchange->exchange связывание...

Режиссер

@SpringBootApplication
@EnableBinding(Source.class)
public class So43614477Application implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(So43614477Application.class, args).close();
    }

    @Autowired
    private MessageChannel output;

    @Autowired
    private AmqpAdmin admin;

    @Value("${spring.cloud.stream.bindings.output.producer.partition-count}")
    private int partitionCount;

    @Value("${spring.cloud.stream.bindings.output.destination}")
    private String destination;

    @Override
    public void run(String... args) throws Exception {
        for (int i = 0; i < this.partitionCount; i++) {
            String partition = this.destination + "-" + i;
            TopicExchange exchange = new TopicExchange(partition);
            this.admin.declareExchange(exchange);
            Binding binding = BindingBuilder.bind(exchange).to(new TopicExchange(this.destination))
                    .with(partition);
            this.admin.declareBinding(binding);
        }

        output.send(MessageBuilder.withPayload("fiz").setHeader("whichPart", 0).build());
        output.send(MessageBuilder.withPayload("buz").setHeader("whichPart", 1).build());
    }

}

а также

spring.cloud.stream.bindings.output.destination=foo
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['whichPart']
spring.cloud.stream.bindings.output.producer.partition-count=2

потребитель

@SpringBootApplication
@EnableBinding(Sink.class)
public class So43661064Application {

    public static void main(String[] args) {
        SpringApplication.run(So43661064Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void foo1(String in) {
        System.out.println(in);
    }

}

а также

spring.cloud.stream.bindings.input.group=bar
spring.cloud.stream.bindings.input.destination=foo-0,foo-1

Разделы от первичного обмена направляются на обмен разделами, и потребитель получает список обменов, чтобы связать его очереди.

Вы можете передать этот список в командной строке.

Почему вы думаете, что это пустая трата ресурсов? Если ваше требование диктует необходимость обработки с сохранением состояния и вы разбиваетесь на несколько разделов, вам понадобится N потребителей для N разделов.

Если вы смешаете сообщения разных разделов в одной и той же очереди, ваш порядок пострадает. Если только вы не добавите логику на свою сторону для агрегирования сообщений на основе определенных метаданных.

Другие вопросы по тегам