Как подключить функциональные компоненты Spring Cloud Stream к Binder Kafka?
Я использую документацию Spring Cloud Streams, чтобы попытаться понять, как подключить мой микросервис к Kafka через связующее, уже загруженное в Gradle. Я пробовал создать простой@Bean Function<String, String>()
в моем классе Spring Boot Application и подтвердили, что он может разговаривать с Kafka, используя командную строку для взаимодействия с uppercase-in-0
а также uppercase-out-0
темы, как описано в начале документации, подтверждающей, что приложение может взаимодействовать с Kafka. На этом этапе я попытался создать следующий класс, ожидая, что он загрузится через автоматическое обнаружение:
package com.yuknis.loggingconsumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class LoggingConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(LoggingConsumerApplication.class, args);
}
}
package com.yuknis.loggingconsumer.functions;
import java.util.function.Function;
public class CharCounter implements Function<String, Integer> {
/**
* Applies this function to the given argument.
*
* @param s the function argument
* @return the function result
*/
@Override
public Integer apply(String s) {
return s.length();
}
}
С application.properties
файлы как таковые:
spring.cloud.function.scan.packages:com.yuknis.loggingconsumer.functions
Я не уверен на 100%, что должно произойти сейчас, но предполагаю, что он должен увидеть класс и автоматически создать charcounter-out-0
а также charcounter-in-0
тема, которую я мог бы использовать и опубликовать, с данными в этих темах, проходящими через эту функцию. Это не то, что происходит. Что я могу упустить? Этот класс должен создавать тему так же, как@Bean
было бы?
1 ответ
Несмотря на то, что каждая из функций загружена spring.cloud.function.scan.packages
установить в пакет и spring.cloud.function.scan.enabled
установлен в true
, он по-прежнему не создает темы. Вам все равно нужно будет установитьspring.cloud.function.scan.definition
к Function
, Consumer
, или Supplier
вы бы хотели общаться с Кафкой так:
spring.cloud.function:
scan:
enabled: true
packages: com.yuknis.loggingconsumer.functions
definition: charCounter;lowercase;uppercase
После этого он создаст charCounter-in-0
а также charCounter-out-0
темы, которые при необходимости можно сопоставить с spring.cloud.function.charCounter-in-0
или spring.cloud.function.charCounter-out-0
свойство выражения.