Flink Kafka Source异步自动提交偏移失败

5n0oy7gb  于 2022-12-09  发布在  Apache
关注(0)|答案(1)|浏览(231)

Flink版本:版本1.15.2
Apache Flink有一个问题:Flink任务的Kafka Source表与其他Kafka消费者使用相同的组ID,Flink提交偏移量失败,问题场景描述如下:
1.我有一个Kafka使用者的Java应用程序,它使用使用者组'TopicA'来使用主题'topic_a'中的数据
1.有一个Flink任务,其Kafka Source表使用的Kafka使用者组也是“TopicA”,但使用主题“topic_b”的数据
此时,Flink任务的日志信息中会出现以下错误:偏移量{topic_B-0=OffsetAndMetadata{偏移量=xxx,leaderEpoch=0,元数据=''}}的异步自动提交失败:无法完成提交,因为组已重新平衡并将分区分配给另一个成员。这意味着对poll()的后续调用之间的时间比配置的www.example.com长max.poll.interval.ms,这通常意味着轮询循环花费了太多时间处理消息。可以通过增加max.poll.interval.ms或减少轮询中返回的批处理的最大大小来解决此问题()的最大轮询记录数。

shyt4zoc

shyt4zoc1#

The Java application uses the Spring-Kafka framework, which uses the subcribe() method by default.
The Flink Kafka Connector uses the assign() method. They use the same consumer group ID, so it is inevitable to report an error when submitting an Offset in Flink.
The solution is to specify the partitions that need to be consumed on the KafkaListener annotation. e.g:

@KafkaListener(topicPartitions = {@TopicPartition(topic = "topic_a", partitions = {"0"})})

相关问题