我正试图在springboot平台上用java编写一个kafka消费者应用程序。之前,我用纯java编写了代码,但现在转换成springkafka,因为它比纯java有一些优势。我确实有一些问题,我正在努力理解。
似乎我不必在SpringKafka中显式地执行poll()循环,它将由@kafkalistener自动处理?
我已经设置了enable.auto.commit='false',因为在提交偏移量之前必须进行一些处理,所以如何在SpringKafka中执行commitasync()?
consumerconfig.java用户配置:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${app.kafka_brokers}")
private String KAFKA_BROKERS;
@Value("${app.topic}")
private String KAFKA_TOPIC;
@Value("${app.group_id_config}")
private String GROUP_ID_CONFIG;
@Value("${app.schema_registry_url}")
private String SCHEMA_REGISTRY_URL;
@Value("${app.offset_reset}")
private String OFFSET_RESET;
@Value("${app.max_poll_records}")
private String MAX_POLL_RECORDS;
@Value("${app.security.protocol}")
private String SSL_PROTOCOL;
@Value("${app.ssl.truststore.password}")
private String SSL_TRUSTSTORE_SECURE;
@Value("${app.ssl.keystore.password}")
private String SSL_KEYSTORE_SECURE;
@Value("${app.ssl.key.password}")
private String SSL_KEY_SECURE;
@Value("${app.ssl.truststore.location}")
private String SSL_TRUSTSTORE_LOCATION_FILE_NAME;
@Value("${app.ssl.keystore.location}")
private String SSL_KEYSTORE_LOCATION_FILE_NAME;
@Bean
public ConsumerFactory<String, String> consumerFactory(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL);
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME);
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE);
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME);
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE);
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
return factory;
}
}
kafkaconsumer.java文件:
@Component
public class KafkaConsumer {
@KafkaListener(topics = "topic", groupId = "group")
public void run(ConsumerRecord<String, GenericRecord> record) {
System.out.println(record);
<-- how to asyncCommit()-->
}
}
2条答案
按热度按时间jecbmhm31#
侦听器容器将在侦听器正常退出时提交偏移量,具体取决于容器的
AckMode
财产;AckMode.BATCH
(默认)表示轮询返回的所有记录的偏移量都将在处理完之后提交,AckMode.RECORD
意味着一旦侦听器退出,就会提交每个偏移量。sync
与。async
是由syncCommits
容器属性。xmjla07d2#
首先,我建议您使用springkafka设置的属性和自动配置,而不是创建自己的属性和自动配置,因为它遵循干燥的原则:不要重复自己。
这个
AckMode
文件:https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/containerproperties.ackmode.html基本上,
manual
是异步确认,而manual_immediate
是同步的。然后在你的
@KafkaListener
可以注入的组件org.springframework.kafka.support.Acknowledgment
对象确认您的消息。以下是可以注入到
@KafkaListener
方法:https://docs.spring.io/spring-kafka/reference/html/#message-听众