Kafka的azure事件中心来自同一群体的2名消费者进行了内部再平衡

piwo6bdm  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(362)

我在消费者网站上为kafka和springkafka 1.3.5使用azure事件中心(出于兼容性原因)。这是我的配置:

@EnableKafka
@Configuration
class EventHubsKafkaConfig(@Value("\${eventhubs.broker}") val eventHubsBroker: String,
                           @Value("\${eventhubs.new-mails.shared-access-key}") val newMailsEventHubSharedKey: String,
                           @Value("\${eventhubs.consumer-group}") val consumerGroup: String) {
    @Bean
    fun kafkaListenerContainerFactory(consumerFactory: ConsumerFactory<Int, NewMailEvent>):
            ConcurrentKafkaListenerContainerFactory<Int, NewMailEvent> {
        val factory = ConcurrentKafkaListenerContainerFactory<Int, NewMailEvent>()
        factory.consumerFactory = consumerFactory
        return factory
    }

    @Bean
    fun consumerFactory(consumerConfigs: Map<String, Any>) =
            DefaultKafkaConsumerFactory<Int, NewMailEvent>(consumerConfigs, IntegerDeserializer(),
                    JsonDeserializer(NewMailEvent::class.java, jacksonObjectMapper()))

    @Bean
    fun consumerConfigs(): Map<String, Any> {
        val connectionString = "Endpoint=sb://${eventHubsBroker}/;" +
                "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=${newMailsEventHubSharedKey}"

        return mapOf(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "${eventHubsBroker}:9093",
                ConsumerConfig.GROUP_ID_CONFIG to consumerGroup,
                CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to "SASL_SSL",
                SaslConfigs.SASL_MECHANISM to "PLAIN",
                SaslConfigs.SASL_JAAS_CONFIG to "org.apache.kafka.common.security.plain.PlainLoginModule required " +
                        "username=\"\$ConnectionString\" password=\"$connectionString\";",
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to IntegerDeserializer::class.java,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java
        )
    }
}

消费者部分:

@Component
class NewMailEventConsumer {
    @KafkaListener(topics = ["\${eventhubs.new-mails.topic-name}"])
    fun newMails(newMailEvent: NewMailEvent) {
        logger.info { "new mail event: $newMailEvent" }
    }

    companion object : KLogging()
}

data class NewMailEvent(val mailbox: String, val mailUuid: String)

当我用这个代码启动2个消费者应用程序时,我看到了奇怪的警告,它永远不会结束:

Successfully joined group offer-application-bff-local with generation 5
web_1  | 2018-07-09 11:20:42.950  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [offer-mail-crawler-new-mails-0] for group offer-application-bff-local
web_1  | 2018-07-09 11:20:42.983  INFO 1 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[offer-mail-crawler-new-mails-0]
web_1  | 2018-07-09 11:21:28.686  WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Auto-commit of offsets {offer-mail-crawler-new-mails-0=OffsetAndMetadata{offset=4, metadata=''}} failed for group offer-application-bff-local: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
web_1  | 2018-07-09 11:21:28.687  WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Auto-commit of offsets {offer-mail-crawler-new-mails-0=OffsetAndMetadata{offset=4, metadata=''}} failed for group offer-application-bff-local: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
web_1  | 2018-07-09 11:21:28.687  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [offer-mail-crawler-new-mails-0] for group offer-application-bff-local
web_1  | 2018-07-09 11:21:28.687  INFO 1 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[offer-mail-crawler-new-mails-0]
web_1  | 2018-07-09 11:21:28.688  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group offer-application-bff-local
web_1  | 2018-07-09 11:21:29.670  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Marking the coordinator bap-event-hubs-dev.servicebus.windows.net:9093 (id: 2147483647 rack: null) dead for group offer-application-bff-local
web_1  | 2018-07-09 11:21:43.099  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator bap-event-hubs-dev.servicebus.windows.net:9093 (id: 2147483647 rack: null) for group offer-application-bff-local.
web_1  | 2018-07-09 11:21:43.131  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group offer-application-bff-local
web_1  | 2018-07-09 11:21:43.344  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group offer-application-bff-local with generation 7
web_1  | 2018-07-09 11:21:43.345  INFO 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [offer-mail-crawler-new-mails-0] for group offer-application-bff-local
web_1  | 2018-07-09 11:21:43.375  INFO 1 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned:[offer-mail-crawler-new-mails-0]
web_1  | 2018-07-09 11:21:46.377  WARN 1 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Auto-commit of offsets {offer-mail-crawler-new-mails-0=OffsetAndMetadata{offset=4, metadata=''}} failed for group offer-application-bff-local: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

定期出现以下异常:

2018-07-09 11:36:21.602  WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.common.protocol.Errors  : Unexpected error code: 60.
web_1  | 2018-07-09 11:36:21.603 ERROR 1 --- [ntainer#0-0-C-1] essageListenerContainer$ListenerConsumer : Container exception
web_1  |
web_1  | org.apache.kafka.common.KafkaException: Unexpected error in join group response: The server experienced an unexpected error when processing the request
web_1  |    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:504) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:455) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:488) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:348) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:168) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) ~[kafka-clients-0.11.0.2.jar!/:na]
web_1  |    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:628) ~[spring-kafka-1.3.5.RELEASE.jar!/:na]
web_1  |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
web_1  |    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]

定期检查这个

Failed to send SSL Close message 

java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[na:1.8.0_162]
    at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) ~[na:1.8.0_162]
    at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) ~[na:1.8.0_162]
    at sun.nio.ch.IOUtil.write(IOUtil.java:65) ~[na:1.8.0_162]
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) ~[na:1.8.0_162]
    at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:194) ~[kafka-clients-0.11.0.2.jar:na]

对于单一消费者来说,它就像一个魔咒,没有警告,什么都没有。有人知道哪里出了问题吗?

a0x5cqrl

a0x5cqrl1#

使用相同组id的使用者不能多于给定主题的分区数。
e、 一个有3个分区的主题可以有1-3个消费者使用相同的组id。
我假设您的主题只有一个分区,而两个使用者一直在争夺这个资源。您要么删除一个使用者,要么在主题中添加一个额外的分区。

57hvy0tb

57hvy0tb2#

最终,我发现了问题所在。正如您在代码中看到的,我没有指定 client.id Kafka消费者的财产。这对于 Spring Kafka来说至关重要,因为它试图使用一些自动生成的工具 client.id = consumer-0 对于消费者群体中的两个消费者。这导致了同名的两个使用者之间分区的无限重新平衡。我需要把它设置为一个部分随机字符串 ConsumerConfig.CLIENT_ID_CONFIG to "bff-${UUID.randomUUID()}" 要使其工作:

@Bean
    fun consumerConfigs(): Map<String, Any> {
        val connectionString = "Endpoint=sb://${eventHubsBroker}/;" +
                "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=${newMailsEventHubSharedKey}"

        return mapOf(
                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "${eventHubsBroker}:9093",
                ConsumerConfig.CLIENT_ID_CONFIG to "bff-${UUID.randomUUID()}",
                ConsumerConfig.GROUP_ID_CONFIG to consumerGroup,
                CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to "SASL_SSL",
                SaslConfigs.SASL_MECHANISM to "PLAIN",
                SaslConfigs.SASL_JAAS_CONFIG to "org.apache.kafka.common.security.plain.PlainLoginModule required " +
                        "username=\"\$ConnectionString\" password=\"$connectionString\";",
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to IntegerDeserializer::class.java,
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java
        )
    }

相关问题