kafka生产者拦截器

mkshixfv  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(415)

我正在尝试添加一个拦截器来验证生产者发布到Kafka主题的消息。除了由kafka主题执行的模式验证之外,我还需要做一些验证。我遵循的步骤如下。
我已经编写了一个java类来扩展producerinterceptor接口。
编译类并创建一个jar文件,该文件放在类路径中包含的文件夹中。
将intercetors.classes=classname添加到kafka安装中的producer.properties中。
但当我将消息发布到主题时,不会调用我编写的自定义拦截器类我没有得到任何错误也。消息被发布到主题中)。
我已经提到https://cwiki.apache.org/confluence/display/kafka/kip-42%3a+add+producer+and+consumer+interceptors
请给我一些建议。

fiei3ece

fiei3ece1#

属性名是interceptor.classes,而不是interceptors.classes

t1qtbnec

t1qtbnec2#

这个问题已经很老了,所以我想你同时也找到了解决办法。然而,为了以防万一,我发现我的 ProducerInterceptor 类,它根据消息的内容将消息分派到不同的主题,除非我的流已经有了指定的输出,否则不会被调用。
我的第一次尝试看起来像这样,因为我认为我不需要指定输出主题。这不起作用:

val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic")

val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()

但事实上:

val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic").through("dummy-output-topic")

val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()

值得一提的是,没有什么可以发表的 dummy-output-topic 在第二个例子中,使用 to 而不是 through 似乎也是这样。
就我而言,我是在 map 在使用拦截器将记录分派到不同的主题之前更改记录,因此我的代码实际上更像这样:

val builder: KStreamBuilder = new KStreamBuilder
val input = builder.stream("input-topic")
    .map(new CustomKeyValueMapper)
    .through("dummy-output-topic")

val stream: KafkaStreams = new KafkaStreams(builder, streamsConfigWithProducerInterceptor)
stream.start()

我希望这些例子能帮助任何与我一起工作的人 ProducerInterceptor 他犯了和我一样的错误。

相关问题