我正在尝试使用mysql的mode timestamp,因为我的表大小是2.6gb,所以行数有限。
以下是我正在使用的连接器属性:
{
"name": "jdbc_source_mysql_registration_query",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"connection.url": "jdbc:mysql://localhost:3310/users?zeroDateTimeBehavior=ROUND&useCursorFetch=true&defaultFetchSize=1000&user=kotesh&password=kotesh",
"query": "SELECT matriid,DateUpdated from users.employee WHERE date(DateUpdated)>='2018-11-28' ",
"mode": "timestamp",
"timestamp.column.name": "DateUpdated",
"validate.non.null": "false",
"topic.prefix": "mysql-prod-kot-"
}
}
我得到如下结果:
info timestampincrementingtablequeryer{table=null,query='select matriid,dateupdated from users.employee where date(dateupdated)>='2018-11-28',topicprefix='mysql-prod-kot-',incrementingcolumn='',timestampcolumns=[dateupdated]}准备的sql查询:select matriid,dateupdated from users.employee where date(dateupdated)>='2018-11-28'where DateUpdated
> ? 以及 DateUpdated
< ? 订货人 DateUpdated
asc(io.confluent.connect.jdbc.source.timestampincrementingtab)lequerier:161)[2018-11-29 17:29:00981]对表TimestampIncrementTableQuerier{table=null,query='select matriid,dateupdated from users.employee where date(dateupdated)>='2018-11-28',topicprefix='mysql-prod-kot-',incrementingcolumn=',timestampcolumns=[dateupdated]}:{}(io.confluent.connect.jdbc.source.jdbcs)ourcetask:328)sqlsyntaxerrorexception:您的sql语法有错误;请查看与mysql服务器版本对应的手册,以了解在“where”附近使用的正确语法 DateUpdated
>“1970-01-01 00:00:00.0”和 DateUpdated
<'2018-11-29 17'一号线
2条答案
按热度按时间xtfmy6hx1#
发生这种情况是因为您试图同时使用这两种方法
"mode": "timestamp"
以及query
.TimestampIncrementingTableQuerier
附加WHERE
与现有查询冲突的查询的子句WHERE
合同条款query
.jdbc source connector docs对此很清楚:
query
如果指定,则执行查询以选择新的或更新的行。如果要联接表、选择表中列的子集或筛选数据,请使用此设置。如果使用,此连接器将仅使用此查询复制数据--整个表复制将被禁用。对于增量更新,仍然可以使用不同的查询模式,但是为了正确地构造增量查询,必须可以将where子句附加到此查询(即,不能使用where子句)。如果使用where子句,它必须自己处理增量查询。作为一种变通方法,您可以将查询修改为(取决于您使用的sql风格)
或
例如,在您的例子中,查询应该是
1sbrub3j2#
错误如下:
这是因为你在用
query
而且"mode": "timestamp"
因此连接器试图附加它自己的WHERE
子句,这将导致无效的sqljdbc源连接器的每个文档:
为了正确地构造增量查询,必须能够将where子句附加到此查询(即不能使用where子句)。如果使用where子句,它必须自己处理增量查询。