Kafka连接在飞行中添加更多的主题

ymzxtsji  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(291)

我有一个ElasticSearch kafka-connect 一些主题。
具有以下配置:

{
    connection.url": "https://my-es-cluster:443",
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.ignore": "true",
    "topics": "topic1,topic2",
    ...
}

我可以在它运行时添加更多的主题吗?
会发生什么?
如果我从列表中删除一些主题,然后再添加它们呢。
我想添加一个新的 topic3 在这里:

{
    ...
    "topics": "topic1,topic2,topic3",
    ...
}

我要删除什么 topic2 ? 其他主题是否会被重复使用

{
    ...
    "topics": "topic1,topic3",
    ...
}
lfapxunr

lfapxunr1#

既然你已经有了 kafka 以及 kafka-connect 运行时,可以使用的restapi kafka-connect 你自己检查一下:https://docs.confluent.io/current/connect/references/restapi.html
如果添加新主题( topic3 ),将使用该主题中当前的所有邮件(根据保留策略)。

PUT http://kafka-connect:8083/connectors/my-test-connector/config
{
   ...
   "topics": "topic1,topic2,topic3",
   ...
}

检查此连接器的状态和配置:

GET http://kafka-connect:8083/connectors/my-test-connector

如果要禁用某个主题,只需使用 PUT 更新该连接器的配置。

PUT http://kafka-connect:8083/connectors/my-test-connector/config
{
   ...
   "topics": "topic1,topic3",
   ...
}

什么都不会改变 topic1 以及 topic3 . 只是 topic2 不会再被消耗了。
但是如果你想把它还给我 topic2 将从上次提交的偏移量开始使用,而不是从开始使用。
对于每个 consumer group 上次提交 offset 是存储的,从配置中删除主题一段时间并不重要。在这种情况下,消费者群体将 connect-my-test-connector .
即使你删除了连接器( DELETE http://kafka-connect:8083/connectors/my-test-connector )然后用相同的名称再次创建它,偏移量将被保存,并且当您删除它时,消费将从它们开始继续(注意保留政策,通常是7天)。

相关问题