为什么同一个groupid中的多个消费者在kafka中被分配了相同的分区?

9nvpjoqh  于 2021-07-24  发布在  Java
关注(0)|答案(0)|浏览(820)

根据文档,一个主题的分区将会并且只会被分配给同一组中的一个消费者,而对于我的案例(kafka版本2.0.1)来说似乎不是这样,我以集群模式启动了3个app server,并创建了一个分区为20的主题,从而将每个app server的这个主题的并发性设置为6。当我开始启动所有3个应用服务器时(20个分区被平均分配到3个),它工作得很好,但不知怎么的重新平衡发生了(不知道为什么),一些分区被同时分配给多个使用者,导致db的id\u密钥重复错误。

@KafkaListener(topics = MQConstant.HK_STOCK_DATA_TRADE, concurrency = "#{kafkaUtil.getConcurrency('" + MQConstant.HK_STOCK_DATA_TRADE + "')}", containerFactory = "defaultKafkaListenerContainerFactory")
public void hkTradeReceive0(ConsumerRecord<?, String> record)

public synchronized Integer getConcurrency(String topic) {
        return getPartitionCount(topic) < HA_SERVER_COUNT_IN_CLUSTER ? 1 : getPartitionCount(topic) / HA_SERVER_COUNT_IN_CLUSTER;
    }

public synchronized Integer getPartitionCount(String topic) {
        if (!getAllTopics().contains(topic)) {
            throw new InvalidParameterException("without target topic:" + topic);
        }
        return kafkaConsumer.partitionsFor(topic).size();
    }
public Map<String, Object> getDefaultCommonPropertis() {
        Map<String, Object> properties = new HashMap<String, Object>(16);
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, defaultConsumerMaxPollRecords);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, defaultConsumerGroupId);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, defaultConsumerAutoOffsetReset);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return properties;
    }

app server one日志:

[stock-market-cal-hk] [2021-01-21 09:43:05.725] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-4-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280]   [Consumer clientId=consumer-39, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-16]
[stock-market-cal-hk] [2021-01-21 09:43:05.725] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-3-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280]   [Consumer clientId=consumer-38, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-15]
[stock-market-cal-hk] [2021-01-21 09:43:05.725] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-0-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280]   [Consumer clientId=consumer-35, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-12, hk_stock_data_trade-18]
[stock-market-cal-hk] [2021-01-21 09:43:05.725] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-2-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280]   [Consumer clientId=consumer-37, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-14]
[stock-market-cal-hk] [2021-01-21 09:43:05.729] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-1-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280]   [Consumer clientId=consumer-36, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-13, hk_stock_data_trade-19]
[stock-market-cal-hk] [2021-01-21 09:43:05.729] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-5-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280]   [Consumer clientId=consumer-40, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-17]

应用程序服务器2日志:

[stock-market-cal-hk] [2021-01-21 09:43:05.724] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-0-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280]   [Consumer clientId=consumer-35, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-12, hk_stock_data_trade-0]
[stock-market-cal-hk] [2021-01-21 09:43:05.724] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-2-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280]   [Consumer clientId=consumer-37, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-8, hk_stock_data_trade-17]
[stock-market-cal-hk] [2021-01-21 09:43:05.724] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-4-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280]   [Consumer clientId=consumer-39, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-10]
[stock-market-cal-hk] [2021-01-21 09:43:05.724] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-5-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280]   [Consumer clientId=consumer-40, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-11]
[stock-market-cal-hk] [2021-01-21 09:43:05.724] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-1-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280]   [Consumer clientId=consumer-36, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-1, hk_stock_data_trade-15]
[stock-market-cal-hk] [2021-01-21 09:43:05.725] [INFO] [org.springframework.kafka.KafkaListenerEndpointContainer#11-3-C-1] [o.a.k.c.c.i.ConsumerCoordinator:280]   [Consumer clientId=consumer-38, groupId=data_cal_hk] Setting newly assigned partitions [hk_stock_data_trade-9, hk_stock_data_trade-18]

我不知道我在kafka配置中是否做错了什么,因为在服务器日志中很明显,对于topic hk\u stock\u data\u trade,分区17/18被两次分配给不同的消费者,即使他们共享相同的groupid(data\u cal\u hk)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题