上下文
我们使用kafka来处理大消息,偶尔高达10mb,但大多在500kb范围内。处理一条消息可能需要大约30秒,但有时需要一分钟。
问题
使用较少的使用者(最多50个左右)处理数据不会导致代理重复进行重新平衡,处理工作正常。这种规模的任何重新平衡也相当快,根据经纪人的日志,大多不到一分钟。
一旦耗电元件被缩放到100或200,耗电元件就会不断地重新平衡,时间间隔可达5分钟左右。这将导致5分钟的工作/消耗,然后是5分钟的重新平衡,然后再次相同。消费者服务并没有失败,只是无缘无故地重新平衡。这会导致在扩展用户时吞吐量降低。
当扩展到2oo消费者时,处理的平均速率为每个消费者每分钟2条消息。单个使用者在不重新平衡时的处理速度约为每分钟6条消息。
我不怀疑数据中心的网络是一个问题,因为我们有一些消费者对消息执行不同类型的处理,他们每分钟传递100到1000条消息没有问题。
其他人是否经历过这种模式并找到了简单的解决方案,例如更改特定的配置参数?
附加信息
Kafka代理是2.0版本,在不同的数据中心有10个。复制设置为3。这个主题的分区是500。特定代理配置的摘录,以更好地处理大型消息:
压缩类型=lz4
message.max.bytes=10000000#10 mb
replica.fetch.max.bytes=10000000#10 mb
group.max.session.timeout.ms=1320000#22分钟
offset.retention.minutes=10080#7天
在使用者端,我们使用java客户机和一个重新平衡的监听器,该监听器清除来自已撤销分区的所有缓冲消息。这个缓冲区有10条消息大。客户机运行的是客户机api版本2.1,java客户机从2.0升级到2.1似乎大大减少了这些较大客户机上的以下类型的代理日志(我们以前几乎为每个客户机和每次重新平衡都获得了这些日志):
INFO [GroupCoordinator 2]: Member svc002 in group my_group has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
消费者与代理处于不同的数据中心。偏移量的提交是异步执行的。循环轮询在一个线程中执行,该线程以15秒的超时时间填充缓冲区;一旦缓冲区满了,线程就会休眠几秒钟,并且只在缓冲区有空闲空间时进行轮询。较大消息用例的配置摘录:
max.partition.fetch.bytes.config=200000000#200 mb
max.poll.records.config=2
session.timeout.ms.config=1200000#20分钟
日志文件
下面是代理日志文件的摘录,它在30分钟的时间范围内管理这个特定组。命名减少到我的组和我的主题。还有一些无关主题的条目。
19:47:36,786] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:47:36,810] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3213 (kafka.coordinator.group.GroupCoordinator)
19:47:51,788] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3213 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,851] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:48:46,902] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3214 (kafka.coordinator.group.GroupCoordinator)
19:50:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
19:54:29,365] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3214 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:57:22,161] INFO [ProducerStateManager partition=unrelated_topic-329] Writing producer snapshot at offset 88002 (kafka.log.ProducerStateManager)
19:57:22,162] INFO [Log partition=unrelated_topic-329, dir=/kafkalog] Rolled new log segment at offset 88002 in 11 ms. (kafka.log.Log)
19:59:14,022] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
19:59:14,061] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3215 (kafka.coordinator.group.GroupCoordinator)
20:00:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:02:57,821] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3215 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,360] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:06:51,391] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3216 (kafka.coordinator.group.GroupCoordinator)
20:10:10,846] INFO [GroupMetadataManager brokerId=2] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
20:10:46,863] INFO [ReplicaFetcher replicaId=2, leaderId=8, fetcherId=0] Node 8 was unable to process the fetch request with (sessionId=928976035, epoch=216971): FETCH_SESSION_ID_NOT_FOUND. (org.apache.kafka.clients.FetchSessionHandler)
20:11:19,236] INFO [GroupCoordinator 2]: Preparing to rebalance group my_group with old generation 3216 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:13:54,851] INFO [ProducerStateManager partition=mytopic-321] Writing producer snapshot at offset 123640 (kafka.log.ProducerStateManager)
20:13:54,851] INFO [Log partition=mytopic-321, dir=/kafkalog] Rolled new log segment at offset 123640 in 14 ms. (kafka.log.Log)
20:14:30,686] INFO [ProducerStateManager partition=mytopic-251] Writing producer snapshot at offset 133509 (kafka.log.ProducerStateManager)
20:14:30,686] INFO [Log partition=mytopic-251, dir=/kafkalog] Rolled new log segment at offset 133509 in 1 ms. (kafka.log.Log)
20:16:01,892] INFO [GroupCoordinator 2]: Stabilized group my_group generation 3217 (__consumer_offsets-18) (kafka.coordinator.group.GroupCoordinator)
20:16:01,938] INFO [GroupCoordinator 2]: Assignment received from leader for group my_group for generation 3217 (kafka.coordinator.group.GroupCoordinator)
非常感谢您对这个问题的帮助。
1条答案
按热度按时间63lcw9qa1#
经过进一步的设计和微调,我们成功地控制了问题。
首先,似乎有些服务的处理仍然超出了限制,这导致它们很少失败。接下来的离开导致了一个重新平衡,接着在大约6-7分钟后加入,这也导致了一个重新平衡。我们通过在吞吐量方面优化我们的服务进一步减少了这一点。
第二个因素是我们用来扩展服务的底层docker网络。默认情况下,心跳间隔非常短(5秒),因此消费者节点上的任何艰苦工作和网络负载都可能在非常短的间隔内将其从docker swarm中移除。docker通过将这些服务移动到其他节点(重新平衡)来响应此中断,然后在节点恢复联机时重新平衡。由于服务的启动时间很长,只有5-7分钟,因此在每个事件上都需要重新平衡几次。
第三个因素是消费服务中的错误,导致其中一个偶尔崩溃,比如每小时1%。这再次导致两个重新平衡,一个离开,一个加入。
总的来说,这些问题的结合导致了观察到的问题。最新的kafka版本似乎也输出了更多关于服务为什么离开消费者群体的信息。如果kafka能继续向仍然稳定的用户提供数据,那就太好了,我可能会为此添加一个特性请求。尽管如此,我们有它运行稳定,现在有体面的表现。