我正在使用alpakkafka和scala应用程序。我的Kafka正在docker内部运行,我正在尝试使用我的代码在Kafka生产者上发布消息。我的代码如下
def sendMsg(xmlFile: String): Future[Done] = {
futureToFutureTry {
val producer = SendProducer(producerSettings)
producer.send(new ProducerRecord("topic_name", "Key", xmlFile)).map(result => {
producer.close()
})
} flatMap {
case Success(v) => v
case Failure(e) =>
Future.failed(e)
}
}
代码很好,但当我发送大的xml文件时 an error that org.apache.kafka.common.errors.RecordTooLargeException: The message is 22093081 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
我可以看到stackoverflow本身的一些解决方案,但没有一个解释我需要在哪里进行更改。如何增加最大请求大小。我是Kafka的新人。当应用程序启动时,我可以看到它正在打印默认的max.request.size,但不知道它是如何打印的,以及如何、在哪里以及我需要做什么来解决它。请帮忙
1条答案
按热度按时间imzjd6km1#
如果您计划向kafka生成大于默认大小1048588字节的消息,则需要更改三个不同位置的设置:
主题配置
生产者配置
消费者配置
主题配置
创建主题时,需要确保
max.message.bytes
更大的价值。在创建Kafka主题时可以使用kafka-topics
脚本:此配置的描述如下:
kafka允许的最大记录批大小(如果启用了压缩,则在压缩之后)。如果此值增加并且存在早于0.10.2的消费者,消费者的获取大小也必须增加,以便他们能够获取如此大的记录批。在最新的消息格式版本中,为了提高效率,总是将记录分组到批中。在以前的消息格式版本中,未压缩的记录不会分组到批中,在这种情况下,此限制仅适用于单个记录。
生产者配置
在变量中
producerSettings
你需要增加max.request.size
,例如:此设置的说明如下:
请求的最大大小(字节)。此设置将限制生产者在单个请求中发送的记录批数,以避免发送大量请求。这实际上也是对最大未压缩记录批处理大小的限制。请注意,服务器对记录批大小有自己的上限(如果启用了压缩,则在压缩之后),这可能与此不同。
消费者配置
此外,还需要在设置中通过增加
max.partition.fetch.bytes
.描述如下:
服务器将返回的每个分区的最大数据量。消费者批量获取记录。如果获取的第一个非空分区中的第一个记录批大于此限制,则仍将返回该批,以确保使用者可以取得进展。代理接受的最大记录批大小通过message.max.bytes(代理配置)或max.message.bytes(主题配置)定义。有关限制使用者请求大小的信息,请参阅fetch.max.bytes。