Kafka一个特殊的消费者群体正在重新平衡

oug3syen  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(1368)

几个月前,我在我的Kafka链中创建了两个消费者组(每个组有3个消费者):
第一组:从外部获取数据并发送到我的主题1
第二组:从主题1消费
这没有问题。
最近,我添加了第三个组,将parralelisation添加到我的链中:
对第一组的影响:同时发送到主题3(在发送到主题1的基础上)
新的第三组(还有3个消费者):从topic3消费
问题:newtopic3没有被真正使用(每个主题中只有很少的20或50条消息在许多小时后被使用……)=>第三组的3个分区的eahc上有巨大的“延迟”(从未被使用)。
在kafka日志中,我可以看到许多只针对第三组的重新平衡(似乎达到5分钟默认最大poll.interval.ms,但我不确定:kafka消息没有明确指定:准备重新平衡组group3…)。我试图修改poll选项,减少max.poll.records等的数量。。情况有所好转,但我继续得到平衡。。。我试着添加更多的cpu核心(我想很多消费者都在我的服务器上使用cpu),从1个增加到2个,最后增加4个vCore:没有什么变化,htop没有显示cpu核心的巨大利用率。。。
我只会遇到新消费群体的滞后/再平衡问题,消费代码和其他消费者一样。。。
我不知道该怎么办。。。关于Kafka的配置,Kafkacpu的利用率或者别的什么有什么建议吗?
注意:所有堆栈都在docker容器下
谢谢您!
编辑:kafka日志的示例1(不重置为0的生成循环):

