Flink применить функцию на timeWindow
В настоящее время я делаю проект Flink. Основная идея проекта - прочитать поток данных JSON (сетевых журналов), сопоставить их и сгенерировать новый JSON, который представляет собой комбинацию различной информации JSON.
В этот момент я могу читать JSON, генерировать KeyedStream (на основе машины, которая генерирует журнал), а затем генерировать поток окна за 5 секунд.
Следующим шагом, который я хочу выполнить, является использование функции применения к окну и объединение информации о каждом JSON. Я немного запутался в том, как это сделать.
Код, который у меня сейчас есть, следующий:
DataStream<Tuple2<String,JSONObject>> MetaAlert = events
.flatMap(new JSONParser())
.keyBy(0)
.timeWindow(Time.seconds(5))
.apply(new generateMetaAlert());
public static class generateMetaAlert implements WindowFunction<Tuple2<String,JSONObject>, Tuple2<String,JSONObject>, String, Window> {
@Override
public void apply(String arg0, Window arg1, Iterable<Tuple2<String, JSONObject>> arg2,
Collector<Tuple2<String, JSONObject>> arg3) throws Exception {
}
Часть.apply(new generateMetaAlert()) жалуется на следующую ошибку:
Метод apply(WindowFunction,R,Tuple,TimeWindow>) в типе WindowedStream,Tuple,TimeWindow> не применим для аргументов (MetaAlertGenerator.generateMetaAlert)
Любое другое предложение структуры кода, отличное от того, которое я составил
Заранее спасибо за вашу помощь
1 ответ
Когда вы применяете keyBy
Функция (без использования анонимного класса) тип ключа в вашем пользовательском WindowFunction
(3-е поле) должно быть Tuple
потому что компилятор не может определить тип вашего ключа. Этот код компилируется без ошибок (учтите, что я попытался заполнить пробелы фиктивным кодом):
public class Test {
public Test() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<String> events = env.readTextFile("datastream.log");
DataStream<Tuple2<String, JSONObject>> MetaAlert
= events
.flatMap(new JSONParser())
.keyBy(0)
.timeWindow(Time.seconds(5))
.apply(new GenerateMetaAlert());
}
public class JSONObject {
}
public class JSONParser implements FlatMapFunction<String, Tuple2<String, JSONObject>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
}
}
public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow> {
@Override
public void apply(Tuple key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
}
}
}
Но самый простой способ - использовать анонимный класс, чтобы вы могли сохранить String
тип:
DataStream<Tuple2<String, JSONObject>> MetaAlert
= events
.flatMap(new JSONParser())
.keyBy(0)
.timeWindow(Time.seconds(5))
.apply(new WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, Tuple, TimeWindow>() {
@Override
public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
// Your code here
}
});
Наконец, если вы хотите сохранить класс, но также хотите сохранить тип вашего ключа таким, какой он есть, вы можете реализовать KeySelector
:
public class Test {
public Test() {
DataStream<Tuple2<String, JSONObject>> MetaAlert
= events
.flatMap(new JSONParser())
.keyBy(new KeySelector<Tuple2<String,JSONObject>, String>() {
@Override
public String getKey(Tuple2<String, JSONObject> json) throws Exception {
return json.f0;
}
})
.timeWindow(Time.seconds(5))
.apply(new GenerateMetaAlert());
}
public class GenerateMetaAlert implements WindowFunction<Tuple2<String, JSONObject>, Tuple2<String, JSONObject>, String, TimeWindow> {
@Override
public void apply(String key, TimeWindow timeWindow, Iterable<Tuple2<String, JSONObject>> iterable, Collector<Tuple2<String, JSONObject>> collector) throws Exception {
}
}
}