根据这个合流页面: https://cwiki.apache.org/confluence/display/FLINK/FLIP-106%3A+Support+Python+UDF+in+SQL+Function+DDL
在flink1.11中,python自定义项可以与in-sql函数一起使用。
我去了Flink医院: https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sqlClient.html
在终端上尝试此操作,并使用以下参数启动sql-client.sh: $ sql-client.sh embedded --pyExecutable /Users/jonathanfigueroa/opt/anaconda3/bin/python --pyFiles /Users/jonathanfigueroa/Desktop/pyflink/inference/test1.py
然后:
> Create Temporary System Function func1 as 'test1.func1' Language PYTHON;
[INFO] Function has been created.
当我尝试时:
> Select func1(str) From (VALUES ("Name1", "Name2", "Name3"));
[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Instantiating python function 'test1.func1' failed.
我试过使用: -pyarch,--pyArchives, -pyexec,--pyExecutable, -pyfs,--pyFiles
在每一个组合中 .zip, .py
结果总是一样的。
顺便说一句,我的python文件如下所示:
def func1(s):
return s;
我有什么遗漏吗?
谨致问候,
乔纳森
1条答案
按热度按时间qfe3c7zg1#
python udf应该由
pyflink.table.udf
,如下所示:启动sql客户机时需要加载flink python jar,如下所示:
此外,您还需要添加
taskmanager.memory.task.off-heap.size: 79mb
至$FLINK_HOME/conf/flink-conf.yaml
或其他可用于设置配置的文件(例如sql客户端环境文件),否则在执行python udf时会出现错误:最好的,
世界环境学会