Apache Beam: обновление побочного ввода, который я читаю из MongoDB, используя MongoDbIO.read() Часть 2

Не уверен в том, как эта GenerateSequence работает для меня, так как мне нужно периодически читать значения из Mongo ежечасно или ежедневно, создать ParDo, который читает MongoDB, а также добавил окно в GlobalWindows с триггером (триггер, который я буду обновлять как требование pr). Но ниже фрагмент кода дает ошибку типа возвращаемого значения, так что не могли бы вы помочь мне исправить приведенные ниже строки кода? Также найдите снимок ошибки. Также, как эта функция "Создание последовательности" помогает в моем случае?

PCollectionView<List<String>> list_of_vins = pipeline
                  .apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(5))) // adjust polling rate
                  .apply(ParDo.of(new DoFn<Long, List<String>>() {
                    @ProcessElement
                    public void process(ProcessContext c) {
                      // Read entire DB, and output as a List<String>
                        final String uriString = "mongodb://$[username]:$[password]@$[hostlist]/$[database]?authSource=$[authSource]";
                        MongoClient mongoClient = MongoClients.create(uriString);
                        MongoDatabase mongoDB = mongoClient.getDatabase(options.getMongoDBHostName());
                        MongoCollection<Document> mongoCollection = mongoDB.getCollection(options.getMongoVinCollectionName());
                        c.output((List<String>) ((View) mongoCollection).asList());
                    }
                  })
                  .apply(Window.into(new GlobalWindows()).triggering(AfterPane.elementCountAtLeast(1))));

2 ответа

@danielm и все такое,

Я обновил свой код и, похоже, он работает, но мало вопросов и требуется пояснение, чтобы продолжить,

PCollection<String> list_of_vins_1 = pipeline
            // Generate a tick every 15 seconds
            .apply("Ticker", GenerateSequence.from(0).withRate(1, Duration.standardMinutes(2)))
            // Just to check if individual ticks are being generated once every day
            .apply("Read Data from Mongo DB",ParDo.of(new DoFn<Long, Document>() {
                    @ProcessElement
                    public void processElement(@Element Long tick, OutputReceiver<Document> out) {
                            // reading values from Mongo DB
                            out.output(mongoDocuments);
                        }
                    }
                }
            )).apply("Window", Window.<Document>into(new GlobalWindows()).triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes())
            .apply(ParDo.of(new ConvertDocuemntToStringFn()));

// convert to mongodata to list of string
PCollectionView<List<String>> list_of_data_1 = list_of_vins_1.apply(View.<String> asList());

Я могу читать значение из mongo db в соответствии с указанной продолжительностью тикера, но я не уверен, что это увеличит мой размер побочного ввода. Подобно тому, как я передаю этот list_of_data_1 в качестве побочного входа, в конвейере он показывает, что количество добавленных элементов увеличивается.

Предположим, что если mongo db имеет 20000 коллекций, и если этот тикер запускается каждые 2 минуты, то количество добавленных элементов будет 20000, умноженное на количество запусков тикера, то есть 20,000 + 20,0000 + 20,000 + ..... и так далее.

Итак, мой вопрос: каждый раз, когда элементы добавляются в боковые входы или боковые входы, обновляются, а боковые входы всегда имеют 20000 значений или что-то еще, что есть в MongoDB, добавляется или переопределяется?

Вам нужно будет указать типы в преобразовании окна следующим образом:

.apply(Window.<List<String>>into(...));
Другие вопросы по тегам