Буферный пул уничтожен по типу POJO

У меня есть пользовательский источник, который испускает пользовательский тип данных BaseEvent. Следующий код прекрасно работает, когда BaseEvent не POJO. Но когда я изменил его на POJO, добавив конструктор по умолчанию, я получил исключение времени выполнения "Буферный пул уничтожен" в методе Collect. Я бегу Flink 1.7.0

        DataStream<BaseEvent> eventStream = see.addSource(new AgoraSource(configFile, instance));

        DataStream<Tuple4<String, Long, Double, String>> result_order = eventStream
                .filter(e -> e instanceof OrderEvent)
                .map(e -> (OrderEvent)e)
                .map(e -> new Tuple3<>(e.SecurityID, Long.valueOf(1), Double.valueOf(e.OriginalQuantity))).returns(info_tuple3)
                .keyBy(e -> e.f0)
                .timeWindow(Time.seconds(5))
                .reduce((a, b) -> new Tuple3<>(a.f0, a.f1 + b.f1, a.f2 + b.f2))
                .map(e -> new Tuple4<>(e.f0, e.f1, e.f2, "Order")).returns(info_tuple4);

0 ответов

Другие вопросы по тегам