spring kafka-如何用组id将偏移量重置为最新的?

sz81bmfz  于 2021-06-07  发布在  Kafka
关注(0)|答案(4)|浏览(515)

我目前正在使用spring集成kafka进行实时统计。不过,组名使kafka搜索侦听器以前没有读取的所有值。

@Value("${kafka.consumer.group.id}")
private String consumerGroupId;

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(getDefaultProperties());
}

public Map<String, Object> getDefaultProperties() {
    Map<String, Object> properties = new HashMap<>();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);

    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    return properties;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

@Bean
public KafkaMessageListener listener() {
    return new KafkaMessageListener();
}

我想开始最新的抵消,而不是被旧的价值观困扰。是否有可能重置组的偏移量?

2ledvvac

2ledvvac1#

对于Kafka中没有初始偏移的新消费群体,可以设置 AUTO_OFFSET_RESET_CONFIG :

properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group-id");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

对于现有消费群体,您可以:
更改组id以显示为新的,即。 consumer-group-id-v2 实施 ConsumerSeekAware 因此,您可以在初始化过程中寻找所需的偏移量

nhaq1z21

nhaq1z212#

可以使用partitionoffsets注解以精确偏移开始,例如:

@KafkaListener(id = "bar", topicPartitions =
    { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
      @TopicPartition(topic = "topic2", partitions = "0",
         partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
    })public void listen(ConsumerRecord<?, ?> record) {
     }
ma8fv8wu

ma8fv8wu3#

你可以设置一个 ConsumerRebalanceListener 对于kafka消费者,当您订阅某些主题时,您可以通过 KafkaConsumer.endOffsets() 方法,并将其设置为consumer by KafkaConsumer.seek() 方法,如下所示:

kafkaConsumer.subscribe(Collections.singletonList(topics),
    new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            //do nothing
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            //get and set the lastest offset for each partiton
            kafkaConsumer.endOffsets(partitions) 
                .forEach((partition, offset) -> kafkaConsumer.seek(partition, offset));
        }
    }
);
uurv41yg

uurv41yg4#

因为我没有看到任何这样的例子,我要解释一下我是怎么做到的。
你的班级 @KafkaListener 必须实施 ConsumerSeekAware 类,该类将允许侦听器在对分区进行属性化时控制偏移量查找(资料来源:https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek )

public class KafkaMessageListener implements ConsumerSeekAware {
    @KafkaListener(topics = "your.topic")
    public void listen(byte[] payload) {
        // ...
    }

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {

    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.forEach((t, o) -> callback.seekToEnd(t.topic(), t.partition()));
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {

    }
}

在这里,在重新平衡中,我们使用给定的回调来寻找所有给定主题的最后一个偏移量。多亏了阿尔特姆比兰(https://stackoverflow.com/users/2756547/artem-bilan )为了指引我找到答案。

相关问题