Как подключить функциональные компоненты Spring Cloud Stream к Binder Kafka?

Я использую документацию Spring Cloud Streams, чтобы попытаться понять, как подключить мой микросервис к Kafka через связующее, уже загруженное в Gradle. Я пробовал создать простой@Bean Function<String, String>() в моем классе Spring Boot Application и подтвердили, что он может разговаривать с Kafka, используя командную строку для взаимодействия с uppercase-in-0 а также uppercase-out-0темы, как описано в начале документации, подтверждающей, что приложение может взаимодействовать с Kafka. На этом этапе я попытался создать следующий класс, ожидая, что он загрузится через автоматическое обнаружение:

package com.yuknis.loggingconsumer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class LoggingConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(LoggingConsumerApplication.class, args);
    }

}

package com.yuknis.loggingconsumer.functions;

import java.util.function.Function;

public class CharCounter implements Function<String, Integer> {

    /**
     * Applies this function to the given argument.
     *
     * @param s the function argument
     * @return the function result
     */
    @Override
    public Integer apply(String s) {
        return s.length();
    }

}

С application.properties файлы как таковые:

spring.cloud.function.scan.packages:com.yuknis.loggingconsumer.functions

Я не уверен на 100%, что должно произойти сейчас, но предполагаю, что он должен увидеть класс и автоматически создать charcounter-out-0 а также charcounter-in-0тема, которую я мог бы использовать и опубликовать, с данными в этих темах, проходящими через эту функцию. Это не то, что происходит. Что я могу упустить? Этот класс должен создавать тему так же, как@Bean было бы?

1 ответ

Несмотря на то, что каждая из функций загружена spring.cloud.function.scan.packages установить в пакет и spring.cloud.function.scan.enabled установлен в true, он по-прежнему не создает темы. Вам все равно нужно будет установитьspring.cloud.function.scan.definition к Function, Consumer, или Supplier вы бы хотели общаться с Кафкой так:

spring.cloud.function:
  scan:
    enabled: true
    packages: com.yuknis.loggingconsumer.functions
  definition: charCounter;lowercase;uppercase

После этого он создаст charCounter-in-0 а также charCounter-out-0 темы, которые при необходимости можно сопоставить с spring.cloud.function.charCounter-in-0 или spring.cloud.function.charCounter-out-0 свойство выражения.

Другие вопросы по тегам