我试图利用Kafka消费者图书馆是预先编写在我的组织。它从kafka主题获取json数据并将其存储在mongo数据库中。虽然我无法发布这段代码,但它是一个非常简单的体系结构,使用apachecamel路由,然后使用springboot mongo依赖关系将消耗的消息存储到mongo中。
我遇到了这样一种情况:当部署到openshift并扩展到超过1个pod时,接收到下面的异常,然后应用程序挂起,没有更多的输入或处理。我相信失败发生在Kafka客户机库的逻辑中。
我试过在本地不同端口下运行两个应用程序示例。这是完美无误的。我试过设置心跳间隔、会话超时、批大小、最大获取字节数、并发用户数、seda模式开/关和请求超时。改变那些Kafka设置上,下,开,关和未定义,问题仍然存在。
2019-05-23 16:15:51 [Camel (camel-1) thread #1 - KafkaConsumer[mytopic]] ERROR o.a.k.c.c.i.ConsumerCoordinator - Error UNKNOWN_MEMBER_ID occurred while committing offsets for group mytopic-status
2019-05-23 16:15:51 [Camel (camel-1) thread #7 - KafkaConsumer[mytopic]] ERROR o.a.k.c.c.i.ConsumerCoordinator - Error UNKNOWN_MEMBER_ID occurred while committing offsets for group mytopic-status
2019-05-23 16:15:51 [Camel (camel-1) thread #7 - KafkaConsumer[mytopic]] WARN o.a.c.component.kafka.KafkaConsumer - Error consuming mytopic-Thread 0 from kafka topic. Caused by: [org.apache.kafka.clients.consumer.CommitFailedException - Commit cannot be completed due to group rebalance]
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed due to group rebalance
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:552)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:493)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:665)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:644)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:380)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:274)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:320)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:213)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:193)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:358)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:968)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:936)
at org.apache.camel.component.kafka.KafkaConsumer$KafkaFetchRecords.run(KafkaConsumer.java:132)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
111
暂无答案!
目前还没有任何答案,快来回答吧!