我正在使用kafka connector confluent 3.0.1版本。我创建了一个名为new group的新组,上面有大约20个主题。这些主题中的大部分都很忙。但遗憾的是,当我启动connector框架时,系统无法停止重新平衡,大约2分钟左右所有主题的重新平衡。我不知道原因。一些错误消息是:
[2017-01-03 21:43:57,718] ERROR Commit of WorkerSinkTask{id=new-connector-0} offsets threw an unexpected exception: (org.apache.kafka.connect.runtime.WorkerSinkTask:180)
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:578)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:519)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:404)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1058)
at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:247)
at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:293)
at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:421)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:54)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:465)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:283)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:212)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:305)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:222)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
:
我不知道这是否与持续的再平衡有关。
我知道如果kafkaconsumer.poll()比配置的超时时间长,kafka会撤销分区,从而触发重新平衡,但是我很确定每次的poll都不会那么长。有人能给我一些线索吗?
2条答案
按热度按时间laik7k3q1#
考虑升级到0.10.1或更高版本,因为这些版本增强了使用者,以便更好地处理poll()调用之间的较长时间间隔。
你可以增加新的
max.poll.interval.ms
参数,如果将结果放入hdfs的时间超过5分钟。这将阻止您的消费者因为没有取得进展而被赶出消费群体。在0.10.1发行说明中
新的java使用者现在支持后台线程的心跳。有一个新配置max.poll.interval.ms,它控制在使用者主动离开组之前轮询调用之间的最长时间(默认为5分钟)。configuration request.timeout.ms的值必须始终大于max.poll.interval.ms,因为这是使用者重新平衡时joingroup请求可以在服务器上阻止的最长时间,因此我们将其默认值更改为略高于5分钟。最后,session.timeout.ms的默认值被调整为10秒,max.poll.records的默认值被更改为500秒。
epfja78i2#
我想
max.poll.records
它是调整每个循环迭代中必须处理的记录数。在0.10中有max.poll.records
,它对每个调用返回的记录数设置了一个上限。同样根据confluent,consumer.poll()应该有相当高的会话超时,例如30到60秒。
您可能还需要微调: