有一个批处理主题( Json
内容)在Kafka,必须通过消费 NiFi(version 1.8)
. 我可以使用 consumekafkarecord
处理器,但希望根据属性值进行筛选,因为我不需要该主题中的所有记录。
在使用Kafka主题时可以进行过滤吗 even before getting the records into NiFi
? 最好的方法是什么 Processors
或者 Scripts
使用?
我只想根据其中一个属性值过滤掉大量的记录,因为它们是不需要的。
1条答案
按热度按时间ryoqjall1#
据我所知,没有一种方法可以过滤consumekafkarecord中的记录,但在使用该处理器之后,您就可以很容易地做到这一点。
一种方法是将它连接到queryrecord处理器,并编写一个sql语句来选择您感兴趣的记录。
第二个选项是使用partitionrecord,它允许您基于记录路径表达式对记录进行分区。所以你可以说field1上的partition,如果有两个值,比如a和b,那么它会产生两个流文件,一个包含所有a的记录,一个包含所有b的记录,然后你只需要路由你感兴趣的一个,然后把另一个发送到死胡同。