在我当前的项目中,我使用springkafka来发送、转换和检索消息。生产者将类型a的消息发送到kafka主题topic-a,然后kafka streams示例从该主题读取类型a的消息,将其Map到类型b,并将这些Map的消息写入kafka主题topic-b。最后,kafka侦听器侦听topic-b并将这些消息持久化到elasticsearch示例。
侦听器在批处理模式下工作,并配置为手动确认。在包含的消息成功持久化到elasticsearch后,批处理将得到确认。
这非常有效,除了一个边缘情况:在生产者完成发送消息(这是一个测试设置)并且侦听器检索、持久化并确认了最后一批消息之后,消费者组中的消费者仍然存在延迟:
+-----------------------------------------------------------------------------------------------------------------------+
| group topic partition current-offset log-end-offset lag consumer-id host client-id |
+-----------------------------------------------------------------------------------------------------------------------+
| consumer-group1 topic-B 2 335681 335690 9 consumer2 /172.18.0.1 client2 |
| consumer-group1 topic-B 9 0 0 0 consumer9 /172.18.0.1 client9 |
| consumer-group1 topic-B 6 537082 537085 3 consumer6 /172.18.0.1 client6 |
| consumer-group1 topic-B 8 394747 394752 5 consumer8 /172.18.0.1 client8 |
| consumer-group1 topic-B 7 584215 584225 10 consumer7 /172.18.0.1 client7 |
| consumer-group1 topic-B 1 0 0 0 consumer10 /172.18.0.1 client10 |
| consumer-group1 topic-B 5 0 0 0 consumer5 /172.18.0.1 client5 |
| consumer-group1 topic-B 0 1089342 1089350 8 consumer1 /172.18.0.1 client1 |
| consumer-group1 topic-B 4 0 0 0 consumer4 /172.18.0.1 client4 |
| consumer-group1 topic-B 3 0 0 0 consumer3 /172.18.0.1 client3 |
+-----------------------------------------------------------------------------------------------------------------------+
我用ack mode manual\u immediate和manual测试了这一点,结果是一样的,在侦听器检索到最后一批之后,消费者延迟永远不会减少。
在我的测试中,我也用一个香草Kafka消费者做了同样的尝试。通过这种方法,在确认最后一批之后,消费者滞后是预期的(=0)。
这是非常混乱的,因为我对这两种方法使用相同的Kafka配置,所以我怀疑这与springkafka有关。有没有人遇到过同样的问题,如果有,你是如何解决的?非常感谢您的帮助!
配置
spring:
kafka:
listener:
ack-mode: MANUAL # MANUAL_IMMEDIATE
concurrency: 10
consumer:
client-id: "b-consumer-client-local"
group-id: "b-consumer-group-local"
bootstrap-servers: "localhost:9092"
# key-deserializer: "org.apache.kafka.common.serialization.StringDeserializer"
# value-deserializer: "org.myproject.serialization.BDeserializer"
# auto-commit-interval: 1000
# auto-offset-reset: "earliest"
enable-auto-commit: false
# fetch-max-wait: 500
# fetch-min-size: 1
# heartbeat-interval: 3000
# max-poll-records: 5000
侦听器配置
@Configuration
public class ListenerConfiguration {
private final KafkaProperties kafkaProperties;
@Autowired
public ListenerConfiguration(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Bean
public ConsumerFactory<String, B> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new BDeserializer());
}
public Map<String, Object> consumerConfigs() {
return new HashMap<>(kafkaProperties.buildConsumerProperties());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, B>
kafkaListenerContainerFactory(ConsumerFactory<String, B> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, B> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(kafkaProperties.getListener().getConcurrency());
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(kafkaProperties.getListener().getAckMode());
factory.setBatchListener(true);
return factory;
}
}
监听器
@Component
public class TopicListener implements BatchAcknowledgingMessageListener<String, B> {
private final String topic;
TopicListener(TopicsConfiguration topicsConfiguration) {
this.topic = topicsConfiguration.getInput();
}
@Override
@KafkaListener(id = "batch-listener", topics = "#{__listener.getTopic()}")
public void onMessage(List<ConsumerRecord<String, B>> list, Acknowledgment acknowledgment) {
if(saveData(list)) {
acknowledgment.acknowledge();
}
}
private boolean saveData(List<ConsumerRecord<String, B>> list) {
...
}
}
暂无答案!
目前还没有任何答案,快来回答吧!