我使用下面的代码创建一个生产者,它产生大约2000条消息。
public class ProducerDemoWithCallback {
public static void main(String[] args) {
final Logger logger = LoggerFactory.getLogger(ProducerDemoWithCallback.class);
String bootstrapServers = "localhost:9092";
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// create the producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i=0; i<2000; i++ ) {
// create a producer record
ProducerRecord<String, String> record =
new ProducerRecord<String, String>("TwitterProducer", "Hello World " + Integer.toString(i));
// send data - asynchronous
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
// executes every time a record is successfully sent or an exception is thrown
if (e == null) {
// the record was successfully sent
logger .info("Received new metadata. \n" +
"Topic:" + recordMetadata.topic() + "\n" +
"Partition: " + recordMetadata.partition() + "\n" +
"Offset: " + recordMetadata.offset() + "\n" +
"Timestamp: " + recordMetadata.timestamp());
} else {
logger .error("Error while producing", e);
}
}
});
}
// flush data
producer.flush();
// flush and close producer
producer.close();
}
}
我想计算这些消息并得到int值。我使用这个命令,它的工作,但我试图得到这个计数使用代码。
"bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic TwitterProducer --time -1"
结果是
- TwitterProducer:0:2000
以编程方式执行相同操作的代码如下所示,但我不确定这是否是获取计数的正确方法:
int valueCount = (int) recordMetadata.offset();
System.out.println("Offset value " + valueCount);
有人能帮我得到Kafka消息计数偏移值使用代码。
2条答案
按热度按时间0ejtzxu11#
您可以查看getoffsetshell的实现细节。
下面是用java重新编写的简化代码:
它为每个主题的分区和总和(消息总数)生成偏移量,如:
dbf7pr2w2#
你为什么要得到这个值?如果你分享更多关于目的的细节,我可以给你更多好的提示。
对于您的最后一个问题,使用偏移量值获取邮件计数是不正确的。如果您的主题有一个分区,而生产者是一个分区,则可以使用它。您需要考虑主题有几个分区。
如果要获取每个生产者的消息数,可以在oncompletion()的回调函数中对其进行计数
或者,您可以使用消费客户端获得最后一个偏移量,如下所示: