@kafkalistener每次都从头读

uqjltbpv  于 2021-06-07  发布在  Kafka
关注(0)|答案(5)|浏览(606)

我使用下面的示例来使用SpringKafka消费者阅读消息。我的用例要求每次生成一条消息时,监听器都要从头开始读取。

@KafkaListener(
    id = "grouplistener",
    topicPartitions = { 
        @TopicPartition(
            topic = "mycompactedtopic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0")
        )
    }
)

public void onReceiving(
    String payload, @Header(KafkaHeaders.OFFSET) Integer offset,
    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
    @Header(KafkaHeaders.RECEIVED_TOPIC) String topic
) {
    log.info(
        "Processing topic = {}, partition = {}, offset = {}, payload= {}",
        topic, partition, offset, payload
    );
}

我似乎只能让它在应用程序启动时从一开始就读取,然后它通常只消耗向前的消息。
有没有办法强迫它每次都开始?

yqhsw0fo

yqhsw0fo1#

下面我将如何实现它。你需要实施 ConsumerSeekAware 接口并在上执行一些实现 onPartitionsAssigned 方法。如果在重新启动应用程序时发送环境变量,也可以按需进行seekToBeging。不过我还没实现呢!

@Service
@EnableKafka
public class Service implements ConsumerSeekAware {

    @KafkaListener(topics = "${topicName}", groupId = "${groupId}")
    public void listen(@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                       @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts,
                       @Payload List<String> messageBatch
    ) {
            //do a bunch of stuff
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        String topic= Optional.ofNullable(System.getProperty(TOPIC_NAME)).orElseThrow(()->new RuntimeException("topicName needs to be set"));
        assignments.keySet().stream().filter(partition->topic.equals(partition.topic()))
                .forEach(partition -> callback.seekToBeginning(topic, partition.partition()));
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {}

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {}
}
pkbketx9

pkbketx92#

@KafkaListener(topicPartitions 
          = @TopicPartition(topic = "test", partitionOffsets = {
          @PartitionOffset(partition = "0", initialOffset = "0")}),groupId = "foo",
        containerFactory = "kafkaListenerContainerFactory")
public void listenAllMsg(@Payload String message,@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
    System.out.println(" all msg Received Messasge in group 'foo': " + message+"RECEIVED_PARTITION_ID - "+partition);

}

Kafka2.3.1

wqsoz72f

wqsoz72f3#

使用具有一个分区的压缩主题来保存配置列表。然后需要由rest端点调用它,它应该显示配置的完整唯一列表
实现这一点的方法应该是使用kafka流和ktable,并在rest层后面设置交互式查询。不是一个标准的消费者,需要倒带自己获得最新的系统状态。
kafka connect框架中已经存在这样一个示例,其中有一个配置主题,您只能访问 GET /connectors/name/config ,并且只有重新启动它或扩展到更多示例时,它才会再次消耗所有消息。schema registry也是一个例子,它存储数据库中所有模式的内部hashmap _schemas 主题,并具有用于读取、插入和删除的RESTAPI
本质上,当您为给定的键获得新配置时,您可以用一个全新的键“替换”给定键的旧值,或者以某种方式将旧值与新数据“合并”。

g6baxovj

g6baxovj4#

我认为你应该试着写一个consumersekawarelistener,每次你读到消息的时候都要把偏移量设为0。听起来像是疯狂的解决方法,但它可能会有所帮助。希望这能帮到你:-)

class Listener implements ConsumerSeekAware {

 private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();

   ----Override all methods that are needed----

@KafkaListener(...)
    public void listen(@Payload String message) {

            this.seekCallBack.get().seek(topic, partition, 0);
        }
    }
}
irtuqstp

irtuqstp5#

@所以这是一个纯java实现。我不确定它是否符合你的需要。因此,基本上我提交了一个0的偏移量(意思是,即使我读了Kafka的主题,我回到偏移量是在开始)。我不确定你是否考虑过这个实现,但请让我知道,如果这是你正在寻找的
省去commitcountobj。这不是你需要的。所以默认情况下,offsetmap会有下一个这样的偏移记录,
offsetmap.put(new topicpartition(record.topic(),record.partition()),new offsetandmetadata(record.offset()+1,“some commit success message”);
但是对于您的用例,我做了一些修改,当消费者没有重新启动时,它工作得很好
offsetmap.put(new topicpartition(record.topic(),record.partition()),new offsetandmetadata(0,“未提交完成”);

public class KafkaConsumerClass {

    private static final org.apache.log4j.Logger log = org.apache.log4j.Logger.getLogger(KafkaConsumerClass.class);
    private CommitCountClass commitCountobj = new CommitCountClass();

    public Consumer<String, List<FeedBackConsumerClass>> createConsumer() {
        Map<String, Object> consumerProps = new HashMap<String, Object>();
        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:7070,localhost:7072");
        consumerProps.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 50000);
        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "first_group-client1");
        // consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "first_group");
        // consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, KafkaConsumerInterceptor.class);
        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        consumerProps.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1500);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        return new KafkaConsumer<String, List<FeedBackConsumerClass>>(consumerProps);
    }

    public void consumeRecord() {
        log.info("Coming inside consumer consumer");
        ArrayList<String> topicList = new ArrayList<String>();
        topicList.add("topic1");
        commitCountobj.setCount(0);
        Consumer<String, List<FeedBackConsumerClass>> kafkaConsumer = createConsumer();
        kafkaConsumer.subscribe(topicList);
        log.info("after subscribing");

        Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();

        while (true) {

            ConsumerRecords<String, List<FeedBackConsumerClass>> recordList = kafkaConsumer.poll(Long.MAX_VALUE);
            // kafkaConsumer.seekToBeginning(kafkaConsumer.assignment());

            log.info("Inside while loop:" + recordList);
            if (!recordList.isEmpty()) {
                recordList.forEach(record -> {
                    int i = 0;
                    System.out.println(record.toString());
                    // we can make the call to the API here
                    // call the db here or any API and process the record
                    // then call the code to commit
                    // since the commit is switched off, it becomes a developers responsibility to do the auto commit
                    offsetMap.put(new TopicPartition(record.topic(), record.partition()),
                            new OffsetAndMetadata(0, "no metadata/offset commited"));
                    // here we are incrementing the offsetMap so that we are making sure we are storing the
                    // next set of offsets in the map
                    if (commitCountobj.getCount() % 1000 == 0) {
                        kafkaConsumer.commitAsync(offsetMap, new OffsetCommitCallback() {

                            @Override
                            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
                                    Exception exception) {
                                // TODO Auto-generated method stub
                                if (exception != null) {
                                    // retry it now with a sync
                                    // possibility of error occuring here as well
                                    // so capture the exception and exit the consumer gracefully
                                    kafkaConsumer.commitSync();
                                    log.error(exception.getMessage());
                                }
                            }
                        });
                    }
                    commitCountobj.setCount(i++);
                });
            }

        }
    }

}

相关问题