Kafka producer无法创建主题,并在创建Debezium MySQL连接器后抛出连续错误

0pizxfdo  于 2023-05-16  发布在  Mysql
关注(0)|答案(4)|浏览(110)

我正在使用Debezium作为CDC工具来从MySql流式传输数据。在将Debezium MySQL连接器安装到Confluent OSS集群后,我试图将MySQL bin_log更改捕获到Kafka主题中。当我创建连接器时,在拍摄数据库快照之后,我会遇到一系列连续的错误。
我检查了MySql bin_log是ON,并尝试用不同的序列化器重新启动schema-registry和连接器。但我也犯了同样的错误。
错误日志显示:

[2019-06-21 13:56:14,885] INFO Step 8: - Completed scanning a total of 955 rows from table 'mydb.test' after 00:00:00.086 (io.debezium.connector.mysql.SnapshotReader:565)
[2019-06-21 13:56:14,886] INFO Step 8: scanned 1758 rows in 2 tables in 00:00:00.383 (io.debezium.connector.mysql.SnapshotReader:601)
[2019-06-21 13:56:14,886] INFO Step 9: committing transaction (io.debezium.connector.mysql.SnapshotReader:635)
[2019-06-21 13:56:14,887] INFO Completed snapshot in 00:00:01.055 (io.debezium.connector.mysql.SnapshotReader:701)
[2019-06-21 13:56:14,965] WARN [Producer clientId=producer-5] Error while fetching metadata with correlation id 11 : {kbserver=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient:968)
[2019-06-21 13:56:15,066] WARN [Producer clientId=producer-5] Error while fetching metadata with correlation id 12 : {kbserver=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient:968)
[2019-06-21 13:56:15,168] WARN [Producer clientId=producer-5] Error while fetching metadata with correlation id 13 : {kbserver=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient:968)
[2019-06-21 13:56:15,269] WARN [Producer clientId=producer-5] Error while fetching metadata with correlation id 14 : {kbserver=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient:968)
[2019-06-21 13:56:15,370] WARNDebezium [Producer clientId=producer-5] Error while fetching metadata with correlation id 15 : {kbserver=UNKNOWN_TOPIC_OR_PARTITION}

我发送的连接器有效负载如下所示:

{
      "name": "debezium-connector",
      "config": {
            "connector.class": "io.debezium.connector.mysql.MySqlConnector",
            "tasks.max": "1",
            "key.serializer": "io.confluent.connect.avro.AvroConverter",
            "value.serializer": "io.confluent.connect.avro.AvroConverter",
            "database.hostname": "localhost",
            "database.port": "3306",
            "database.user": "test",
            "database.password": "test@123",
            "database.whitelist": "mydb",
            "table.whitelist": "mydb.test",
            "database.server.id": "1",
            "database.server.name": "kbserver",
            "database.history.kafka.bootstrap.servers": "kafka:9092",
            "database.history.kafka.topic": "db-schema.mydb",
            "include.schema.changes": "true"
       }
    }

有人知道为什么会发生这种情况吗?或者我该如何解决?

xxb16uws

xxb16uws1#

请参考database.whitelisttable.whitelist。它们是不一致的。
它应该是mydbmydb.testdbdb.test,具体取决于数据库的名称。

pobjuy32

pobjuy322#

我也犯了同样的错误……在我的例子中,我没有提前创建“模式更改主题”。参见:https://debezium.io/documentation/reference/stable/connectors/sqlserver.html#about-the-debezium-sqlserver-connector-schema-change-topic
一旦我创建了这个主题,错误就消失了,数据开始流动。

btxsgosb

btxsgosb3#

解决方案隐藏在评论中,但对我来说是成功的:在Kafka代理上启用主题的自动创建就足够了。
如果你使用strimzi,修改配置部分的属性如下:

auto.create.topics.enable: 'true'

在Kafka Connect中可以设置相同的配置。这是一个设计选择在哪里设置它。
如果你在broker上这样做,它将被视为默认值,并且它将对所有创建新Kafka主题的新请求有效。如果您在Connect上设置它,它将仅对通过连接的主题有效。
然而我会避免这两种解决方案,并在设置中手动创建主题。这是来自debezium文档
您需要创建的主题需要与具有相同的名称,其中serverName是在www.example.com中指定的逻辑服务器名称database.server.name

h7appiyu

h7appiyu4#

配置连接器以自动创建主题:https://debezium.io/documentation/reference/stable/configuration/topic-auto-create-config.html

"topic.creation.enable": "true",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "1",

相关问题