我已经设置了一个带有三个主机的apachekafka集群。
我用复制因子3和分区计数8创建了一个主题。
我已经在8个分区中平均分配了数据。
我有8个单线程的消费者,我离开Kafka做分区分配给每个消费者。
我使用enable auto commit作为false,ackmode作为record。
问题:
当我停下来启动消费者时。重复邮件计数为数百。我期望计数不超过8,因为我有8个消费者和ackmode是记录。
请帮忙!!!
这是我的消费者配置。
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.support.serializer.JsonDeserializer;
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value(value = "${kafka.market.persist.consumergroup}")
private String marketDataPersistConsumer;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Quote> marketDataKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Quote> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(marketDataPersistConsumer));
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
@Bean
public ConsumerFactory<String, Quote> consumerFactory(String groupId) {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(groupId), new StringDeserializer(), new JsonDeserializer<>(Quote.class));
}
@Bean
public Map<String, Object> consumerConfigs(String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return props;
}
@Bean
public MessageListener messageListener() {
return new MessageListener();
}
}
这是我的留言列表:
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class MessageListener {
private static final Logger LOG = Logger.getLogger("debugLogger");
private void persistQuote(Quote quote, int partition) {
save(quote);
}
@KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
public void marketDataPersistConsumerZero(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
persistQuote(quote, partition);
LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
}
@KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
public void marketDataPersistConsumerOne(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
persistQuote(quote, partition);
LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
}
@KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
public void marketDataPersistConsumerTwo(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
persistQuote(quote, partition);
LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
}
@KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
public void marketDataPersistConsumerThree(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
persistQuote(quote, partition);
LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
}
@KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
public void marketDataPersistConsumerFour(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
persistQuote(quote, partition);
LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
}
@KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
public void marketDataPersistConsumerFive(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
persistQuote(quote, partition);
LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
}
@KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
public void marketDataPersistConsumerSix(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
persistQuote(quote, partition);
LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
}
@KafkaListener(topics = "${kafka.market.topic.name}", containerFactory = "marketDataKafkaListenerContainerFactory", containerGroup = "${kafka.market.persist.consumergroup}")
public void marketDataPersistConsumerSeven(@Payload Quote quote, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
persistQuote(quote, partition);
LOG.info("Consumed Quote:" + quote + " from partition:" + partition);
}
}
kakfa调试日志
11:32:26调试kafkamessagelistenercontainer$listenerconsumer:961 - 提交>列表:{}11:32:27调试kafkamessagelistenercontainer$listenerconsumer:450 - >>提交:{market-data-topic-dev3-2=offsetandmetadata{offset=15649,metadata=''},market-data-topic-dev3-3=offsetandmetadata{offset=15408,metadata=''}}11:32:27调试kafkamessagelistenercontainer$listenerconsumer:450 - 提交:{market-data-topic-dev3-0=offsetandmetadata{offset=15648,metadata=''},market-data-topic-dev3-1=offsetandmetadata{offset=15269,metadata=''}}11:32:27调试kafkamessagelistenercontainer$listenerconsumer:450 - 提交:{market-data-topic-dev3-4=offsetandmetadata{offset=15631,metadata=''},market-data-topic-dev3-5=offsetandmetadata{offset=15515,metadata=''}}11:32:27调试kafkamessagelistenercontainer$listenerconsumer:450 - 提交:{market-data-topic-dev3-6=offsetandmetadata{offset=15657,metadata='},market-data-topic-dev3-7=offsetandmetadata{offset=15631,metadata='}}11:32:27信息kafkamessagelistenercontainer:247 - 分配的分区:[market-data-topic-dev3-0,market-data-topic-dev3-1]11:32:27信息kafkamesassagelistenercontainer:247 - 分配的分区:[market-data-topic-dev3-2,market-data-topic-dev3-3]11:32:27信息kafkamessagelistenercontainer:247 - 分配的分区:[market-data-topic-dev3-4,market-data-topic-dev3-5]11:32:27信息kafkamesassagelistenercontainer:247 - 分配的分区:[market-data-topic-dev3-6,market-data-topic-dev3-7]11:32:27调试kafkamessagelistenercontainer$listenerconsumer:560 - 收到:0条记录11:32:27调试kafkamessagelistenercontainer$listenerconsumer:961 - 提交列表:{}11:32:29调试recordmessagingmessagelisteneradapter:70 - 正在处理[genericmessage[payload=quote[riccode]=sequence:15534 - partition:6,lastprice=6,systemtime=null,ticktime=null,localticktime=null,localticktimeepoch=null,closeprice=15534,closetime=null,localclosetime=null,localclosetimeepoch=null,messagestate=null,logmessage=null,tradevolume=null],headers={kafka\u offset=15657,kafka\u receivedmessagekey=null,kafka\u receivedpartitionid=6,kafka\u receivedtopic=market-data-topic-dev3}]]11:32:39调试kafkamessagelistenercontainer$listenerconsumer:961 - 提交列表:{market-data-topic-dev3-2=offsetandmetadata{offset=15775,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:965 - 提交:{market-data-topic-dev3-2=offsetandmetadata{offset=15775,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:961 - 提交列表:{market-data-topic-dev3-4=offsetandmetadata{offset=15755,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:965 - 提交:{market-data-topic-dev3-4=offsetandmetadata{offset=15755,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:961 - 提交列表:{market-data-topic-dev3-6=offsetandmetadata{offset=15783,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:965 - 提交:{market-data-topic-dev3-6=offsetandmetadata{offset=15783,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:961 - 提交列表:{market-data-topic-dev3-0=offsetandmetadata{offset=15776,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:965 - 提交:{market-data-topic-dev3-0=offsetandmetadata{offset=15776,metadata=''}}8月30日,2017年11:32:39 am org.apache.kafka.clients.consumer.internals.consumercoordinator onjoinprepare info:为group market-data-persist-cg-dev3 2017-08-30 11:32:39 debug kafkamessagelistenercontainer$listene撤销先前分配的分区[market-data-topic-dev3-2,market-data-topic-dev3]rconsumer:1063 - 停止调用程序8月30日,2017年11:32:39 am org.apache.kafka.clients.consumer.internals.consumercoordinator onjoinprepare info:撤销先前分配的分区[market-data-topic-dev3-4,market-data-topic-dev3-5]对于组market-data-persist-cg-dev3 2017-08-30 11:32:39调试kafkamessagelistener容器$listenerconsumer:1063 - 停止调用程序2017-08-30 11:32:39调试recordmessagingmessagelisteneradapter:70 - 正在处理[genericmessage[payload=quote[riccode]=sequence:16440 - partition:0,lastprice=0,systemtime=null,ticktime=null,localticktime=null,localticktimeepoch=null,closeprice=16440,closetime=null,localclosetime=null,localclosetimeepoch=null,messagestate=null,logmessage=null,tradevolume=null],headers={kafka\u offset=15777,kafka\u receivedmessagekey=null,kafka\u receivedpartitionid=0,kafka\u receivedtopic=market-data-topic-dev3}]]8月30日,2017 11:32:39 am org.apache.kafka.clients.consumer.internals.consumercoordinator onjoinprepare info:撤销先前分配的分区[market-data-topic-dev3-6,market-data-topic-dev3-7]对于组market-data-persist-cg-dev3 2017-08-30 11:32:39调试kafkamessagelistener容器$listenerconsumer:1063 - 停止调用程序2017-08-30 11:32:39调试kafkamessagelistenercontainer$listenerconsumer:1082 - 调用程序停止2017-08-30 11:32:39信息kafkamessagelistenercontainer:242 - 分区撤销:[market-data-topic-dev3-6,market-data-topic-dev3-7]2017-08-30 11:32:39调试kafkamessagelistener容器$listenerconsumer:961 - 提交列表:{market-data-topic-dev3-6=offsetandmetadata{offset=15784,metadata=''}}8月30日,2017年11:32:39 am org.apache.kafka.clients.consumer.internals.consumercoordinator onjoinprepare info:撤销先前分配的分区[market-data-topic-dev3-0,market-data-topic-dev3-1]对于组market-data-persist-cg-dev3 2017-08-30 11:32:39调试kafkamessagelistener容器$listenerconsumer:965 - 提交:{market-data-topic-dev3-6=offsetandmetadata{offset=15784,metadata=''}}2017-08-30 11:32:39调试kafkamessagelistenercontainer$listenerconsumer:1063 - 停止调用程序8月30日,2017 11:32:39 am org.apache.kafka.clients.consumer.internals.abstractcoordinator sendjoingrouprequest信息:(re-)加入组market-data-persist-cg-dev3 11:32:39调试kafkamessagelistenercontainer$listenerconsumer:1082 - 调用程序已停止11:32:39信息kafkamessagelistenercontainer:242 - 分区:[market-data-topic-dev3-2,market-data-topic-dev3-3]11:32:39调试kafkamessagelistenercontainer$listenerconsumer:961 - 提交列表:{market-data-topic-dev3-2=offsetandmetadata{offset=15777,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:965 - 提交:{market-data-topic-dev3-2=offsetandmetadata{offset=15777,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:1082 - 调用程序已停止11:32:39信息kafkamessagelistenercontainer:242 - 分区:[market-data-topic-dev3-4,market-data-topic-dev3-5]11:32:39调试kafkamessagelistenercontainer$listenerconsumer:961 - 提交列表:{market-data-topic-dev3-4=offsetandmetadata{offset=15757,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:965 - 提交:{market-data-topic-dev3-4=offsetandmetadata{offset=15757,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:1082 - 调用程序已停止11:32:39信息kafkamessagelistenercontainer:242 - 分区:[market-data-topic-dev3-0,market-data-topic-dev3-1]11:32:39调试kafkamessagelistenercontainer$listenerconsumer:961 - 提交列表:{market-data-topic-dev3-0=offsetandmetadata{offset=15778,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:965 - 提交:{market-data-topic-dev3-0=offsetandmetadata{offset=15778,metadata=''}}8月30日,2017年11:32:39 am org.apache.kafka.clients.consumer.internals.consumercoordinator onjoincomplete信息:为组market-data-persist-cg-dev3 8月30日设置新分配的分区[market-data-topic-dev3-2],2017年11:32:39 am org.apache.kafka.clients.consumer.internals.consumercoordinator onjoincomplete信息:为组market-data-persist-cg-dev3 11:32:39调试kafkamessagelistenercontainer$listene设置新分配的分区[market-data-topic-dev3-6]rconsumer:450 - 提交:{market-data-topic-dev3-1=offsetandmetadata{offset=15269,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:450 - 提交:{market-data-topic-dev3-0=offsetandmetadata{offset=15778,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:450 - 提交:{market-data-topic-dev3-3=offsetandmetadata{offset=15408,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:450 - 提交:{market-data-topic-dev3-7=offsetandmetadata{offset=15631,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:450 - 提交:{market-data-topic-dev3-2=offsetandmetadata{offset=15777,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:450 - 提交:{market-data-topic-dev3-4=offsetandmetadata{offset=15757,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:450 - 提交:{market-data-topic-dev3-6=offsetandmetadata{offset=15784,metadata=''}}11:32:39调试kafkamessagelistenercontainer$listenerconsumer:450 - 提交:{market-data-topic-dev3-5=offsetandmetadata{offset=15515,元数据=''}}11:32:39信息kafkamessagelistenercontainer:247 - 分配的分区:[market-data-topic-dev3-1]11:32:39信息kafkamessagelistenercontainer:247 - 分配的分区:[market-data-topic-dev3-0]11:32:39信息kafkamessagelistenercontainer:247 - 分配的分区:[market-data-topic-dev3-3]11:32:39信息Kafka马斯container:247 - 分配的分区:[market-data-topic-dev3-7]11:32
暂无答案!
目前还没有任何答案,快来回答吧!