Kafka 如何使用MirrorMaker 2.0迁移消费者偏移?

b1uwtaje  于 2023-05-16  发布在  Apache
关注(0)|答案(2)|浏览(340)

在Kafka 2.7.0中,我使用MirroMaker 2.0作为Kafka-connect连接器,将所有主题从主Kafka集群复制到备份集群。
除了__consumer_offsets之外,所有主题都被完美地复制。以下是连接配置:

{
    "name": "test-connector",
    "config": {
      "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
      "topics.blacklist": "some-random-topic",
      "replication.policy.separator": "",
      "source.cluster.alias": "",
      "target.cluster.alias": "",
      "exclude.internal.topics":"false",
      "tasks.max": "10",
      "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
      "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
      "source.cluster.bootstrap.servers": "xx.xx.xxx.xx:9094",
      "target.cluster.bootstrap.servers": "yy.yy.yyy.yy:9094",
      "topics": "test-topic-from-primary,primary-kafka-connect-offset,primary-kafka-connect-config,primary-kafka-connect-status,__consumer_offsets"
    }
}

在类似的问题here中,接受的答案如下:
在consumer.config中添加以下内容:

exclude.internal.topics=false

在producer.config中添加以下内容:

client.id=__admin_client

在配置中的何处添加这些内容?
这里的连接器配置属性没有这样的名为client.id的属性,我已经将exclude.internal.topics的值设置为false
我是不是漏掉了什么?

更新

我了解到Kafka 2.7及更高版本支持使用MirrorCheckpointTask的自动消费者偏移同步,如here所述。
我为此创建了一个连接器,具有以下配置:

{
    "name": "mirror-checkpoint-connector",
    "config": {
      "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
      "sync.group.offsets.enabled": "true",
      "source.cluster.alias": "",
      "target.cluster.alias": "",
      "exclude.internal.topics":"false",
      "tasks.max": "10",
      "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
      "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
      "source.cluster.bootstrap.servers": "xx.xx.xxx.xx:9094",
      "target.cluster.bootstrap.servers": "yy.yy.yyy.yy:9094",
      "topics": "__consumer_offsets"
    }
}

还是没有帮助。这是正确的做法吗?有什么需要吗?

vbopmzt1

vbopmzt11#

你不想复制connsumer_offsets。由于各种原因,从src到目标群集的偏移量将不相同。
MirrorMaker2提供了进行偏移平移的能力。它将使用从src集群生成的转换偏移量填充目标集群。https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0

e4eetjau

e4eetjau2#

默认情况下忽略__consumer_offsets

topics.exclude = [.*[\-\.]internal, .*\.replica, __.*]

你需要覆盖这个配置

相关问题