es接收器连接器debezium因错误而停止

hof1towb  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(523)

为了理解cdc是如何工作的,我一直在使用debezium站点提供的以下示例https://debezium.io/blog/2018/03/08/creating-ddd-aggregates-with-debezium-and-kafka-streams/.
在本例中,如果我尝试将sink connector从mongo db更改为elastic search,然后启动es sink connector。它显示以下错误

Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.

mysql debezium源连接器属性如下(请忽略更正url)

{
"name": "mysql-source",
"config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "localhost",
    "database.port": "3306",
    "database.user": "cdc",
    "database.password": "passwrod",
    "database.server.id": "1840514",
    "database.server.name": "dbserver1",
    "table.whitelist": "inventory.customers,inventory.addresses",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "transforms": "unwrap",
    "transforms.unwrap.type":"io.debezium.transforms.UnwrapFromEnvelope",
    "transforms.unwrap.drop.tombstones":"false"
}

}
本例中使用的是ElasticSearch接收器连接器https://debezium.io/blog/2018/01/17/streaming-to-elasticsearch/
弹性接收器连接器属性如下(请忽略更正url)

{
"name": "elastic-sink",
"config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "product-cdc,final_ddd_aggregates,dbserver1.inventory.customers,dbserver1.inventory.addresses",
    "connection.url": "https://localhost:9243",
    "transforms": "unwrap,key",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.key.field": "id",
    "key.ignore": "false",
    "schema.ignore" : "true",
    "value.converter.schemas.enable":"true",
    "type.name":"final_ddd_aggregates"
}

}
请在这件事上帮助我。

bvhaajcl

bvhaajcl1#

在您的配置中,您需要按照错误消息告诉您的操作,并设置 schemas.enable=false . 使用本文中的示例,而不是:

{
    "name": "mongodb-sink",
    "config": {
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "tasks.max": "1",
        "topics": "final_ddd_aggregates",
        "mongodb.connection.uri": "mongodb://mongodb:27017/inventory?w=1&journal=true",
        "mongodb.collection": "customers_with_addresses",
        "mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.FullKeyStrategy",
        "mongodb.delete.on.null.values": true
    }
}

你应该:

{
    "name": "mongodb-sink",
    "config": {
        "connector.class": "at.grahsl.kafka.connect.mongodb.MongoDbSinkConnector",
        "tasks.max": "1",
        "topics": "final_ddd_aggregates",
        "mongodb.connection.uri": "mongodb://mongodb:27017/inventory?w=1&journal=true",
        "mongodb.collection": "customers_with_addresses",
        "mongodb.document.id.strategy": "at.grahsl.kafka.connect.mongodb.processor.id.strategy.FullKeyStrategy",
        "mongodb.delete.on.null.values": true,
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false"
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false"
    }
}

要了解有关转换器等的更多信息,请参阅https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained 还有http://rmoff.dev/ksldn19-kafka-connect

ef1yzkbh

ef1yzkbh2#

正如错误消息所暗示的那样,在您正在阅读的主题中可能存储了没有模式的json消息。您需要在源端启用它,或者在接收器端禁用它。
有关mor的详细信息,请查看此faq条目。

相关问题