Flink применить функцию на timeWindow

В настоящее время я делаю проект Flink. Основная идея проекта - прочитать поток данных JSON (сетевых журналов), сопоставить их и сгенерировать новый JSON, который представляет собой комбинацию различной информации JSON.

В этот момент я могу читать JSON, генерировать KeyedStream (на основе машины, которая генерирует журнал), а затем генерировать поток окна за 5 секунд.

Следующим шагом, который я хочу выполнить, является использование функции применения к окну и объединение информации о каждом JSON. Я немного запутался в том, как это сделать.

Код, который у меня сейчас есть, следующий:

DataStream<Tuple2<String,JSONObject>> MetaAlert = events
                .flatMap(new JSONParser())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .apply(new generateMetaAlert());




public static class generateMetaAlert implements WindowFunction<Tuple2<String,JSONObject>, Tuple2<String,JSONObject>, String, Window> {

        @Override
        public void apply(String arg0, Window arg1, Iterable<Tuple2<String, JSONObject>> arg2,
                Collector<Tuple2<String, JSONObject>> arg3) throws Exception {


        }

Часть.apply(new generateMetaAlert()) жалуется на следующую ошибку:

Метод apply(WindowFunction,R,Tuple,TimeWindow>) в типе WindowedStream,Tuple,TimeWindow> не применим для аргументов (MetaAlertGenerator.generateMetaAlert)

Любое другое предложение структуры кода, отличное от того, которое я составил

Заранее спасибо за вашу помощь

1 ответ

Решение

Когда вы применяете keyBy Функция (без использования анонимного класса) тип ключа в вашем пользовательском WindowFunction (3-е поле) должно быть Tuple потому что компилятор не может определить тип вашего ключа. Этот код компилируется без ошибок (учтите, что я попытался заполнить пробелы фиктивным кодом):

public class Test {

    public Test() {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<String> events = env.readTextFile("datastream.log");

        DataStream<Tuple2<String, JSONObject>> MetaAlert
                = events
                .flatMap(new JSONParser())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .apply(new GenerateMetaAlert());

    }

    public class JSONObject {
    }

    public class JSONParser implements FlatMapFunction<String, Tuple2<String, JSONObject>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, JSONObject>> collector) throws Exception {

        }
    }

    public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow> {
        @Override
        public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {

        }
    }

}

Но самый простой способ - использовать анонимный класс, чтобы вы могли сохранить String тип:

DataStream<Tuple2<String, JSONObject>> MetaAlert
        = events
        .flatMap(new JSONParser())
        .keyBy(0)
        .timeWindow(Time.seconds(5))
        .apply(new WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow>() {
            @Override
            public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
                // Your code here
            }
        });

Наконец, если вы хотите сохранить класс, но также хотите сохранить тип вашего ключа таким, какой он есть, вы можете реализовать KeySelector:

public class Test {

    public Test() {

        DataStream<Tuple2<String, JSONObject>> MetaAlert
                = events
                .flatMap(new JSONParser())
                .keyBy(new KeySelector<Tuple2<String,JSONObject>, String>() {
                    @Override
                    public String getKey(Tuple2<String, JSONObject> json) throws Exception {
                        return json.f0;
                    }
                })
                .timeWindow(Time.seconds(5))
                .apply(new GenerateMetaAlert());
    }

    public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, String, TimeWindow> {
        @Override
        public void apply(String key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {

        }
    }

}
Другие вопросы по тегам