我有一个相当直截了当的Kafka消费者:
MessageListener<String, T> messageListener = record -> {
doStuff( record.value()));
};
startConsumer(messageListener);
protected void startConsumer(MessageListener<String, T> messageListener) {
ConcurrentMessageListenerContainer<String, T> container = new ConcurrentMessageListenerContainer<>(
consumerFactory(this.brokerAddress, this.groupId),
containerProperties(this.topic, messageListener));
container.start();
}
我可以毫无问题地使用消息。现在,我有个要求 seek
从基于调用的结果的特定偏移量 offsetsForTimes
关于Kafka消费者。
我明白我可以 seek
使用 ConsumerSeekAware
接口:
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.forEach((t, o) -> callback.seek(t.topic(), t.partition(), ?????));
}
现在的问题是,我没有访问Kafka消费者内部的回调,因此我没有办法调用 offsetsForTimes
.
有没有其他方法可以达到这个目的?
1条答案
按热度按时间ne5o7dgx1#
使用
ConsumerAwareRebalanceListener
进行初始搜索(在2.0中引入)。当前版本为2.2.0。
如何测试consumerawarerebalancelistener?