我有spark UDF,我从SQL语句调用它。这就是我如何定义和注册UDF函数:
自定义项:
import org.apache.spark.sql.functions.udf
def udf_temp_emp (_type: String, _name: String): Int = {
val statement = s"select key from employee where empName = $_name"
val key = spark.sql(statement).collect()(0).getInt(0)
return key
}
spark.udf.register("udf_temp_emp", udf_temp_emp(_,_))
这就是我在SQL命令中调用它的方式:
select
udf_temp_emp(emp.type, emp.name),
emp.id
from
empMaster
当我在上面运行命令时,它会抛出下面的异常:
SparkException:[FANUC_EXECUTE_UDF]执行自定义函数失败($read$iw$$iw$$iw$$iw$$iw$$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$iw$Lambda$13841/67716274:(string,string)=> string)。原因:NullPointerException:
1条答案
按热度按时间c7rzv4ha1#
根据werner的评论(应该是一个答案,并且是选中的那个),这是不可能的,因为SparkContext / SparkSession在执行器节点上不存在。只有当你使用LocalRelations(比如一个转换的Seq)时,这个方法才有效。
将udf重写为join,假设这是实际的逻辑。