我已经在kafka用户邮件列表上发布了这个,但是没有得到任何回复,所以我想我也会在这里尝试。
我目前正在尝试升级我的软件,以使用Kafka0.9从0.8.2。我正在尝试切换到新的consumerapi,以便在集群中添加或删除机器时进行重新平衡。我遇到了这样一个问题:当一台机器被添加到组中时,一个主题上的同一个分区在短时间内被分配给多个使用者。这会导致某些消息被多次处理,而我的目标是只处理一次。我遵循javadocs中的设置说明,并使用外部数据存储在消费和重新平衡时保存偏移量。
在我的测试集群中,我首先使用两台机器和一个生产商。开始时一切正常,每个使用者得到一半的分区。当我添加第三台机器时,它被分配了一部分分区,但是这些分区并没有从两台初始机器中的一台撤销。下面是我程序中的一些日志语句,希望它们能帮助说明我的情况。
分区14最初分配给机器1。在添加机器3之前,机器1读取大量消息。分区14在启动时分配给机器3,但分区14没有从机器1中撤销。然后,在系统重新平衡之前,这两台机器在偏移量3处读取相同的消息,并且都有权访问被撤销的分区14。机器2在从机器1撤销后被分配到分区14,但仍然被分配到机器3。从机器3中撤销后,机器2是唯一可以访问分区14的机器。
机器1(启动时打开)
2016-08-24 14:17:08 DEBUG KafkaStreamReader:351 - ASSIGNED: Assigning partition 14 for topic
assignments to worker with offset 0
2016-08-24 14:18:48 DEBUG KafkaStreamReader:312 - Committing topic assignments partition 14
offset 3
2016-08-24 14:19:38 DEBUG KafkaStreamReader:200 - partition = 14, offset = 3 (Message read
from kafka)
2016-08-24 14:19:38 DEBUG KafkaStreamReader:312 - Committing topic assignments partition 14
offset 4
2016-08-24 14:19:39 DEBUG KafkaStreamReader:338 - REVOKED: Committing for partition 14 of
topic assignments offset 4
机器2(启动时打开)
2016-08-24 14:19:51 DEBUG KafkaStreamReader:351 - ASSIGNED: Assigning partition 14 for topic
assignments to worker with offset 4
机器3(几分钟后打开)
2016-08-24 14:19:21 DEBUG KafkaStreamReader:351 - ASSIGNED: Assigning partition 14 for topic
assignments to worker with offset 3
2016-08-24 14:19:48 DEBUG KafkaStreamReader:200 - partition = 14, offset = 3 (Message read
from kafka - already read by machine 1)
2016-08-24 14:20:00 DEBUG KafkaStreamReader:338 - REVOKED: Committing for partition 14 of
topic assignments offset 4
我的集群正在运行cloudera5.7.0,kafka版本为2.0.1-1.2.0.1.p0.5,对应于kafka版本0.9.0.0+kafka2.0.1+283(https://www.cloudera.com/documentation/kafka/latest/topics/kafka_packaging.html)
有人能帮我解释一下我做错了什么吗?如果有任何进一步的信息,我可以提供帮助,请让我知道,我会很高兴提供它,如果我可以。
暂无答案!
目前还没有任何答案,快来回答吧!