如何在java中从db读入kafka producer表的delta?

h43kikqp  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(188)

我试图在java代码中使用db数据作为我的kafka生产者。源数据连续增长(例如每秒20行)。每次将新记录插入db表时,都会从db读取整个数据并将其添加到kafka主题中。我只希望新添加的行被发送到主题(即,如果表已经包含10行,并且又添加了4行,那么只需要将这4行发送到主题)。
如果我们还可以使用kafkaapi,有没有一种方法可以在java中实现这一点??

izj3ouym

izj3ouym1#

一个更简单的方法是使用变更数据捕获将变更从数据库提供给kafka主题。尝试自己建造这个是重新发明一个已经完善的轮子;-)
你的源数据库是什么?对于专有的rdbms(oracle、db2、mssql等),您可以使用goldengate、attunity、dbvisit等商业工具。对于开源rdbms(例如mysql、postgresql),您应该看看开源debezium工具。所有这些cdc工具都直接与Kafka集成。
根据您的用例、规模等,您还有另一个选择,就是使用jdbc-kafka-connect连接器从数据库中提取更改的行。这并不像cdc那样灵活或可伸缩,但仍然比自己尝试轮询数据库更有用、更容易。

相关问题