Spring Cloud: поставщик постоянно публикует события Kafka вместо одного?
Spring Cloud: поставщик постоянно публикует события Kafka, как публиковать только одно?
public static HashMap<String, Ticker> transactionsOfAccount = new HashMap<>(0);
public LinkedList<Ticker> lists = new LinkedList<>();
Производитель.класс
@Bean
public Supplier<Message<Ticker>> messageSupplier() {
return () -> {
if (tickerPublisher.lists.peek() != null) {
Message<Ticker> msg = MessageBuilder
.withPayload(tickerPublisher.lists.peek())
.build();
log.info("Total Size is {}",tickerPublisher.lists.size());
log.info("Message: {}", msg.getPayload());
tickerPublisher.lists.get(0).setStatus(Status.SUCESS);
return msg;
} else {
return null;
}
};
}
RestController.класс
@GetMapping(value = "/quote-mono", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Mono<Ticker> getQuoteMono(@RequestParam("symbol") String symbol) {
tickerPublisher.publisherMono(symbol);
return mono;
}
PublisherService.класс
public void publisherMono(String ticker) {
String path = ticker.toUpperCase() + "/prices/realtime?api_key=" + apiKey;
this.webClient
.get()
.uri(path)
.retrieve()
.bodyToMono(Ticker.class)
.flatMap(data -> sendToKafka(ticker, data))
.doOnNext(data-> {
log.info("next events from published : {}", data);
if (transactionsOfAccount.containsKey(ticker) && !lists.isEmpty()) {
log.info("list is clear now ");
transactionsOfAccount.clear();
}
})
.subscribe(
data -> {
log.info("data is {}", data);
this.sinkMono.tryEmitValue((Ticker) data);
},
(err) -> log.info(String.valueOf(err)),
() -> {
log.info("Completed");
}
)
;
log.info(lists.toString());
}
Проблема заключается в том, что класс поставщиков постоянно публикует повторяющиеся события в KafkaBroker.
введите описание изображения здесь
Нужна помощь, где возникают проблемы, как мы можем публиковать события Mono Single, если ответ представляет собой один объект
Реакция отдыха
{"last_price":245.3,"last_time":"2022-12-09T22:44:53.000Z","last_size":null,"bid_price":244.98,"bid_size":100,"ask_price":250.0,"ask_size":96,"open_price":246.4,"close_price":null,"high_price":248.2,"low_price":244.37,"exchange_volume":1168680,"market_volume":null,"updated_on":"2022-12-09T22:59:58.183Z","source":"bats_delayed","security":{"id":"sec_XaL6mg","ticker":"MSFT","exchange_ticker":"MSFT:UW","figi":"BBG000BPHFS9","composite_figi":"BBG000BPH459"}}
Ожидается публикация только одного события с помощью WebClient.
1 ответ
В вашем случае, вероятно, лучшим выбором будет использование StreamBridge.
@Autowired
private final StreamBridge streamBridge;
@Override
public void send(T t) {
Message<T> message = message(t);
boolean success = streamBridge.send(bindingName(), message);
if (success) {
log.info("send message to kafka {}", message);
} else {
String msg = "Message was not sent to stream (kafka producer): " + message;
log.warn(msg);
throw exception(msg);
}
}