我使用mongo source来监听mongo change stream并将所有事件放入kafka,但我正在绞尽脑汁寻找从事件中提取“真实”密钥的方法。我尝试了转换,但没有成功,给了我一个错误:
Caused by: org.apache.kafka.connect.errors.DataException: Only Struct objects supported for [copying fields from value to key], found: java.lang.String
在mongo source我找到了这条线
这基本上意味着它甚至没有一些密钥处理,而是查找“\u id”字段(它不是文档的id,而是一个resume token info)
相反,我想将主题的键设置为“documentkey”。
下面是连接器获取的事件示例:
{
"_id": {
"_data": "DSAD45543FFWEHTEY004....."
},
"operationType": "replace",
"clusterTime": {
"$timestamp": {
"t": 1446707990,
"i": 1
}
},
"fullDocument": {
"_id": {
"$binary": "FxVFgHFRhrr/z+zUc/w==",
"$type": "03"
},
...
},
"ns": {
"db": "somedb",
"coll": "somecol"
},
"documentKey": {
"_id": {
"$binary": "FxVFgHFRhrr/z+zUc/w==",
"$type": "03"
}
}
}
我使用了以下配置:
"transforms":"createKey",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"documentKey"
我试过了:
org.apache.kafka.connect.json.JsonConverter
还有stringconverter(虽然我不认为这可以用string实现)
org.apache.kafka.connect.storage.StringConverter
有办法提取钥匙吗?请注意:架构已禁用。
2条答案
按热度按时间xiozqbni1#
这是因为kafka的mongodb源连接器还不支持它。它应该支持从1.3版开始的高级键选择。
https://jira.mongodb.org/browse/kafka-40
oug3syen2#
请注意:架构已禁用
在这种情况下,不能使用valuetokey转换。但是,即使可以,该转换也不支持有效负载中的嵌套值,在您的示例中类似于
documentKey._id.$binary