spring kafka中的poll()和commitasync()

sycxhyv7  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(338)

我正试图在springboot平台上用java编写一个kafka消费者应用程序。之前,我用纯java编写了代码,但现在转换成springkafka,因为它比纯java有一些优势。我确实有一些问题,我正在努力理解。
似乎我不必在SpringKafka中显式地执行poll()循环,它将由@kafkalistener自动处理?
我已经设置了enable.auto.commit='false',因为在提交偏移量之前必须进行一些处理,所以如何在SpringKafka中执行commitasync()?
consumerconfig.java用户配置:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value("${app.kafka_brokers}")
    private String KAFKA_BROKERS;

    @Value("${app.topic}")
    private String KAFKA_TOPIC;

    @Value("${app.group_id_config}")
    private String GROUP_ID_CONFIG;

    @Value("${app.schema_registry_url}")
    private String SCHEMA_REGISTRY_URL;

    @Value("${app.offset_reset}")
    private String OFFSET_RESET;

    @Value("${app.max_poll_records}")
    private String MAX_POLL_RECORDS;

    @Value("${app.security.protocol}")
    private String SSL_PROTOCOL;

    @Value("${app.ssl.truststore.password}")
    private String SSL_TRUSTSTORE_SECURE;

    @Value("${app.ssl.keystore.password}")
    private String SSL_KEYSTORE_SECURE;

    @Value("${app.ssl.key.password}")
    private String SSL_KEY_SECURE;

    @Value("${app.ssl.truststore.location}")
    private String SSL_TRUSTSTORE_LOCATION_FILE_NAME;

    @Value("${app.ssl.keystore.location}")
    private String SSL_KEYSTORE_LOCATION_FILE_NAME;

    @Bean
    public ConsumerFactory<String, String> consumerFactory(){

        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,KAFKA_BROKERS);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OFFSET_RESET);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID_CONFIG);
        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, SCHEMA_REGISTRY_URL);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SSL_PROTOCOL);
        props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,SSL_TRUSTSTORE_LOCATION_FILE_NAME);
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, SSL_TRUSTSTORE_SECURE);
        props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,SSL_KEYSTORE_LOCATION_FILE_NAME);
        props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEYSTORE_SECURE);
        props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, SSL_KEY_SECURE);

        return new DefaultKafkaConsumerFactory<>(props);

    }

    @Bean
    ConcurrentKafkaListenerContainerFactory<String, String> 
    kafkaListenerContainerFactory() {

      ConcurrentKafkaListenerContainerFactory<String, String> factory =
                            new ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(consumerFactory());
      factory.setConcurrency(3);
      return factory;
  }

}

kafkaconsumer.java文件:

@Component
public class KafkaConsumer {

    @KafkaListener(topics = "topic", groupId = "group")
    public void run(ConsumerRecord<String, GenericRecord> record) {

        System.out.println(record);

    <-- how to asyncCommit()--> 
    }

}
jecbmhm3

jecbmhm31#

侦听器容器将在侦听器正常退出时提交偏移量,具体取决于容器的 AckMode 财产; AckMode.BATCH (默认)表示轮询返回的所有记录的偏移量都将在处理完之后提交, AckMode.RECORD 意味着一旦侦听器退出,就会提交每个偏移量。 sync 与。 async 是由 syncCommits 容器属性。

xmjla07d

xmjla07d2#

首先,我建议您使用springkafka设置的属性和自动配置,而不是创建自己的属性和自动配置,因为它遵循干燥的原则:不要重复自己。

spring:
  kafka:
    bootstrap-servers: ${app.kafka_brokers}
    consumer:
      auto-offset-reset: ${app.offset_reset}
      enable-auto-commit: false   // <---- disable auto committing
    ssl:
      protocol: ${app.security.protocol}
      key-store-location: ${app.ssl.keystore.location}
      key-store-password:  ${app.ssl.keystore.password}
      trust-store-location: ${app.ssl.truststore.location}
      trust-store-password: ${app.ssl.truststore.password}
  // And other properties
    listener:
      ack-mode: manual // This is what you need

这个 AckMode 文件:https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/containerproperties.ackmode.html
基本上, manual 是异步确认,而 manual_immediate 是同步的。
然后在你的 @KafkaListener 可以注入的组件 org.springframework.kafka.support.Acknowledgment 对象确认您的消息。

@Component
public class KafkaConsumer {

    @KafkaListener(topics = "topic", groupId = "group")
    public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgment) {

        System.out.println(record);

        acknowledgment.acknowledge();
    }

}

以下是可以注入到 @KafkaListener 方法:https://docs.spring.io/spring-kafka/reference/html/#message-听众

相关问题