Spring Cloud: поставщик постоянно публикует события Kafka вместо одного?

Spring Cloud: поставщик постоянно публикует события Kafka, как публиковать только одно?

        public static HashMap<String, Ticker> transactionsOfAccount = new HashMap<>(0);
    public LinkedList<Ticker> lists = new LinkedList<>();

Производитель.класс

      @Bean
public Supplier<Message<Ticker>> messageSupplier() {

    return () -> {
        if (tickerPublisher.lists.peek() != null) {
            Message<Ticker> msg = MessageBuilder
                    .withPayload(tickerPublisher.lists.peek())
                    .build();
            log.info("Total Size is {}",tickerPublisher.lists.size());
            log.info("Message: {}", msg.getPayload());
            tickerPublisher.lists.get(0).setStatus(Status.SUCESS);
            return  msg;
        } else {
            return null;
        }
    };
}

RestController.класс

      @GetMapping(value = "/quote-mono", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Mono<Ticker> getQuoteMono(@RequestParam("symbol") String symbol) {
    tickerPublisher.publisherMono(symbol);
    return mono;
}

PublisherService.класс

       public void publisherMono(String ticker) {
        String path = ticker.toUpperCase() + "/prices/realtime?api_key=" + apiKey;
        this.webClient
                .get()
                .uri(path)
                .retrieve()
                .bodyToMono(Ticker.class)
                .flatMap(data -> sendToKafka(ticker, data))
                .doOnNext(data-> {
                    log.info("next events from published : {}", data);
                    if (transactionsOfAccount.containsKey(ticker) && !lists.isEmpty()) {
                        log.info("list is clear now ");
                        transactionsOfAccount.clear();
                    }
                })
                .subscribe(
                        data -> {
                            log.info("data is {}", data);
                            this.sinkMono.tryEmitValue((Ticker) data);
                        },
                        (err) -> log.info(String.valueOf(err)),
                        () -> {
                            log.info("Completed");
                        }
                )
        ;
        log.info(lists.toString());
    }

Проблема заключается в том, что класс поставщиков постоянно публикует повторяющиеся события в KafkaBroker.

введите описание изображения здесь

Нужна помощь, где возникают проблемы, как мы можем публиковать события Mono Single, если ответ представляет собой один объект

Реакция отдыха

      {"last_price":245.3,"last_time":"2022-12-09T22:44:53.000Z","last_size":null,"bid_price":244.98,"bid_size":100,"ask_price":250.0,"ask_size":96,"open_price":246.4,"close_price":null,"high_price":248.2,"low_price":244.37,"exchange_volume":1168680,"market_volume":null,"updated_on":"2022-12-09T22:59:58.183Z","source":"bats_delayed","security":{"id":"sec_XaL6mg","ticker":"MSFT","exchange_ticker":"MSFT:UW","figi":"BBG000BPHFS9","composite_figi":"BBG000BPH459"}}

Ожидается публикация только одного события с помощью WebClient.

1 ответ

В вашем случае, вероятно, лучшим выбором будет использование StreamBridge.

        @Autowired
  private final StreamBridge streamBridge;

  @Override
  public void send(T t) {
    Message<T> message = message(t);
    boolean success = streamBridge.send(bindingName(), message);
    if (success) {
      log.info("send message to kafka {}", message);
    } else {
      String msg = "Message was not sent to stream (kafka producer): " + message;
      log.warn(msg);
      throw exception(msg);
    }
  }
Другие вопросы по тегам