Kafka 带有MongoDB的Debezium-生成的记录的有效负载包含反斜杠

j91ykkif  于 2022-11-21  发布在  Apache
关注(0)|答案(1)|浏览(211)

我 正在 使用 debezium mongodb 连接 器 实现 数据 提取 , 它 基于 官方 文档 :https://debezium.io/documentation/reference/stable/connectors/mongodb.html 的 最 大 值
一切 都 运行 得 很 好 - - 除了 有效 负载 包含 反 斜杠 , 正如 您 在 after 属性 中 看到 的 那样 。

{
  "after": "{\"_id\": {\"$oid\": \"63626d5993801d8fd1140993\"},\"document\": \"29973569000204\",\"document_type\": \"CNPJ\"}",
  "patch": null,
  "filter": null,
  "source": {
    "version": "1.7.1.Final",
    "connector": "mongodb",
    "name": "xxxxxxxxxx",
    "ts_ms": 8466513,
    "snapshot": "false",
    "db": "database",
    "sequence": null,
    "rs": "atlas-iurhise-shard-0",
    "collection": "mongo_collection",
    "ord": 1,
    "h": null,
    "tord": 4,
    "stxnid": "281f4230-d8cc-3d23-a556-89923b45e25f:168"
  },
  "op": "c",
  "ts_ms": 1667394905422,
  "transaction": null
}

中 的 每 一 个
我 尝试 过 这个 解决 方案 , 但 它 对 我 不 起 作用 :Debezium Outbox Pattern property transforms.outbox.table.expand.json.payload not working 格式
这些 是 我 的 设置 :

{
  "name": "DebeziumDataExtract",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "tasks.max": "3",
    "mongodb.hosts": "removed",
    "mongodb.name": "removed",
    "mongodb.user": "removed",
    "mongodb.password": "removed",
    "mongodb.ssl.enabled": "true",
    "collection.whitelist": "removed",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "hstore.handling.mode": "json",
    "decimal.handling.mode": "string",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "heartbeat.interval.ms": "1000",
    "heartbeat.topics.prefix": "removed",
    "topic.creation.default.replication.factor": 3,  
    "topic.creation.default.partitions": 1,  
    "topic.creation.default.cleanup.policy": "compact",  
    "topic.creation.default.compression.type": "lz4",
    "transforms": "unwrap",
    "transforms.unwrap.collection.expand.json.payload": "true"
  }
}

格式
并 等待 这样 的 有效 负载 :

{
  "after": {
    "_id": {
      "$oid": "63626d5993801d8fd1140993"
    },
    "document": "29973585214796",
    "document_type": "CNPJ"
  },
  "patch": null,
  "filter": null,
  "source": {
    "version": "1.7.1.Final",
    "connector": "mongodb",
    "name": "xxxxxxxxxx",
    "ts_ms": 8466513,
    "snapshot": "false",
    "db": "database",
    "sequence": null,
    "rs": "atlas-iurhise-shard-0",
    "collection": "mongo_collection",
    "ord": 1,
    "h": null,
    "tord": 4,
    "stxnid": "281f4230-d8cc-3d23-a556-89923b45e25f:168"
  },
  "op": "c",
  "ts_ms": 1667394905422,
  "transaction": null
}

格式
谁 能 帮帮 我 ?

# # # # # # # # # 更新 # # # # # # # #

在@onecricketeer 的 评论 之后 , 我 尝试 了 这个 :

{
  "name": "DebeziumTransportPlanner",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "tasks.max": "3",
    "mongodb.hosts": "stg-transport-planner-0-shard-00-00-00.xmapa.mongodb.net,stg-transport-planner-0-shard-00-01.xmapa.mongodb.net,stg-transport-planner-0-shard-00-02.xmapa.mongodb.net",
    "mongodb.name": "stg-transport-planner-01",
    "mongodb.user": "oploguser-stg",
    "mongodb.password": "vCh1NtV4PoY8PeSJ",
    "mongodb.ssl.enabled": "true",
    "collection.whitelist": "stg-transport-planner-01[.]aggregated_transfers",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "hstore.handling.mode": "json",
    "decimal.handling.mode": "string",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false",
    "heartbeat.interval.ms": "1000",
    "heartbeat.topics.prefix": "__debeziumtransport-planner-heartbeat",
    "topic.creation.default.replication.factor": 3,  
    "topic.creation.default.partitions": 1,  
    "topic.creation.default.cleanup.policy": "compact",  
    "topic.creation.default.compression.type": "lz4",
    "transforms": "unwrap",
    "transforms.unwrap.type":"io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
    "transforms.unwrap.collection.expand.json.payload": "true",
    "transforms.unwrap.collection.fields.additional.placement": "route_external_id:header,transfer_index:header"
  }
}

格式

n6lpvg4x

n6lpvg4x1#

如果希望数据是 *JSON对象 * 而不是String,则需要使用JsonConverter而不是StringConverter
此外,您还缺少transforms.unwrap.type

相关问题