jdbc合流连接器模式

nfzehxib  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(610)

我在jdbc-kafka源连接器中使用了一个自定义查询,有人能告诉我在jdbc-kafka源连接器中使用自定义查询时的模式吗?如果我使用的是批量模式,那么它将重新插入kafka主题中的所有数据。note:-i didn我的表中没有任何主键或时间戳列。

35g0bw71

35g0bw711#

您可以使用递增或时间戳 incrementing -在每个表上使用严格递增的列,以仅检测新行。请注意,这不会检测现有行的修改或删除。 timestamp -使用timestamp(或类似timestamp)列来检测新的和修改过的行。这假设列随着每次写入而更新,并且值是单调递增的,但不一定是唯一的。 timestamp+incrementing -使用两个列,一个timestamp列检测新的和修改的行,另一个严格递增的列为更新提供全局唯一的id,这样就可以为每一行分配一个唯一的流偏移量。
例如 timestamp :

name=mysql-source-test
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10

connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=myuser&password=mypass
table.whitelist=users,products

mode=timestamp
timestamp.column.name=last_modified

topic.prefix=mysql-test-

例如 incrementing :

name=mysql-source-test
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10

connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=myuser&password=mypass
table.whitelist=users,products

mode=incrementing
incrementing.column.name=id

topic.prefix=mysql-test-

例如 timestamp+incrementing :

name=mysql-source-test
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10

connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=myuser&password=mypass
table.whitelist=users,products

mode=timestamp+incrementing
incrementing.column.name=id
timestamp.column.name=last_modified

topic.prefix=mysql-test-
4dbbbstv

4dbbbstv2#

如果没有timestamp或递增id列,则无法执行基于查询的cdc,只能执行批量加载。
您的替代方法是使用基于日志的cdc和debezium等工具。
本讲座将详细介绍每个选项和可用工具:http://rmoff.dev/ksny19-no-more-silos

相关问题