假设自动提交间隔时间为30秒,则使用者由于某些原因无法处理消息并将其保留超过30秒,然后崩溃。自动提交偏移机制是否在使用者崩溃之前提交此偏移?如果我的假设是正确的,消息在提交其偏移量时丢失,但消息本身没有被处理?
rnmwe5a21#
让我们考虑您的使用者群组名称是test,并且使用者群组中只有一个使用者。启用自动提交时,偏移量仅在poll()调用和关闭使用者期间提交。例如-auto.commit.interval.ms为5秒,对poll()的每次调用都需要7秒。每次调用poll()时,它都会检查自动提交间隔是否已过,如果已过,则会提交偏移量,如上例所示。补偿也在消费者关闭期间提交。从文档中-“关闭使用者,等待30秒的默认超时,以便进行任何需要的清理。如果启用了自动提交,这将在默认超时内提交当前偏移量(如果可能)。你可以在这里了解更多-https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaconsumer.html现在,关于您的问题,如果poll()没有再次调用,或者consumer没有关闭,它将不会提交偏移量。
zpjtge222#
如果使用者接收到消息n,提交它,然后在完全处理它之前崩溃,那么默认情况下,使用者将认为此消息已处理。请注意,消息仍在代理上,因此可以重新使用它进行处理。但这需要应用程序中的一些逻辑,以便不仅从上次提交的位置重新启动,而且还检查以前的记录是否已成功处理。如果您的应用程序通常需要很长时间来处理消息,那么您可能希望切换到手动提交而不是自动提交。这样,您就可以更好地控制何时提交并避免此问题。
2条答案
按热度按时间rnmwe5a21#
让我们考虑您的使用者群组名称是test,并且使用者群组中只有一个使用者。
启用自动提交时,偏移量仅在poll()调用和关闭使用者期间提交。
例如-auto.commit.interval.ms为5秒,对poll()的每次调用都需要7秒。每次调用poll()时,它都会检查自动提交间隔是否已过,如果已过,则会提交偏移量,如上例所示。
补偿也在消费者关闭期间提交。
从文档中-
“关闭使用者,等待30秒的默认超时,以便进行任何需要的清理。如果启用了自动提交,这将在默认超时内提交当前偏移量(如果可能)。
你可以在这里了解更多-
https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaconsumer.html
现在,关于您的问题,如果poll()没有再次调用,或者consumer没有关闭,它将不会提交偏移量。
zpjtge222#
如果使用者接收到消息n,提交它,然后在完全处理它之前崩溃,那么默认情况下,使用者将认为此消息已处理。
请注意,消息仍在代理上,因此可以重新使用它进行处理。但这需要应用程序中的一些逻辑,以便不仅从上次提交的位置重新启动,而且还检查以前的记录是否已成功处理。
如果您的应用程序通常需要很长时间来处理消息,那么您可能希望切换到手动提交而不是自动提交。这样,您就可以更好地控制何时提交并避免此问题。