Совокупные сообщения в Spring Cloud Stream

Я новичок в Spring Cloud и хочу изменить нашу моноструктуру на микро-сервисы. Сначала я сказал, что я пытаюсь сделать следующее:

  1. Получать запросы на вызов веб-службы (внешней системы) из разных источников. В любое определенное время это может быть 1 запрос или до 100 000 запросов.
  2. Массовая поддержка внешней системы, поэтому лучше, если я смогу объединять сообщения и отправлять их навалом. Например, продолжайте агрегирование до тех пор, пока не будет достигнут порог числа (сообщение 100) или порог времени 2 секунды.
  3. Также, если я получил ошибку, я хочу отступить в геометрической прогрессии

Моя первая идея - создать процессор перед мойкой, который будет выполнять вышеупомянутую агрегацию.

Это правильный способ мышления в облачных вычислениях или это другой путь, который нужно пройти?


Рабочий раствор

@EnableBinding(Processor.class)
class Configuration {

    @Autowired
    Processor processor;


    @ServiceActivator(inputChannel = Processor.INPUT)
    @Bean
    public MessageHandler aggregator() {

        AggregatingMessageHandler aggregatingMessageHandler =
                new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
                        new SimpleMessageStore(10));

        //AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean();
        //aggregatorFactoryBean.setMessageStore();
        aggregatingMessageHandler.setOutputChannel(processor.output());
        //aggregatorFactoryBean.setDiscardChannel(processor.output());
        aggregatingMessageHandler.setSendPartialResultOnExpiry(true);
        aggregatingMessageHandler.setSendTimeout(1000L);
        aggregatingMessageHandler.setCorrelationStrategy(new  ExpressionEvaluatingCorrelationStrategy("'FOO'"));
        aggregatingMessageHandler.setReleaseStrategy(new MessageCountReleaseStrategy(3)); //ExpressionEvaluatingReleaseStrategy("size() == 5")
        aggregatingMessageHandler.setExpireGroupsUponCompletion(true);
        aggregatingMessageHandler.setGroupTimeoutExpression(new ValueExpression<>(3000L)); //size() ge 2 ? 5000 : -1
        aggregatingMessageHandler.setExpireGroupsUponTimeout(true);
        return aggregatingMessageHandler;
    }
}

1 ответ

Решение

Вы можете написать aggregator приложение процессора, которое объединяет несколько сообщений в одно сообщение. Для получения дополнительной информации о агрегаторе Spring Integration, пожалуйста, обратитесь сюда

Другие вопросы по тегам