用于Kafka-Streams消费者的RecordInterceptor

ohfgkhjo  于 2023-10-15  发布在  Apache
关注(0)|答案(2)|浏览(123)

我在寻找Kafka流事件。我尝试为Kakfa-Streams添加一个拦截器(针对消费者)。
我添加了一个RecordInterceptor如下:

configMap.put(consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), "com.package.to.interceptor.MyCustomRecordInterceptor");

但是在启动过程中出现了一个错误:

Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
Caused by: java.lang.ClassCastException: class com.package.to.interceptor.MyCustomRecordInterceptor

如果我添加一个拦截器,
org.apache.kafka.clients.consumer.ConsumerInterceptor
但我需要一个记录拦截器。
我的问题是,有没有一种方法可以将RecordInterceptor实现作为消费者拦截器添加到Kafka-streams?任何帮助都非常感谢。

whlutmcx

whlutmcx1#

我需要一个RecordInterceptor
RecordInterceptor是一个spring-kafka接口,而不是普通的Kafka API
消费者将只接受ConsumerInterceptor的实现,这可能是铸造失败的原因。你的代码是正确的,否则。
Producers只接受ProducerInterceptor,您可以在Streams config/map中使用producerPrefix

34gzjxbg

34gzjxbg2#

Kafa Stream在内部使用生产者和消费者,所以你必须给属性加上前缀。
消费

configMap.put(StreamsConfig.consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG),"com.package.to.interceptor.MyCustomRecordInterceptor")

生产者

configMap.put(StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG),"com.package.to.interceptor.MyCustomRecordProducerInterceptor")

相关问题