Регистрация агрегатного UDF в Apache Flink

Я пытаюсь выполнить описанные здесь шаги, чтобы создать базовый UDF Flink Aggregate. Я добавил зависимости () и реализовал

public class MyAggregate extends AggregateFunction<Long, TestAgg> {..}

Я реализовал обязательные методы, а также несколько других: accumulate, merge, etc, Все это строит без ошибок. Теперь в соответствии с документами, я должен быть в состоянии зарегистрировать это как

    StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment sTableEnv = StreamTableEnvironment.getTableEnvironment(sEnv);
    sTableEnv.registerFunction("MyMin", new MyAggregate());

Но registerFucntion кажется, хочет ScalarFunction только в качестве ввода. Я получаю несовместимую ошибку типа: The method registerFunction(String, ScalarFunction) in the type TableEnvironment is not applicable for the arguments (String, MyAggregate)

Любая помощь будет отличной.

1 ответ

Решение

Вам необходимо импортировать StreamTableEnvironment для выбранного вами языка, который в вашем случае org.apache.flink.table.api.java.StreamTableEnvironment,

org.apache.flink.table.api.StreamTableEnvironment является общим абстрактным классом для вариантов Java и Scala StreamTableEnvironment, Мы заметили, что эта часть API вводит пользователей в заблуждение, и мы улучшим ее в будущем.

Другие вопросы по тегам