Объединение адаптеров входящего канала и передатчика потока

Я играю с реактивным весенним облачным потоком и сталкиваюсь с проблемой.

Рассмотрим следующий код:

@InboundChannelAdapter("list", poller = [(Poller(fixedDelay = "\${thetis.listInterval:60000}"))])
fun timerMessageSource(): Flux<Center> = config.centers.toFlux()

Моя цель здесь состоит в том, чтобы создать поток, который должен быть поглощен чем-то в форме:

 @StreamListener("list") @Output("download")
 fun processList(center: Center): Flux<Product> = ...

Но это, похоже, не работает. Адаптер канала правильно генерирует потоки, но говорит следующее:

 org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Unrecognized token 'FluxIterable': was expecting ('true', 'false' or 'null')

Я думал, что я добавлю StreamEmitter аннотации вдоль адаптера входящего канала, но это, похоже, не работает.

Как правильно реализовать такой поток?

Спасибо и всего наилучшего,

Фернандо

1 ответ

Решение

Исключение довольно ясно: вы производите Flux объект как payload сообщения для отправки list канал для передачи в целевое место назначения в промежуточном программном обеспечении обмена сообщениями. И что это полностью правильно, что Flux as is не совместим с JSON для сериализации.

С другой стороны, я не уверен, что такое Kotlin и компилирует ваш код в Java, но у нас есть что-то вроде этого (для Java):

@StreamEmitter
@Output("list")
public Flux<Center> timerMessageSource() {
     return config.centers.toFlux();
}

И каждый испущенный элемент в потоке будет сериализован и отправлен в виде записи или сообщения в Binder. Если твой Center конечно, совместим с JSON. Для этого вам нужен spring-cloud-stream-reactive зависимость.

Верно, @InboundChannelAdapter здесь не помогает или даже мешает.

Если вы беспокоитесь о какой-то периодической эмиссии, стоит рассмотреть возможность планирования поддержки в Project Reactor.

Другие вопросы по тегам