在Kafka 2.7.0中,我使用MirroMaker 2.0作为Kafka-connect连接器,将所有主题从主Kafka集群复制到备份集群。
除了__consumer_offsets
之外,所有主题都被完美地复制。以下是连接配置:
{
"name": "test-connector",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"topics.blacklist": "some-random-topic",
"replication.policy.separator": "",
"source.cluster.alias": "",
"target.cluster.alias": "",
"exclude.internal.topics":"false",
"tasks.max": "10",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"source.cluster.bootstrap.servers": "xx.xx.xxx.xx:9094",
"target.cluster.bootstrap.servers": "yy.yy.yyy.yy:9094",
"topics": "test-topic-from-primary,primary-kafka-connect-offset,primary-kafka-connect-config,primary-kafka-connect-status,__consumer_offsets"
}
}
在类似的问题here中,接受的答案如下:
在consumer.config中添加以下内容:
exclude.internal.topics=false
在producer.config中添加以下内容:
client.id=__admin_client
在配置中的何处添加这些内容?
这里的连接器配置属性没有这样的名为client.id
的属性,我已经将exclude.internal.topics
的值设置为false
。
我是不是漏掉了什么?
更新
我了解到Kafka 2.7及更高版本支持使用MirrorCheckpointTask
的自动消费者偏移同步,如here所述。
我为此创建了一个连接器,具有以下配置:
{
"name": "mirror-checkpoint-connector",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"sync.group.offsets.enabled": "true",
"source.cluster.alias": "",
"target.cluster.alias": "",
"exclude.internal.topics":"false",
"tasks.max": "10",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"source.cluster.bootstrap.servers": "xx.xx.xxx.xx:9094",
"target.cluster.bootstrap.servers": "yy.yy.yyy.yy:9094",
"topics": "__consumer_offsets"
}
}
还是没有帮助。这是正确的做法吗?有什么需要吗?
2条答案
按热度按时间vbopmzt11#
你不想复制connsumer_offsets。由于各种原因,从src到目标群集的偏移量将不相同。
MirrorMaker2提供了进行偏移平移的能力。它将使用从src集群生成的转换偏移量填充目标集群。https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0
e4eetjau2#
默认情况下忽略__consumer_offsets
你需要覆盖这个配置