如何告诉mongosource(使用kafka connect)要序列化的键

omqzjyyz  于 2021-06-06  发布在  Kafka
关注(0)|答案(2)|浏览(422)

我使用mongo source来监听mongo change stream并将所有事件放入kafka,但我正在绞尽脑汁寻找从事件中提取“真实”密钥的方法。我尝试了转换,但没有成功,给了我一个错误:

Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [copying fields from value to key], found: java.lang.String

在mongo source我找到了这条线
这基本上意味着它甚至没有一些密钥处理,而是查找“\u id”字段(它不是文档的id,而是一个resume token info)
相反,我想将主题的键设置为“documentkey”。
下面是连接器获取的事件示例:

{
 "_id": {
    "_data": "DSAD45543FFWEHTEY004....."
  },
  "operationType": "replace",
  "clusterTime": {
    "$timestamp": {
      "t": 1446707990,
      "i": 1
    }
  },
  "fullDocument": {
    "_id": {
      "$binary": "FxVFgHFRhrr/z+zUc/w==",
      "$type": "03"
    },
    ...
  },
  "ns": {
    "db": "somedb",
    "coll": "somecol"
  },
  "documentKey": {
    "_id": {
      "$binary": "FxVFgHFRhrr/z+zUc/w==",
      "$type": "03"
    }
  }
}

我使用了以下配置:

"transforms":"createKey",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"documentKey"

我试过了:

org.apache.kafka.connect.json.JsonConverter

还有stringconverter(虽然我不认为这可以用string实现)

org.apache.kafka.connect.storage.StringConverter

有办法提取钥匙吗?请注意:架构已禁用。

xiozqbni

xiozqbni1#

这是因为kafka的mongodb源连接器还不支持它。它应该支持从1.3版开始的高级键选择。
https://jira.mongodb.org/browse/kafka-40

oug3syen

oug3syen2#

请注意:架构已禁用
在这种情况下,不能使用valuetokey转换。但是,即使可以,该转换也不支持有效负载中的嵌套值,在您的示例中类似于 documentKey._id.$binary

相关问题