kafka consumerinterceptor所需的配置

dldeef67  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(890)

我是Kafka的新人。我已经建立了我的环境来生成和使用记录。但是,我的目标是截取记录并在将其发送给目标消费者之前修改其值。现在,为了确保环境被很好地设置为拦截记录,我正在编写一个简单的consumerinterceptor,它将拦截记录并打印它们的值。configure()方法应该实现什么来启用consumerinterceptor?我应该添加/修改哪些其他配置以及在哪里?

public class SimpleConsumerInterceptors<K, V> implements ConsumerInterceptor<K, V>{
    private String clientId;

    public void configure(final Map<String, ?> configs) {

// What configurations required to enable my consumerInterceptor?

    }

    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
        ConsumerRecords<K, V> interceptRecords = records;
        for (TopicPartition partition : records.partitions()) {
            String topic = partition.topic();
            List<ConsumerRecord<K, V>> recordsInPartition = records.records(partition);
            for (ConsumerRecord<K, V> record : recordsInPartition) {
                System.out.println("onConsume:");
                System.out.println(record.value());
            }
        }

        return interceptRecords;
    }

    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        System.out.println("onCommit")
    }

    @Override
    public void close() {
        System.out.println("close")
        this.close();
    }
}
mccptt67

mccptt671#

只需将类名添加到 interceptor.classes 消费者财产, ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG .
你只需要实现 configure() 如果您想在kafka创建示例之后配置拦截器。
传递到方法中的Map是使用者属性的Map。
所以,假设您想使日志记录成为可选的;添加 my.interceptor.logging.enabled=true 并在 configure() 方法来配置是否记录。

相关问题