我使用的是kafka的0.9.0.0版本,我想计算一个主题中的消息数,而不使用管理脚本kafka-console-consumer.sh。我已经尝试了答案java中的所有命令,如何在apachekafka中获取一个主题中的消息数,但是没有一个得到结果。有人能帮帮我吗?
nszi6y051#
从技术上讲,您可以简单地使用主题中的所有消息并对它们进行计数:例子:
kafka-run-class.sh kafka.tools.SimpleConsumerShell --broker-list localhost:9092 --topic XYZ --partition 0*
然而 kafka.tools.GetOffsetShell 该方法将给出偏移量,而不是主题中消息的实际数量。这意味着,如果主题被压缩,那么如果您通过使用消息或读取偏移量来计算消息,您将得到两个不同的数字。主题压缩:https://kafka.apache.org/documentation.html#design_compactionbasics
kafka.tools.GetOffsetShell
evrscar22#
您也可以使用awk和一个简单的循环来实现这一点
for i in `kafka-run-class kafka.tools.GetOffsetShell --broker-list broker:9092 --time -1 --topic topic_name| awk -F : '{print $3}'`; do sum=$(($sum+$i)); done
drkbr07n3#
获取主题中的记录数
brokers="<broker1:port>" topic=<topic-name> sum_1=$(/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -1 | grep -e ':[[:digit:]]*:' | awk -F ":" '{sum += $3} END {print sum}') sum_2=$(/usr/hdp/current/kafka-broker/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $brokers --topic $topic --time -2 | grep -e ':[[:digit:]]*:' | awk -F ":" '{sum += $3} END {print sum}') echo "Number of records in topic ${topic}: "$((sum_1 - sum_2))
brjng4g34#
您可以尝试执行以下命令:
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test-topic --time -1
然后,将每个分区的所有计数相加。更新:java实现
Properties props = new Properties(); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); ...... try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Arrays.asList("your_topic")); Set<TopicPartition> assignment; while ((assignment = consumer.assignment()).isEmpty()) { consumer.poll(Duration.ofMillis(100)); } final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment); final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(assignment); assert (endOffsets.size() == beginningOffsets.size()); assert (endOffsets.keySet().equals(beginningOffsets.keySet())); Long totalCount = beginningOffsets.entrySet().stream().mapToLong(entry -> { TopicPartition tp = entry.getKey(); Long beginningOffset = entry.getValue(); Long endOffset = endOffsets.get(tp); return endOffset - beginningOffset; }).sum(); System.out.println(totalCount); }
qlckcl4x5#
您可以使用以下公式来汇总所有计数:
.../bin/kafka-run-class kafka.tools.GetOffsetShell --broker-list <<broker_1>>:9092,<<broker_2:9092>>... --topic <<your_topic_name>> --time -1 | while IFS=: read topic_name partition_id number; do echo "$number"; done | paste -sd+ - | bc
zu0ti5jz6#
如果你不想为“原创”Kafka剧本的麻烦买单,还有Kafka。基本思想是使用每个分区的最后一条消息将偏移相加(校正基于零的偏移)。让我们开发这个。
kafkacat -C -b <broker> -t <topic> -o -1 -f '%p\t%o\n'
这将输出如下内容(加上stderr上的“到达分区结尾”通知):
0 77 1 75 2 78
现在, kafkacat 不终止,但一直等待新消息。我们可以通过添加一个超时来避免这种情况(选择一个足够大的值,这样就可以获得给定环境中的所有分区):
kafkacat
timeout --preserve-status 1 kafkacat <snip>
现在我们可以继续计算第二列(每个列增加1个)--但是如果在该超时间隔期间有新消息,我们可能会得到如下结果:
0 77 1 75 2 78 1 76
所以我们必须考虑到这一点,这是很容易做到的一点 awk :
awk
timeout --preserve-status 1 kafkacat <snip> 2> /dev/null \ | awk '{lastOffsets[$1] = $2} END {count = 0; for (i in lastOffsets) { count += lastOffsets[i] + 1 }; print count}'
请注意,我们如何使用(哈希)Map来记住每个分区最后一次看到的偏移量,直到触发超时,然后在数组上循环以计算总和。
6条答案
按热度按时间nszi6y051#
从技术上讲,您可以简单地使用主题中的所有消息并对它们进行计数:
例子:
然而
kafka.tools.GetOffsetShell
该方法将给出偏移量,而不是主题中消息的实际数量。这意味着,如果主题被压缩,那么如果您通过使用消息或读取偏移量来计算消息,您将得到两个不同的数字。主题压缩:https://kafka.apache.org/documentation.html#design_compactionbasics
evrscar22#
您也可以使用awk和一个简单的循环来实现这一点
drkbr07n3#
获取主题中的记录数
brjng4g34#
您可以尝试执行以下命令:
然后,将每个分区的所有计数相加。
更新:java实现
qlckcl4x5#
您可以使用以下公式来汇总所有计数:
zu0ti5jz6#
如果你不想为“原创”Kafka剧本的麻烦买单,还有Kafka。
基本思想是
使用每个分区的最后一条消息
将偏移相加(校正基于零的偏移)。
让我们开发这个。
这将输出如下内容(加上stderr上的“到达分区结尾”通知):
现在,
kafkacat
不终止,但一直等待新消息。我们可以通过添加一个超时来避免这种情况(选择一个足够大的值,这样就可以获得给定环境中的所有分区):现在我们可以继续计算第二列(每个列增加1个)--但是如果在该超时间隔期间有新消息,我们可能会得到如下结果:
所以我们必须考虑到这一点,这是很容易做到的一点
awk
:请注意,我们如何使用(哈希)Map来记住每个分区最后一次看到的偏移量,直到触发超时,然后在数组上循环以计算总和。