如何在kafka connect jdbc源连接器中添加显式where子句

dba5bblo  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(424)

我正在使用kafka connect to source data from db2 to kafka主题,我正在配置sql query以从db2读取数据,下面是查询

SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR WHERE TRAN_I ='503' AND PRCS_N = 'GLOBAL'

我正在使用设置 "timestamp.column.name": "CREATE_TS" 这里的问题是他们的查询已经存在了 WHERE kafka connect试图添加另一个带有timestamp列的where子句,它正在创建一个问题,另一个问题是如果我从sql子句中删除where子句,如下所示

SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR

然后我的substr出错了,如下所示

SQL Error [22011]: THE SECOND OR THIRD ARGUMENT OF THE SUBSTR OR SUBSTRING FUNCTION IS OUT OF RANGE. SQLCODE=-138, SQLSTATE=22011, DRIVER=4.19.26

任何人都可以在这两个问题上提出建议,我被困在这一点上。

kgqe7b3p

kgqe7b3p1#

发生这种情况是因为您试图同时使用这两种方法 "mode": "timestamp" 以及 query . TimestampIncrementingTableQuerier 附加 WHERE 与现有查询冲突的查询的子句 WHERE 合同条款 query .
jdbc source connector docs对此很清楚: query 如果指定,则执行查询以选择新的或更新的行。如果要联接表、选择表中列的子集或筛选数据,请使用此设置。如果使用,此连接器将仅使用此查询复制数据--整个表复制将被禁用。对于增量更新,仍然可以使用不同的查询模式,但是为了正确地构造增量查询,必须可以将where子句附加到此查询(即,不能使用where子句)。如果使用where子句,它必须自己处理增量查询。
作为一种变通方法,您可以将查询修改为(取决于您使用的sql风格)

SELECT * FROM ( SELECT * FROM table WHERE ...)

WITH a AS
   SELECT * FROM b
    WHERE ...
SELECT * FROM a

例如,在您的例子中,查询应该是

"query":"SELECT * FROM (SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR WHERE TRAN_I ='503' AND PRCS_N = 'GLOBAL') o"

相关问题