Использование пользовательской функции Python в задании Java Flink

Есть ли способ использовать определяемую пользователем функцию python в задании Java Flink или как-либо еще, чтобы сообщить, например, результат преобразования, выполненного с помощью flink с помощью java с пользовательской функцией python, чтобы применить некоторые вещи машинного обучения:

Я знаю, что из pyFlink можно сделать что-то вроде этого:

table_env.register_java_function("hash_code", "my.java.function.HashCode")

Но мне нужно сделать что-то подобное, но добавить функцию python из java, или как я могу передать результат преобразования java напрямую в задание Python UDF Flink?

Я надеюсь, что эти вопросы не слишком сумасшедшие, но мне нужно знать, существуют ли какие-либо способы связи Flink DataStream API с Python Table API с Java в качестве основного языка? это означает, что из Java мне нужно сделать: Source -> Transformations -> Sink, но некоторые из этих преобразований могут запускать функцию Python, или функция Python будет ждать завершения некоторого преобразования Java, чтобы что-то сделать с результатом Stream.

Надеюсь, кто-то поймет, что я здесь пытаюсь сделать.

С уважением!

1 ответ

Поддержка Python UDF (определяемых пользователем функций) была добавлена ​​в Flink 1.10 - см. PyFlink: Введение поддержки Python для UDF в API таблиц Flink. Например, вы можете сделать это:

add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
table_env.register_function("add", add)
my_table.select("add(a, b)")

Дополнительные примеры и т. Д. См. В сообщении в блоге, указанном выше, или в стабильной документации.

В Flink 1.11 (выпуск ожидается на следующей неделе) была добавлена ​​поддержка векторизованных UDF Python, обеспечивающая взаимодействие с Pandas, Numpy и т. Д. Этот выпуск также включает поддержку UDF Python в SQL DDL и в клиенте SQL. Для документации см. Основную документацию.

Похоже, вы хотите вызвать Python из Java. API функций с отслеживанием состояния поддерживает это более полно - см. Удаленные функции. Но для вызова Python из Java DataStream API, я думаю, ваш единственный вариант - использовать поддержку SQL DDL, добавленную в Flink 1.11. См. FLIP-106 и документы.

FLIP-106 имеет этот пример:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
tEnv.getConfig().getConfiguration().setString("python.files", "/home/my/test1.py");
tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");

tEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");
Table table = tEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str").select("func1(str)");
tEnv.toDataSet(table, String.class).collect();

которые вы должны иметь возможность преобразовать для использования вместо этого API DataStream.

Пример такой интеграции: эта зависимость необходима в вашем pom.xml, если предполагается, что Flink 1.11 является текущей версией.

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner-blink_2.11</artifactId>
  <version>1.11.2</version>
  <scope>provided</scope>
</dependency>

Создайте Среды:

private StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

private StreamTableEnvironment tableEnv = getTableAPIEnv(env);

/*this SingleOutputStreamOperator will contains the result of the consumption from the  defined source*/
private SingleOutputStreamOperator<Event> stream; 


public static StreamTableEnvironment getTableAPIEnv(StreamExecutionEnvironment env) {
        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.getConfig().getConfiguration().setString("python.files", path/function.py);
        tableEnv.getConfig().getConfiguration().setString("python.client.executable", path/python);
        tableEnv.getConfig().getConfiguration().setString("python.executable", path/python);
        tableEnv.getConfig().getConfiguration().setString("taskmanager.memory.task.off-heap.size", "79mb");
/*pass here the function.py and the name of the function into the python script*/
        tableEnv.executeSql("CREATE TEMPORARY SYSTEM FUNCTION FunctionName AS 'function.FunctionName' LANGUAGE PYTHON");
        return tableEnv;
    }

Начните с преобразований, которые вы хотите сделать, например:

SingleOutputStreamOperator<EventProfile> profiles = createUserProfile(stream.keyBy(k -> k.id));

/*The result of that ProcessFunction `createUserProfile()` will be sent into the Python function to update some values of the profile and return them back into a defined function in Flink with Java: map function for example*/
profiles = turnIntoTable(profiles).map((MapFunction<Row, EventProfile>) x -> {
  /*you custom code here to do the mapping*/
});
profiles.addSink(new yourCustomSinkFunction());

/*this function will process the Event and create the EventProfile class for this example but you can also use another operators (map, flatMap, etc)*/
 private SingleOutputStreamOperator<EventProfile> createUserProfile(KeyedStream<Event, String> stream) {
        return stream.process(new UserProfileProcessFunction());
    }


/*This function will receive a SingleOutputStreamOperator and sent each record to the Python function trough the TableAPI and returns a Row of String(you can change the Row type) that will be mapped back into EventProfile class*/
@FunctionHint(output = @DataTypeHint("ROW<a STRING>"))
private DataStream<Row> turnIntoTable(SingleOutputStreamOperator<EventProfile> rowInput) {
        Table events = tableEnv.fromDataStream(rowInput,
                $("id"), $("noOfHits"), $("timestamp"))
                .select("FunctionName(id, noOfHits, timestamp)");
        return tableEnv.toAppendStream(events, Row.class);
    }

И наконец

env.execute("Job Name");

Пример функции Python с именем FunctionName в function.py сценарий:

@udf(
    input_types=[
        DataTypes.STRING(), DataTypes.INT(), DataTypes.TIMESTAMP(precision=3)
    ],
    result_type=DataTypes.STRING()
)
def FunctionName(id, noOfHits, timestamp):
    # function code here
    return f"{id}|{noOfHits}|{timestamp}"
Другие вопросы по тегам