KafkaIO с Apache Beam застрял в бесконечном цикле на DirectRunner

Я пытаюсь запустить этот простой пример, в котором данные из темы Kafka отфильтрованы: https://www.talend.com/blog/2018/08/07/developing-data-processing-job-using-apache-beam-streaming-pipeline/

У меня аналогичная настройка с брокером localhost с настройками по умолчанию, но я даже не могу прочитать из этой темы.

При запуске приложение застревает в бесконечном цикле и ничего не происходит. Я попытался дать своему брокеру бессмысленный URL-адрес, чтобы узнать, сможет ли он связаться с ними - это не так. Кластер запущен, и я могу добавлять сообщения в тему. Вот где я указываю брокера и тему:

        pipeline
            .apply(
                    KafkaIO.<Long, String>read()
                            .withBootstrapServers("localhost:9092")
                            .withTopic("BEAM_IN")
                            .withKeyDeserializer(LongDeserializer.class)
                            .withValueDeserializer(StringDeserializer.class)
                            )

Ошибок не вижу и в выходной теме ничего не написано.

При отладке я вижу, что он застрял в этом цикле:

        while(Instant.now().isBefore(completionTime)) {
        ExecutorServiceParallelExecutor.VisibleExecutorUpdate update = this.visibleUpdates.tryNext(Duration.millis(25L));
        if (update == null && ((State)this.pipelineState.get()).isTerminal()) {
            return (State)this.pipelineState.get();
        }

        if (update != null) {
            if (this.isTerminalStateUpdate(update)) {
                return (State)this.pipelineState.get();
            }

            if (update.thrown.isPresent()) {
                Throwable thrown = (Throwable)update.thrown.get();
                if (thrown instanceof Exception) {
                    throw (Exception)thrown;
                }

                if (thrown instanceof Error) {
                    throw (Error)thrown;
                }

                throw new Exception("Unknown Type of Throwable", thrown);
            }
        }

В методе isKeyed(PValue pvalue) в классе ExecutorServiceParallelExecutor.

Что мне не хватает?

0 ответов

Другие вопросы по тегам