不镜像消费群体偏移

8aqjt8rx  于 2021-06-05  发布在  Kafka
关注(0)|答案(3)|浏览(504)

我已设置mirrormaker2在两个dc之间复制数据。
我的财产,


# mm2.properties

name=source->dest
clusters=source, dest

source.bootstrap.servers=localhost:9091
dest.bootstrap.servers=localhost:9092

source->dest.enabled=true

offset.storage.partitions=2
config.storage.replication.factor=1
status.storage.replication.factor=1

启动时请看下面的。

[2020-02-16 07:31:07,547] INFO MirrorConnectorConfig values: 
    admin.timeout.ms = 60000
    checkpoints.topic.replication.factor = 3
    config.action.reload = restart
    config.properties.blacklist = [follower\.replication\.throttled\.replicas, leader\.replication\.throttled\.replicas, message\.timestamp\.difference\.max\.ms, message\.timestamp\.type, unclean\.leader\.election\.enable, min\.insync\.replicas]
    config.property.filter.class = class org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter
    connector.class = org.apache.kafka.connect.mirror.MirrorCheckpointConnector
    consumer.poll.timeout.ms = 1000
    emit.checkpoints.enabled = true
    emit.checkpoints.interval.seconds = 60
    emit.heartbeats.enabled = true
    emit.heartbeats.interval.seconds = 1
    enabled = true
    errors.log.enable = false
    errors.log.include.messages = false
    errors.retry.delay.max.ms = 60000
    errors.retry.timeout = 0
    errors.tolerance = none
    group.filter.class = class org.apache.kafka.connect.mirror.DefaultGroupFilter
    groups = [.*]
    groups.blacklist = [console-consumer-.*, connect-.*, __.*]
    header.converter = null
    heartbeats.topic.replication.factor = 3
    key.converter = null
    metric.reporters = null
    name = source->dest
    offset-syncs.topic.replication.factor = 3
    offset.lag.max = 100
    refresh.groups.enabled = true
    refresh.groups.interval.seconds = 600
    refresh.topics.enabled = true
    refresh.topics.interval.seconds = 600
    replication.factor = 2
    replication.policy.class = class org.apache.kafka.connect.mirror.DefaultReplicationPolicy
    replication.policy.separator = .
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    source.cluster.alias = source
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    sync.topic.acls.enabled = true
    sync.topic.acls.interval.seconds = 600
    sync.topic.configs.enabled = true
    sync.topic.configs.interval.seconds = 600
    target.cluster.alias = dest
    task.assigned.groups = null
    task.assigned.partitions = null
    tasks.max = 1
    topic.filter.class = class org.apache.kafka.connect.mirror.DefaultTopicFilter
    topics = [.*]
    topics.blacklist = [.*[\-\.]internal, .*\.replica, __.*]
    transforms = []
    value.converter = null
 (org.apache.kafka.connect.mirror.MirrorConnectorConfig:347)

我的数据正在按预期复制。源主题在目标集群中创建为源。。但是,消费群体的补偿并没有被复制。
已在源群集中启动使用者组。

./kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic test-1 --group test-1-group

消耗了几条消息并阻止了它。在此主题中发布了新消息,mirror maker还将数据镜像到目标集群。
我尝试使用来自目标集群的消息,如下所示。

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic source.test-1 --group test-1-group

因为我使用的是同一个用户组,所以我希望我的偏移量也能同步,并且不会使用与我在cluster1中使用的相同的消息。但是,仍然会消耗所有消息。有什么我不知道的吗。

qv7cva1a

qv7cva1a1#

我看到你在检查点上的配置

emit.checkpoints.enabled = true 
emit.checkpoints.interval.seconds = 60

因此,您的checkpoints主题将仅在60秒后反映新的更改。如果你立即尝试,它不会工作,所以,尝试后1分钟。

nkcskrwz

nkcskrwz2#

我的数据正在按预期复制。源主题在目标集群中创建为源。。但是,消费群体的补偿并没有被复制。
默认情况下,mm2不会从 kafka-console-consumer . 在启动日志中,我们可以看到 groups.blacklist = [console-consumer-.*, connect-.*, __.*] . 我相信你可以在你的 mm2.properties 配置文件。
因为我使用的是同一个用户组,所以我希望我的偏移量也能同步,并且不会使用与我在cluster1中使用的相同的消息。
一旦正确镜像了使用者组并启用了检查点,就应该有一个在目标集群中自动创建的内部主题(类似于 dest.checkpoints.internal ). 此检查点主题包含每个使用者组中镜像主题分区的源集群和目标集群中最后提交的偏移量。
然后,您可以使用kafka的remoteclusterutils实用程序类来转换这些偏移量,并获取的同步偏移量 source.test-1 Map到消费者最后承诺的 test-1 . 如果您最终使用java创建了一个消费者,那么您可以添加 RemoteClusterUtils 作为项目的依赖项:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>connect-mirror-client</artifactId>
    <version>2.4.0</version>
</dependency>

否则,很可能您将不得不编写一个 Package RemoteClusterUtils.java 得到平移的偏移量。这个功能或类似的东西看起来是作为mm2未来版本的一部分来计划的。

rbl8hiat

rbl8hiat3#

复制偏移非常重要,有几个基本原因:
Kafka是一个至少有一次的系统(忽略了宣传)。这意味着mirror maker,因为它建立在kafka消费者和生产者之上,每个消费者和生产者都可以超时/断开连接,这将导致某种程度的重复记录被传递到目标。这意味着偏移量不会在源和目标之间Map1:1。即使您尝试使用“精确一次”支持(mm2 kip明确表示它没有使用),它所能做的就是跳过部分交付的批,但是这些批仍然会占用目的地的偏移量
如果在源主题开始过期记录很久之后设置镜像,则目标主题将从偏移量0开始,而源主题的“最旧”偏移量将高得多。有人试图解决这个问题(见kip-391),但从未被接受
一般来说,不能保证镜像拓扑从单个源镜像到单个目标。例如,linkedin拓扑将多个源集群镜像到“聚合”层集群。对于这样的拓扑,Map偏移是没有意义的
在mm2 kip中,提到了一个“偏移同步主题”。在代码中,可以使用remoteclusterutils类在集群之间转换检查点:

Map<TopicPartition, Long> newOffsets = RemoteClusterUtils.translateOffsets(
   newClusterProperties, oldClusterName, consumerGroupId
);
consumer.seek(newOffsets);

这是从以下陈述中提取的-https://www.slideshare.net/confluentinc/disaster-recovery-with-mirrormaker-20-ryanne-dolan-cloudera-kafka-summit-london-2019
或者,您可以使用seek by timespamp api在目标上启动使用者组,直到数据传递到目标(或者传递到源,如果目标上的日志附加时间戳的代理设置不覆盖这些时间)的大致时间。为了安全起见,你需要倒卷一点。

相关问题