我创建了Debezium连接器,它的工作原理就像一个魅力:
{
"name": "debezium_title",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "xxx.xxx.xxx",
"database.port": "3306",
"database.user": "kafkaconnect",
"database.password": "***",
"database.server.name": "mysql_debezium",
"heartbeat.interval.ms": 5000,
"snapshot.mode": "when_needed",
"snapshot.new.tables": "parallel",
"database.include.list": "database",
"table.include.list": "database.table1,database.table2",
"database.history.kafka.topic": "mysql_debezium_history",
"database.history.kafka.recovery.poll.interval.ms": 5000,
"database.history.kafka.bootstrap.servers": "xxx.xxx.xxx:9092",
"include.schema.changes": "false",
"transforms": "extractInt",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "id"
}
}
现在,我想删除这个连接器,并创建一个没有白名单表的新连接器-以摄取整个数据库:
{
"name": "debezium_title_new",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "xxx.xxx.xxx",
"database.port": "3306",
"database.user": "kafkaconnect",
"database.password": "***",
"database.server.name": "mysql_debezium_new",
"heartbeat.interval.ms": 5000,
"snapshot.mode": "initial",
"database.include.list": "database",
"database.history.kafka.topic": "debezium_mysql_history_new",
"database.history.kafka.recovery.poll.interval.ms": 5000,
"database.history.kafka.bootstrap.servers": "xxx.xxx.xxx:9092"
}
}
新的:连接器名称,数据库服务器名称和数据库历史Kafka主题名称。
奇怪的是,它只为我在"table.include.list"
中为第一个连接器设置的表创建了主题!
kafka-topics --list --zookeeper xxx.xxx:2181
mysql_debezium_new.table1
mysql_debezium_new.table2
过了一会儿,出现了错误:Encountered change event for table database.table3 whose schema isn't known to this connector
(table 3是数据库中不在第一个连接器白名单上的表的示例)
我的数据库中的其余表在哪里?:)当我已经有了带有白名单表的连接器时,我如何强制Debezium从mysql中移动所有表(删除第一个连接器并创建一个没有白名单的新连接器没有帮助)。
我还尝试了只添加新表到白名单,它也不起作用-它不创建新主题,也不加载整个表-它等待表中的第一条新记录,然后发生错误:不知道此表的架构。
怎么会这样?Debezium是否为第一个连接器获取数据库快照,然后将其保存在某个地方并保存它?我如何重置它?
Debezium版本:1.5
3条答案
按热度按时间ndasle7k1#
该参数是可选的,如果您希望它获取所有非系统表,请完全忽略该参数。
来自文档:* 一个可选的逗号分隔的正则表达式列表,这些正则表达式匹配您希望Debezium捕获的表的全限定表标识符;不包括在table.include.list中的任何表都将从捕获中排除。每个标识符的格式为schemaName. tableName。默认情况下,连接器捕获指定架构的所有非系统表。不能与table. exclude. list一起使用。*
链接到上述Debezium文档:https://debezium.io/documentation/reference/connectors/sqlserver.html
vmdwslir2#
@Gignak,以下是我的一些建议:
要删除连接器,请用途:
要创建新连接器,请用途:
我还建议当你需要scheuma的所有表时,至少指定scheuma,这样你就可以在json中包含这个参数:
请记住,目前(2023/05),当您停止或删除连接器时,旧主题不会被删除,因此您应该以不同的方式进行操作。
lc8prwob3#
您只需删除Kafka主题并重新运行应用程序。会成功的