Kafka流与消费群体怪异行为

myzjeezk  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(358)

我将两个高级问题分解为更多的单个问题,两个高级问题都涉及一个apachekafkastreamsapi正在创建和使用的消费群体。
首先,是 kafka-consumer-group.sh 脚本。我得到一个奇怪的输出,它并没有真正告诉我一个特定的消费者在哪里,尽管他们似乎连接到一个特定的组/主题/分区:

TOPIC    PARTITION    CURRENT-OFFSET    LOG-END-OFFSET    LAG
STANDARD_DATA                  9          11              11              0          myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-1-consumer-4fd9dc15-d8a7-4598-85a9-3761ae6a747b/1.1.1.1                 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-1-consumer
STANDARD_DATA                  0          4               11              7          myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-5-consumer-28e1c7bf-860d-44d6-bf58-5e0ff875587c/1.1.1.1                 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-5-consumer
STANDARD_DATA                  4          -               10              -          myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-4-consumer-a3023af6-eafb-4633-85f1-048c20c4dfb3/1.1.1.1                 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-4-consumer
STANDARD_DATA                  5          -               10              -          myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-3-consumer-a81f1399-1fc4-4579-b24f-fa8fee01fabf/1.1.1.1                 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-3-consumer
STANDARD_DATA                  3          -               12              -          myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-2-consumer-6a83bfcc-2c6e-4e9d-a819-029ac8c6ae17/1.1.1.1                 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-2-consumer
STANDARD_DATA                  8          12              12              0          myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-4-consumer-6d46bed3-70c4-4c7f-8e53-f9591192bc3f/1.1.1.1                 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-4-consumer
STANDARD_DATA                  7          -               11              -          myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-3-consumer-5313315b-ded9-4fe7-ac9d-d8d5b20dd5b9/1.1.1.1                 myConsumer-13b61e5a-6289-45db-844b-3ef8c5a26782-StreamThread-3-consumer
STANDARD_DATA                  2          10              10              0          myConsumer-b9402faf-4b37-479f-82be-a17eaa180c62-StreamThread-1-consumer-c08a648f-548e-47a8-8bc5-7b6fa3bc1fb5/1.1.1.1                  myConsumer-b9402faf-4b37-479f-82be-a17eaa180c62-StreamThread-1-consumer
STANDARD_DATA                  1          2               10              8          myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-2-consumer-08d99679-d430-4e9f-a3b9-11e558ca34a4/1.1.1.1                 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-2-consumer
STANDARD_DATA                  6          -               12              -          myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-5-consumer-666040f8-d4d0-49e9-9db6-c6efee49ebe1/1.1.1.1                 myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-5-consumer

当我可以直接查询kafka的api来区分它们实际上被捕获时,为什么一些当前偏移量(第3列)和延迟(第4列)显示为“-”?
(通过golang api查询)

4                      myConsumer-7fc71848-465b-4817-93b3-42b9ba290dcd-StreamThread-4-consumer-a3023af6-eafb-4633-85f1-048c20c4dfb3    OFFSET: 10        LOG-END: 10                LAG: 0

另外,为什么通常情况下,偏移量不会显示在日志中(也就是说,它应该被捕获)?
我的第二个高层次问题是流。我们有一个流进程,即在随机时间(主要是在重新启动期间)重置为特定主题中可用的最早偏移量。在整个代码中没有“重置”,也没有触摸偏移量重置。我也可以确认,我们没有使用'准确一次',所以我不知道确切地说,这些抵消重置发挥作用。
再一次,基本上是:
流处理在数据中翻腾,一些事情发生了,然后我们的偏移量回到地面0,再次处理。在决定重置之前,这种情况可能会持续数天到数周,因此正在提交补偿。

chhkpiq4

chhkpiq41#

关于输出 kafka-consumer-groups.sh :一个 - in current offset表示此分区没有提交的偏移量。这意味着,也无法计算滞后(因此,您将得到一个 - 也在那里)。
如果我正确地阅读了您的语句,如果您使用golang查询偏移量,它将显示分区4位于偏移量10处,而不是 kafka-consumer-groups.sh 肖恩斯——不知道为什么会这样。。。
关于重置的偏移量:您可能需要增加代理配置 offsets.retention.minutes --默认值为24小时(参见。https://docs.confluent.io/current/streams/faq.html#why-是我的应用程序从头开始重新处理数据)。
还请注意,streams api使用默认的重置策略“earliest”(与使用“latest”作为默认值的使用者api不同)。您可以通过在streams api中更改重置策略 StreamsConfig : https://docs.confluent.io/current/streams/developer-guide.html#non-流配置参数

相关问题