我运行的是kafka2.11-0.9.0.0和一个基于java的生产者/消费者。对于70 kb的消息,一切正常。然而,在生产者将一个更大的70mb消息排队后,kafka似乎停止了向消费者传递消息。i、 不仅大消息没有被传递,而且随后的小消息也没有被传递。我知道producer成功了,因为我使用了kafka回调进行确认,并且我可以在kafka消息日志中看到消息。
Kafka配置自定义更改:
message.max.bytes=200000000
replica.fetch.max.bytes=200000000
使用者配置:
props.put("fetch.message.max.bytes", "200000000");
props.put("max.partition.fetch.bytes", "200000000");
2条答案
按热度按时间dphi5xsq1#
有帮助的是提高了java堆的大小。
7rtdyuoh2#
您需要增加消费者可以消费的消息的大小,这样它就不会在试图阅读太大的消息时陷入困境。
服务器将返回的每个分区的最大数据量。请求使用的最大总内存为#partitions*max.partition.fetch.bytes。此大小必须至少与服务器允许的最大邮件大小一样大,否则生产者可能发送的邮件比使用者可以获取的邮件大。如果发生这种情况,消费者可能会在尝试获取某个分区上的大消息时陷入困境。