我正在使用kafka connect to source data from db2 to kafka主题,我正在配置sql query以从db2读取数据,下面是查询
SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR WHERE TRAN_I ='503' AND PRCS_N = 'GLOBAL'
我正在使用设置 "timestamp.column.name": "CREATE_TS"
这里的问题是他们的查询已经存在了 WHERE
kafka connect试图添加另一个带有timestamp列的where子句,它正在创建一个问题,另一个问题是如果我从sql子句中删除where子句,如下所示
SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR
然后我的substr出错了,如下所示
SQL Error [22011]: THE SECOND OR THIRD ARGUMENT OF THE SUBSTR OR SUBSTRING FUNCTION IS OUT OF RANGE. SQLCODE=-138, SQLSTATE=22011, DRIVER=4.19.26
任何人都可以在这两个问题上提出建议,我被困在这一点上。
1条答案
按热度按时间kgqe7b3p1#
发生这种情况是因为您试图同时使用这两种方法
"mode": "timestamp"
以及query
.TimestampIncrementingTableQuerier
附加WHERE
与现有查询冲突的查询的子句WHERE
合同条款query
.jdbc source connector docs对此很清楚:
query
如果指定,则执行查询以选择新的或更新的行。如果要联接表、选择表中列的子集或筛选数据,请使用此设置。如果使用,此连接器将仅使用此查询复制数据--整个表复制将被禁用。对于增量更新,仍然可以使用不同的查询模式,但是为了正确地构造增量查询,必须可以将where子句附加到此查询(即,不能使用where子句)。如果使用where子句,它必须自己处理增量查询。作为一种变通方法,您可以将查询修改为(取决于您使用的sql风格)
或
例如,在您的例子中,查询应该是