跟随https://stackoverflow.com/a/36009859/9911256,如果消费者突然死亡,Kafka提交/自动提交可能会失败。事实上,我的kafka应用程序在生产环境中运行良好,但是在测试期间,有时我会遇到这个反复出现的问题(直到重新启动kafka):偏移量是相同的。
我的单元测试(一个java生产者在一个代理、一个主题、一个分区、一个组中向一个java消费者发送10个数据包)启动10个数据包,我从第一个开始检查它们:
SENT: (0) NAME:person-001; UUID:352c1f8e-c141-4446-8ac7-18eb044a6b92
SENT: (1) NAME:person-001; UUID:81681a30-83e1-4f85-b07f-da140cfdb874
SENT: (2) NAME:person-001; UUID:3b9db497-460a-4a1c-86b9-f724af1a0449
SENT: (3) NAME:person-001; UUID:63c0edf9-ec00-4ef7-b81a-4b1b8919a42d
SENT: (4) NAME:person-001; UUID:346f265c-1964-4460-97de-1a7b43285c06
SENT: (5) NAME:person-001; UUID:2d1bb49c-03ce-4762-abb3-2bbb963e87d1
SENT: (6) NAME:person-001; UUID:3c8ddda0-6cb8-45b4-b1d2-3a99ba57a48a
SENT: (7) NAME:person-001; UUID:3f819408-41d5-4cad-ad39-322616a86b99
SENT: (8) NAME:person-001; UUID:1db09bc1-4c90-4a0d-8efc-d6ea8a791985
SENT: (9) NAME:person-001; UUID:705a3a3c-fd15-45a9-a96c-556350f1f79a
Exception in thread "Thread-2" org.opentest4j.AssertionFailedError: expected: <352c1f8e-c141-4446-8ac7-18eb044a6b92> but was: <6785fa5d-ef63-4fe6-85c5-c525bfc4ee12>
如果我再做一次测试:
SENT: (0) NAME:person-001; UUID:d171e7ee-fa73-4cb4-826e-f7bffdef9e92
SENT: (1) NAME:person-001; UUID:25da6b6e-57e9-4f8a-a3ff-1099f94fcaf5
SENT: (2) NAME:person-001; UUID:d05b4693-ba60-4db2-a5ae-30dcd44ce5b7
SENT: (3) NAME:person-001; UUID:fbd75ee7-6f34-4ab1-abda-d31ee91d0ff8
SENT: (4) NAME:person-001; UUID:798fe246-f10e-4fc3-90c9-df3e181bb641
SENT: (5) NAME:person-001; UUID:26b33a19-7e65-49ec-b54d-3379ef76b797
SENT: (6) NAME:person-001; UUID:45ecef46-69f5-4bff-99b5-c7c2dce67ec8
SENT: (7) NAME:person-001; UUID:464df926-cd66-4cfa-b282-36047522dfe8
SENT: (8) NAME:person-001; UUID:982c82c0-c669-400c-a70f-62c57e3552a4
SENT: (9) NAME:person-001; UUID:ecdbfce6-d378-496d-9e0b-30f16b7cf484
Exception in thread "Thread-2" org.opentest4j.AssertionFailedError: expected: <d171e7ee-fa73-4cb4-826e-f7bffdef9e92> but was: <6785fa5d-ef63-4fe6-85c5-c525bfc4ee12>
请注意,消息<6785fa5d-ef63-4fe6-85c5-c525bfc4ee12>被重复发送到每次尝试,尽管它们是不同的lanches,我手动执行。
我使用:
properties.put(“auto.offset.reset”,“latest”);
我已经试过了 autocommit
选项,具有等效结果。
最糟糕的是,每次我重新启动kafka服务器时都会发生这种情况:
有时,我重新启动它,测试可以正常进行,即使我重复多次。
有时,它似乎进入了这种失败状态,而且总是会失败。
如前所述,当消费者没有死亡,大量的mesages流量得到管理,这个问题就不会出现。
我还注意到,如果最近重新启动了服务器,并且删除了kafka的所有日志和数据目录,那么第一次测试可能会延迟并失败。
我的日志显示:
2019-01-25T12:20:02.874119+01:00 TLS dockcompose: #033[32mkafka_1 |#033[0m [2019-01-25 11:20:02,850] INFO [GroupCoordinator 1001]: Preparing to rebalance group 0 in state PreparingRebalance with old generation 26 (__consumer_offsets-48) (reason: Adding new member consumer-1-a0b94a2a-0cae-4ba8-85f0-9a84030f4beb) (kafka.coordinator.group.GroupCoordinator)
2019-01-25T12:20:02.874566+01:00 TLS dockcompose: #033[32mkafka_1 |#033[0m [2019-01-25 11:20:02,851] INFO [GroupCoordinator 1001]: Stabilized group 0 generation 27 (__consumer_offsets-48) (kafka.coordinator.group.GroupCoordinator)
2019-01-25T12:20:02.874810+01:00 TLS dockcompose: #033[32mkafka_1 |#033[0m [2019-01-25 11:20:02,858] INFO [GroupCoordinator 1001]: Assignment received from leader for group 0 for generation 27 (kafka.coordinator.group.GroupCoordinator)
试验后13秒:
2019-01-25T12:20:15.894185+01:00 TLS dockcompose: #033[32mkafka_1 |#033[0m [2019-01-25 11:20:15,871] INFO [GroupCoordinator 1001]: Member consumer-2-79f97c80-294c-438b-8a8a-3745f4a57010 in group 0 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
2019-01-25T12:20:15.894522+01:00 TLS dockcompose: #033[32mkafka_1 |#033[0m [2019-01-25 11:20:15,871] INFO [GroupCoordinator 1001]: Preparing to rebalance group 0 in state PreparingRebalance with old generation 27 (__consumer_offsets-48) (reason: removing member consumer-2-79f97c80-294c-438b-8a8a-3745f4a57010 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
2019-01-25T12:20:17.897272+01:00 TLS dockcompose: #033[32mkafka_1 |#033[0m [2019-01-25 11:20:17,865] INFO [GroupCoordinator 1001]: Member consumer-1-a0b94a2a-0cae-4ba8-85f0-9a84030f4beb in group 0 has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
2019-01-25T12:20:17.897579+01:00 TLS dockcompose: #033[32mkafka_1 |#033[0m [2019-01-25 11:20:17,866] INFO [GroupCoordinator 1001]: Group 0 with generation 28 is now empty (__consumer_offsets-48) (kafka.coordinator.group.GroupCoordinator)
我可以得出的结论是,阅读前面的主题是,Kafka将工作良好,如果消费者是永久连接(生产)。但这在测试中是不可能的!这里有什么问题???
更新:我发现,实际上,当前偏移量在某些情况下没有改变(这很奇怪,因为我继续接收消息<6785fa5d-ef63-4fe6-85c5-c525bfc4ee12>),当然,日志结束偏移量会增加,但这是完全不稳定的。。。
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
t0001 0 160 1085 925 consumer-2-1a2cce59-c449-471e-bad0-3c3335f44e26 /10.42.0.105 consumer-2
启动测试后,再次:
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
t0001 0 160 1153 993 consumer-2-dff28a54-b4e8-464a-a5e7-67c8cbad749f /10.42.0.105 consumer-2
暂无答案!
目前还没有任何答案,快来回答吧!