在kafka中处理大型xml文件

x6h2sr28  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(467)

我正在使用alpakkafka和scala应用程序。我的Kafka正在docker内部运行,我正在尝试使用我的代码在Kafka生产者上发布消息。我的代码如下

def sendMsg(xmlFile: String): Future[Done] = {
    futureToFutureTry {
      val producer = SendProducer(producerSettings)
      producer.send(new ProducerRecord("topic_name", "Key", xmlFile)).map(result => {
        producer.close()
      })
    } flatMap {
      case Success(v) => v
      case Failure(e) =>
        Future.failed(e)
    }
  }

代码很好,但当我发送大的xml文件时 an error that org.apache.kafka.common.errors.RecordTooLargeException: The message is 22093081 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. 我可以看到stackoverflow本身的一些解决方案,但没有一个解释我需要在哪里进行更改。如何增加最大请求大小。我是Kafka的新人。当应用程序启动时,我可以看到它正在打印默认的max.request.size,但不知道它是如何打印的,以及如何、在哪里以及我需要做什么来解决它。请帮忙

imzjd6km

imzjd6km1#

如果您计划向kafka生成大于默认大小1048588字节的消息,则需要更改三个不同位置的设置:
主题配置
生产者配置
消费者配置

主题配置

创建主题时,需要确保 max.message.bytes 更大的价值。在创建Kafka主题时可以使用 kafka-topics 脚本:

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topic-name --partitions 1 --replication-factor 1 --config max.message.bytes=50000000

此配置的描述如下:
kafka允许的最大记录批大小(如果启用了压缩,则在压缩之后)。如果此值增加并且存在早于0.10.2的消费者,消费者的获取大小也必须增加,以便他们能够获取如此大的记录批。在最新的消息格式版本中,为了提高效率,总是将记录分组到批中。在以前的消息格式版本中,未压缩的记录不会分组到批中,在这种情况下,此限制仅适用于单个记录。

生产者配置

在变量中 producerSettings 你需要增加 max.request.size ,例如:

val producerSettingsNew = producerSettings + ("max.request.size" -> "50000000")

此设置的说明如下:
请求的最大大小(字节)。此设置将限制生产者在单个请求中发送的记录批数,以避免发送大量请求。这实际上也是对最大未压缩记录批处理大小的限制。请注意,服务器对记录批大小有自己的上限(如果启用了压缩,则在压缩之后),这可能与此不同。

消费者配置

此外,还需要在设置中通过增加 max.partition.fetch.bytes .
描述如下:
服务器将返回的每个分区的最大数据量。消费者批量获取记录。如果获取的第一个非空分区中的第一个记录批大于此限制,则仍将返回该批,以确保使用者可以取得进展。代理接受的最大记录批大小通过message.max.bytes(代理配置)或max.message.bytes(主题配置)定义。有关限制使用者请求大小的信息,请参阅fetch.max.bytes。

相关问题