...
[2020-07-15 10:52:32,029] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 365 (__consumer_offsets-38) (reason: Adding new member kafka-python-2.0.1-319d2d85-ad11-4012-9e30-d8c94f706bbe with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 10:52:32,036] INFO [GroupCoordinator 1001]: Stabilized group group3 generation 366 (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 10:52:32,040] INFO [GroupCoordinator 1001]: Assignment received from leader for group group3 for generation 366 (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 10:54:00,552] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 10:57:32,144] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id kafka-python-2.0.1-319d2d85-ad11-4012-9e30-d8c94f706bbe] in group group3 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 10:57:32,149] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 366 (__consumer_offsets-38) (reason: removing member kafka-python-2.0.1-319d2d85-ad11-4012-9e30-d8c94f706bbe on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 10:57:32,157] INFO [GroupCoordinator 1001]: Group group3 with generation 367 is now empty (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:04:00,553] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 2 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 11:06:14,173] INFO [GroupCoordinator 1001]: Dynamic Member with unknown member id joins group group3 in Empty state. Created a new member id kafka-python-2.0.1-9eae5a06-c39f-4b66-b9c6-e2e520d787c1 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:06:14,182] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 367 (__consumer_offsets-38) (reason: Adding new member kafka-python-2.0.1-9eae5a06-c39f-4b66-b9c6-e2e520d787c1 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:06:14,205] INFO [GroupCoordinator 1001]: Stabilized group group3 generation 368 (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:06:14,210] INFO [GroupCoordinator 1001]: Assignment received from leader for group group3 for generation 368 (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:11:14,233] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id kafka-python-2.0.1-9eae5a06-c39f-4b66-b9c6-e2e520d787c1] in group group3 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:11:14,234] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 368 (__consumer_offsets-38) (reason: removing member kafka-python-2.0.1-9eae5a06-c39f-4b66-b9c6-e2e520d787c1 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:11:14,235] INFO [GroupCoordinator 1001]: Group group3 with generation 369 is now empty (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:14:00,552] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 11:15:42,480] INFO [GroupCoordinator 1001]: Dynamic Member with unknown member id joins group group3 in Empty state. Created a new member id kafka-python-2.0.1-560ebdf5-4c6f-4f12-85d5-bf6e7194b712 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:15:42,483] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 369 (__consumer_offsets-38) (reason: Adding new member kafka-python-2.0.1-560ebdf5-4c6f-4f12-85d5-bf6e7194b712 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:15:42,489] INFO [GroupCoordinator 1001]: Stabilized group group3 generation 370 (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:15:42,505] INFO [GroupCoordinator 1001]: Assignment received from leader for group group3 for generation 370 (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:20:42,655] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id kafka-python-2.0.1-560ebdf5-4c6f-4f12-85d5-bf6e7194b712] in group group3 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:20:42,657] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 370 (__consumer_offsets-38) (reason: removing member kafka-python-2.0.1-560ebdf5-4c6f-4f12-85d5-bf6e7194b712 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:20:42,657] INFO [GroupCoordinator 1001]: Group group3 with generation 371 is now empty (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:24:00,552] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 11:34:00,552] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 11:36:26,840] INFO [GroupCoordinator 1001]: Dynamic Member with unknown member id joins group group3 in Empty state. Created a new member id kafka-python-2.0.1-6c853061-c4e8-4066-8940-2c7173709a96 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:36:26,860] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 371 (__consumer_offsets-38) (reason: Adding new member kafka-python-2.0.1-6c853061-c4e8-4066-8940-2c7173709a96 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:36:26,869] INFO [GroupCoordinator 1001]: Stabilized group group3 generation 372 (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:36:26,879] INFO [GroupCoordinator 1001]: Assignment received from leader for group group3 for generation 372 (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:41:26,949] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id kafka-python-2.0.1-6c853061-c4e8-4066-8940-2c7173709a96] in group group3 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:41:26,952] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 372 (__consumer_offsets-38) (reason: removing member kafka-python-2.0.1-6c853061-c4e8-4066-8940-2c7173709a96 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:41:26,953] INFO [GroupCoordinator 1001]: Group group3 with generation 373 is now empty (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:44:00,552] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 11:49:45,548] INFO [GroupCoordinator 1001]: Dynamic Member with unknown member id joins group group3 in Empty state. Created a new member id kafka-python-2.0.1-834458a8-67f7-4a0d-9056-52b6b61e79db for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:49:45,576] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 373 (__consumer_offsets-38) (reason: Adding new member kafka-python-2.0.1-834458a8-67f7-4a0d-9056-52b6b61e79db with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:49:45,580] INFO [GroupCoordinator 1001]: Stabilized group group3 generation 374 (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:49:45,586] INFO [GroupCoordinator 1001]: Assignment received from leader for group group3 for generation 374 (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:54:00,552] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 11:54:45,676] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id kafka-python-2.0.1-834458a8-67f7-4a0d-9056-52b6b61e79db] in group group3 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:54:45,685] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 374 (__consumer_offsets-38) (reason: removing member kafka-python-2.0.1-834458a8-67f7-4a0d-9056-52b6b61e79db on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:54:45,686] INFO [GroupCoordinator 1001]: Group group3 with generation 375 is now empty (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:59:01,403] INFO [GroupCoordinator 1001]: Dynamic Member with unknown member id joins group group3 in Empty state. Created a new member id kafka-python-2.0.1-f8f4d080-73df-43bd-b49b-5b5403d15e00 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:59:01,432] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 375 (__consumer_offsets-38) (reason: Adding new member kafka-python-2.0.1-f8f4d080-73df-43bd-b49b-5b5403d15e00 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:59:01,471] INFO [GroupCoordinator 1001]: Stabilized group group3 generation 376 (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 11:59:01,481] INFO [GroupCoordinator 1001]: Assignment received from leader for group group3 for generation 376 (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:04:00,552] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 12:04:01,515] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id kafka-python-2.0.1-f8f4d080-73df-43bd-b49b-5b5403d15e00] in group group3 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:04:01,518] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 376 (__consumer_offsets-38) (reason: removing member kafka-python-2.0.1-f8f4d080-73df-43bd-b49b-5b5403d15e00 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:04:01,519] INFO [GroupCoordinator 1001]: Group group3 with generation 377 is now empty (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:14:00,552] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 12:21:11,498] INFO [GroupCoordinator 1001]: Dynamic Member with unknown member id joins group group3 in Empty state. Created a new member id kafka-python-2.0.1-43259283-cdeb-4738-aaa4-44e83542c7dc for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:21:11,502] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 377 (__consumer_offsets-38) (reason: Adding new member kafka-python-2.0.1-43259283-cdeb-4738-aaa4-44e83542c7dc with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:21:11,505] INFO [GroupCoordinator 1001]: Stabilized group group3 generation 378 (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:21:11,508] INFO [GroupCoordinator 1001]: Assignment received from leader for group group3 for generation 378 (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:24:00,554] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 2 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 12:26:11,680] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id kafka-python-2.0.1-43259283-cdeb-4738-aaa4-44e83542c7dc] in group group3 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:26:11,811] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 378 (__consumer_offsets-38) (reason: removing member kafka-python-2.0.1-43259283-cdeb-4738-aaa4-44e83542c7dc on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:26:11,812] INFO [GroupCoordinator 1001]: Group group3 with generation 379 is now empty (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:34:00,553] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 12:34:30,547] INFO [GroupCoordinator 1001]: Dynamic Member with unknown member id joins group group3 in Empty state. Created a new member id kafka-python-2.0.1-0775e634-fe08-40e9-9ae3-7080ec21dd8a for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:34:30,597] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 379 (__consumer_offsets-38) (reason: Adding new member kafka-python-2.0.1-0775e634-fe08-40e9-9ae3-7080ec21dd8a with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:34:30,613] INFO [GroupCoordinator 1001]: Stabilized group group3 generation 380 (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:34:30,628] INFO [GroupCoordinator 1001]: Assignment received from leader for group group3 for generation 380 (kafka.coordinator.group.GroupCoordinator)

