java—使用ApacheKafka使用者获取大量重复记录

dwbf0jvd  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(290)

我已经设置了一个带有三个主机的apachekafka集群。
我用复制因子3和分区计数8创建了一个主题。
我已经在8个分区中平均分配了数据。
我有8个单线程的消费者,我离开Kafka做分区分配给每个消费者。
我使用enable auto commit作为false,ackmode作为record。
问题:
当我停下来启动消费者时。重复邮件计数为数百。我期望计数不超过8,因为我有8个消费者和ackmode是记录。
请帮忙!!!
这是我的消费者配置。

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.support.serializer.JsonDeserializer;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Value(value = "${kafka.market.persist.consumergroup}")
    private String marketDataPersistConsumer;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Quote> marketDataKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Quote> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(marketDataPersistConsumer));
        factory.getContainerProperties().setAckMode(AckMode.RECORD);
        factory.getContainerProperties().setSyncCommits(true);

        return factory;
    }

    @Bean
    public ConsumerFactory<String, Quote> consumerFactory(String groupId) {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(groupId), new StringDeserializer(), new JsonDeserializer<>(Quote.class));
    }

    @Bean
    public Map<String, Object> consumerConfigs(String groupId) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return props;
    }

    @Bean
    public MessageListener messageListener() {
        return new MessageListener();
    }
}

这是我的留言列表:

import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class MessageListener {

    private static final Logger LOG = Logger.getLogger("debugLogger");

    private void persistQuote(Quote quote, int partition) {
        save(quote);
    }

    @KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
    public void marketDataPersistConsumerZero(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        persistQuote(quote, partition);
        LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
    }

    @KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
    public void marketDataPersistConsumerOne(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        persistQuote(quote, partition);
        LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
    }

    @KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
    public void marketDataPersistConsumerTwo(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        persistQuote(quote, partition);
        LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
    }

    @KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
    public void marketDataPersistConsumerThree(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        persistQuote(quote, partition);
        LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
    }

    @KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
    public void marketDataPersistConsumerFour(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        persistQuote(quote, partition);
        LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
    }

    @KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
    public void marketDataPersistConsumerFive(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        persistQuote(quote, partition);
        LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
    }

    @KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
    public void marketDataPersistConsumerSix(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        persistQuote(quote, partition);
        LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
    }

    @KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
    public void marketDataPersistConsumerSeven(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
        persistQuote(quote, partition);
        LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
    }
}

