Flink: как хранить состояние и использовать в другом потоке?

У меня есть сценарий использования для Flink, где мне нужно прочитать информацию из файла, сохранить каждую строку, а затем использовать это состояние для фильтрации другого потока.

У меня все это работает прямо сейчас с connect оператор и RichCoFlatMapFunction, но это кажется слишком сложным. Кроме того, я обеспокоен тем, что flatMap2 может начать выполняться до того, как из файла будет загружено все состояние:

fileStream
    .connect(partRecordStream.keyBy((KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId()))
    .keyBy((KeySelector<String, String>) partId -> partId, (KeySelector<PartRecord, String>) partRecord -> partRecord.getPartId())
    .flatMap(new RichCoFlatMapFunction<String, PartRecord, PartRecord>() {
        private transient ValueState<String> storedPartId;
        @Override
        public void flatMap1(String partId, Collector<PartRecord> out) throws Exception {
            // store state
            storedPartId.update(partId);
        }

        @Override
        public void flatMap2(PartRecord record, Collector<PartRecord> out) throws Exception {
            if (record.getPartId().equals(storedPartId.value())) {
                out.collect(record);
            } else {
                // do nothing
            }
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<String> descriptor =
                    new ValueStateDescriptor<>(
                            "partId", // the state name
                            TypeInformation.of(new TypeHint<String>() {}),
                            null);
            storedPartId = getRuntimeContext().getState(descriptor);
        }
    });

Есть ли лучший способ (начиная с Flink 1.1.3) выполнить этот шаблон состояния загрузки, затем использовать его в последующих потоках?

1 ответ

Решение

Ваши опасения по поводу CoFlatMapFunction верны. Порядок, в котором flatMap1 а также flatMap2 Вызываемые не могут контролироваться и зависят от порядка поступления данных. Так, flatMap2 может быть вызван до того, как все данные будут прочитаны flatMap1,

Единственный способ в Flink 1.1.3 прочитать все данные перед началом обработки потока - это использовать данные в open() метод RichFlatMapFunctionт.е. вы должны вручную прочитать и проанализировать файл.

Это в основном стратегия объединения в широковещательном режиме, т. Е. Каждый параллельный экземпляр оператора будет делать это. Недостатком является то, что данные файла будут реплицированы. Преимущество заключается в том, что вам не нужно перетасовывать "основной" поток (не нужно использовать keyBy()).

Другие вопросы по тегам