Kafka消费群体再平衡和群体协调人死亡

t9eec4r0  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(231)

我和Kafka(1.0.0)玩了几个月,试图了解消费者群体是如何运作的。我有一个单独的代理kafka,我正在使用kafka connect cassandra来使用从主题到数据库表的消息。我有10个主题,所有主题都只有一个分区,我有一个消费者组,其中有10个消费者示例(每个主题一个)。
在运行此设置时,我有时会在kafka connect控制台中看到以下日志:

1:

[Worker clientId=connect-1, groupId=connect-cluster] Marking the coordinator qa-server:9092 (id: 2147483647 rack: null) dead (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
[Worker clientId=connect-1, groupId=connect-cluster] Discovered group coordinator qa-server:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
[Consumer clientId=consumer-7, groupId=connect-cassandra-sink-casb] Discovered group coordinator qa-server:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)

[Worker clientId=connect-1, groupId=connect-cluster] Marking the coordinator qa-server:9092 (id: 2147483647 rack: null) dead (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
[Consumer clientId=consumer-7, groupId=connect-cassandra-sink-casb] Marking the coordinator qa-server:9092 (id: 2147483647 rack: null) dead (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
[Worker clientId=connect-1, groupId=connect-cluster] Discovered group coordinator qa-server:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
[Consumer clientId=consumer-7, groupId=connect-cassandra-sink-casb] Discovered group coordinator qa-server:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordi
nator:341)
[Consumer clientId=consumer-7, groupId=connect-cassandra-sink-casb] Revoking previously assigned partitions [topic1-0, topic2-0, ....] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:341)
[Consumer clientId=consumer-7, groupId=connect-cassandra-sink-casb] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:336)
[Consumer clientId=consumer-7, groupId=connect-cassandra-sink-casb] Successfully joined group with generation 349 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
[Consumer clientId=consumer-7, groupId=connect-cassandra-sink-casb] Setting newly assigned partitions [topic1-0, topic2-0, ....] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:341)

之后,它开始消耗消息并写入cassandra表。这种情况经常发生在不规则的时间间隔。
但是,有时连接器会停止并关闭。然后它开始并再次消耗消息。这是日志:

INFO [Worker clientId=connect-1, groupId=connect-cluster] Marking the coordinator qa-server:9092 (id: 2147483647 rack: null) dead (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
INFO [Worker clientId=connect-1, groupId=connect-cluster] Discovered group coordinator qa-server:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
INFO [Worker clientId=connect-1, groupId=connect-cluster] Marking the coordinator qa-server:9092 (id: 2147483647 rack: null) dead (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
INFO [Worker clientId=connect-1, groupId=connect-cluster] Discovered group coordinator qa-server:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)

INFO WorkerSinkTask{id=cassandra-sink-casb-0} Committing offsets asynchronously using sequence number 42: {topic1-0=OffsetAndMetadata{offset=1074, metadata=''}, topic2-0=OffsetAndMetadata{offset=112, metadata=''}, ...}} (org.apache.kafka.connect.runtime.WorkerSinkTask:311)
INFO Rebalance started (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1214)
INFO Stopping connector cassandra-sink-casb (org.apache.kafka.connect.runtime.Worker:304)
INFO Stopping task cassandra-sink-casb-0 (org.apache.kafka.connect.runtime.Worker:464)
INFO Stopping Cassandra sink. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask:79)
INFO Shutting down Cassandra driver session and cluster. (com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter:253)
INFO Stopped connector cassandra-sink-casb (org.apache.kafka.connect.runtime.Worker:320)
INFO Finished stopping tasks in preparation for rebalance (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1244)
INFO [Worker clientId=connect-1, groupId=connect-cluster] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:336)
INFO [Worker clientId=connect-1, groupId=connect-cluster] Successfully joined group with generation 7 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
INFO Joined group and got assignment: Assignment{error=0, leader='connect-1-1dc56cda-ed54-4181-a5f9-d11022d8e8c3', leaderUrl='http://127.0.1.1:8083/', offset=8, connectorIds=[cassandra-sink-casb], taskIds
=[cassandra-sink-casb-0]} (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1192)
INFO Starting connectors and tasks using config offset 8 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:837)
INFO Starting connector cassandra-sink-casb (org.apache.kafka.connect.runtime.distributed.DistributedHerder:890)

2:

org.apache.kafka.clients.consumer.CommitFailedException: 
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. 
This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, 
which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum
 size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:722)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600)
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1250)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync(WorkerSinkTask.java:299)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:327)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:398)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:547)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1300(WorkerSinkTask.java:62)
        at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:618)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:419)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:359)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:410)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:283)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
INFO [Consumer clientId=consumer-5, groupId=connect-cassandra-sink-casb] Marking the coordinator qa-server:9092 (id: 2147483647 rack: null) dead (org.apache.kafka.clients.consumer.internals.AbstractCoordi
nator:341)
INFO [Consumer clientId=consumer-5, groupId=connect-cassandra-sink-casb] Discovered group coordinator qa-server:9092 (id: 2147483647 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordi
nator:341)
INFO [Consumer clientId=consumer-5, groupId=connect-cassandra-sink-casb] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:336)
INFO [Consumer clientId=consumer-5, groupId=connect-cassandra-sink-casb] Successfully joined group with generation 343 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:341)
INFO [Consumer clientId=consumer-5, groupId=connect-cassandra-sink-casb] Setting newly assigned partitions [topic1-0, topic2-0,...] (org.apache.kafka.cl
ients.consumer.internals.ConsumerCoordinator:341)
INFO WorkerSinkTask{id=cassandra-sink-casb-0} Committing offsets asynchronously using sequence number 155: {topic1-0=OffsetAndMetadata{offset=836, metadata=''}, topic2-0=OffsetAndMetadata{offset=86, metadata=''}, ...}} (org.apache.kafka.connect.runtime.WorkerSinkTask:311)

有时,Kafka连接开始消费信息后,再平衡,有时它关闭。
我有以下问题:

1) Why does Group Coordinator (Kafka Broker) dies?

我正在研究多个Kafka配置来解决这些问题,比如 connections.max.idle.ms , max.poll.records , session.timeout.ms , group.min.session.timeout.ms 以及 group.max.session.timeout .
我不确定什么样的配置才能使事情顺利进行。

2) Why does rebalance occurs?

我知道在添加新任务、更改任务等情况下可能会发生组重新平衡,但我没有更改任何内容。有时,kafka connect框架似乎处理错误有点过于激进,会终止connect任务,而不是继续工作。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题