Kafka制作人timeoutexception

sauutmhj  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(697)

我正在运行一个samza流作业,该作业正在将数据写入kafka主题。Kafka正在运行一个3节点集群。samza任务部署在Yarn上。我们在容器日志中看到很多这样的异常:

INFO [2018-10-16 11:14:19,410] [U:2,151,F:455,T:2,606,M:2,658] samza.container.ContainerHeartbeatMonitor:[ContainerHeartbeatMonitor:stop:61] - [main] - Stopping ContainerHeartbeatMonitor
ERROR [2018-10-16 11:14:19,410] [U:2,151,F:455,T:2,606,M:2,658] samza.runtime.LocalContainerRunner:[LocalContainerRunner:run:107] - [main] - Container stopped with Exception. Exiting process now.
org.apache.samza.SamzaException: org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 15 to system kafka.
        at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:147)
        at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:694)
        at org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:104)
        at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:149)
Caused by: org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 15 to system kafka.
        at org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.onCompletion(KafkaSystemProducer.scala:181)
        at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109)
        at org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:160)
        at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:212)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 5 record(s) for Topic3-16 due to 30332 ms has passed since last attempt plus backoff time

这三种类型的例外情况经常出现。

59088 org.apache.kafka.common.errors.TimeoutException: Expiring 115 record(s) for Topic3-1 due to 30028 ms has passed since last attempt plus backoff time

61015 org.apache.kafka.common.errors.TimeoutException: Expiring 60 record(s) for Topic3-1 due to 74949 ms has passed since batch creation plus linger time

62275 org.apache.kafka.common.errors.TimeoutException: Expiring 176 record(s) for Topic3-4 due to 74917 ms has passed since last append

请帮助我理解这里的问题是什么。不管什么时候,samza容器都会重新启动。

col17t5w

col17t5w1#

此错误表示将某些记录放入队列的速度快于从客户端发送这些记录的速度。
当生产者发送消息时,它们被存储在缓冲区中(在将消息发送到目标代理之前),记录被分组到一起,以提高吞吐量。当一个新记录被添加到批处理中时,它必须在一个可配置的时间窗口内发送,这个时间窗口由 request.timeout.ms (默认设置为30秒)。如果批处理在队列中的时间较长,则 TimeoutException 则批处理记录将从队列中删除,并且不会传递到代理。
增加 request.timeout.ms 应该帮你搞定。
如果这不起作用,您也可以尝试减少 batch.size 以便更频繁地发送批处理(但这次将包含更少的消息),并确保 linger.ms 设置为0(这是默认值)。
请注意,在更改任何配置参数后,需要重新启动kafka代理。
如果你仍然得到的错误,我认为是有什么不对劲,你的网络。你启用ssl了吗?

相关问题