如何使用kafka(超过15mb)发送大消息?

yrefmtwq  于 2021-06-07  发布在  Kafka
关注(0)|答案(7)|浏览(436)

我给Kafkav。0.8和java producer api。如果邮件大小约为15 mb,我会得到一个 MessageSizeTooLargeException . 我已经试过了 message.max.bytes 到40MB,但我还是得到了例外。小消息没有问题。
(例外情况出现在producer中,我在此应用程序中没有使用者。)
我能做些什么来消除这个异常?

我的示例生产者配置

private ProducerConfig kafkaConfig() {
    Properties props = new Properties();
    props.put("metadata.broker.list", BROKERS);
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    props.put("request.required.acks", "1");
    props.put("message.max.bytes", "" + 1024 * 1024 * 40);
    return new ProducerConfig(props);
}

错误日志:

4709 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 214 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
4869 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with    correlation id 217 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5035 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with   correlation id 220 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5198 [main] WARN  kafka.producer.async.DefaultEventHandler  - Produce request with correlation id 223 failed due to [datasift,0]: kafka.common.MessageSizeTooLargeException
5305 [main] ERROR kafka.producer.async.DefaultEventHandler  - Failed to send requests for topics datasift with correlation ids in [213,224]

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
at kafka.producer.async.DefaultEventHandler.handle(Unknown Source)
at kafka.producer.Producer.send(Unknown Source)
at kafka.javaapi.producer.Producer.send(Unknown Source)
cmssoen2

cmssoen21#

您需要调整三(或四)个属性:
消费者方面: fetch.message.max.bytes -这将决定使用者可以获取的消息的最大大小。
经纪人方面: replica.fetch.max.bytes -这将允许代理中的副本在集群内发送消息,并确保消息被正确复制。如果这个值太小,那么消息将永远不会被复制,因此,使用者将永远看不到消息,因为消息将永远不会被提交(完全复制)。
经纪人方面: message.max.bytes -这是代理可以从生产者接收的最大消息大小。
代理端(每个主题): max.message.bytes -这是代理允许附加到主题的最大消息大小。此大小经过预压缩验证(默认为经纪人的 message.max.bytes .)
我发现第二个问题很难解决——你不会从Kafka那里得到任何异常、消息或警告,所以在发送大消息时一定要考虑到这一点。

goucqfw6

goucqfw62#

要记住的一点是 message.max.bytes 属性必须与使用者的 fetch.message.max.bytes 财产。fetch大小必须至少与最大消息大小一样大,否则可能会出现生产者发送的消息比使用者可以使用/获取的消息大的情况。也许值得一看。
你用的是哪个版本的Kafka?同时提供一些你得到的详细信息。有没有像。。。 payload size of x larger than 1000000 记录在案?

blmhpbnm

blmhpbnm3#

“笑男”的回答相当准确。不过,我还是想向KafkaMaven斯蒂芬马雷克(stephanemaarek)提出一个建议。
Kafka不是用来处理大量信息的。
您的api应该使用云存储(例如awss3),只需将s3的引用推送到kafka或任何消息代理。你必须找到一个地方来保存你的数据,也许它是一个网络驱动器,也许它是什么,但它不应该是消息代理。
现在,如果你不想用上面的方法
消息的最大大小是1mb(代理中的设置称为 message.max.bytes )Apache·Kafka。如果您真的非常需要它,您可以增加这个大小,并确保为您的生产者和消费者增加网络缓冲区。
如果您真的关心拆分消息,请确保每个拆分的消息都具有完全相同的键,以便将其推送到同一分区,并且您的消息内容应该报告一个“part id”,以便您的使用者可以完全重建消息。
如果您的消息是基于文本的(gzip、snappy、lz4压缩),那么您还可以探索压缩,这可能会减少数据大小,但不是神奇的。
同样,您必须使用一个外部系统来存储该数据,并且只需将一个外部引用推送到kafka。这是一个非常普遍的架构,你应该去和广泛接受。
请记住,只有当信息量巨大而不是大小时,Kafka才能发挥最佳效果。
资料来源:https://www.quora.com/how-do-i-send-large-messages-80-mb-in-kafka

5jdjgkvh

5jdjgkvh4#

对于使用landoop kafka的用户:您可以在以下环境变量中传递配置值:

docker run -d --rm -p 2181:2181 -p 3030:3030 -p 8081-8083:8081-8083  -p 9581-9585:9581-9585 -p 9092:9092
 -e KAFKA_TOPIC_MAX_MESSAGE_BYTES=15728640 -e KAFKA_REPLICA_FETCH_MAX_BYTES=15728640  landoop/fast-data-dev:latest `

如果使用rdkafka,则在producer配置中传递message.max.bytes,如下所示:

const producer = new Kafka.Producer({
        'metadata.broker.list': 'localhost:9092',
        'message.max.bytes': '15728640',
        'dr_cb': true
    });

同样,对于消费者来说,

const kafkaConf = {
   "group.id": "librd-test",
   "fetch.message.max.bytes":"15728640",
   ... .. }
lf5gs5x2

lf5gs5x25#

这样做的目的是让Kafka生产者向Kafka经纪人发送的消息大小相等,然后由Kafka消费者接收。
Kafka制作人-->Kafka经纪人-->Kafka消费者
假设要求发送15mb的消息,那么生产者、代理和使用者三者都需要同步。
Kafka生产者发送15 mb-->Kafka经纪人允许/存储15 mb-->Kafka消费者接收15 mb
因此,设置应为:
a) 关于经纪人:

message.max.bytes=15728640 
replica.fetch.max.bytes=15728640

b) 关于消费者:

fetch.message.max.bytes=15728640
os8fio9y

os8fio9y6#

您需要覆盖以下属性:
代理配置($kafka\u home/config/server.properties)
replica.fetch.max.字节
message.max.字节
消费者配置($kafka\u home/config/consumer.properties)
这一步对我不起作用。我把它添加到消费者应用程序中,效果很好
fetch.message.max.字节
重新启动服务器。
有关详细信息,请参阅此文档:http://kafka.apache.org/08/configuration.html

8ljdwjyq

8ljdwjyq7#

Kafka0.10和新的消费者相比,需要做一些小的改变
经纪人:没有变化,你还是需要增加财产 message.max.bytes 以及 replica.fetch.max.bytes . message.max.bytes 必须等于或小于(replica.fetch.max.bytes .
生产商:增加 max.request.size 发送更大的信息。
消费者:增加 max.partition.fetch.bytes 接收更大的信息。
)阅读评论以了解更多信息 message.max.bytes <= replica.fetch.max.bytes

相关问题