注:Kafka版本使用: kafka_2.11-0.9.0.1
我有一个主题叫 test-kafka
作为 3 partitions
以及 1 replication factor
这个主题有一些字符串数据,例如。 key and value pair
在每个分区中。
当我试图通过消费者api获取记录时, consumer.poll
函数没有获取任何记录并给出 records.count =0
但是当我用同样的逻辑来处理 single parition
然后它运行良好并获取记录。
我以为Kafka会在内部处理多个分区,并从每个分区中逐个获取记录,我只需要 subscribe
到 topic
.
有人能帮忙吗?
供参考的代码段:
KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Arrays.asList(topic));
while(true)
{
ConsumerRecords<String, String> records = consumer.poll(100);
System.out.println(records.count());
for(ConsumerRecord<String,String> record: records)
{
System.out.printf("Key: %s, Value = %s", record.key(), record.value());
}
}
上面的代码正在返回 "zero"
在记录中计数。
1条答案
按热度按时间13z8s7eq1#
尝试使用更长的轮询间隔。我尝试了100毫秒我的主题,但通话超时之前,任何消息可以检索。换成1000毫秒对我很有效。