Облачная функция Spring для создания GlobalKTable из потока
Есть ли пример того, как создать GlobalKTable для подсчета из KStream, используя поток Spring Cloud и используя функциональный подход?
2 ответа
Согласно документации GlobalKTables доступны только для чтения, вы не можете изменять глобальную таблицу во время обработки.
Поскольку GlobalKTables являются потребителями темы Kafka, вы можете просто отправить свои данные в исходную тему GlobalKTable, и в конечном итоге они будут добавлены в таблицу. Но нельзя быть уверенным, что GlobalKTable будет обновлен сразу.
Является ли реализация интерфейса процессора правильным?
@Bean
public Consumer<KStream<String, Long>> processorsample() {
return input -> input.process(() -> new Processor<String, Long>() {
@Override
public void init(ProcessorContext context) {
if (state == null) {
state = (KeyValueStore<String, Long>) context.getStateStore("mystate");
}
}
@Override
public void process(String key, Long value) {
if (state != null) {
if (key != null) {
Long currentCount = state.get(key);
if (currentCount == null) {
state.put(key, value);
} else {
state.put(key, currentCount + value);
}
}
}
}
@Override
public void close() {
if (state != null) {
state.close();
}
}
}, "mystate");
}