KafkaIO с Apache Beam застрял в бесконечном цикле на DirectRunner
Я пытаюсь запустить этот простой пример, в котором данные из темы Kafka отфильтрованы: https://www.talend.com/blog/2018/08/07/developing-data-processing-job-using-apache-beam-streaming-pipeline/
У меня аналогичная настройка с брокером localhost с настройками по умолчанию, но я даже не могу прочитать из этой темы.
При запуске приложение застревает в бесконечном цикле и ничего не происходит. Я попытался дать своему брокеру бессмысленный URL-адрес, чтобы узнать, сможет ли он связаться с ними - это не так. Кластер запущен, и я могу добавлять сообщения в тему. Вот где я указываю брокера и тему:
pipeline
.apply(
KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("BEAM_IN")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
)
Ошибок не вижу и в выходной теме ничего не написано.
При отладке я вижу, что он застрял в этом цикле:
while(Instant.now().isBefore(completionTime)) {
ExecutorServiceParallelExecutor.VisibleExecutorUpdate update = this.visibleUpdates.tryNext(Duration.millis(25L));
if (update == null && ((State)this.pipelineState.get()).isTerminal()) {
return (State)this.pipelineState.get();
}
if (update != null) {
if (this.isTerminalStateUpdate(update)) {
return (State)this.pipelineState.get();
}
if (update.thrown.isPresent()) {
Throwable thrown = (Throwable)update.thrown.get();
if (thrown instanceof Exception) {
throw (Exception)thrown;
}
if (thrown instanceof Error) {
throw (Error)thrown;
}
throw new Exception("Unknown Type of Throwable", thrown);
}
}
В методе isKeyed(PValue pvalue) в классе ExecutorServiceParallelExecutor.
Что мне не хватает?