访问偏移量从特定偏移量开始消耗的时间

qf9go6mv  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(471)

我有一个相当直截了当的Kafka消费者:

MessageListener<String, T> messageListener = record -> {

    doStuff( record.value()));
  };

  startConsumer(messageListener);

protected void startConsumer(MessageListener<String, T> messageListener) {
ConcurrentMessageListenerContainer<String, T> container = new ConcurrentMessageListenerContainer<>(
    consumerFactory(this.brokerAddress, this.groupId),
    containerProperties(this.topic, messageListener));

   container.start();
}

我可以毫无问题地使用消息。现在,我有个要求 seek 从基于调用的结果的特定偏移量 offsetsForTimes 关于Kafka消费者。
我明白我可以 seek 使用 ConsumerSeekAware 接口:

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
        ConsumerSeekCallback callback) {

      assignments.forEach((t, o) -> callback.seek(t.topic(), t.partition(), ?????));
}

现在的问题是,我没有访问Kafka消费者内部的回调,因此我没有办法调用 offsetsForTimes .
有没有其他方法可以达到这个目的?

ne5o7dgx

ne5o7dgx1#

使用 ConsumerAwareRebalanceListener 进行初始搜索(在2.0中引入)。
当前版本为2.2.0。
如何测试consumerawarerebalancelistener?

相关问题