java,如何在ApacheKafka中获取一个主题中的消息数

zte4gxcn  于 2021-06-08  发布在  Kafka
关注(0)|答案(17)|浏览(506)

我正在使用apachekafka进行消息传递。我已经用java实现了生产者和消费者。我们如何获得一个主题中的消息数?

noj0wjuj

noj0wjuj1#

我发现的最简单的方法是使用kafdroprestapi /topic/topicName 并指定键: "Accept" /价值: "application/json" 以获取json响应。
这里有记录。

6ovsh4lw

6ovsh4lw2#

使用https://prestodb.io/docs/current/connector/kafka-tutorial.html
一个超级sql引擎,由facebook提供,连接多个数据源(cassandra、kafka、jmx、redis…)。
prestodb是作为一个服务器运行的,带有可选的worker(有一个没有额外worker的独立模式),然后使用一个小的可执行jar(称为presto cli)进行查询。
配置好presto服务器后,就可以使用传统的sql:

SELECT count(*) FROM TOPIC_NAME;
muk1a3rh

muk1a3rh3#

ConsumerOffsetChecker 不再支持,您可以使用此命令检查主题中的所有邮件:

bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand \
    --group my-group \
    --bootstrap-server localhost:9092 \
    --describe

哪里 LAG 主题分区中的邮件数:

你也可以尝试使用Kafka卡特。这是一个开源项目,可以帮助您从主题和分区读取消息并将它们打印到stdout。下面是一个从中读取最后10条消息的示例 sample-kafka-topic 主题,然后退出:

kafkacat -b localhost:9092 -t sample-kafka-topic -p 0 -o -10 -e
nnsrf1az

nnsrf1az4#

使用kafka 2.11-1.0.0的java客户端,可以执行以下操作:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList("test"));
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

            // after each message, query the number of messages of the topic
            Set<TopicPartition> partitions = consumer.assignment();
            Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
            for(TopicPartition partition : offsets.keySet()) {
                System.out.printf("partition %s is at %d\n", partition.topic(), offsets.get(partition));
            }
        }
    }

输出如下:

offset = 10, key = null, value = un
partition test is at 13
offset = 11, key = null, value = deux
partition test is at 13
offset = 12, key = null, value = trois
partition test is at 13
kgqe7b3p

kgqe7b3p5#

我也有同样的问题,我是这样做的,从一个Kafka消费者,在Kotlin:

val messageCount = consumer.listTopics().entries.filter { it.key == topicName }
    .map {
        it.value.map { topicInfo -> TopicPartition(topicInfo.topic(), topicInfo.partition()) }
    }.map { consumer.endOffsets(it).values.sum() - consumer.beginningOffsets(it).values.sum()}
    .first()

非常粗糙的代码,因为我刚刚得到这个工作,但基本上你想从结束偏移量减去主题的开始偏移量,这将是主题的当前消息计数。
不能因为其他配置(清理策略、保留ms等)而仅仅依赖结束偏移量,这些配置最终可能会导致从主题中删除旧邮件。偏移量只向前“移动”,所以beggining偏移量将向前移动到更接近结束偏移量的位置(如果主题现在不包含消息,则最终移动到相同的值)。
基本上,结束偏移量表示通过该主题的消息总数,两者之间的差异表示该主题当前包含的消息数。

7jmck4yq

7jmck4yq6#

在Kafka管理器的最新版本中,有一个标题为“最近偏移量总和”的列。

jjjwad0x

jjjwad0x7#

有时,我们的兴趣在于知道每个分区中的消息数,例如,在测试自定义分区器时。随后的步骤已经过测试,可以使用confluent 3.2中的kafka 0.10.2.1-2。有了Kafka的主题, kt 以及以下命令行:

$ kafka-run-class kafka.tools.GetOffsetShell \
  --broker-list host01:9092,host02:9092,host02:9092 --topic kt

打印示例输出,显示三个分区中的消息计数:

kt:2:6138
kt:1:6123
kt:0:6137

根据主题的分区数,行数可以是更多或更少。

7qhs6swi

7qhs6swi8#

要获取为主题存储的所有消息,可以将使用者搜索到每个分区的流的开头和结尾,并对结果求和

