如何在使用spark结构化流时更新kafka consumer max.request.size配置

iyr7buue  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(519)

Spark readStream 因为Kafka失败了,错误如下:
org.apache.kafka.common.errors.recordtoolargeexception(序列化时消息为1166569字节,大于使用max.request.size配置配置的最大请求大小。)
我们怎么把车撞上去 max.request.size ?
代码:

val ctxdb = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "ip:port")
  .option("subscribe","topic")
  .option("startingOffsets", "earliest")
  .option(" failOnDataLoss", "false")
  .option("max.request.size", "15728640")

我们已尝试更新 option("max.partition.fetch.bytes", "15728640") 没有运气。

myzjeezk

myzjeezk1#

你需要添加 kafka 写入流设置的前缀:

.option("kafka.max.request.size", "15728640")

相关问题