我在寻找Kafka流事件。我尝试为Kakfa-Streams添加一个拦截器(针对消费者)。
我添加了一个RecordInterceptor如下:
configMap.put(consumerPrefix(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG), "com.package.to.interceptor.MyCustomRecordInterceptor");
但是在启动过程中出现了一个错误:
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
Caused by: java.lang.ClassCastException: class com.package.to.interceptor.MyCustomRecordInterceptor
如果我添加一个拦截器,org.apache.kafka.clients.consumer.ConsumerInterceptor
。
但我需要一个记录拦截器。
我的问题是,有没有一种方法可以将RecordInterceptor实现作为消费者拦截器添加到Kafka-streams?任何帮助都非常感谢。
2条答案
按热度按时间whlutmcx1#
我需要一个RecordInterceptor
RecordInterceptor是一个spring-kafka接口,而不是普通的Kafka API
消费者将只接受ConsumerInterceptor的实现,这可能是铸造失败的原因。你的代码是正确的,否则。
Producers只接受ProducerInterceptor,您可以在Streams config/map中使用producerPrefix
34gzjxbg2#
Kafa Stream在内部使用生产者和消费者,所以你必须给属性加上前缀。
消费
生产者