kafka代理并没有压缩我的大消息

6tqwzwtp  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(378)

下面是我的producer配置,如果您看到它们的is压缩类型是gzip,即使我提到了压缩类型,为什么消息没有发布,并且失败了

props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, edi856KafkaConfig.getBootstrapServersConfig());
        props.put(ProducerConfig.RETRIES_CONFIG, edi856KafkaConfig.getRetriesConfig());
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, edi856KafkaConfig.getBatchSizeConfig());
        props.put(ProducerConfig.LINGER_MS_CONFIG, edi856KafkaConfig.getIntegerMsConfig());
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, edi856KafkaConfig.getBufferMemoryConfig());
      ***props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");***
        props.put(Edi856KafkaProducerConstants.SSL_PROTOCOL, edi856KafkaConfig.getSslProtocol());
        props.put(Edi856KafkaProducerConstants.SECURITY_PROTOCOL, edi856KafkaConfig.getSecurityProtocol());
        props.put(Edi856KafkaProducerConstants.SSL_KEYSTORE_LOCATION, edi856KafkaConfig.getSslKeystoreLocation());
        props.put(Edi856KafkaProducerConstants.SSL_KEYSTORE_PASSWORD, edi856KafkaConfig.getSslKeystorePassword());
        props.put(Edi856KafkaProducerConstants.SSL_TRUSTSTORE_LOCATION, edi856KafkaConfig.getSslTruststoreLocation());
        props.put(Edi856KafkaProducerConstants.SSL_TRUSTSTORE_PASSWORD, edi856KafkaConfig.getSslTruststorePassword());
      **props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");**

我得到的错误如下

org.apache.kafka.common.errors.RecordTooLargeException: The message is 1170632 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
2017-12-07_12:34:10.037 [http-nio-8080-exec-1] ERROR c.tgt.trans.producer.Edi856Producer - Exception while writing mesage to topic= '{}'
org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 1170632 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.

我们需要在消费端使用kafka消息的字符串表示

nafvub8i

nafvub8i1#

只需阅读错误消息:) The message is 1170632 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration 消息大于1 MB,这是apache kafka允许的默认值。要允许大消息,请在“如何使用kafka发送大消息”(超过15mb)中检查答案?

0vvn1miw

0vvn1miw2#

不幸的是,在kafka中新的producer实现遇到了一个相当奇怪的问题。
虽然kafka在代理级别应用的消息大小限制应用于单个压缩记录集(可能是多个消息),但是新的生产者当前应用 max.request.size 任何压缩前对记录的限制。
这张照片是在https://issues.apache.org/jira/browse/kafka-4169 (2016年9月14日创建,在编写本报告时尚未解决)。
如果您确定消息的压缩大小(加上记录集的任何开销)将小于代理的配置大小 max.message.bytes ,则可以通过增加 max.request.size 属性,而无需更改代理上的任何配置。这将允许生产者代码接受预压缩负载的大小,然后将其压缩并发送到代理。
但是需要注意的是,如果生产者试图发送一个对于代理的配置来说太大的请求,代理将拒绝该消息,这将由您的应用程序来正确处理。

相关问题