我在jdbc-kafka源连接器中使用了一个自定义查询,有人能告诉我在jdbc-kafka源连接器中使用自定义查询时的模式吗?如果我使用的是批量模式,那么它将重新插入kafka主题中的所有数据。note:-i didn我的表中没有任何主键或时间戳列。
35g0bw711#
您可以使用递增或时间戳 incrementing -在每个表上使用严格递增的列,以仅检测新行。请注意,这不会检测现有行的修改或删除。 timestamp -使用timestamp(或类似timestamp)列来检测新的和修改过的行。这假设列随着每次写入而更新,并且值是单调递增的,但不一定是唯一的。 timestamp+incrementing -使用两个列,一个timestamp列检测新的和修改的行,另一个严格递增的列为更新提供全局唯一的id,这样就可以为每一行分配一个唯一的流偏移量。例如 timestamp :
incrementing
timestamp
timestamp+incrementing
name=mysql-source-test connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=10 connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=myuser&password=mypass table.whitelist=users,products mode=timestamp timestamp.column.name=last_modified topic.prefix=mysql-test-
例如 incrementing :
name=mysql-source-test connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=10 connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=myuser&password=mypass table.whitelist=users,products mode=incrementing incrementing.column.name=id topic.prefix=mysql-test-
例如 timestamp+incrementing :
name=mysql-source-test connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=10 connection.url=jdbc:mysql://mysql.example.com:3306/my_database?user=myuser&password=mypass table.whitelist=users,products mode=timestamp+incrementing incrementing.column.name=id timestamp.column.name=last_modified topic.prefix=mysql-test-
4dbbbstv2#
如果没有timestamp或递增id列,则无法执行基于查询的cdc,只能执行批量加载。您的替代方法是使用基于日志的cdc和debezium等工具。本讲座将详细介绍每个选项和可用工具:http://rmoff.dev/ksny19-no-more-silos
2条答案
按热度按时间35g0bw711#
您可以使用递增或时间戳
incrementing
-在每个表上使用严格递增的列,以仅检测新行。请注意,这不会检测现有行的修改或删除。timestamp
-使用timestamp(或类似timestamp)列来检测新的和修改过的行。这假设列随着每次写入而更新,并且值是单调递增的,但不一定是唯一的。timestamp+incrementing
-使用两个列,一个timestamp列检测新的和修改的行,另一个严格递增的列为更新提供全局唯一的id,这样就可以为每一行分配一个唯一的流偏移量。例如
timestamp
:例如
incrementing
:例如
timestamp+incrementing
:4dbbbstv2#
如果没有timestamp或递增id列,则无法执行基于查询的cdc,只能执行批量加载。
您的替代方法是使用基于日志的cdc和debezium等工具。
本讲座将详细介绍每个选项和可用工具:http://rmoff.dev/ksny19-no-more-silos