groovy 在Nifi中使用PutSql获取存储过程的输出

lp0sw83n  于 2022-11-01  发布在  其他
关注(0)|答案(1)|浏览(233)

我必须在Nifi中获得Oracle存储过程的输出。我已经尝试使用下面的sql语句运行PutSql:
声明VARCHAR 2; 1,输出);结束;
它工作正常,但它只是执行脚本。我如何才能得到输出'out'的值?编辑:我在这里尝试了Groovy脚本:
https://community.cloudera.com/t5/Support-Questions/Does-ExecuteSQL-processor-allow-to-execute-stored-procedure/td-p/158922
出现以下错误:
2022-06-17 1 - 3:38:53,353错误[定时器驱动的进程线程-9] o. a. n. p. groovyx.执行GroovyScript执行GroovyScript [id= 26 ab 18f 1 - 3b 0 c-18 cf-d90 b-3d 5904676458] groovy.lang.缺少方法异常:无方法签名:Script6a6d0a35$_run_closure1.doCall()适用于参数类型:(字符串、字符串、java.sql.日期、空值、字符串、空值、空值、字符串...)值:【xxxx,xxxx,2022-05-30,空,好,空,空,......】:groovy.lang.MissingMethodException:无方法签名:Script6a6d0a35$_run_closure1.doCall()适用于参数类型:(字符串、字符串、java.sql.日期、空值、字符串、空值、空值、字符串...)值:【xxxx,xxxx,2022-05-30,空,好,空,空,......】
在代码运行时,您可以使用以下方法来创建一个新的类:
所以我有过程的输出,但我得到了错误!
脚本:

import org.apache.commons.io.IOUtils
import org.apache.nifi.controller.ControllerService
import org.apache.nifi.processor.io.StreamCallback
import java.nio.charset.*
import groovy.sql.OutParameter
import groovy.sql.Sql
import java.sql.ResultSet

////Get the session values from Nifi flow Start 
def flowFile = session.get()
if(!flowFile) return
  String TYPE_NOTIFICATION = flowFile.getAttribute('TYPE_NOTIFICATION')
  String ID_NOTIFICATION = flowFile.getAttribute('ID_NOTIFICATION')
////Get the session values from Nifi flow END

String sqlString ="""{call PKG_TEST.P_TEST(?,?,?,?,?,?,?,?,?,?,?)}""";

def parametersList = [ID_NOTIFICATION, TYPE_NOTIFICATION,Sql.VARCHAR,Sql.VARCHAR,Sql.DATE,Sql.VARCHAR,Sql.VARCHAR,Sql.VARCHAR,Sql.VARCHAR,Sql.VARCHAR,Sql.DATE ];

SQL.mydbxx.call(sqlString, parametersList) {out1, out2,...->
    flowFile.putAttribute("out1",out1)...
};

session.transfer(flowFile, REL_SUCCESS)

我的存储程序签章:谢谢你!

sc4hvdpw

sc4hvdpw1#

我不能测试它-所以,它只是一个参考代码
使用ExecuteGroovyScript处理器
在处理器级别上添加SQL.mydb参数,并将其链接到所需的DBCP池。
将其近似设置为脚本主体

def ff=session.get()
if(!ff)return

def statement = '''
  declare out VARCHAR2; 
  begin
    PKG_TEST.P_TEST(?, out);
  end;
  ? := out;
'''
//parameters for each ? placeholder
def params = [
  ff.param_input as Long, //get parameter value from flowfile attribute
  SQL.mydb.VARCHAR, //out varchar parameter https://docs.groovy-lang.org/latest/html/api/groovy/sql/Sql.html#VARCHAR
]
SQL.mydb.call(statement, params){p_out-> //we have only one out patameter
    //closure to process output parameters
    ff.param_output = p_out //assign value into flowfile attribute
}

//transfer flowfile to success
REL_SUCCESS << ff

相关问题