我们使用Strimzi Kafka Maker2将一些主题从一个Kafka集群复制到另一个Kafka集群(单向),以实现灾难恢复。
作为一个poc,我创建了一个快速生产者和一个慢速消费者的示例,以模拟滞后,并希望确保源集群中的最新消费者偏移量定期复制到目标集群。这里的目标是当主区域关闭时,DR区域应该继续接近源集群的最新偏移量。
但是,消费者偏移量不会被复制到目标集群,其值仍为1。
下面是应用的配置。Kafka版本3.5.1
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
name: iflight-mm2-cluster
namespace: kafka
labels:
app: iflight-mm2-cluster
spec:
version: 3.5.1
replicas: 1
connectCluster: my-cluster-target
clusters:
- alias: my-cluster-source
bootstrapServers: iflight-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092
- alias: my-cluster-target
bootstrapServers: iflight-dr-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092
config:
config.storage.replication.factor: 3
offset.storage.replication.factor: 3
status.storage.replication.factor: 3
mirrors:
- sourceCluster: my-cluster-source
targetCluster: my-cluster-target
sourceConnector:
tasksMax: 10
config:
replication.factor: 3
offset-syncs.topic.replication.factor: 3
sync.topic.acls.enabled: "false"
refresh.topics.enabled: "true"
refresh.topics.interval.seconds: 5
sync.group.offsets.enabled: "true"
emit.checkpoints.enabled: "true"
replication.policy.separator: ""
replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
heartbeatConnector:
config:
heartbeats.topic.replication.factor: 3
replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
replication.policy.separator: ""
checkpointConnector:
config:
checkpoints.topic.replication.factor: 3
refresh.groups.enabled: "true"
refresh.groups.interval.seconds: 5
sync.topics.configs.enabled: "true"
sync.group.offsets.enabled: "true"
sync.group.offsets.interval.seconds: 5
emit.checkpoints.enabled: "true"
emit.checkpoints.interval.seconds: 5
replication.policy.separator: ""
replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
topicsPattern: IFLIGHT-.*
groupsPattern: iflight_.*
字符串
源集群,消费者偏移量:2570
的数据
目标集群consumer offset:1(启动consumer连接到目标集群,查看是从哪个offset开始消耗的,值为51。由于batchSize为50,第一批消耗后,值为51)。
的
是否存在未转换的消费者偏移的任何配置缺失?
我已经发布了另一个相关的问题,同样的场景也在mirrormaker2中尝试过。上面的strimzi配置相当于mirrormaker2中的this配置。请提出任何遗漏的地方。
1条答案
按热度按时间92dk7w1h1#
我们也有类似的问题,虽然滞后不是那么高。对我们来说,这是设置,偏移滞后最大值默认设置为100。我们将其更改为0,以便每当偏移量更改时同步。
字符串