Есть ли интеграция функции весеннего облака webflux + поток весеннего облака с http источником

Я пытаюсь интегрировать весенний облачный поток с весенней облачной функцией webflux

поскольку в будущих выпусках они осуждают реактивные потоки Spring Cloud, я пытаюсь использовать функции Spring Cloud https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.2.RELEASE/single/spring-cloud-stream.html

Веб-функция Spring Cloud может предоставить конечную точку своей функции путями, как в документе

https://cloud.spring.io/spring-cloud-static/spring-cloud-function/1.0.0.RELEASE/single/spring-cloud-function.html

из облачного потока я вижу, что источник должен быть определен как поставщик https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.1.2.RELEASE/single/spring-cloud-stream.html

но мой вариант использования заключается в том, чтобы получить данные POST из реактивной конечной точки http и ввести их в kafka. Есть ли способ добиться этого с помощью функции весенней облачной сети и весеннего облачного потока?

из документа doc для функции весеннего облака с потоком весеннего облака

@SpringBootApplication
@EnableBinding(Source.class)
public static class SourceFromSupplier {
    public static void main(String[] args) {
        SpringApplication.run(SourceFromSupplier.class, "--spring.cloud.stream.function.definition=date");
    }
    @Bean
    public Supplier<Date> date() {
        return () -> new Date(12345L);
    }
}

и если я запускаю это, я вижу, что дата вставляется в kafka каждую 1 секунду, и если я вызываю конечную точку get для поставщика, например localhost:/8080/date приводит к ответу на дату, есть ли способ ввести paylaod из post в кафка с весенней облачной функцией?

1 ответ

Решение

Существует проблема, которую ваш вопрос помог обнаружить, и она связана с несоответствием жизненного цикла между автоконфигурациями, предоставляемыми функцией и потоком. Проблема проявляется в том, что точка отдыха, созданная Spring Cloud Functions, не может видеть привязки, так как она создана намного раньше.

Таким образом, мы рассмотрим проблему в ближайшее время. Между тем есть обходной путь, который потребует от вас доступа output канал из ApplicationContext (см. ниже):

@SpringBootApplication
@EnableBinding(Source.class)
public class SimpleFunctionRabbitDemoApplication {

  public static void main(String[] args) throws Exception {      
    SpringApplication.run(SimpleFunctionRabbitDemoApplication.class);
  }

  @Bean
  public Consumer<String> storeSync(ApplicationContext context) {
     return v -> {
        MessageChannel channel = context.getBean(Source.OUTPUT, MessageChannel.class);
        channel.send(MessageBuilder.withPayload(v).build());
     };
  }
}
Другие вопросы по тегам