Распечатать Кафка Стрим Вводить на консоль?
Я просматривал много документации Kafka для Java-приложения, над которым я работаю. Я пытался войти в лямбда-синтаксис, представленный в Java 8, но я немного отрывочен на этом основании и не чувствую себя слишком уверенным, что это должно быть то, что я использую до сих пор.
У меня Kafka/Zookeeper Service работает без каких-либо проблем, и я хочу написать небольшую примерную программу, которая на основе входных данных запишет ее, но не выполняет подсчет слов, так как примеров уже так много.
Что касается примеров данных, я получу строку следующей структуры:
Пример данных
This a sample string containing some keywords such as GPS, GEO and maybe a little bit of ACC.
Вопрос
Я хочу быть в состоянии извлечь 3-буквенные ключевые слова и распечатать их с System.out.println
, Как получить строковую переменную, содержащую входные данные? Я знаю, как применять регулярные выражения или даже просто искать по строке, чтобы получить ключевые слова.
Код
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "app_id");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "0:0:0:0:0:0:0:1:9092");
props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "0:0:0:0:0:0:0:1:2181");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
final Serde<String> stringSerde = Serdes.String();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
//How do I assign the input from in-stream to the following variable?
String variable = ?
}
У меня работает zookeeper, kafka, продюсер и потребитель, подключенные к одной и той же теме, поэтому я хочу в основном увидеть одно и то же String
появляются во всех экземплярах (производитель, потребитель и поток).
1 ответ
Если вы используете Kafka Streams, вам необходимо применить функции / операторы к вашим потокам данных. В вашем случае вы создаете KStream
объект, таким образом, вы хотите применить оператор к source
,
В зависимости от того, что вы хотите сделать, есть операторы, которые применяют функцию к каждой записи в потоке независимо (например, map()
) или другие операторы, которые применяют функцию для нескольких записей вместе (например, aggregateByKey()
). Вы должны заглянуть в документацию: http://docs.confluent.io/3.0.0/streams/developer-guide.html и примеры https://github.com/confluentinc/examples/tree/kafka-0.10.0.0-cp-3.0.0/kafka-streams
Таким образом, вы никогда не создаете локальные переменные, используя потоки Kafka, как показано в примере выше, а просто внедряете все в операторы / функции, которые объединяются в цепочки.
Например, если вы хотите распечатать все входные записи на стандартный вывод, вы можете сделать
KStream<String, String> source = builder.stream(stringSerde, stringSerde, "in-stream");
source.foreach(new ForeachAction<String, String>() {
void apply(String key, String value) {
System.out.println(key + ": " + value);
}
});
Таким образом, после запуска приложения через streams.start()
, он получит записи из вашей входной темы и для каждой записи вашей темы вызов apply(...)
сделано, что печатает запись на стандартный вывод.
Конечно, более естественным способом печати потока на консоль будет использование source.print()
(который внутри в основном такой же, как показано foreach()
оператор с уже данным ForeachAction
.)
Для вашего примера с назначением строки локальной переменной вам нужно поместить ваш код в apply(...)
и делать ваши регулярные выражения и т. д. там, чтобы "извлечь 3-х буквенные ключевые слова".
Лучший способ выразить это, однако, будет через комбинацию flatMapValues()
а также print()
(То есть, source.flatMapValues(...).print()
). flatMapValues()
вызывается для каждой входной записи (в вашем случае, я предполагаю, что ключ будет null
так что можешь проигнорировать это). В вашем flatMapValue
Функция, вы применяете свое регулярное выражение и для каждого совпадения, вы добавляете совпадение в список значений, которые вы, наконец, возвращаете.
source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
ArrayList<String> keywords = new ArrayList<String>();
// apply regex to value and for each match add it to keywords
return keywords;
}
}
Выход из flatMapValues
будет KStream
опять же, содержащий запись для каждого найденного ключевого слова (т. е. выходной поток является "объединением" по всем спискам, которые вы возвращаете в ValueMapper#apply()
). Наконец, вы просто распечатываете свой результат в консоли через print()
, (Конечно, вы также можете использовать один foreach
вместо flatMapValue
+print
но это было бы менее модульным.)