писать в несколько тем Kafka в apache-beam?
Я выполняю простую программу подсчета слов, в которой я использовал одну тему Kafka (производитель) в качестве источника ввода, а затем применяю к ней pardo для расчета количества слов. Теперь мне нужна помощь, чтобы написать слова на разные темы, исходя из их частоты. Допустим, все слова с одинаковой частотой попадут в тему 1, а остальные - в тему 2.
может кто-нибудь помочь мне с примером?
1 ответ
Это можно сделать с помощью метода writeRecord Kafka.io, который принимает Producer
ниже код -:
static class ExtractWordsFn extends DoFn<String, String> {
private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
private final Distribution lineLenDist =
Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");
@ProcessElement
public void processElement(@Element String element, OutputReceiver<String> receiver) {
lineLenDist.update(element.length());
if (element.trim().isEmpty()) {
emptyLines.inc();
}
String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1);
for (String word : words) {
if (!word.isEmpty()) {
receiver.output(word);
}
}
}
}
public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, ProducerRecord<String,String>> {
@Override
public ProducerRecord<String, String> apply(KV<String, Long> input) {
if(input.getValue()%2==0)
return new ProducerRecord("test",input.getKey(),input.getKey()+" "+input.getValue().toString());
else
return new ProducerRecord("copy",input.getKey(),input.getKey()+" "+input.getValue().toString());
}
}
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
p.apply("ReadLines", KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("copy")// use withTopics(List<String>) to read from multiple topics.
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("group.id", "my_beam_app_1"))
.updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
.withLogAppendTime()
.withReadCommitted()
.commitOffsetsInFinalize()
.withProcessingTime()
.withoutMetadata()
)
.apply(Values.create())
.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn())) //PCollection<ProducerRecord<string,string>>
.setCoder(ProducerRecordCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))
.apply("WriteCounts", (KafkaIO.<String, String>writeRecords()
.withBootstrapServers("localhost:9092")
//.withTopic("test")
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class)
))