kakfa调试日志
11:32:26调试kafkamessagelistenercontainer$listenerconsumer:961 - 提交>列表:{}11:32:27调试kafkamessagelistenercontainer$listenerconsumer:450 - >>提交:{market-data-topic-dev3-2=offsetandmetadata{offset=15649,metadata=''},market-data-topic-dev3-3=offsetandmetadata{offset=15408,metadata=''}}11:32:27调试kafkamessagelistenercontainer$listenerconsumer:450 - 提交:{market-data-topic-dev3-0=offsetandmetadata{offset=15648,metadata=''},market-data-topic-dev3-1=offsetandmetadata{offset=15269,metadata=''}}11:32:27调试kafkamessagelistenercontainer$listenerconsumer:450 - 提交:{market-data-topic-dev3-4=offsetandmetadata{offset=15631,metadata=''},market-data-topic-dev3-5=offsetandmetadata{offset=15515,metadata=''}}11:32:27调试kafkamessagelistenercontainer$listenerconsumer:450 - 提交:{market-data-topic-dev3-6=offsetandmetadata{offset=15657,metadata='},market-data-topic-dev3-7=offsetandmetadata{offset=15631,metadata='}}11:32:27信息kafkamessagelistenercontainer:247 - 分配的分区:[market-data-topic-dev3-0,market-data-topic-dev3-1]11:32:27信息kafkamesassagelistenercontainer:247 - 分配的分区:[market-data-topic-dev3-2,market-data-topic-dev3-3]11:32:27信息kafkamessagelistenercontainer:247 - 分配的分区:[market-data-topic-dev3-4,market-data-topic-dev3-5]11:32:27信息kafkamesassagelistenercontainer:247 - 分配的分区:[market-data-topic-dev3-6,market-data-topic-dev3-7]11:32:27调试kafkamessagelistenercontainer$listenerconsumer:560 - 收到:0条记录11:32:27调试kafkamessagelistenercontainer$listenerconsumer:961 - 提交列表:{}11:32:29调试recordmessagingmessagelisteneradapter:70 - 正在处理[genericmessage[payload=quote[riccode]=sequence:15534 - partition:6,lastprice=6,systemtime=null,ticktime=null,localticktime=null,localticktimeepoch=null,closeprice=15534,closetime=null,localclosetime=null,localclosetimeepoch=null,messagestate=null,logmessage=null,tradevolume=null],headers={kafka\u offset=15657,kafka\u receivedmessagekey=null,kafka\u receivedpartitionid=6,kafka\u receivedtopic=market-data-topic-dev3}]]11:32:39调试kafkamessagelistenercontainer$listenerconsumer:961 - 提交列表:{market-data-topic-dev3-2=offsetandmetadata{offset=15775,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:965 - 提交:{market-data-topic-dev3-2=offsetandmetadata{offset=15775,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:961 - 提交列表:{market-data-topic-dev3-4=offsetandmetadata{offset=15755,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:965 - 提交:{market-data-topic-dev3-4=offsetandmetadata{offset=15755,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:961 - 提交列表:{market-data-topic-dev3-6=offsetandmetadata{offset=15783,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:965 - 提交:{market-data-topic-dev3-6=offsetandmetadata{offset=15783,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:961 - 提交列表:{market-data-topic-dev3-0=offsetandmetadata{offset=15776,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:965 - 提交:{market-data-topic-dev3-0=offsetandmetadata{offset=15776,metadata=''}}8月30日,2017年11:32:39 am org.apache.kafka.clients.consumer.internals.consumercoordinator onjoinprepare info:为group market-data-persist-cg-dev3 2017-08-30 11:32:39 debug kafkamessagelistenercontainer$listene撤销先前分配的分区[market-data-topic-dev3-2,market-data-topic-dev3]rconsumer:1063 - 停止调用程序8月30日,2017年11:32:39 am org.apache.kafka.clients.consumer.internals.consumercoordinator onjoinprepare info:撤销先前分配的分区[market-data-topic-dev3-4,market-data-topic-dev3-5]对于组market-data-persist-cg-dev3 2017-08-30 11:32:39调试kafkamessagelistener容器$listenerconsumer:1063 - 停止调用程序2017-08-30 11:32:39调试recordmessagingmessagelisteneradapter:70 - 正在处理[genericmessage[payload=quote[riccode]=sequence:16440 - partition:0,lastprice=0,systemtime=null,ticktime=null,localticktime=null,localticktimeepoch=null,closeprice=16440,closetime=null,localclosetime=null,localclosetimeepoch=null,messagestate=null,logmessage=null,tradevolume=null],headers={kafka\u offset=15777,kafka\u receivedmessagekey=null,kafka\u receivedpartitionid=0,kafka\u receivedtopic=market-data-topic-dev3}]]8月30日,2017 11:32:39 am org.apache.kafka.clients.consumer.internals.consumercoordinator onjoinprepare info:撤销先前分配的分区[market-data-topic-dev3-6,market-data-topic-dev3-7]对于组market-data-persist-cg-dev3 2017-08-30 11:32:39调试kafkamessagelistener容器$listenerconsumer:1063 - 停止调用程序2017-08-30 11:32:39调试kafkamessagelistenercontainer$listenerconsumer:1082 - 调用程序停止2017-08-30 11:32:39信息kafkamessagelistenercontainer:242 - 分区撤销:[market-data-topic-dev3-6,market-data-topic-dev3-7]2017-08-30 11:32:39调试kafkamessagelistener容器$listenerconsumer:961 - 提交列表:{market-data-topic-dev3-6=offsetandmetadata{offset=15784,metadata=''}}8月30日,2017年11:32:39 am org.apache.kafka.clients.consumer.internals.consumercoordinator onjoinprepare info:撤销先前分配的分区[market-data-topic-dev3-0,market-data-topic-dev3-1]对于组market-data-persist-cg-dev3 2017-08-30 11:32:39调试kafkamessagelistener容器$listenerconsumer:965 - 提交:{market-data-topic-dev3-6=offsetandmetadata{offset=15784,metadata=''}}2017-08-30 11:32:39调试kafkamessagelistenercontainer$listenerconsumer:1063 - 停止调用程序8月30日,2017 11:32:39 am org.apache.kafka.clients.consumer.internals.abstractcoordinator sendjoingrouprequest信息:(re-)加入组market-data-persist-cg-dev3 11:32:39调试kafkamessagelistenercontainer$listenerconsumer:1082 - 调用程序已停止11:32:39信息kafkamessagelistenercontainer:242 - 分区:[market-data-topic-dev3-2,market-data-topic-dev3-3]11:32:39调试kafkamessagelistenercontainer$listenerconsumer:961 - 提交列表:{market-data-topic-dev3-2=offsetandmetadata{offset=15777,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:965 - 提交:{market-data-topic-dev3-2=offsetandmetadata{offset=15777,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:1082 - 调用程序已停止11:32:39信息kafkamessagelistenercontainer:242 - 分区:[market-data-topic-dev3-4,market-data-topic-dev3-5]11:32:39调试kafkamessagelistenercontainer$listenerconsumer:961 - 提交列表:{market-data-topic-dev3-4=offsetandmetadata{offset=15757,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:965 - 提交:{market-data-topic-dev3-4=offsetandmetadata{offset=15757,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:1082 - 调用程序已停止11:32:39信息kafkamessagelistenercontainer:242 - 分区:[market-data-topic-dev3-0,market-data-topic-dev3-1]11:32:39调试kafkamessagelistenercontainer$listenerconsumer:961 - 提交列表:{market-data-topic-dev3-0=offsetandmetadata{offset=15778,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:965 - 提交:{market-data-topic-dev3-0=offsetandmetadata{offset=15778,metadata=''}}8月30日,2017年11:32:39 am org.apache.kafka.clients.consumer.internals.consumercoordinator onjoincomplete信息:为组market-data-persist-cg-dev3 8月30日设置新分配的分区[market-data-topic-dev3-2],2017年11:32:39 am org.apache.kafka.clients.consumer.internals.consumercoordinator onjoincomplete信息:为组market-data-persist-cg-dev3 11:32:39调试kafkamessagelistenercontainer$listene设置新分配的分区[market-data-topic-dev3-6]rconsumer:450 - 提交:{market-data-topic-dev3-1=offsetandmetadata{offset=15269,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:450 - 提交:{market-data-topic-dev3-0=offsetandmetadata{offset=15778,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:450 - 提交:{market-data-topic-dev3-3=offsetandmetadata{offset=15408,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:450 - 提交:{market-data-topic-dev3-7=offsetandmetadata{offset=15631,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:450 - 提交:{market-data-topic-dev3-2=offsetandmetadata{offset=15777,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:450 - 提交:{market-data-topic-dev3-4=offsetandmetadata{offset=15757,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:450 - 提交:{market-data-topic-dev3-6=offsetandmetadata{offset=15784,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:450 - 提交:{market-data-topic-dev3-5=offsetandmetadata{offset=15515,元数据=''}}11:32:39信息kafkamessagelistenercontainer:247 - 分配的分区:[market-data-topic-dev3-1]11:32:39信息kafkamessagelistenercontainer:247 - 分配的分区:[market-data-topic-dev3-0]11:32:39信息kafkamessagelistenercontainer:247 - 分配的分区:[market-data-topic-dev3-3]11:32:39信息Kafka马斯container:247 - 分配的分区:[market-data-topic-dev3-7]11:32

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题