我是Kafka的新人。我已经建立了我的环境来生成和使用记录。但是,我的目标是截取记录并在将其发送给目标消费者之前修改其值。现在,为了确保环境被很好地设置为拦截记录,我正在编写一个简单的consumerinterceptor,它将拦截记录并打印它们的值。configure()方法应该实现什么来启用consumerinterceptor?我应该添加/修改哪些其他配置以及在哪里?
public class SimpleConsumerInterceptors<K, V> implements ConsumerInterceptor<K, V>{
private String clientId;
public void configure(final Map<String, ?> configs) {
// What configurations required to enable my consumerInterceptor?
}
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
ConsumerRecords<K, V> interceptRecords = records;
for (TopicPartition partition : records.partitions()) {
String topic = partition.topic();
List<ConsumerRecord<K, V>> recordsInPartition = records.records(partition);
for (ConsumerRecord<K, V> record : recordsInPartition) {
System.out.println("onConsume:");
System.out.println(record.value());
}
}
return interceptRecords;
}
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
System.out.println("onCommit")
}
@Override
public void close() {
System.out.println("close")
this.close();
}
}
1条答案
按热度按时间mccptt671#
只需将类名添加到
interceptor.classes
消费者财产,ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG
.你只需要实现
configure()
如果您想在kafka创建示例之后配置拦截器。传递到方法中的Map是使用者属性的Map。
所以,假设您想使日志记录成为可选的;添加
my.interceptor.logging.enabled=true
并在configure()
方法来配置是否记录。