在我的一个用例中,我试图创建一个管道
每当我从自定义分区发送消息时,我都以毫秒为单位发送长数据类型的时间戳,因为在模式中,时间戳列被定义为long。
我之前在自定义分区中使用的代码:
Date date = new Date();
long timeMilli = date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);
发送记录前显示结果:
日期=美国东部时间2019年3月26日星期二22:02:04,时间单位:毫秒=1553652124063
表2中时间戳列中插入的值:
2019年3月27日凌晨2:02:04.063000
由于它采取英国时区(我相信),我把暂时的临时补丁,以减少4小时,从目前的时间戳,以便我可以匹配美国东部时间戳。
Date date = new Date();
Date adj_date = DateUtils.addHours(date,-4);
long timeMilli = adj_date.getTime();
System.out.println("date = " + date.toString() + " , time in millis = " + timeMilli);
显示结果:
日期=2019年3月26日星期二22:04:43美国东部时间,时间单位:毫秒=1553637883826
表2中时间戳列中插入的值:
2019年3月26日晚上10:04:43.826000
请让我知道,如果我遗漏了什么,因为我不知道为什么这是发生时,我从自定义分区发送消息。
1条答案
按热度按时间qlzsbp2j1#
在引擎盖下jdbc源连接器使用以下查询:
摘要:如果行的timestamp列的值早于当前时间戳且晚于上次选中的时间戳,则查询将检索行。
以上参数为:
beginTimetampValue
-上次导入记录的timestamp列的值endTimetampValue
-根据数据库的当前时间戳lastIncrementedValue
-上次导入记录的增量列的值我想在你的情况下
Producer
将具有更高时间戳的记录放入表中,而不是稍后手动插入(使用查询)。当jdbc连接器检查要导入到kafka的新记录时,它会跳过它们(因为它们不完整)
someTimestampColumn < $endTimetampValue
时间戳条件)也可以将日志级别更改为
DEBUG
看看日志里是怎么回事