Могу ли я печатать отдельные элементы DataSteam<T> в Apache Flink без использования встроенной функции print()
Я пытаюсь напечатать значения предупреждений, которые были обнаружены в Flink
// Генерируем температурные предупреждения для каждого соответствующего шаблона предупреждения
DataStream<TemperatureEvent> warnings = tempPatternStream.select(
(Map<String, MonitoringEvent> pattern) -> {
TemperatureEvent first = (TemperatureEvent) pattern.get("first");
return new TemperatureEvent(first.getRackID(), first.getTemperature()) ;
}
);
// Print the warning and alert events to stdout
warnings.print();
Я получаю вывод, как показано ниже (согласно toString функции eventSource)
Rack id = 99 and temprature = 76.0
Может кто-нибудь сказать мне, если есть какой-нибудь способ, которым я могу напечатать значения DataStream без использования print? Например, если я хочу печатать только температуру, как я могу получить доступ к отдельным элементам в DataStream.
Заранее спасибо
1 ответ
Я нашел способ доступа к отдельным элементам. Предположим, у нас есть DataStream
HeartRate<Integer,Integer>
Имеет 2 атрибута
private Integer Patient_id ;
private Integer HR;
// Генерация Datasteam с помощью пользовательской функции
DataStream<HREvent> hrEventDataStream = envrionment
.addSource(new HRGenerator()).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());
Предполагая, что вы сгенерировали группу данных с помощью пользовательской функции, теперь мы можем напечатать значения отдельных элементов HeartRateEvent, как показано ниже
hrEventDataStream.keyBy(new KeySelector<HREvent, Integer>() {
@Override
public Integer getKey(HREvent hrEvent) throws Exception {
return hrEvent.getPatient_id();
}
})
.window(TumblingEventTimeWindows.of(milliseconds(10)))
.apply(new WindowFunction<HREvent, Object, Integer, TimeWindow>() {
@Override
public void apply(Integer integer, TimeWindow timeWindow, Iterable<HREvent> iterable, Collector<Object> collector) throws Exception {
for(HREvent in : iterable){
System.out.println("Patient id = " + in.getPatient_id() + " Heart Rate = " + in.getHR());
}//for
}//apply
});
Надеюсь, поможет!