使用mongodb kafka连接器的多个集合

1cosmwyk  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(568)

根据文档,如果您不提供值,它将从所有集合中读取
“数据库中要监视更改的集合的名称。如果未设置,则将监视所有集合。“
我看到了连接器源代码并确认了这一点:
https://github.com/mongodb/mongo-kafka/blob/k133/src/main/java/com/mongodb/kafka/connect/source/mongosourcetask.java#l462
但是,如果不提供集合,则会出现如下错误:

ERROR WorkerSourceTask{id=mongo-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:186)
org.apache.kafka.connect.errors.ConnectException: com.mongodb.MongoCommandException: Command failed with error 73 (InvalidNamespace): '{aggregate: 1} is not valid for '$changeStream'; a collection is required.' on server localhost:27018. The full response is {"operationTime": {"$timestamp": {"t": 1603928795, "i": 1}}, "ok": 0.0, "errmsg": "{aggregate: 1} is not valid for '$changeStream'; a collection is required.", "code": 73, "codeName": "InvalidNamespace", "$clusterTime": {"clusterTime": {"$timestamp": {"t": 1603928795, "i": 1}}, "signature": {"hash": {"$binary": "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "$type": "00"}, "keyId": {"$numberLong": "0"}}}}

这是我的配置文件

name=mongo-source
connector.class=com.mongodb.kafka.connect.MongoSourceConnector
tasks.max=1

# Connection and source configuration

connection.uri=mongodb://localhost:27017,localhost:27018/order
database=order
collection=

topic.prefix=redemption
poll.max.batch.size=1000
poll.await.time.ms=5000

# Change stream options

pipeline=[]
batch.size=0
change.stream.full.document=updateLookup
collation=
copy.existing=true
errors.tolerance=all

如果使用了集合,我就可以使用连接器并生成主题。
查看日志,连接器似乎正在连接到数据库:
正在“order”(com.mongodb.kafka.connect.source.mongos)上监视数据库更改的信息ourcetask:620)
源代码

else if (collection.isEmpty()) {
      LOGGER.info("Watching for database changes on '{}'", database);
      MongoDatabase db = mongoClient.getDatabase(database);
      changeStream = pipeline.map(db::watch).orElse(db.watch());
    } else

如果我去我的mongo控制台,我有以下内容:

rs0:SECONDARY> db.watch()
2020-10-28T18:13:50.344-0600 E QUERY    [thread1] TypeError: db.watch is not a function :
@(shell):1:1
rs0:SECONDARY> db.watch
test.watch
9rygscc1

9rygscc11#

我使用的是mongo3.6版本,它支持监视集合,但不支持监视数据库或部署(示例),因此我遇到了这些错误。
我在文件上找到这个:
从mongodb4.0开始,您可以打开单个数据库(不包括admin、local和config数据库)的变更流游标,以监视其所有非系统集合的变更。
https://docs.mongodb.com/manual/changestreams/#watch-集合数据库部署

相关问题