kafka streams错误-分区上的偏移提交失败,请求超时

lf3rwulv  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(643)

我们使用kafka流来消费、处理和生成消息,在prod env上,我们在多个主题上遇到了错误:

ERROR org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=app-xxx-StreamThread-3-consumer, groupId=app] 
Offset commit failed on partition xxx-1 at offset 13920: 
The request timed out.[]

这些错误很少发生在小负载的主题上,但是对于高负载(和峰值)的主题,每个主题一天会发生几十次错误。主题有多个分区(例如10)。似乎这个问题并不影响数据的处理(尽管有性能),因为在抛出异常(甚至可能是同一偏移量的多个错误)之后,使用者稍后会重新读取消息并成功地处理它。
我看到这个错误信息出现在 kafka-clients 版本 1.0.0 由于公关,但在以前 kafka-clients 相同用例的版本( Errors.REQUEST_TIMED_OUT 关于消费者)类似的信息( Offset commit for group {} failed: {} )已记录为 debug 水平。对于我来说,对于这样的用例,将日志级别更新为warning更符合逻辑。
如何解决这个问题?根本原因是什么?也许更改使用者属性或分区设置有助于解决此类问题。
我们使用以下实现来创建kafka流:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.<String, String>stream(topicName);
stream.foreach((key, value) -> processMessage(key, value));
Topology topology = builder.build();
StreamsConfig streamsConfig = new StreamsConfig(consumerSettings);
new KafkaStreams(streamsTopology, streamsConfig);

我们的Kafka消费者设置:

bootstrap.servers: xxx1:9092,xxx2:9092,...,xxx5:9092
application.id: app
state.dir: /tmp/kafka-streams/xxx
commit.interval.ms: 5000       # also I tried default value 30000
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
timestamp.extractor: org.apache.kafka.streams.processor.WallclockTimestampExtractor

Kafka经纪人版本: kafka_2.11-0.11.0.2 . Kafka流的两个版本都出现错误: 1.0.1 以及 1.1.0 .

xdyibdwo

xdyibdwo1#

看起来您对kafka集群有问题,kafka使用者在尝试提交偏移量时超时。您可以尝试增加kafka使用者的连接相关配置
request.timeout.ms(默认305000ms)
配置控制客户端等待请求响应的最长时间
connections.max.idle.ms(默认540000ms)
在此配置指定的毫秒数之后关闭空闲连接。

相关问题