我有一个Kafka主题,有40个分区。在Kubernetes星系群中。我还有一个微服务,使用这个主题。
有时,在批处理过程中,当大多数分区都已完成时,有些分区还保留着未处理的数据。使用 kafka-consumer-groups.sh
这看起来像这样:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
- - - - - kafka-python-2.0.1-f1259971-c8ed-4d98-ba37-40f263b14a78/10.44.2.119 kafka-python-2.0.1
- - - - - kafka-python-2.0.1-328f6a97-22ea-4f59-b702-4173feb9f025/10.44.0.29 kafka-python-2.0.1
- - - - - kafka-python-2.0.1-9a2ea04e-3bf1-40f4-9262-6c14d0791dfc/10.44.7.35 kafka-python-2.0.1
- - - - - kafka-python-2.0.1-81f5be15-535c-436c-996e-f8098d0613a1/10.44.4.26 kafka-python-2.0.1
- - - - - kafka-python-2.0.1-ffcf76e2-f0ed-4894-bc70-ee73220881db/10.44.14.2 kafka-python-2.0.1
- - - - - kafka-python-2.0.1-fc5709a0-a0b5-4324-92ff-02b6ee0f1232/10.44.2.123 kafka-python-2.0.1
- - - - - kafka-python-2.0.1-c058418c-51ec-43e2-b666-21971480665b/10.44.15.2 kafka-python-2.0.1
- - - - - kafka-python-2.0.1-0c14afab-af2a-4668-bb3c-015932fbfd13/10.44.14.5 kafka-python-2.0.1
- - - - - kafka-python-2.0.1-1cb308f0-203f-43ae-9252-e0fc98eb87b8/10.44.14.4 kafka-python-2.0.1
- - - - - kafka-python-2.0.1-42753a7f-80d0-481e-93a6-67445cb1bb5e/10.44.14.6 kafka-python-2.0.1
- - - - - kafka-python-2.0.1-63e97395-e1ec-4cab-8edc-c5dd251932af/10.44.2.122 kafka-python-2.0.1
- - - - - kafka-python-2.0.1-7116fdc2-809f-4f99-b5bd-60fbf2aba935/10.44.1.37 kafka-python-2.0.1
- - - - - kafka-python-2.0.1-f5ef8ff1-f09c-498e-9b27-1bcac94b895b/10.44.2.125 kafka-python-2.0.1
- - - - - kafka-python-2.0.1-8feec117-aa3a-42c0-91e8-0ccefac5f134/10.44.2.121 kafka-python-2.0.1
- - - - - kafka-python-2.0.1-45cc5605-d3c8-4c77-8ca8-88afbde81a69/10.44.14.3 kafka-python-2.0.1
- - - - - kafka-python-2.0.1-9a575ac4-1531-4b2a-b516-12ffa2496615/10.44.5.32 kafka-python-2.0.1
- - - - - kafka-python-2.0.1-d33e112b-a1f4-4699-8989-daee03a5021c/10.44.14.7 kafka-python-2.0.1
my-topic 20 890 890 0 - - -
my-topic 38 857 857 0 - - -
my-topic 28 918 918 0 - - -
my-topic 23 66 909 843 - - -
my-topic 10 888 888 0 - - -
my-topic 2 885 885 0 - - -
my-topic 7 853 853 0 - - -
my-topic 16 878 878 0 - - -
my-topic 15 47 901 854 - - -
my-topic 26 934 934 0 - - -
my-topic 32 898 898 0 - - -
my-topic 21 921 921 0 - - -
my-topic 13 933 933 0 - - -
my-topic 5 879 879 0 - - -
my-topic 12 945 945 0 - - -
my-topic 4 918 918 0 - - -
my-topic 29 924 924 0 - - -
my-topic 39 895 895 0 - - -
my-topic 25 30 926 896 - - -
my-topic 9 915 915 0 - - -
my-topic 35 31 890 859 - - -
my-topic 3 69 897 828 - - -
my-topic 1 911 911 0 - - -
my-topic 6 22 901 879 - - -
my-topic 14 41 881 840 - - -
my-topic 30 900 900 0 - - -
my-topic 22 847 847 0 - - -
my-topic 8 919 919 0 - - -
my-topic 0 902 902 0 - - -
my-topic 18 924 924 0 - - -
my-topic 36 864 864 0 - - -
my-topic 34 929 929 0 - - -
my-topic 24 864 864 0 - - -
my-topic 19 937 937 0 - - -
my-topic 27 859 859 0 - - -
my-topic 11 838 838 0 - - -
my-topic 31 49 922 873 - - -
my-topic 37 882 882 0 - - -
my-topic 17 942 942 0 - - -
my-topic 33 928 928 0 - - -
它进一步指出,消费者群体是 rebalancing
. 这里要注意的一点是 CONSUMER-ID
有更少的消费者声称应该有。它应该是20个消费者,但在这个输出中,只有17个显示,即使所有的豆荚运行。这个数字各不相同,我不确定这是一个输出问题,还是他们真的不存在。这也让我困惑,因为当我最初开始(所有新的kafka和消费者部署)时,这并没有发生。因此,这似乎真的与用户部署被扩展或被扼杀有关。
然后在很短的一段时间内,消费者被分配到一个地方,大约半分钟后,与上面相同的图片再次显示了消费者群体正在重新平衡的地方。
当我缩小比例时也会发生这种情况。e、 当我只有4个消费者的时候。我不知道这里发生了什么。这些pod都在运行,我在其他微服务中使用了相同的基本代码和模式,在这些微服务中,它似乎工作得很好。
我怀疑这与一个消费者pod被杀有关,因为,正如我所说的,在一个新的部署中,它最初起作用。这一批也比我的其他批要长一点,所以在运行期间更有可能杀死豆荚。我也不确定它是否与大多数已经完成的分区有关,这也可能只是我用例的一个怪癖。
我认识到这一点,因为处理过程似乎需要很长时间,但新的数据仍在处理中。因此,我认为发生的是,在分配给消费者的短暂时刻,他们处理数据,但在重新平衡之前,他们从不提交偏移量,而将它们留在一个无限循环中。我发现唯一一个稍微相关的是这个问题,但它是从相当多的版本之前,并没有完全描述我的情况。
我使用 kafka-python
客户和我用Kafka的形象 confluentinc/cp-kafka:5.0.1
.
我使用管理客户端创建主题 NewTopic(name='my-topic', num_partitions=40, replication_factor=1)
这样创建客户机:
consumer = KafkaConsumer(consume_topic,
bootstrap_servers=bootstrap_servers,
group_id=consume_group_id,
value_deserializer=lambda m: json.loads(m))
for message in consumer:
process(message)
这里出了什么问题?我有一些配置错误吗?
非常感谢您的帮助。
1条答案
按热度按时间bvuwiixz1#
问题是心跳配置。事实证明,虽然大多数消息只需要几秒钟就可以处理,但很少有消息需要很长时间才能处理。在这些特殊情况下,heartbeat更新花费了太长时间,导致代理假设消费者停机并开始重新平衡。
我假设接下来发生的事情是,消费者被重新分配到同一条消息,再次处理它花费了太长时间,并触发了另一个重新平衡。导致了无休止的循环。
我最终通过增加两者来解决这个问题
session_timeout_ms
以及heartbeat_interval_ms
在消费者中(此处记录)。我还减小了批处理大小,以便心跳更新更加有规律。