Kafka消费者第一次得到一张唱片时,从许多唱片中只得到一张?

niknxzdl  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(376)

我使用的是springkafka和springkafka测试版本1.0.2.release。
在我的一个测试中,我的应用程序使用kafkatemplate和大部分默认配置设置,将一行100条记录发送到嵌入kafka示例上的单个topicpartion。
我使用kafkatestutils.getrecords(consumer)方法尝试从kafka示例获取记录,并验证它们是否都已发送。
第一次调用getrecords时,我只收到一条记录。如果我再打一次,我会得到另外99个。
如果我显式地将consumer的位置设置为topicpartition的开头,然后调用getrecords,我将得到全部100个。
为什么getrecords第一次只能得到一个记录?有没有更好的方法一次获得所有100个,然后通过显式调用seektobegining在消费者身上?

p4rjhz4m

p4rjhz4m1#

很可能只是一个比赛条件-消费者坐在 poll() 代理在第一条消息到达时立即发送它。
请参见属性 fetch.min.bytes 以及 fetch.max.wait.ms 在Kafka的文件里。 fetch.min.bytes 默认情况下为1。
编辑
你也可以试试 flush() 使 KafkaTemplate 打电话之前 getRecords() .
但是,您的测试不应该真正依赖于在一次获取中获取所有消息—太脆弱了。

92dk7w1h

92dk7w1h2#

这听起来像是时间问题。很可能你第一次打电话时只有一条信息 poll() -该方法不能保证将获取多少消息。当你编写代码时,你不应该假设你会一次收到x条记录。Kafka0.10中有一个消费属性 max.poll.records 出于测试目的,您可能希望设置为1,然后执行接收循环,直到轮询了所有100个。

相关问题