如何获取kafka生产者消息计数

mfpqipee  于 2021-06-05  发布在  Kafka
关注(0)|答案(2)|浏览(441)

我使用下面的代码创建一个生产者,它产生大约2000条消息。

public class ProducerDemoWithCallback {

    public static void main(String[] args) {

        final Logger logger = LoggerFactory.getLogger(ProducerDemoWithCallback.class);

String bootstrapServers = "localhost:9092";
    Properties properties = new Properties();
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    // create the producer
    KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

    for (int i=0; i<2000; i++ ) {
        // create a producer record
        ProducerRecord<String, String> record =
                new ProducerRecord<String, String>("TwitterProducer", "Hello World " + Integer.toString(i));

        // send data - asynchronous
        producer.send(record, new Callback() {
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                // executes every time a record is successfully sent or an exception is thrown
                if (e == null) {
                    // the record was successfully sent
                    logger .info("Received new metadata. \n" +
                            "Topic:" + recordMetadata.topic() + "\n" +
                            "Partition: " + recordMetadata.partition() + "\n" +
                            "Offset: " + recordMetadata.offset() + "\n" +
                            "Timestamp: " + recordMetadata.timestamp());

                } else {
                    logger .error("Error while producing", e);
                }
            }
        });
    }

    // flush data
    producer.flush();
    // flush and close producer
    producer.close();
  }
}

我想计算这些消息并得到int值。我使用这个命令,它的工作,但我试图得到这个计数使用代码。

"bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic TwitterProducer --time -1"

结果是

- TwitterProducer:0:2000

以编程方式执行相同操作的代码如下所示,但我不确定这是否是获取计数的正确方法:

int valueCount = (int) recordMetadata.offset();
 System.out.println("Offset value " + valueCount);

有人能帮我得到Kafka消息计数偏移值使用代码。

0ejtzxu1

0ejtzxu11#

您可以查看getoffsetshell的实现细节。
下面是用java重新编写的简化代码:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.*;
import java.util.stream.Collectors;

public class GetOffsetCommand {

    private static final Set<String> TopicNames = new HashSet<>();

    static {
        TopicNames.add("my-topic");
        TopicNames.add("not-my-topic");
    }

    public static void main(String[] args) {
        TopicNames.forEach(topicName -> {
            final Map<TopicPartition, Long> offsets = getOffsets(topicName);

            new ArrayList<>(offsets.entrySet()).forEach(System.out::println);
            System.out.println(topicName + ":" + offsets.values().stream().reduce(0L, Long::sum));
        });
    }

    private static Map<TopicPartition, Long> getOffsets(String topicName) {
        final KafkaConsumer<String, String> consumer = makeKafkaConsumer();
        final List<TopicPartition> partitions = listTopicPartitions(consumer, topicName);
        return consumer.endOffsets(partitions);
    }

    private static KafkaConsumer<String, String> makeKafkaConsumer() {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "get-offset-command");

        return new KafkaConsumer<>(props);
    }

    private static List<TopicPartition> listTopicPartitions(KafkaConsumer<String, String> consumer, String topicName) {
        return consumer.listTopics().entrySet().stream()
                .filter(t -> topicName.equals(t.getKey()))
                .flatMap(t -> t.getValue().stream())
                .map(p -> new TopicPartition(p.topic(), p.partition()))
                .collect(Collectors.toList());
    }
}

它为每个主题的分区和总和(消息总数)生成偏移量,如:

my-topic-0=184
my-topic-2=187
my-topic-4=189
my-topic-1=196
my-topic-3=243
my-topic:999
dbf7pr2w

dbf7pr2w2#

你为什么要得到这个值?如果你分享更多关于目的的细节,我可以给你更多好的提示。
对于您的最后一个问题,使用偏移量值获取邮件计数是不正确的。如果您的主题有一个分区,而生产者是一个分区,则可以使用它。您需要考虑主题有几个分区。
如果要获取每个生产者的消息数,可以在oncompletion()的回调函数中对其进行计数
或者,您可以使用消费客户端获得最后一个偏移量,如下所示:

Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "your-brokers");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

Consumer<Long, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("topic_name");

Collection<TopicPartition> partitions = consumer.assignment();

consumer.seekToEnd(partitions);

for(TopicPartition tp: partitions) {
    long offsetPosition = consumer.position(tp);
}

相关问题