如何更改kafka连接源连接器生成的主题的名称

ezykj2lf  于 2021-06-05  发布在  Kafka
关注(0)|答案(2)|浏览(451)

我有一个已经运行的生产部署kafka集群,主题是“现有主题”。我正在使用debezium的mongodb源连接器。
在这里,我只想将cdc事件直接推送到主题“existing topic”上,以便已经在收听该主题的消费者能够处理它。
我没有找到任何资源这样做,但它提到的主题是创建在下面的格式-
如果mongodb.name参数为a,数据库名称为b,集合名称为c,则来自数据库a和集合c的数据将加载到主题a.b.c下
我可以将主题更改为“现有主题”并将事件推送到它吗?

wwodge7n

wwodge7n1#

根据文件,
Kafka主题的名字总是以 logicalName.databaseName.collectionName ,在哪里 logicalName 是用指定的连接器的逻辑名称 mongodb.name 配置属性, databaseName 发生操作的数据库的名称,以及 collectionName 存在受影响文档的mongodb集合的名称。
这意味着如果连接器的逻辑名称是 myConnector 还有你的数据库 myDatabase 有两个收藏 users 以及 orders ```
{
"name": "myConnector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "mongo-db-host:27017",
"mongodb.name": "myDatabase",
"collection.whitelist": "myDatabase[.]*",
}
}

然后kafka connect将用名称填充两个主题:
myConnector.myDatabase.users `myConnector.myDatabase.orders` 现在,如果仍要更改目标主题的名称,可以使用kafka connect single message transforms(smt)。更准确地说, `ExtractTopic` 我应该帮你。请注意,尽管这个smt可以帮助您从消息的键或值中提取主题名称,但是您还是需要在负载中包含所需的主题名称。
例如,下面的smt将提取字段的值 `myField` 并以此作为唱片的主题:

transforms.ValueFieldExample.type=io.confluent.connect.transforms.ExtractTopic$Value
transforms.ValueFieldExample.field=myField

vltsax25

vltsax252#

我在使用jdbc源代码连接器时遇到了同样的问题,并找到了不同的解决方案:
使用 RegexRouter 单个消息转换 dropPrefix 您可以覆盖整个主题名称:

"transforms":"dropPrefix",
"transforms.dropPrefix.regex":"A.B.C",                 // whole created topic name
"transforms.dropPrefix.replacement":"existing-topic"   // whole exisiting topic name

而且它可以与regex一起工作,因此如果您使用多个表/集合,并且您创建的主题名称不是常量,那么您应该能够使其成为动态的。
从技术上讲,我删除了整个主题名,然后添加了一个新的主题名,这不是最好的解决方案。

相关问题