Apache-Flink 1.11 Невозможно использовать Python UDF в SQL Function DDL
Согласно этой странице слияния:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python+UDF+in+SQL+Function+DDL
python udf во Flink 1.11 доступны для использования в функциях SQL.
Я пошел к документации flink здесь:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html
и попробуйте это на терминале и запустите sql-client.sh со следующими параметрами:
$ sql-client.sh embedded --pyExecutable /Users/jonathanfigueroa/opt/anaconda3/bin/python --pyFiles /Users/jonathanfigueroa/Desktop/pyflink/inference/test1.py
а потом:
> Create Temporary System Function func1 as 'test1.func1' Language PYTHON;
[INFO] Function has been created.
и когда я попробовал:
> Select func1(str) From (VALUES ("Name1", "Name2", "Name3"));
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Instantiating python function 'test1.func1' failed.
Я пробовал использовать: -pyarch,--pyArchives, -pyexec,--pyExecutable, -pyfs,--pyFiles
в каждой комбинации .zip, .py
и всегда результат тот же.
кстати, мой файл python выглядит так:
def func1(s):
return s;
Что мне не хватает?
С уважением,
Джонатан
1 ответ
UDF python должен быть заключен в декоратор udf в pyflink.table.udf
, как это:
from pyflink.table.types import DataTypes
from pyflink.table.udf import udf
@udf(input_types=[DataTypes.INT()], result_type=DataTypes.INT())
def add_one(a):
return a + 1
И jar flink-python нужно загружать при запуске sql-client, вот так:
$ cd $FLINK_HOME/bin
$ ./start-cluster.sh
$ ./sql-client.sh embedded -pyfs xxx.py -j ../opt/flink-python_2.11-1.11.0.jar
Кроме того, вам нужно добавить taskmanager.memory.task.off-heap.size: 79mb
к $FLINK_HOME/conf/flink-conf.yaml
или другие файлы, которые можно использовать для установки конфигураций (например, файл среды клиента sql), в противном случае вы получите сообщение об ошибке при выполнении python udf:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key'taskmanager.memory .task.off-heap.size'.
С уважением,
Вэй