使用jsonconverter的kafka connect mongodb kafka连接器不工作

disbfnqx  于 2021-06-05  发布在  Kafka
关注(0)|答案(2)|浏览(381)

我正在尝试配置kafka连接器以使用mongodb作为源,并将记录发送到kakfa主题中。我已经成功地做到了这一点,但我正在尝试使用jsonconverter来实现这一点,以便还可以用有效负载保存模式。
我的问题是连接器正在按如下方式保存数据:

{ "schema": { "type": "string" } , "payload": "{....}" }

换句话说,它自动假设实际的json是一个字符串,并将模式保存为字符串。
我就是这样设置连接器的:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
 "name": "newtopic",
 "config": { 
"tasks.max":1,
 "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector", 
 "key.converter":"org.apache.kafka.connect.json.JsonConverter", 
 "key.converter.schemas.enabled": "true",
 "value.converter":"org.apache.kafka.connect.json.JsonConverter", 
 "value.converter.schemas.enabled": "true",
 "connection.uri":"[MONGOURL]", 
 "database":"dbname", 
 "collection":"collname", 
 "pipeline":"[]", 
 "topic.prefix": "", 
 "publish.full.document.only": "true" 
 }}'

我是不是在配置上遗漏了什么?它只是不能猜测mongodb中存储的文档的模式,所以它使用string吗?

lkaoscv7

lkaoscv71#

mongodb连接器的最新版本1.3应该可以解决这个问题。
它甚至提供了一个推断源模式的选项。

c90pui9n

c90pui9n2#

没什么好猜的。mongo是无模式的,上次我检查过,所以模式是一个字符串或字节。我建议使用avroconverter或将schema enabled设置为false
你也可以尝试使用debezium来看看你是否得到了不同的结果

相关问题