当记录从多个源插入到源主题中时,kafka源连接器没有按预期提取记录

6gpjuf90  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(531)

在我的一个用例中,我试图创建一个管道
每当我从自定义分区发送消息时,我都以毫秒为单位发送长数据类型的时间戳,因为在模式中,时间戳列被定义为long。
我之前在自定义分区中使用的代码:

Date date = new Date();
long timeMilli = date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);

发送记录前显示结果:
日期=美国东部时间2019年3月26日星期二22:02:04,时间单位:毫秒=1553652124063
表2中时间戳列中插入的值:
2019年3月27日凌晨2:02:04.063000
由于它采取英国时区(我相信),我把暂时的临时补丁,以减少4小时,从目前的时间戳,以便我可以匹配美国东部时间戳。

Date date = new Date();
Date adj_date = DateUtils.addHours(date,-4);
long timeMilli = adj_date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);

显示结果:
日期=2019年3月26日星期二22:04:43美国东部时间,时间单位:毫秒=1553637883826
表2中时间戳列中插入的值:
2019年3月26日晚上10:04:43.826000
请让我知道,如果我遗漏了什么,因为我不知道为什么这是发生时,我从自定义分区发送消息。

qlzsbp2j

qlzsbp2j1#

在引擎盖下jdbc源连接器使用以下查询:

SELECT * FROM someTable
WHERE
someTimestampColumn < $endTimetampValue
AND (
    (someTimestampColumn = $beginTimetampValue AND someIncrementalColumn > $lastIncrementedValue)
    OR someTimestampColumn > $beginTimetampValue)
ORDER BY someTimestampColumn, someIncrementalColumn ASC

摘要:如果行的timestamp列的值早于当前时间戳且晚于上次选中的时间戳,则查询将检索行。
以上参数为: beginTimetampValue -上次导入记录的timestamp列的值 endTimetampValue -根据数据库的当前时间戳 lastIncrementedValue -上次导入记录的增量列的值
我想在你的情况下 Producer 将具有更高时间戳的记录放入表中,而不是稍后手动插入(使用查询)。
当jdbc连接器检查要导入到kafka的新记录时,它会跳过它们(因为它们不完整) someTimestampColumn < $endTimetampValue 时间戳条件)
也可以将日志级别更改为 DEBUG 看看日志里是怎么回事

相关问题