confluent:对表timestampincrementingtablequeryer mysql jdbc运行查询失败

ql3eal8s  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(387)

我正在尝试使用mysql的mode timestamp,因为我的表大小是2.6gb,所以行数有限。
以下是我正在使用的连接器属性:

{
        "name": "jdbc_source_mysql_registration_query",
        "config": {
                 "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                 "key.converter": "io.confluent.connect.avro.AvroConverter",
                 "key.converter.schema.registry.url": "http://localhost:8081",
                 "value.converter": "io.confluent.connect.avro.AvroConverter",
                 "value.converter.schema.registry.url": "http://localhost:8081",
                 "connection.url": "jdbc:mysql://localhost:3310/users?zeroDateTimeBehavior=ROUND&useCursorFetch=true&defaultFetchSize=1000&user=kotesh&password=kotesh",
                 "query": "SELECT matriid,DateUpdated  from users.employee WHERE date(DateUpdated)>='2018-11-28' ",
                 "mode": "timestamp",
                 "timestamp.column.name": "DateUpdated",
                 "validate.non.null": "false",
                 "topic.prefix": "mysql-prod-kot-"
        }
}

我得到如下结果:
info timestampincrementingtablequeryer{table=null,query='select matriid,dateupdated from users.employee where date(dateupdated)>='2018-11-28',topicprefix='mysql-prod-kot-',incrementingcolumn='',timestampcolumns=[dateupdated]}准备的sql查询:select matriid,dateupdated from users.employee where date(dateupdated)>='2018-11-28'where DateUpdated > ? 以及 DateUpdated < ? 订货人 DateUpdated asc(io.confluent.connect.jdbc.source.timestampincrementingtab)lequerier:161)[2018-11-29 17:29:00981]对表TimestampIncrementTableQuerier{table=null,query='select matriid,dateupdated from users.employee where date(dateupdated)>='2018-11-28',topicprefix='mysql-prod-kot-',incrementingcolumn=',timestampcolumns=[dateupdated]}:{}(io.confluent.connect.jdbc.source.jdbcs)ourcetask:328)sqlsyntaxerrorexception:您的sql语法有错误;请查看与mysql服务器版本对应的手册,以了解在“where”附近使用的正确语法 DateUpdated >“1970-01-01 00:00:00.0”和 DateUpdated <'2018-11-29 17'一号线

xtfmy6hx

xtfmy6hx1#

发生这种情况是因为您试图同时使用这两种方法 "mode": "timestamp" 以及 query . TimestampIncrementingTableQuerier 附加 WHERE 与现有查询冲突的查询的子句 WHERE 合同条款 query .
jdbc source connector docs对此很清楚: query 如果指定,则执行查询以选择新的或更新的行。如果要联接表、选择表中列的子集或筛选数据,请使用此设置。如果使用,此连接器将仅使用此查询复制数据--整个表复制将被禁用。对于增量更新,仍然可以使用不同的查询模式,但是为了正确地构造增量查询,必须可以将where子句附加到此查询(即,不能使用where子句)。如果使用where子句,它必须自己处理增量查询。
作为一种变通方法,您可以将查询修改为(取决于您使用的sql风格)

SELECT * FROM ( SELECT * FROM table WHERE ...)

WITH a AS
   SELECT * FROM b
    WHERE ...
SELECT * FROM a

例如,在您的例子中,查询应该是

"query":"SELECT * FROM (SELECT matriid,DateUpdated  from users.employee WHERE date(DateUpdated)>='2018-11-28') o"
1sbrub3j

1sbrub3j2#

错误如下:

java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; 
check the manual that corresponds to your MySQL server version for the right syntax to use near 
'WHERE `DateUpdated` > '1970-01-01 00:00:00.0' AND `DateUpdated` < '2018-11-29 17' at line 1

这是因为你在用 query 而且 "mode": "timestamp" 因此连接器试图附加它自己的 WHERE 子句,这将导致无效的sql
jdbc源连接器的每个文档:
为了正确地构造增量查询,必须能够将where子句附加到此查询(即不能使用where子句)。如果使用where子句,它必须自己处理增量查询。

相关问题