示例2(重置为0的生成循环)kafka日志:

...
[2020-07-15 12:10:02,637] INFO [GroupMetadataManager brokerId=1001] Group group3 transitioned to Dead in generation 2 (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 12:10:02,637] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 12:19:22,210] INFO [GroupCoordinator 1001]: Dynamic Member with unknown member id joins group group3 in Empty state. Created a new member id kafka-python-2.0.1-4ecb3653-0db8-4233-9842-710db5270491 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:19:22,211] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 0 (__consumer_offsets-38) (reason: Adding new member kafka-python-2.0.1-4ecb3653-0db8-4233-9842-710db5270491 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:19:22,212] INFO [GroupCoordinator 1001]: Stabilized group group3 generation 1 (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:19:22,214] INFO [GroupCoordinator 1001]: Assignment received from leader for group group3 for generation 1 (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:20:02,637] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 12:24:22,271] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id kafka-python-2.0.1-4ecb3653-0db8-4233-9842-710db5270491] in group group3 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:24:22,272] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 1 (__consumer_offsets-38) (reason: removing member kafka-python-2.0.1-4ecb3653-0db8-4233-9842-710db5270491 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:24:22,272] INFO [GroupCoordinator 1001]: Group group3 with generation 2 is now empty (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:25:29,398] INFO [GroupCoordinator 1001]: Dynamic Member with unknown member id joins group group3 in Empty state. Created a new member id kafka-python-2.0.1-d927d9d4-1885-4430-a84b-4cc92cb7a04e for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:25:29,398] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 2 (__consumer_offsets-38) (reason: Adding new member kafka-python-2.0.1-d927d9d4-1885-4430-a84b-4cc92cb7a04e with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:25:29,399] INFO [GroupCoordinator 1001]: Stabilized group group3 generation 3 (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:25:29,401] INFO [GroupCoordinator 1001]: Assignment received from leader for group group3 for generation 3 (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:30:02,637] INFO [GroupMetadataManager brokerId=1001] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-15 12:30:29,420] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id kafka-python-2.0.1-d927d9d4-1885-4430-a84b-4cc92cb7a04e] in group group3 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:30:29,420] INFO [GroupCoordinator 1001]: Preparing to rebalance group group3 in state PreparingRebalance with old generation 3 (__consumer_offsets-38) (reason: removing member kafka-python-2.0.1-d927d9d4-1885-4430-a84b-4cc92cb7a04e on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
[2020-07-15 12:30:29,421] INFO [GroupCoordinator 1001]: Group group3 with generation 4 is now empty (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)

edit2:添加消费者零件代码

self.consumer = KafkaConsumer('topic1',
                    bootstrap_servers=brokers_env_var,
                    auto_offset_reset='earliest',
                    enable_auto_commit=True,
                    session_timeout_ms=14000,
                    heartbeat_interval_ms=2000,
                    group_id='group3',
                    max_poll_records=100,
                    key_deserializer=lambda x: loads(x.decode('utf-8')),
                    value_deserializer=lambda x: loads(x.decode('utf-8')))

try:
  for message in self.consumer:
      self.consumeMessage(message.value)

except Exception as error:
  print('Run error - error : ' + str(error))
ua4mk5z4

ua4mk5z41#

消费者组只消费来自Kafka的消息,所以实际上有一个生产者向topic1、topic3发送消息,还有两个消费者组-一个消费来自topic1,另一个消费来自topic3。
将相同的数据发送到多个主题是没有意义的,我建议您只将这些消息发送到topic1,并使用两个消费者组:
3名消费者来自topic1。
另外3位消费者来自topic1。
kafka中的不同使用者组独立地管理偏移量,这就是为什么您可以对数据使用单个主题,并创建多个使用者组来使用它,否则具有相同数据的两个主题将占用两倍的存储空间,并且可能需要cpu开销来管理它们。
请确保同一组中的消费者拥有相同的 group.id 参数。
例如,6个消费者-其中3个拥有组id g1 另外3个有组id g2 .

相关问题