我给Kafkav。0.8和java producer api。如果邮件大小约为15 mb,我会得到一个 MessageSizeTooLargeException
. 我已经试过了 message.max.bytes
到40MB,但我还是得到了例外。小消息没有问题。
(例外情况出现在producer中,我在此应用程序中没有使用者。)
我能做些什么来消除这个异常?
我的示例生产者配置
private ProducerConfig kafkaConfig() {
Properties props = new Properties();
props.put("metadata.broker.list", BROKERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
props.put("message.max.bytes", "" + 1024 * 1024 * 40);
return new ProducerConfig(props);
}
错误日志:
4709 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN kafka.producer.async.DefaultEventHandler - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler - Failed to send requests for topics datasift with correlation ids in [213,224]
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
7条答案
按热度按时间cmssoen21#
您需要调整三(或四)个属性:
消费者方面:
fetch.message.max.bytes
-这将决定使用者可以获取的消息的最大大小。经纪人方面:
replica.fetch.max.bytes
-这将允许代理中的副本在集群内发送消息,并确保消息被正确复制。如果这个值太小,那么消息将永远不会被复制,因此,使用者将永远看不到消息,因为消息将永远不会被提交(完全复制)。经纪人方面:
message.max.bytes
-这是代理可以从生产者接收的最大消息大小。代理端(每个主题):
max.message.bytes
-这是代理允许附加到主题的最大消息大小。此大小经过预压缩验证(默认为经纪人的message.max.bytes
.)我发现第二个问题很难解决——你不会从Kafka那里得到任何异常、消息或警告,所以在发送大消息时一定要考虑到这一点。
goucqfw62#
要记住的一点是
message.max.bytes
属性必须与使用者的fetch.message.max.bytes
财产。fetch大小必须至少与最大消息大小一样大,否则可能会出现生产者发送的消息比使用者可以使用/获取的消息大的情况。也许值得一看。你用的是哪个版本的Kafka?同时提供一些你得到的详细信息。有没有像。。。
payload size of x larger than 1000000
记录在案?blmhpbnm3#
“笑男”的回答相当准确。不过,我还是想向KafkaMaven斯蒂芬马雷克(stephanemaarek)提出一个建议。
Kafka不是用来处理大量信息的。
您的api应该使用云存储(例如awss3),只需将s3的引用推送到kafka或任何消息代理。你必须找到一个地方来保存你的数据,也许它是一个网络驱动器,也许它是什么,但它不应该是消息代理。
现在,如果你不想用上面的方法
消息的最大大小是1mb(代理中的设置称为
message.max.bytes
)Apache·Kafka。如果您真的非常需要它,您可以增加这个大小,并确保为您的生产者和消费者增加网络缓冲区。如果您真的关心拆分消息,请确保每个拆分的消息都具有完全相同的键,以便将其推送到同一分区,并且您的消息内容应该报告一个“part id”,以便您的使用者可以完全重建消息。
如果您的消息是基于文本的(gzip、snappy、lz4压缩),那么您还可以探索压缩,这可能会减少数据大小,但不是神奇的。
同样,您必须使用一个外部系统来存储该数据,并且只需将一个外部引用推送到kafka。这是一个非常普遍的架构,你应该去和广泛接受。
请记住,只有当信息量巨大而不是大小时,Kafka才能发挥最佳效果。
资料来源:https://www.quora.com/how-do-i-send-large-messages-80-mb-in-kafka
5jdjgkvh4#
对于使用landoop kafka的用户:您可以在以下环境变量中传递配置值:
如果使用rdkafka,则在producer配置中传递message.max.bytes,如下所示:
同样,对于消费者来说,
lf5gs5x25#
这样做的目的是让Kafka生产者向Kafka经纪人发送的消息大小相等,然后由Kafka消费者接收。
Kafka制作人-->Kafka经纪人-->Kafka消费者
假设要求发送15mb的消息,那么生产者、代理和使用者三者都需要同步。
Kafka生产者发送15 mb-->Kafka经纪人允许/存储15 mb-->Kafka消费者接收15 mb
因此,设置应为:
a) 关于经纪人:
b) 关于消费者:
os8fio9y6#
您需要覆盖以下属性:
代理配置($kafka\u home/config/server.properties)
replica.fetch.max.字节
message.max.字节
消费者配置($kafka\u home/config/consumer.properties)
这一步对我不起作用。我把它添加到消费者应用程序中,效果很好
fetch.message.max.字节
重新启动服务器。
有关详细信息,请参阅此文档:http://kafka.apache.org/08/configuration.html
8ljdwjyq7#
Kafka0.10和新的消费者相比,需要做一些小的改变
经纪人:没有变化,你还是需要增加财产
message.max.bytes
以及replica.fetch.max.bytes
.message.max.bytes
必须等于或小于()replica.fetch.max.bytes
.生产商:增加
max.request.size
发送更大的信息。消费者:增加
max.partition.fetch.bytes
接收更大的信息。()阅读评论以了解更多信息
message.max.bytes
<=replica.fetch.max.bytes