如何从sparkscala调用db2数据库中的存储过程

yv5phkfx  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(408)

我必须调用db2中的一个存储过程,它接受3个参数并返回一个整数。有人能帮我从spark scala代码中调用这个sp吗。下面是db2中的存储过程。

CREATE PROCEDURE TEST_PROC(IN V_DATE DATE,IN V_GROUP VARCHAR(20),IN V_FREQ 
VARCHAR(20),IN V_RULE VARCHAR(20), OUT ID INTEGER)
LANGUAGE SQL
MODIFIES SQL DATA
BEGIN
LOCK TABLE CAL_LOG IN EXCLUSIVE MODE;
SET ID = (10+ COALESENCE((SELECT MAX(ID) FROM CAL_LOG WITH UR),0));
INSERT INTO CAL_RESULT(ID,P_DATE,GROUP,FREQ,RULE)
VALUES(ID,V-DATE,V_GROUP,V_FREQ,V_RULE);
COMMIT:
END;

proc已创建并按预期工作。
现在我想从spark scala代码调用这个过程。我正在尝试下面的代码

val result = spark.read.format("jdbc")
.options(Map(
"url"-> //the db2 url
"driver" - > // my db2 driver
"user name" - > // username
"password" -> // password
""dbtable" -> "(CALL TEST_PROC('2020-07-08','TEST',''TEST','TEST,?)) as proc_result;"
)).load()

但代码片段给出了以下错误

DB@ SQL Error: SQLCODE=-104, SQLSTATE=42601
com.ibm.db2.jcc.am.SqlSyntaxErrorException: DB2 SQL Error: SQLCODE=-104, SQLSTATE=42601
thtygnil

thtygnil1#

虽然使用sparkload加载相同的数据,但不能使用apachespark调用存储过程
从db2加载

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df= sqlContext.load("jdbc", Map(
                       "url" -> "jdbc:db2://xx.xx.xx.xx:50000/SQLDB:securityMechanism=9;currentSchema=vaquarkhan;user=<ur-username>;password=xxxxx;",
        "driver" -> "com.ibm.db2.jcc.DB2Driver",
        "dbtable" -> "scheam.TableName"))

创建temp表/df并添加过滤器以获得所需的响应。

kokeuurv

kokeuurv2#

我推荐scalikejdbc
maven坐标(scala 2.11): org.scalikejdbc:scalikejdbc_2.11:3.4.1 ```
import scalikejdbc._

// Initialize JDBC driver & connection pool
Class.forName()
ConnectionPool.singleton(, , )

// ad-hoc session provider on the REPL
implicit val session = AutoSession

// Now you can run anything you want
sql"""
CREATE PROCEDURE TEST_PROC(IN V_DATE DATE,IN V_GROUP VARCHAR(20),IN V_FREQ
VARCHAR(20),IN V_RULE VARCHAR(20), OUT ID INTEGER)
LANGUAGE SQL
MODIFIES SQL DATA
BEGIN
LOCK TABLE CAL_LOG IN EXCLUSIVE MODE;
SET ID = (10+ COALESENCE((SELECT MAX(ID) FROM CAL_LOG WITH UR),0));
INSERT INTO CAL_RESULT(ID,P_DATE,GROUP,FREQ,RULE)
VALUES(ID,V-DATE,V_GROUP,V_FREQ,V_RULE);
COMMIT:
END;""".execute.apply()

获取数据如下

sql"""(CALL TEST_PROC('2020-07-08','TEST',''TEST','TEST,?))
as proc_result;""".execute.apply()

如果需要,可以将结果再次转换为Dataframe。
wfsdck30

wfsdck303#

我认为应该直接使用jdbc连接而不是spark,因为存储过程只返回整数。如果需要该值,可以从对存储过程的调用中检索该值,但是使用scala而不使用spark。
你可以在https://www.ibm.com/support/knowledgecenter/ssepek_12.0.0/java/src/tpc/imjcc_tjvcscsp.html
这是任何语言的标准称呼方式:
如果需要替换参数,可以使用上面链接中描述的preparecall
用registerparameter(in或out)指定参数值
在sp返回整数时运行executequery
关闭连接

相关问题