我 正在 使用 debezium mongodb 连接 器 实现 数据 提取 , 它 基于 官方 文档 :https://debezium.io/documentation/reference/stable/connectors/mongodb.html 的 最 大 值
一切 都 运行 得 很 好 - - 除了 有效 负载 包含 反 斜杠 , 正如 您 在 after
属性 中 看到 的 那样 。
{
"after": "{\"_id\": {\"$oid\": \"63626d5993801d8fd1140993\"},\"document\": \"29973569000204\",\"document_type\": \"CNPJ\"}",
"patch": null,
"filter": null,
"source": {
"version": "1.7.1.Final",
"connector": "mongodb",
"name": "xxxxxxxxxx",
"ts_ms": 8466513,
"snapshot": "false",
"db": "database",
"sequence": null,
"rs": "atlas-iurhise-shard-0",
"collection": "mongo_collection",
"ord": 1,
"h": null,
"tord": 4,
"stxnid": "281f4230-d8cc-3d23-a556-89923b45e25f:168"
},
"op": "c",
"ts_ms": 1667394905422,
"transaction": null
}
中 的 每 一 个
我 尝试 过 这个 解决 方案 , 但 它 对 我 不 起 作用 :Debezium Outbox Pattern property transforms.outbox.table.expand.json.payload not working 格式
这些 是 我 的 设置 :
{
"name": "DebeziumDataExtract",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "3",
"mongodb.hosts": "removed",
"mongodb.name": "removed",
"mongodb.user": "removed",
"mongodb.password": "removed",
"mongodb.ssl.enabled": "true",
"collection.whitelist": "removed",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"hstore.handling.mode": "json",
"decimal.handling.mode": "string",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"heartbeat.interval.ms": "1000",
"heartbeat.topics.prefix": "removed",
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 1,
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.compression.type": "lz4",
"transforms": "unwrap",
"transforms.unwrap.collection.expand.json.payload": "true"
}
}
格式
并 等待 这样 的 有效 负载 :
{
"after": {
"_id": {
"$oid": "63626d5993801d8fd1140993"
},
"document": "29973585214796",
"document_type": "CNPJ"
},
"patch": null,
"filter": null,
"source": {
"version": "1.7.1.Final",
"connector": "mongodb",
"name": "xxxxxxxxxx",
"ts_ms": 8466513,
"snapshot": "false",
"db": "database",
"sequence": null,
"rs": "atlas-iurhise-shard-0",
"collection": "mongo_collection",
"ord": 1,
"h": null,
"tord": 4,
"stxnid": "281f4230-d8cc-3d23-a556-89923b45e25f:168"
},
"op": "c",
"ts_ms": 1667394905422,
"transaction": null
}
格式
谁 能 帮帮 我 ?
# # # # # # # # # 更新 # # # # # # # #
在@onecricketeer 的 评论 之后 , 我 尝试 了 这个 :
{
"name": "DebeziumTransportPlanner",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "3",
"mongodb.hosts": "stg-transport-planner-0-shard-00-00-00.xmapa.mongodb.net,stg-transport-planner-0-shard-00-01.xmapa.mongodb.net,stg-transport-planner-0-shard-00-02.xmapa.mongodb.net",
"mongodb.name": "stg-transport-planner-01",
"mongodb.user": "oploguser-stg",
"mongodb.password": "vCh1NtV4PoY8PeSJ",
"mongodb.ssl.enabled": "true",
"collection.whitelist": "stg-transport-planner-01[.]aggregated_transfers",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"hstore.handling.mode": "json",
"decimal.handling.mode": "string",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"heartbeat.interval.ms": "1000",
"heartbeat.topics.prefix": "__debeziumtransport-planner-heartbeat",
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 1,
"topic.creation.default.cleanup.policy": "compact",
"topic.creation.default.compression.type": "lz4",
"transforms": "unwrap",
"transforms.unwrap.type":"io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
"transforms.unwrap.collection.expand.json.payload": "true",
"transforms.unwrap.collection.fields.additional.placement": "route_external_id:header,transfer_index:header"
}
}
格式
1条答案
按热度按时间n6lpvg4x1#
如果希望数据是 *JSON对象 * 而不是String,则需要使用
JsonConverter
而不是StringConverter
。此外,您还缺少
transforms.unwrap.type