我有一个ElasticSearch kafka-connect
一些主题。
具有以下配置:
{
connection.url": "https://my-es-cluster:443",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.ignore": "true",
"topics": "topic1,topic2",
...
}
我可以在它运行时添加更多的主题吗?
会发生什么?
如果我从列表中删除一些主题,然后再添加它们呢。
我想添加一个新的 topic3
在这里:
{
...
"topics": "topic1,topic2,topic3",
...
}
我要删除什么 topic2
? 其他主题是否会被重复使用
{
...
"topics": "topic1,topic3",
...
}
1条答案
按热度按时间lfapxunr1#
既然你已经有了
kafka
以及kafka-connect
运行时,可以使用的restapikafka-connect
你自己检查一下:https://docs.confluent.io/current/connect/references/restapi.html如果添加新主题(
topic3
),将使用该主题中当前的所有邮件(根据保留策略)。检查此连接器的状态和配置:
如果要禁用某个主题,只需使用
PUT
更新该连接器的配置。什么都不会改变
topic1
以及topic3
. 只是topic2
不会再被消耗了。但是如果你想把它还给我
topic2
将从上次提交的偏移量开始使用,而不是从开始使用。对于每个
consumer group
上次提交offset
是存储的,从配置中删除主题一段时间并不重要。在这种情况下,消费者群体将connect-my-test-connector
.即使你删除了连接器(
DELETE http://kafka-connect:8083/connectors/my-test-connector
)然后用相同的名称再次创建它,偏移量将被保存,并且当您删除它时,消费将从它们开始继续(注意保留政策,通常是7天)。