我正在建造一个 kafka
java中的consumer,它将只显示特定对象的分区(我有10个分区)和偏移量 topic
以及 group id
. 我的当前代码显示给定输入的每条记录(或每行数据)。如果我有10个分区和15行数据,它将显示15行和分区的多个示例。
以下是我对消费者的设置:
private static Consumer<Long, String> createConsumer() {
System.out.println("CREATE CONSUMER");
//Configure consumer settings/properties
final Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// Create the consumer using props.
final Consumer<Long, String> consumer = new KafkaConsumer<>(props);
// Subscribe to the topic.
consumer.subscribe(Collections.singletonList(TOPIC));
return consumer;
}
下面是显示输出的代码:
while (noRecordsCount < giveUp)
{
final ConsumerRecords<Long, String> consumerRecords = consumer.poll(500);
if (consumerRecords.count() == 0)
{
noRecordsCount++;
if (noRecordsCount > giveUp) break;
else continue;
}
//Stores each topic and partition to a specific array list for easier output manipulation
consumerRecords.forEach(record -> {
partitionrecord.add(record.partition());
offsetrecord.add(record.offset());
System.out.printf("Consumer Record: %s (%d, %d)"+"\n", TOPIC, record.partition(), record.offset());
});
}
代码输出:
预期输出显示的是每个分区的示例,而不是每个记录的示例(主题名称、分区号、偏移量):
我需要做的是显示10个分区,而不是显示每个(15)记录及其特定信息(偏移量、分区、值等)。我需要在代码中添加任何特定的命令或函数吗?我是新来的堆栈溢出,我道歉,如果我的查询很长。
1条答案
按热度按时间6tdlim6h1#
对于分配给使用者示例的所有分区,您总是会获得从其开始的最新偏移量之后的所有记录。
如果运行10个使用者,则每个示例只能看到一个分区,但仍然可以看到所有偏移量。
没有只获取一条记录的设置,因为这取决于生产者在消费者开始等待消息之后只均匀地发送n条消息。
对于分配了多个分区的使用者示例,也不能保证跨分区排序
但是,您可以使用treemap或max heap数据结构来存储数据点,然后按顺序在分区上循环并输出每个分区的最大消耗偏移量
换句话说,您当前正在打印每个记录,而不是在分区上的所有循环之后打印,因此您将获得第一个显示的输出
所以,在“Kafka”里,你没有办法做你想做的事情,但你要问的问题是,在你得到记录时,你是如何批处理记录的,然后只存储最大值,最后在哪里输出信息。
注:以下为
GetOffsetShell
命令已经可以查询所有分区的最大当前偏移量