有一个包含80到100条记录的arraylist,试图流式传输每个记录(pojo,而不是整个列表)并将其发送到kafka主题(eventhub)。每小时安排一个cron作业,将这些记录(pojo)发送到事件中心。
能够看到发送到eventhub的消息,但在3到4次成功运行后,会出现以下异常(其中包括多条正在发送的消息和多条失败的以下异常)
Expiring 14 record(s) for eventhubname: 30125 ms has passed since batch creation plus linger time
以下是使用的生产者配置,
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.RETRIES_CONFIG, "3");
消息保留期-7分区-6使用spring-kafka(2.2.3)发送事件方法标记为 @Async
Kafka寄信的地方
@Async
protected void send() {
kafkatemplate.send(record);
}
预期-没有从kafka引发异常实际-已引发org.apache.kafka.common.errors.timeoutexception
2条答案
按热度按时间t40tm48m1#
此异常表示您正在以比发送记录更快的速度对记录进行排队。一旦记录被添加到一个批中,就有一个发送该批的时间限制,以确保它在指定的持续时间内被发送。这由producer配置参数request.timeout.ms控制。如果批处理的排队时间超过超时限制,则将引发异常。该批处理中的记录将从发送队列中删除。
请检查下面类似的问题,这可能会有更好的帮助。
Kafka制作者timeoutexception:过期1个记录
你也可以查看这个链接
when-does-the-apache-kafka-client-throw-a-batch-expired-exception/34794261#34794261了解有关批过期异常的更多详细信息。
还要实施适当的重试策略。
注意这不考虑扫描仪端的任何网络问题。由于网络问题,您将无法发送到任何一个集线器。
希望有帮助。
vd2z7a6w2#
prakash—我们已经看到了一些问题,尖头生产者模式会看到批处理超时。
这里的问题是,生产者有两个tcp连接可以闲置超过4分钟-在这一点上,azure负载平衡器关闭空闲连接。kafka客户机不知道连接已经关闭,所以它尝试在一个死连接上发送一个批处理,这个批处理超时,此时重试开始。
将connections.max.idle.ms设置为<4mins–这允许kafka客户端的网络客户端层优雅地处理生产者的消息发送tcp连接的连接关闭
将metadata.max.age.ms设置为<4mins–这对于生产者元数据tcp连接来说是有效的保持活动状态
请随时联系github上的裕利安怡产品团队,我们在应对问题方面相当出色-https://github.com/azure/azure-event-hubs-for-kafka