Apache-Flink 1.11 Невозможно использовать Python UDF через DDL-функцию SQL в задании потоковой передачи Java Flink

В Flip-106 есть пример того, как вызвать определяемую пользователем функцию python в java-приложении пакетного задания через SQL Function DDL...

BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
tEnv.getConfig().getConfiguration().setString("python.files", "/home/my/test1.py");
tEnv.getConfig().getConfiguration().setString("python.client.executable", "python3");

tEnv.sqlUpdate("create temporary system function func1 as 'test1.func1' language python");
Table table = tEnv.fromDataSet(env.fromElements("1", "2", "3")).as("str").select("func1(str)");
tEnv.toDataSet(table, String.class).collect();

Я пытался воспроизвести этот же пример в java-приложении потоковой передачи, и это мой код:

final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(EnvironmentConfiguration.getEnv(), fsSettings);
fsTableEnv.getConfig().getConfiguration().setString("python.files", "/Users/jf/Desktop/flink/fca/test.py");
fsTableEnv.getConfig().getConfiguration().setString("python.client.executable", "/Users/jf/opt/anaconda3/bin/python");

fsTableEnv.sqlUpdate("CREATE TEMPORARY SYSTEM FUNCTION func1 AS 'test.func1' LANGUAGE PYTHON");
Table table = fsTableEnv.fromValues("1", "2", "3").as("str").select("func1(str)");
/* Missing line */

для этой конкретной строки в пакетном задании:

tEnv.toDataSet(table, String.class).collect();

Я не нашел эквивалента для работы с потоковой передачей

1. Не могли бы вы помочь мне отобразить этот пример flip-106 из партии в поток?

В конечном итоге я хочу вызвать с помощью flink 1.11 функцию python в java-приложении flink для потокового задания следующим образом:

final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(EnvironmentConfiguration.getEnv(), fsSettings);
fsTableEnv.getConfig().getConfiguration().setString("python.files", "/Users/jf/Desktop/flink/fca/test.py");
fsTableEnv.getConfig().getConfiguration().setString("python.client.executable", "/Users/jf/opt/anaconda3/bin/python");

fsTableEnv.sqlUpdate("CREATE TEMPORARY SYSTEM FUNCTION func1 AS 'test.func1' LANGUAGE PYTHON");
final Table table = fsTableEnv.fromDataStream(stream_filtered.map(x->x.idsUmid)).select("func1(f0)").as("umid");
System.out.println("Result --> " + table.select($("umid")) + " --> End of Result");

и использовать результат этого udf для дальнейшей обработки (не обязательно распечатывать его в консоли)

Я редактировал test.py файл, чтобы увидеть, выполняется ли что-то в python, по крайней мере, независимо от безымянной таблицы.

from pyflink.table.types import DataTypes
from pyflink.table.udf import udf
from os import getcwd

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def func1(line):
    print(line)
    print(getcwd())
    with open("test.txt", "a") as myfile:
        myfile.write(line)
    return line

и ничего не печатается, файл test.txt не создается и значение не возвращается в задание потоковой передачи. Так что в основном эта функция python не вызывается.

2. Что мне здесь не хватает?

Спасибо Дэвиду, Вей и Синбо за поддержку, потому что каждая предложенная деталь сработала для меня.

С наилучшими пожеланиями,

Джонатан

1 ответ

Вы можете попробовать это:

final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(EnvironmentConfiguration.getEnv(), fsSettings);
fsTableEnv.getConfig().getConfiguration().setString("python.files", "/Users/jf/Desktop/flink/fca/test.py");
fsTableEnv.getConfig().getConfiguration().setString("python.client.executable", "/Users/jf/opt/anaconda3/bin/python");

// You need to specify the python interpreter used to run the python udf on cluster.
// I assume this is a local program so it is the same as the "python.client.executable".
fsTableEnv.getConfig().getConfiguration().setString("python.executable", "/Users/jf/opt/anaconda3/bin/python");

fsTableEnv.sqlUpdate("CREATE TEMPORARY SYSTEM FUNCTION func1 AS 'test.func1' LANGUAGE PYTHON");
final Table table = fsTableEnv.fromDataStream(stream_filtered.map(x->x.idsUmid)).select("func1(f0)").as("umid");

// 'table.select($("umid"))' will not trigger job execution. You need to call the "execute()" method explicitly.
table.execute().print();
Другие вопросы по тегам