Может ли Flink производить ежечасные снимки агрегированных / скользящих / накопленных данных?

Пример обработки потока в учебнике - программа подсчета слов с метками времени. Со следующим образцом данных

mario 10:00
luigi 10:01
mario 11:00
mario 12:00

Я видел программы подсчета слов, созданные за:

Общий набор данных

mario 3
luigi 1

Набор разделов временного окна

mario 10:00-11:00 1
luigi 10:00-11:00 1
mario 11:00-12:00 1
mario 12:00-13:00 1

Однако я не нашел пример программы подсчета слов в скользящем временном окне, т.е. я хотел бы, чтобы подсчет слов производился ежечасно для каждого слова с начала времени:

mario 10:00-11:00 1
luigi 10:00-11:00 1
mario 11:00-12:00 2
luigi 11:00-12:00 1
mario 12:00-13:00 3
luigi 12:00-13:00 1

Возможно ли это с Apache Flink или любой другой библиотекой обработки потоков? Спасибо!

редактировать:

До сих пор я пробовал вариант подхода Дэвида Андерсона, меняя только время обработки для времени события, поскольку данные имеют временную выборку. Это не работает, как я ожидал, хотя. Вот код, пример данных, результаты, которые он предоставляет, и мои дополнительные вопросы:

public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
            .setParallelism(1)
            .setMaxParallelism(1);

    env.setStreamTimeCharacteristic(EventTime);


    String fileLocation = "full file path here";
    DataStreamSource<String> rawInput = env.readFile(new TextInputFormat(new Path(fileLocation)), fileLocation);

    rawInput.flatMap(parse())
            .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<TimestampedWord>() {
                @Nullable
                @Override
                public Watermark checkAndGetNextWatermark(TimestampedWord lastElement, long extractedTimestamp) {
                    return new Watermark(extractedTimestamp - 1);
                }

                @Override
                public long extractTimestamp(TimestampedWord element, long previousElementTimestamp) {
                    return element.getTimestamp();
                }
            })
            .keyBy(TimestampedWord::getWord)
            .process(new KeyedProcessFunction<String, TimestampedWord, Tuple3<String, Long, Long>>() {
                private transient ValueState<Long> count;

                @Override
                public void open(Configuration parameters) throws Exception {
                    count = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Long.class));
                }

                @Override
                public void processElement(TimestampedWord value, Context ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
                    if (count.value() == null) {
                        count.update(0L);
                    }

                    long l = ((value.getTimestamp() / 10) + 1) * 10;
                    ctx.timerService().registerEventTimeTimer(l);

                    count.update(count.value() + 1);
                }

                @Override
                public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
                    long currentWatermark = ctx.timerService().currentWatermark();
                    out.collect(new Tuple3(ctx.getCurrentKey(), count.value(), currentWatermark));
                }
            })
            .addSink(new PrintlnSink());

    env.execute();
}

private static long fileCounter = 0;

private static FlatMapFunction<String, TimestampedWord> parse() {
    return new FlatMapFunction<String, TimestampedWord>() {
        @Override
        public void flatMap(String value, Collector<TimestampedWord> out) {
            out.collect(new TimestampedWord(value, fileCounter++));
        }
    };
}

private static class TimestampedWord {
    private final String word;
    private final long timestamp;

    private TimestampedWord(String word, long timestamp) {
        this.word = word;
        this.timestamp = timestamp;
    }

    public String getWord() {
        return word;
    }

    public long getTimestamp() {
        return timestamp;
    }
}

private static class PrintlnSink implements org.apache.flink.streaming.api.functions.sink.SinkFunction<Tuple3<String, Long, Long>> {
    @Override
    public void invoke(Tuple3<String, Long, Long> value, Context context) throws Exception {
        System.out.println(value.getField(0) + "=" + value.getField(1) + " at " + value.getField(2));
    }
}

С файлом со следующими словами, каждое в новой строке:

Марио, Луиджи, Марио, Марио, Вилма, Фреда, боб, боб, Марио, дан, Дилан, Дилан, Фреда, Марио, Марио, деревенщина, BamBam, лето, анна, анна, Эда, анна, анна, анна, анна, Анна

Производит следующий вывод:

mario=4 at 10
luigi=1 at 10
dan=1 at 10
bob=2 at 10
fred=1 at 10
vilma=1 at 10
dylan=2 at 20
fred=2 at 20
carl=1 at 20
anna=3 at 20
summer=1 at 20
bambam=1 at 20
mario=6 at 20
anna=7 at 9223372036854775807
edu=1 at 9223372036854775807

Что-то явно не так. Я получаю счет 3 для anna в 20, хотя третий экземпляр слова anna не появляется до позиции 22. Как ни странно edu появляется только в последнем снимке, даже если он появился раньше annaТретий экземпляр. Как я могу инициировать создание снимка каждые 10 "единиц времени", даже если сообщения не поступают (т.е. должны быть получены те же данные)?

Если бы кто-нибудь мог указать мне правильное направление, я был бы очень благодарен!

1 ответ

Решение

Да, это не только возможно сделать с Flink, но это легко. Вы можете сделать это с помощью KeyedProcessFunction, которая поддерживает счетчик в состоянии ключа для количества раз, которое каждое слово / ключ появилось до сих пор во входном потоке. Затем используйте таймер для запуска отчетности.