List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
        .map(p -> new TopicPartition(topic, p.partition()))
        .collect(Collectors.toList());
    consumer.assign(partitions); 
    consumer.seekToEnd(Collections.emptySet());
Map<TopicPartition, Long> endPartitions = partitions.stream()
        .collect(Collectors.toMap(Function.identity(), consumer::position));
    consumer.seekToBeginning(Collections.emptySet());
System.out.println(partitions.stream().mapToLong(p -> endPartitions.get(p) - consumer.position(p)).sum());
efzxgjgh

efzxgjgh9#

如果您可以访问服务器的jmx接口,则起始偏移量和结束偏移量位于:

kafka.log:type=Log,name=LogStartOffset,topic=TOPICNAME,partition=PARTITIONNUMBER
kafka.log:type=Log,name=LogEndOffset,topic=TOPICNAME,partition=PARTITIONNUMBER

(您需要更换 TOPICNAME & PARTITIONNUMBER ). 请记住,您需要检查给定分区的每个副本,或者您需要找出哪个代理是给定分区的领导者(这可能会随着时间的推移而改变)。
或者,您可以使用kafka消费者方法 beginningOffsets 以及 endOffsets .

wfypjpf4

wfypjpf410#

从消费者的Angular 来看,唯一能想到的方法就是实际消费这些消息,然后对它们进行计数。
kafka代理为自启动以来接收的消息数公开jmx计数器,但您无法知道其中有多少已被清除。
在最常见的场景中,kafka中的消息最好被看作是一个无限流,而获取当前保存在磁盘上的离散值是不相关的。此外,当处理一组代理时,事情变得更加复杂,这些代理在一个主题中都有一部分消息。

cgyqldqp

cgyqldqp11#

apache kafka命令获取主题的所有分区上未处理的消息:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group

印刷品:

Group      Topic        Pid Offset          logSize         Lag             Owner
test_group test         0   11051           11053           2               none
test_group test         1   10810           10812           2               none
test_group test         2   11027           11028           1               none

第6列是未处理的消息。把它们加起来如下:

kafka-run-class kafka.tools.ConsumerOffsetChecker 
    --topic test --zookeeper localhost:2181 
    --group test_group 2>/dev/null | awk 'NR>1 {sum += $6} 
    END {print sum}'

awk读取行,跳过标题行,将第6列相加,最后打印总和。
印刷品

5
zbsbpyhn

zbsbpyhn12#

运行以下命令(假设 kafka-console-consumer.sh 在路上):

kafka-console-consumer.sh  --from-beginning \
--bootstrap-server yourbroker:9092 --property print.key=true  \
--property print.value=false --property print.partition \
--topic yourtopic --timeout-ms 5000 | tail -n 10|grep "Processed a total of"
c86crjj0

c86crjj013#

Kafka文献节选
0.9.0.0中的弃用
kafka-consumer-offset-checker.sh(kafka.tools.consumerofsfsetchecker)已被弃用。接下来,请使用kafka-consumer-groups.sh(kafka.admin.consumergroupcommand)实现此功能。
我运行的kafka代理服务器和客户端都启用了ssl。下面是我使用的命令 kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --list --command-config /tmp/ssl_config kafka-consumer-groups.sh --bootstrap-server Broker_IP:Port --command-config /tmp/ssl_config --describe --group group_name_x 其中/tmp/ssl\u config如下

security.protocol=SSL
ssl.truststore.location=truststore_file_path.jks
ssl.truststore.password=truststore_password
ssl.keystore.location=keystore_file_path.jks
ssl.keystore.password=keystore_password
ssl.key.password=key_password
oxcyiej7

oxcyiej714#

我自己也没试过,但似乎有道理。
你也可以使用 kafka.tools.ConsumerOffsetChecker (来源)。

f45qwnt8

f45qwnt815#

你可以用Kafka托尔。请检查此链接->http://www.kafkatool.com/download.html
kafka工具是一个用于管理和使用apachekafka集群的gui应用程序。它提供了一个直观的用户界面,允许用户快速查看Kafka集群中的对象以及集群主题中存储的消息。

相关问题