zookeeper如何从\uu consumer\u offset主题检索consumer offset?

ufj5ltwl  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(391)

根据armando ballaci提供的答案,这是“zookeeper在哪里存储Kafka集群和相关信息”的后续问题。
现在很明显,消费偏移量存储在kafka集群中的一个名为 __consumer_offsets . 很好,我只是想知道如何检索这些补偿工作。
主题不像RDB,在RDB上我们可以根据某个 predicate 查询任意数据。例如-如果数据存储在rdbms中,下面这样的查询可能会为某个使用者组的特定使用者获取主题的特定分区的使用者偏移量。 select consumer_offset__read, consumer_offset__commited from consumer_offset_table where consumer-grp-id="x" and partitionid="y" 但显然,这种检索是不可能的Kafka主题。那么,主题检索机制是如何工作的呢?有人能详细说明一下吗?
(来自kafka分区的数据是在fifo中读取的,如果遵循kafka使用者模型来检索特定的偏移量,则必须处理大量额外的数据,而且速度会很慢。所以我想知道是不是用别的方法……)

jgovgodb

jgovgodb1#

当我在日常工作中偶然发现这一点时,我可以在网上找到一些关于这一点的描述如下:
在kafka版本0.8.1.1中,消费者将其补偿提交给zookeeper。当存在大量偏移量(即,使用者计数*分区计数)时,zookeeper不能很好地伸缩(特别是对于写入)。幸运的是,kafka现在提供了一种理想的机制来存储消费补偿。消费者可以将他们的补偿写在一个持久的(复制的)和高度可用的主题上,从而在kafka中实现。使用者可以通过读取此主题来获取偏移量(尽管我们提供了内存中的偏移量缓存,以便更快地访问)。i、 例如,偏移提交是常规的生产者请求(这是廉价的),偏移获取是快速的内存查找。
Kafka的官方文档描述了该功能是如何工作的,以及如何将偏移从zookeeper迁移到Kafka。这个wiki提供了示例代码,演示如何使用新的基于kafka的偏移存储机制。

try {
        BlockingChannel channel = new BlockingChannel("localhost", 9092,
                BlockingChannel.UseDefaultBufferSize(),
                BlockingChannel.UseDefaultBufferSize(),
                5000 /* read timeout in millis */);
        channel.connect();
        final String MY_GROUP = "demoGroup";
        final String MY_CLIENTID = "demoClientId";
        int correlationId = 0;
        final TopicAndPartition testPartition0 = new TopicAndPartition("demoTopic", 0);
        final TopicAndPartition testPartition1 = new TopicAndPartition("demoTopic", 1);
        channel.send(new ConsumerMetadataRequest(MY_GROUP, ConsumerMetadataRequest.CurrentVersion(), correlationId++, MY_CLIENTID));
        ConsumerMetadataResponse metadataResponse = ConsumerMetadataResponse.readFrom(channel.receive().buffer());

        if (metadataResponse.errorCode() == ErrorMapping.NoError()) {
            Broker offsetManager = metadataResponse.coordinator();
            // if the coordinator is different, from the above channel's host then reconnect
            channel.disconnect();
            channel = new BlockingChannel(offsetManager.host(), offsetManager.port(),
                                          BlockingChannel.UseDefaultBufferSize(),
                                          BlockingChannel.UseDefaultBufferSize(),
                                          5000 /* read timeout in millis */);
            channel.connect();
        } else {
            // retry (after backoff)
        }
    }
    catch (IOException e) {
        // retry the query (after backoff)
    }
6yoyoihd

6yoyoihd2#

在kafka版本0.8.1.1中,消费者将其补偿提交给zookeeper。当存在大量偏移量(即,使用者计数*分区计数)时,zookeeper不能很好地伸缩(特别是对于写入)。幸运的是,kafka现在提供了一种理想的机制来存储消费补偿。消费者可以将他们的补偿写在一个持久的(复制的)和高度可用的主题上,从而在kafka中实现。使用者可以通过读取此主题来获取偏移量(尽管我们提供了内存中的偏移量缓存,以便更快地访问)。i、 例如,偏移提交是常规的生产者请求(这是廉价的),偏移获取是快速的内存查找。
Kafka的官方文档描述了该功能是如何工作的,以及如何将偏移从zookeeper迁移到Kafka。
其思想是,如果您需要所描述的功能,那么您需要将数据存储在rdbs、nosql数据库或elk堆栈中。一个好的模式将通过Kafka连接使用接收器连接器。kafka中的正常消息处理是通过consummer或流定义来完成的,这些流定义在事件发生时对事件做出React。在某些情况下,您当然可以寻求偏移量或时间戳,这是完全可能的。。。
在Kafka的最新版本中,偏移量不再保存在zookeeper中。所以zookeeper不参与场景处理的消费者。

相关问题