带自定义消费者组名称的Kafka Sink连接器

aoyhnmkz  于 2022-10-07  发布在  Kafka
关注(0)|答案(2)|浏览(248)

在Kafka Connect中,所有接收器连接器将使用命名转换为connect-connector_name的不同组。但我想使用自定义名称作为前缀。(我们可以在接收器配置name properties中这样做,但希望默认设置它)

我尝试在consumer.properties文件中设置它,但没有用。

有人知道它是怎么设置的吗?另外,如果我为我的所有接收器连接器设置一个组,会发生什么情况?

46qrfjad

46qrfjad1#

接收器任务的ConsumerConfig``group.id前缀始终为connect-

https://issues.apache.org/jira/browse/KAFKA-4400

consumer.properties(可选)用于kafka-console-consumer,而不是Connect API
如果我为所有接收器连接器设置一个组,会发生什么情况?

你是说有一个name的单个连接器?然后,您会希望tasks.max等于其消耗的所有主题的总分区。

如果您指的是多个连接器,则不能;同一Connect群集中的所有连接器都需要唯一的name/connector.class

mctunoxg

mctunoxg2#

您可以重写任何使用者或生产者属性。您必须在Worker配置中使用connector.client.config.override.policy = All(默认为None)。然后,您可以在属性consumer.override.group.id中为您的任务重写Consumer er.group.id。例如:

{
  "consumer.override.group.id": "testgroup",
  "name": "Elasticsearch",
  "config": {
  "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
  "topics": "orders",
  "tasks.max": 1,
  "connection.url": "http://elasticsearch:9200",
  "type.name": "type.name=kafkaconnect",
  "key.ignore": "true",
  "schema.ignore": "false",
  "transforms": "renameTopic",
  "transforms.renameTopic.type": "org.apache.kafka.connect.transforms.RegexRouter",
  "transforms.renameTopic.regex": "orders",
  "transforms.renameTopic.replacement": "orders-latest"
}'

文档在此处

如果从映像confluentinc/cp-kafka-connnect-base在docker中使用Kafka-CONNECT,则可以从环境变量CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY设置此配置

相关问题