下面是我的producer配置,如果您看到它们的is压缩类型是gzip,即使我提到了压缩类型,为什么消息没有发布,并且失败了
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, edi856KafkaConfig.getBootstrapServersConfig());
props.put(ProducerConfig.RETRIES_CONFIG, edi856KafkaConfig.getRetriesConfig());
props.put(ProducerConfig.BATCH_SIZE_CONFIG, edi856KafkaConfig.getBatchSizeConfig());
props.put(ProducerConfig.LINGER_MS_CONFIG, edi856KafkaConfig.getIntegerMsConfig());
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, edi856KafkaConfig.getBufferMemoryConfig());
***props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");***
props.put(Edi856KafkaProducerConstants.SSL_PROTOCOL, edi856KafkaConfig.getSslProtocol());
props.put(Edi856KafkaProducerConstants.SECURITY_PROTOCOL, edi856KafkaConfig.getSecurityProtocol());
props.put(Edi856KafkaProducerConstants.SSL_KEYSTORE_LOCATION, edi856KafkaConfig.getSslKeystoreLocation());
props.put(Edi856KafkaProducerConstants.SSL_KEYSTORE_PASSWORD, edi856KafkaConfig.getSslKeystorePassword());
props.put(Edi856KafkaProducerConstants.SSL_TRUSTSTORE_LOCATION, edi856KafkaConfig.getSslTruststoreLocation());
props.put(Edi856KafkaProducerConstants.SSL_TRUSTSTORE_PASSWORD, edi856KafkaConfig.getSslTruststorePassword());
**props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");**
我得到的错误如下
org.apache.kafka.common.errors.RecordTooLargeException: The message is 1170632 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
2017-12-07_12:34:10.037 [http-nio-8080-exec-1] ERROR c.tgt.trans.producer.Edi856Producer - Exception while writing mesage to topic= '{}'
org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.RecordTooLargeException: The message is 1170632 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
我们需要在消费端使用kafka消息的字符串表示
2条答案
按热度按时间nafvub8i1#
只需阅读错误消息:)
The message is 1170632 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration
消息大于1 MB,这是apache kafka允许的默认值。要允许大消息,请在“如何使用kafka发送大消息”(超过15mb)中检查答案?0vvn1miw2#
不幸的是,在kafka中新的producer实现遇到了一个相当奇怪的问题。
虽然kafka在代理级别应用的消息大小限制应用于单个压缩记录集(可能是多个消息),但是新的生产者当前应用
max.request.size
任何压缩前对记录的限制。这张照片是在https://issues.apache.org/jira/browse/kafka-4169 (2016年9月14日创建,在编写本报告时尚未解决)。
如果您确定消息的压缩大小(加上记录集的任何开销)将小于代理的配置大小
max.message.bytes
,则可以通过增加max.request.size
属性,而无需更改代理上的任何配置。这将允许生产者代码接受预压缩负载的大小,然后将其压缩并发送到代理。但是需要注意的是,如果生产者试图发送一个对于代理的配置来说太大的请求,代理将拒绝该消息,这将由您的应用程序来正确处理。