SpringKafka concurrentkafkalistenercontainerfactory的消费者组协调

ffx8fchx  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(421)

关于斯普林Kafka在某些场景中的行为,我有几个问题。任何答案或指针都会很好。
背景:我正在构建一个kafka消费者,它与外部api进行通信并发送应答。我的配置如下:

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerServers());
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, this.configuration.getString("kafka-generic.consumer.group.id"));
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "5000000");
    props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "6000000");

    return props;
}

@Bean
public RetryTemplate retryTemplate() {
    final ExponentialRandomBackOffPolicy backOffPolicy = new ExponentialRandomBackOffPolicy();
    backOffPolicy.setInitialInterval(this.configuration.getLong("retry-exp-backoff-init-interval"));
    final SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
    retryPolicy.setMaxAttempts(this.configuration.getInt("retry-max-attempts"));
    final RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(backOffPolicy);
    retryTemplate.setRetryPolicy(retryPolicy);
    return retryTemplate;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Event> retryKafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Event> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    factory.setConcurrency(this.configuration.getInt("kafka-concurrency"));
    factory.setRetryTemplate(retryTemplate());
    factory.getContainerProperties().setIdleEventInterval(this.configuration.getLong("kafka-rtm-idle-time"));
    //factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setErrorHandler(kafkaConsumerErrorHandler);
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
    return factory;
}

假设我没有4个分区。我的分区分布是针对kafkalistener的:

@KafkaListener(topicPartitions = @TopicPartition(topic = "topic", partitions = {"0", "1"}),
            containerFactory = "retryKafkaListenerContainerFactory")
public void receive(Event event, Acknowledgment acknowledgment) throws Exception {
    serviceInvoker.callService(event);
    acknowledgment.acknowledge();
}

@KafkaListener(topicPartitions = @TopicPartition(topic = "topic", partitions = {"2", "3"}),
        containerFactory = "retryKafkaListenerContainerFactory")
public void receive1(Event event, Acknowledgment acknowledgment) throws Exception {
    serviceInvoker.callService(event);
    acknowledgment.acknowledge();
}

现在我的问题是:
假设我有两台机器部署了这段代码(使用相同的使用者组id)。如果我理解正确,如果我得到一个分区的事件,其中一台机器的kafkalistener将侦听相应分区的事件,但其他机器的kafkalistener不会侦听此事件。它是?
我的错误处理程序是:

@Named
        public class KafkaConsumerErrorHandler implements ErrorHandler {
          @Inject
          private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

          @Override
          public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {
              System.out.println("Shutting down all the containers");
              kafkaListenerEndpointRegistry.stop();
          }
        }

让我们来谈谈一个场景,消费者的Kafka主义者被称为在它调用的地方 serviceInvoker.callService(event); 但是服务中断了 retryKafkaListenerContainerFactory ,它重试3次,然后失败,然后调用errorhandler,从而停止kafkalistenerendpointregistry。这是否会关闭具有相同用户组的所有其他用户或机器,或者仅关闭此用户或机器?
让我们谈谈第二个场景。有没有什么配置我们需要改变,让Kafka知道推迟确认了那么长时间?
我的Kafka制作人每10分钟产生一条信息。我是否需要在我的消费代码中的任何地方配置这10分钟,或者这是不可知的?
在我的kafkalistener注解中,我硬编码了主题名和分区。我可以在运行时更改它吗?
非常感谢您的帮助。提前感谢。:)

z18hc3ub

z18hc3ub1#

对的;只有一个人能得到。
它只会停止本地容器—spring对其他示例一无所知。
既然你有 ackOnError=false ,将不提交偏移量。
消费者不需要知道消息发布的频率。
不能在运行时更改它们,但可以使用属性占位符 ${...} 或spel表达式 #{...} 在应用程序初始化期间设置它们。

相关问题