我正在尝试配置kafka连接器以使用mongodb作为源,并将记录发送到kakfa主题中。我已经成功地做到了这一点,但我正在尝试使用jsonconverter来实现这一点,以便还可以用有效负载保存模式。
我的问题是连接器正在按如下方式保存数据:
{ "schema": { "type": "string" } , "payload": "{....}" }
换句话说,它自动假设实际的json是一个字符串,并将模式保存为字符串。
我就是这样设置连接器的:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "newtopic",
"config": {
"tasks.max":1,
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enabled": "true",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enabled": "true",
"connection.uri":"[MONGOURL]",
"database":"dbname",
"collection":"collname",
"pipeline":"[]",
"topic.prefix": "",
"publish.full.document.only": "true"
}}'
我是不是在配置上遗漏了什么?它只是不能猜测mongodb中存储的文档的模式,所以它使用string吗?
2条答案
按热度按时间lkaoscv71#
mongodb连接器的最新版本1.3应该可以解决这个问题。
它甚至提供了一个推断源模式的选项。
c90pui9n2#
没什么好猜的。mongo是无模式的,上次我检查过,所以模式是一个字符串或字节。我建议使用avroconverter或将schema enabled设置为false
你也可以尝试使用debezium来看看你是否得到了不同的结果