我读了多篇文章,让Kafka了解消费者群体。我有一个疑问,Kafka如何确保一个信息只被一个消费群体中的一个消费者处理一次?考虑到消费者群体中有多个消费者。Kafka是否对每一条信息都有某种跟踪,并在每一位消费者身上一个接一个地按顺序进行尝试?任何参考或帮助将不胜感激。
sirbozc51#
首先,当你的主题有超过一个分区时,Kafka消费者小组会帮助我们。请考虑以下内容scenarios:-分区数-3,耗电元件数-3Kafka将一个分区分配给一个使用者。除非某些使用者失败并且发生使用者重新平衡(将分区重新分配给使用者),否则所有使用者都将Map到其分区,并按顺序使用与这些分区相关的事件。分区数-1,用户数-3如果使用者多于分区数,则kafka没有足够的分区来分配使用者。因此,组中的一个使用者被分配到分区,而组中的其他使用者将处于空闲状态。分区数-4,耗电元件数-3在这个场景中,一个使用者获得2个分区,在使用者重新平衡期间,另一个使用者可能获得2个分区。关于Kafka是否保持某种轨道来维持秩序的问题?是的-在分区级别-它在每个分区中维护提交偏移量并按顺序使用。否-在主题级别(除非只有一个分区)。
**@mike在上面解释了如何使用commit offset在分区级别维护序列。
kuarbcqp2#
使用者可以提交从主题中读取的消息,以避免再次读取。这基本上可以通过两种不同的方法实现:启用 enable.auto.commit :“如果为true,则使用者的偏移量将定期在后台提交。”这在默认情况下处于启用状态,您可以使用使用者属性 auto.commit.interval.ms 更改提交发生的时间。间隔的默认值设置为5秒。关于消费者配置的所有细节都在kafka文档中给出呼叫 consumer.commitSync() (或 commitAsync() )在你的代码中轮询数据之后。由于一个特定分区最多只能由一个使用者组中的一个使用者使用,因此提交基于使用者组、分区和偏移量。上面的javadocs KafkaConsumer 类实际上相当不错,它提供了“自动偏移提交”和“手动偏移控制”的所有细节和示例注意:您的措辞是“Kafka如何确保消息只被处理一次…”我不确定您是否在这里谈论“精确一次传递语义”,但请记住,如果不付出任何额外的努力,上述方法仍可能导致消费者群体两次消费一条消息。想象一下这个场景:启用自动提交的时间间隔为5秒你的Kafka消费者调查数据,你就要处理它2秒后,处理导致异常,作业失败。这意味着这条消息的自动提交没有发生。现在,重新启动作业将导致使用者再次读取相同的消息,因为它尚未提交。
enable.auto.commit
auto.commit.interval.ms
consumer.commitSync()
commitAsync()
KafkaConsumer
2条答案
按热度按时间sirbozc51#
首先,当你的主题有超过一个分区时,Kafka消费者小组会帮助我们。
请考虑以下内容scenarios:-
分区数-3,耗电元件数-3
Kafka将一个分区分配给一个使用者。除非某些使用者失败并且发生使用者重新平衡(将分区重新分配给使用者),否则所有使用者都将Map到其分区,并按顺序使用与这些分区相关的事件。
分区数-1,用户数-3
如果使用者多于分区数,则kafka没有足够的分区来分配使用者。因此,组中的一个使用者被分配到分区,而组中的其他使用者将处于空闲状态。
分区数-4,耗电元件数-3
在这个场景中,一个使用者获得2个分区,在使用者重新平衡期间,另一个使用者可能获得2个分区。
关于Kafka是否保持某种轨道来维持秩序的问题?是的-在分区级别-它在每个分区中维护提交偏移量并按顺序使用。
否-在主题级别(除非只有一个分区)。
**@mike在上面解释了如何使用commit offset在分区级别维护序列。
kuarbcqp2#
使用者可以提交从主题中读取的消息,以避免再次读取。
这基本上可以通过两种不同的方法实现:
启用
enable.auto.commit
:“如果为true,则使用者的偏移量将定期在后台提交。”这在默认情况下处于启用状态,您可以使用使用者属性auto.commit.interval.ms
更改提交发生的时间。间隔的默认值设置为5秒。关于消费者配置的所有细节都在kafka文档中给出呼叫
consumer.commitSync()
(或commitAsync()
)在你的代码中轮询数据之后。由于一个特定分区最多只能由一个使用者组中的一个使用者使用,因此提交基于使用者组、分区和偏移量。
上面的javadocs
KafkaConsumer
类实际上相当不错,它提供了“自动偏移提交”和“手动偏移控制”的所有细节和示例注意:您的措辞是“Kafka如何确保消息只被处理一次…”
我不确定您是否在这里谈论“精确一次传递语义”,但请记住,如果不付出任何额外的努力,上述方法仍可能导致消费者群体两次消费一条消息。想象一下这个场景:
启用自动提交的时间间隔为5秒
你的Kafka消费者调查数据,你就要处理它
2秒后,处理导致异常,作业失败。这意味着这条消息的自动提交没有发生。
现在,重新启动作业将导致使用者再次读取相同的消息,因为它尚未提交。