Вот пример, который использует таймеры времени обработки. Он распечатывает отчет каждые 10 секунд.

public class DSExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(new SocketTextStreamFunction("localhost", 9999, "\n", -1))
            .keyBy(x -> x)
            .process(new KeyedProcessFunction<String, String, Tuple3<Long, String, Integer>>() {
                private transient ValueState<Integer> counter;

                @Override
                public void open(Configuration parameters) throws Exception {
                    counter = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Integer.class));
                }

                @Override
                public void processElement(String s, Context context, Collector<Tuple3<Long, String, Integer>> collector) throws Exception {
                    if (counter.value() == null) {
                        counter.update(0);
                        long now = context.timerService().currentProcessingTime();
                        context.timerService().registerProcessingTimeTimer((now + 10000) - (now % 10000));
                    }
                    counter.update(counter.value() + 1);
                }

                @Override
                public void onTimer(long timestamp, OnTimerContext context, Collector<Tuple3<Long, String, Integer>> out) throws Exception {
                    long now = context.timerService().currentProcessingTime();
                    context.timerService().registerProcessingTimeTimer((now + 10000) - (now % 10000));
                    out.collect(new Tuple3(now, context.getCurrentKey(), counter.value()));
                }
            })
            .print();

        env.execute();
    }
}

Обновлено:

Всегда лучше использовать время события, но это добавляет сложности. Большая часть дополнительной сложности связана с тем фактом, что в реальных приложениях вам, скорее всего, придется иметь дело с событиями не по порядку, которых вы избежали в своем примере, поэтому в этом случае мы можем сойти с рук довольно простым реализация.

Если вы измените две вещи, вы получите ожидаемые результаты. Сначала установите водяные знаки на extractedTimestamp - 1 причина неправильных результатов (например, именно поэтому анна =3 в 20). Если вы установите водяной знак на extractedTimestamp вместо этого эта проблема исчезнет.

Пояснение: Именно прибытие третьей анны создает водяной знак, который закрывает окно в момент времени 20. Третья анна имеет временную метку 21, и поэтому в потоке за ней следует водяной знак в 20, который закрывает второе окно. и выдает отчет: анна =3. Да, первый edu прибыл раньше, но это был первый edu с отметкой времени 20. В момент поступления edu таймер для edu не установлен, а созданный таймер правильно настроен на 30, поэтому мы не слышите об edu до тех пор, пока не появится Водяной знак не менее 30.

Другая проблема - логика таймера. Flink создает отдельный таймер для каждой клавиши, и вам нужно создавать новый таймер каждый раз, когда срабатывает таймер. В противном случае вы будете получать отчеты только о словах, которые поступили во время окна. Вы должны изменить код так, чтобы он был таким:

@Override
public void processElement(TimestampedWord value, Context ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
    if (count.value() == null) {
        count.update(0L);
        setTimer(ctx.timerService(), value.getTimestamp());
    }

    count.update(count.value() + 1);
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple3<String, Long, Long>> out) throws Exception {
    long currentWatermark = ctx.timerService().currentWatermark();
    out.collect(new Tuple3(ctx.getCurrentKey(), count.value(), currentWatermark));
    if (currentWatermark < Long.MAX_VALUE) {
        setTimer(ctx.timerService(), currentWatermark);
    }
}

private void setTimer(TimerService service, long t) {
    service.registerEventTimeTimer(((t / 10) + 1) * 10);
}

С этими изменениями я получаю следующие результаты:

mario=4 at 10
luigi=1 at 10
fred=1 at 10
bob=2 at 10
vilma=1 at 10
dan=1 at 10
vilma=1 at 20
luigi=1 at 20
dylan=2 at 20
carl=1 at 20
bambam=1 at 20
mario=6 at 20
summer=1 at 20
anna=2 at 20
bob=2 at 20
fred=2 at 20
dan=1 at 20
fred=2 at 9223372036854775807
dan=1 at 9223372036854775807
carl=1 at 9223372036854775807
dylan=2 at 9223372036854775807
vilma=1 at 9223372036854775807
edu=1 at 9223372036854775807
anna=7 at 9223372036854775807
summer=1 at 9223372036854775807
bambam=1 at 9223372036854775807
luigi=1 at 9223372036854775807
bob=2 at 9223372036854775807
mario=6 at 9223372036854775807

Теперь, если бы вам нужно было обрабатывать неупорядоченные события, это было бы немного сложнее. Было бы необходимо, чтобы водяные знаки отставали от временных отметок на некоторую реалистичную величину, отражающую фактическую величину неупорядоченности, присутствующей в потоке, что затем потребовало бы возможности обрабатывать, если открыто более одного окна одновременно. Любое данное событие / слово может не принадлежать окну, которое будет закрываться следующим, и поэтому не должно увеличивать свой счетчик. Например, вы можете буферизовать эти "ранние" события в другом фрагменте состояния (например, ListState) или каким-либо образом поддерживать несколько счетчиков (возможно, в MapState). Кроме того, некоторые события могут быть запоздалыми, что делает недействительными более ранние отчеты, и вы захотите определить политику для их обработки.

Другие вопросы по тегам