Облачная функция Spring для создания GlobalKTable из потока

Есть ли пример того, как создать GlobalKTable для подсчета из KStream, используя поток Spring Cloud и используя функциональный подход?

2 ответа

Согласно документации GlobalKTables доступны только для чтения, вы не можете изменять глобальную таблицу во время обработки.

Поскольку GlobalKTables являются потребителями темы Kafka, вы можете просто отправить свои данные в исходную тему GlobalKTable, и в конечном итоге они будут добавлены в таблицу. Но нельзя быть уверенным, что GlobalKTable будет обновлен сразу.

Является ли реализация интерфейса процессора правильным?

    @Bean
    public Consumer<KStream<String, Long>> processorsample() {
        return input -> input.process(() -> new Processor<String, Long>() {

            @Override
            public void init(ProcessorContext context) {
                if (state == null) {
                    state = (KeyValueStore<String, Long>) context.getStateStore("mystate");
                } 
            }

            @Override
            public void process(String key, Long value) {
                if (state != null) {
                    if (key != null) {
                        Long currentCount = state.get(key);
                        if (currentCount == null) {
                            state.put(key, value);
                        } else {
                            state.put(key, currentCount + value);
                        }

                    }

                }
            }

            @Override
            public void close() {
                if (state != null) {
                    state.close();
                }
            }
        }, "mystate");

    }
Другие вопросы по тегам