Могу ли я печатать отдельные элементы DataSteam<T> в Apache Flink без использования встроенной функции print()

Я пытаюсь напечатать значения предупреждений, которые были обнаружены в Flink

// Генерируем температурные предупреждения для каждого соответствующего шаблона предупреждения

    DataStream<TemperatureEvent> warnings = tempPatternStream.select(
        (Map<String, MonitoringEvent> pattern) -> {
            TemperatureEvent first = (TemperatureEvent) pattern.get("first");


            return new TemperatureEvent(first.getRackID(), first.getTemperature()) ;
        }
    );



    // Print the warning and alert events to stdout



    warnings.print();

Я получаю вывод, как показано ниже (согласно toString функции eventSource)

Rack id = 99 and temprature = 76.0

Может кто-нибудь сказать мне, если есть какой-нибудь способ, которым я могу напечатать значения DataStream без использования print? Например, если я хочу печатать только температуру, как я могу получить доступ к отдельным элементам в DataStream.

Заранее спасибо

1 ответ

Решение

Я нашел способ доступа к отдельным элементам. Предположим, у нас есть DataStream

 HeartRate<Integer,Integer>

Имеет 2 атрибута

private Integer Patient_id ;
private  Integer HR;

// Генерация Datasteam с помощью пользовательской функции

DataStream<HREvent> hrEventDataStream = envrionment
                .addSource(new HRGenerator()).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());

Предполагая, что вы сгенерировали группу данных с помощью пользовательской функции, теперь мы можем напечатать значения отдельных элементов HeartRateEvent, как показано ниже

hrEventDataStream.keyBy(new KeySelector<HREvent, Integer>() {
    @Override
    public Integer getKey(HREvent hrEvent) throws Exception {
        return hrEvent.getPatient_id();
    }
         })
        .window(TumblingEventTimeWindows.of(milliseconds(10)))
        .apply(new WindowFunction<HREvent, Object, Integer, TimeWindow>() {
            @Override
            public void apply(Integer integer, TimeWindow timeWindow, Iterable<HREvent> iterable, Collector<Object> collector) throws Exception {

                for(HREvent in : iterable){

                    System.out.println("Patient id  = " + in.getPatient_id() + " Heart Rate  = " + in.getHR());
                }//for

            }//apply
        });

Надеюсь, поможет!

Другие вопросы по тегам