我使用的是springkafka和springkafka测试版本1.0.2.release。
在我的一个测试中,我的应用程序使用kafkatemplate和大部分默认配置设置,将一行100条记录发送到嵌入kafka示例上的单个topicpartion。
我使用kafkatestutils.getrecords(consumer)方法尝试从kafka示例获取记录,并验证它们是否都已发送。
第一次调用getrecords时,我只收到一条记录。如果我再打一次,我会得到另外99个。
如果我显式地将consumer的位置设置为topicpartition的开头,然后调用getrecords,我将得到全部100个。
为什么getrecords第一次只能得到一个记录?有没有更好的方法一次获得所有100个,然后通过显式调用seektobegining在消费者身上?
2条答案
按热度按时间p4rjhz4m1#
很可能只是一个比赛条件-消费者坐在
poll()
代理在第一条消息到达时立即发送它。请参见属性
fetch.min.bytes
以及fetch.max.wait.ms
在Kafka的文件里。fetch.min.bytes
默认情况下为1。编辑
你也可以试试
flush()
使KafkaTemplate
打电话之前getRecords()
.但是,您的测试不应该真正依赖于在一次获取中获取所有消息—太脆弱了。
92dk7w1h2#
这听起来像是时间问题。很可能你第一次打电话时只有一条信息
poll()
-该方法不能保证将获取多少消息。当你编写代码时,你不应该假设你会一次收到x条记录。Kafka0.10中有一个消费属性max.poll.records
出于测试目的,您可能希望设置为1,然后执行接收循环,直到轮询了所有100个。