Kafka Strimzi mirrormaker2使用者偏移量未转换到目标群集

dl5txlt9  于 2023-11-16  发布在  Apache
关注(0)|答案(1)|浏览(105)

我们使用Strimzi Kafka Maker2将一些主题从一个Kafka集群复制到另一个Kafka集群(单向),以实现灾难恢复。
作为一个poc,我创建了一个快速生产者和一个慢速消费者的示例,以模拟滞后,并希望确保源集群中的最新消费者偏移量定期复制到目标集群。这里的目标是当主区域关闭时,DR区域应该继续接近源集群的最新偏移量。
但是,消费者偏移量不会被复制到目标集群,其值仍为1。
下面是应用的配置。Kafka版本3.5.1

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaMirrorMaker2
metadata:
  name: iflight-mm2-cluster
  namespace: kafka
  labels:
    app: iflight-mm2-cluster
spec:
  version: 3.5.1
  replicas: 1
  connectCluster: my-cluster-target
  clusters:
    - alias: my-cluster-source
      bootstrapServers: iflight-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092
    - alias: my-cluster-target
      bootstrapServers: iflight-dr-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092
      config:
        config.storage.replication.factor: 3
        offset.storage.replication.factor: 3
        status.storage.replication.factor: 3
  mirrors:
    - sourceCluster: my-cluster-source
      targetCluster: my-cluster-target
      sourceConnector:
        tasksMax: 10
        config:
          replication.factor: 3
          offset-syncs.topic.replication.factor: 3
          sync.topic.acls.enabled: "false"
          refresh.topics.enabled: "true"
          refresh.topics.interval.seconds: 5
          sync.group.offsets.enabled: "true"
          emit.checkpoints.enabled: "true"
          replication.policy.separator: ""
          replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
      heartbeatConnector:
        config:
          heartbeats.topic.replication.factor: 3
          replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
          replication.policy.separator: ""
      checkpointConnector:
        config:
          checkpoints.topic.replication.factor: 3
          refresh.groups.enabled: "true"
          refresh.groups.interval.seconds: 5
          sync.topics.configs.enabled: "true"
          sync.group.offsets.enabled: "true"
          sync.group.offsets.interval.seconds: 5
          emit.checkpoints.enabled: "true"
          emit.checkpoints.interval.seconds: 5
          replication.policy.separator: ""
          replication.policy.class: "org.apache.kafka.connect.mirror.IdentityReplicationPolicy"
      topicsPattern: IFLIGHT-.*
      groupsPattern: iflight_.*

字符串

源集群,消费者偏移量:2570


的数据

目标集群consumer offset:1(启动consumer连接到目标集群,查看是从哪个offset开始消耗的,值为51。由于batchSize为50,第一批消耗后,值为51)。



是否存在未转换的消费者偏移的任何配置缺失?
我已经发布了另一个相关的问题,同样的场景也在mirrormaker2中尝试过。上面的strimzi配置相当于mirrormaker2中的this配置。请提出任何遗漏的地方。

92dk7w1h

92dk7w1h1#

我们也有类似的问题,虽然滞后不是那么高。对我们来说,这是设置,偏移滞后最大值默认设置为100。我们将其更改为0,以便每当偏移量更改时同步。

sourceConnector:
  config:
    offset.lag.max: 0

字符串

相关问题