Вопросы о составленном процессоре и приемнике

Я использую Spring Could Data Flow 1.7.2.RELEASE и пытаюсь подписаться на этот пост

создать "комбинацию процессор + приемник в одном приложении:" новый приемник "".

Когда я структурировал свой код в качестве примера блога, у меня возникли проблемы, и я подумал, что это связано с тем, что в примере блога использовалась функция java.util.Function, которая сродни процессору.

Я догадался, что должен использовать java.util.Consumer, потому что я пытаюсь превратить мой существующий Sink в гибрид Processor-Sink

Мой класс выглядит так:

@EnableBinding(Sink.class)
public class SampleCombinedSink extends Something {

    String modifiedPayload;
    Logger log;

    Consumer<String> consumer = i -> { modifiedPayload="STUFF ADDED BY CONSUMER i=["+i+"]"; };

    public void accept(String s){
      log.info("SampleCombinedSink.accept() s="+s);
    }

    @StreamListener(Sink.INPUT)
    public void doSink(String payload) {

      consumer.accept(payload);
      log.info("SampleCombinedSink.doSink() Payload received.");
      log.info("SampleCombinedSink.doSink() payload="+ payload);
      log.info("SampleCombinedSink.doSink() modifiedPayload="+ modifiedPayload);
    }
}

Мой вывод выглядит так:

SampleCombinedSink.doSink() Payload received.
SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:53.330+0000
SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:53.330+0000]
SampleCombinedSink.doSink() Payload received.
SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:54.332+0000
SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:54.332+0000]
SampleCombinedSink.doSink() Payload received.
SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:55.333+0000
SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:55.333+0000]
SampleCombinedSink.doSink() Payload received.
SampleCombinedSink.doSink() payload=Friday 11 January 2019 19:03:56.313+0000
SampleCombinedSink.doSink() modifiedPayload=STUFF ADDED BY CONSUMER i=[Friday 11 January 2019 19:03:56.313+0000]

Мой источник генерирует метку времени каждую секунду.

Я смущен моим потребителем.

Consumer<String> consumer = i -> { modifiedPayload="STUFF ADDED BY CONSUMER i=["+i+"]"; };

Я думал, что смогу сделать что-то вроде:

Consumer<String> consumer = i -> { i="STUFF ADDED BY CONSUMER i=["+i+"]"; };

а затем полезная нагрузка в

@StreamListener(Sink.INPUT)
public void doSink(String payload) {

будет содержать "STUFF ADDED BY CONSUMER i = [timestamp]"

Это не так.

Я хочу изменить вход на doSink и изменить его, добавив "STUFF ADDED BY CONSUMER", чтобы при вводе полезная нагрузка doSink(String payload) содержала "STUFF ADDED BY CONSUMER i = [timestamp]"

Как мне этого добиться?

1 ответ

Решение

В этом случае вам не нужно ничего менять Sink вместо приложения, просто используйте ту же функцию сантехника на Sink сторона приложения.

Например, чтобы сделать:

a combination of processor + sink into a single application: “a new sink”."

все, что вам нужно, это иметь свой function бобы как часть Sink приложение или даже имея function бобы в отдельном артефакте, но в classpath из Sink приложение. Если у вас есть это, вы можете определить spring.cloud.stream.function.definition для Sink приложение.

Вы можете увидеть образец для этого здесь. Приложение log-composed имеет определенные функции bean.

Чтобы запустить образец:

dataflow:>app register --name http-transformer --type source --uri file:///Users/igopinathan/dev/git/ilayaperumalg/sandbox/function-composition/http-transformer/target/http-transformer-2.1.0.BUILD-SNAPSHOT.jar
Successfully registered application 'source:http-transformer'

dataflow:>app register --name log-composed --type sink --uri file:///Users/igopinathan/.m2/repository/org/springframework/cloud/stream/app/log-composed/2.1.0.BUILD-SNAPSHOT/log-composed-2.1.0.BUILD-SNAPSHOT.jar
Successfully registered application 'sink:log-composed'

dataflow:>stream create helloComposed --definition "http-transformer --server.port=9001 | log-composed"
Created new stream 'helloComposed'


dataflow:>stream deploy helloComposed --properties "app.log-composed.spring.cloud.stream.function.definition=upper|concat,deployer.*.local.inheritLogging=true"
Deployment request has been sent for stream 'helloComposed'

dataflow:>http post --data "friend" --target "http://localhost:9001"

в stream deploy команда, вы можете увидеть приложение, используемое для указания function composition является log-composed,

Другие вопросы по тегам