Весенний облачный поток - агрегаты
Я пытаюсь реализовать предложенные агрегаты СКС, но я не уверен, что понимаю их истинную цель, так как результаты меня удивляют. Во-первых, вот код...
Источник, поставщик сообщений для планирования:
@SpringBootApplication
@EnableBinding(Source.class)
public class SourceApplication {
private final Logger logger = LoggerFactory.getLogger(SourceApplication.class);
@Bean
@InboundChannelAdapter(Source.OUTPUT)
public MessageSource<String> createMessage() {
return () -> {
String payload = now().toString();
logger.warn("Sent: " + payload);
return new GenericMessage<>(payload);
};
}
}
Тогда простой процессор (преобразователь):
@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApplication {
@Transformer(inputChannel = Processor.INPUT,
outputChannel = Processor.OUTPUT)
public String processMessage(String payload) {
return payload + " is the time.";
}
}
И это конечный потребитель (раковина):
@SpringBootApplication
@EnableBinding(Sink.class)
public class SinkApplication {
private final Logger logger = LoggerFactory.getLogger(SinkApplication.class);
@ServiceActivator(inputChannel = Sink.INPUT)
public void loggerSink(Object payload) {
logger.warn("Received: " + payload);
}
}
И, наконец, совокупность связывает эти три:
@SpringBootApplication
public class SampleAggregateApplication {
public static void main(String[] args) {
new AggregateApplicationBuilder().web(false)
.from(SourceApplication.class)
.args("--spring.cloud.stream.bindings.output.destination=step1", "--fixedDelay=5000")
.via(ProcessorApplication.class)
.args("--spring.cloud.stream.bindings.input.destination=step1",
"--spring.cloud.stream.bindings.output.destination=step2")
.to(SinkApplication.class)
.args("--spring.cloud.stream.bindings.input.destination=step2")
.run(args);
}
}
Когда агрегат запущен, это отрывок полученных трасс:
2017-02-03 09:59:13.428 WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:13.428
2017-02-03 09:59:13.949 WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:13.949
2017-02-03 09:59:13.949 WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:13.949
2017-02-03 09:59:13.996 WARN 18688 --- [ask-scheduler-9] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:13.996
2017-02-03 09:59:14.430 WARN 18688 --- [ask-scheduler-2] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:14.430
2017-02-03 09:59:14.956 WARN 18688 --- [sk-scheduler-10] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:14.956
2017-02-03 09:59:14.956 WARN 18688 --- [sk-scheduler-10] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:14.956 is the time.
2017-02-03 09:59:14.999 WARN 18688 --- [ask-scheduler-5] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:14.999
2017-02-03 09:59:15.432 WARN 18688 --- [ask-scheduler-6] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:15.432
2017-02-03 09:59:15.961 WARN 18688 --- [ask-scheduler-7] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:15.961
2017-02-03 09:59:15.961 WARN 18688 --- [ask-scheduler-7] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:15.961
2017-02-03 09:59:16.000 WARN 18688 --- [ask-scheduler-1] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:16
2017-02-03 09:59:16.001 WARN 18688 --- [ask-scheduler-1] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:16
2017-02-03 09:59:16.437 WARN 18688 --- [ask-scheduler-4] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:16.437
2017-02-03 09:59:16.966 WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:16.966
2017-02-03 09:59:17.006 WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:17.006
2017-02-03 09:59:17.006 WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:17.006
2017-02-03 09:59:17.443 WARN 18688 --- [ask-scheduler-9] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:17.443
2017-02-03 09:59:17.971 WARN 18688 --- [ask-scheduler-2] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:17.971
2017-02-03 09:59:17.971 WARN 18688 --- [ask-scheduler-2] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:17.971
2017-02-03 09:59:18.007 WARN 18688 --- [sk-scheduler-10] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:18.007
2017-02-03 09:59:18.448 WARN 18688 --- [ask-scheduler-5] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:18.448
2017-02-03 09:59:18.976 WARN 18688 --- [ask-scheduler-6] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:18.976
2017-02-03 09:59:18.976 WARN 18688 --- [ask-scheduler-6] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:18.976 is the time.
2017-02-03 09:59:19.012 WARN 18688 --- [ask-scheduler-7] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:19.012
2017-02-03 09:59:19.449 WARN 18688 --- [ask-scheduler-1] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:19.449
2017-02-03 09:59:19.982 WARN 18688 --- [ask-scheduler-4] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:19.982
2017-02-03 09:59:19.982 WARN 18688 --- [ask-scheduler-4] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:19.982
2017-02-03 09:59:20.018 WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SourceApplication : Sent: 2017-02-03T09:59:20.018
2017-02-03 09:59:20.018 WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SinkApplication : Received: 2017-02-03T09:59:20.018
2017
Я ожидал бы, что КАЖДОЕ сообщение следовало бы за определенным циклом: источник-процессор-приемник. Мы видим, что по крайней мере 2 из 3 сообщений потеряны, и только 1 из 4 сообщений преобразуется. Примечание: назначения каналов были добавлены во второй попытке, чтобы избежать предполагаемого смешения между приложениями (с использованием того же промежуточного программного обеспечения RabbitMQ).
Может кто-нибудь сказать мне, правильно ли я понял агрегаты цели и сделал ли правильные вещи для ее реализации? Заранее спасибо.
1 ответ
Пара вещей:
- Вы не должны указывать места назначения для своих приложений, поскольку приложения, являющиеся частью совокупности, взаимодействуют по внутренним каналам, а не через посредника;
- во-вторых (а это, к сожалению, не указано в нашей документации) - при использовании разные части агрегата должны принадлежать разным Java-пакетам.
@SpringBootApplication
на каждом агрегатном определении компонента. Что происходит в вашем случае, так это то, что вы получаете несколько потребителей по различным каналам вместо ожидаемой цепочки. Перемещение Source, Transformer и Sink в отдельные пакеты должно работать для вас. Кроме того, добавлен https://github.com/spring-cloud/spring-cloud-stream/issues/785 для отслеживания этого.