Объединение адаптеров входящего канала и передатчика потока
Я играю с реактивным весенним облачным потоком и сталкиваюсь с проблемой.
Рассмотрим следующий код:
@InboundChannelAdapter("list", poller = [(Poller(fixedDelay = "\${thetis.listInterval:60000}"))])
fun timerMessageSource(): Flux<Center> = config.centers.toFlux()
Моя цель здесь состоит в том, чтобы создать поток, который должен быть поглощен чем-то в форме:
@StreamListener("list") @Output("download")
fun processList(center: Center): Flux<Product> = ...
Но это, похоже, не работает. Адаптер канала правильно генерирует потоки, но говорит следующее:
org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Unrecognized token 'FluxIterable': was expecting ('true', 'false' or 'null')
Я думал, что я добавлю StreamEmitter
аннотации вдоль адаптера входящего канала, но это, похоже, не работает.
Как правильно реализовать такой поток?
Спасибо и всего наилучшего,
Фернандо
1 ответ
Исключение довольно ясно: вы производите Flux
объект как payload
сообщения для отправки list
канал для передачи в целевое место назначения в промежуточном программном обеспечении обмена сообщениями. И что это полностью правильно, что Flux
as is не совместим с JSON для сериализации.
С другой стороны, я не уверен, что такое Kotlin и компилирует ваш код в Java, но у нас есть что-то вроде этого (для Java):
@StreamEmitter
@Output("list")
public Flux<Center> timerMessageSource() {
return config.centers.toFlux();
}
И каждый испущенный элемент в потоке будет сериализован и отправлен в виде записи или сообщения в Binder. Если твой Center
конечно, совместим с JSON. Для этого вам нужен spring-cloud-stream-reactive
зависимость.
Верно, @InboundChannelAdapter
здесь не помогает или даже мешает.
Если вы беспокоитесь о какой-то периодической эмиссии, стоит рассмотреть возможность планирования поддержки в Project Reactor.