我正在使用kafka connect将数据从kafka接收到mongo db。在这个过程中,我尝试使用过滤器过滤一些记录(https://docs.confluent.io/current/connect/transforms/filter-confluent.html). 对于这个过滤器,我们需要给出一个 predicate ,指定json path作为过滤来自kafka的数据的条件。
下面是我从Kafka那里收到的json数据
**Input json:**
{
"EventMetadata": {
"ColumnsUpdated": "FirstName,LastName,Age"
},
"TotalBooking__c": 1
}
我有一个要求,它应该满足下面的条件,如果下面的条件是匹配的Kafka连接应该允许这样的记录插入mongo数据库。
条件:如果totalbooking\uu c>0且eventmetadata.columnsupdated包含任何“firstname”或“lastname”值,则应允许在mongo db中插入此类记录。
我试着使用下面的kafka连接过滤器操作符
[?(@.TotalBooking__c > 0 && @.EventMetadata.ColumnsUpdated =~ /(?=\b\b)\b(FirstName|LastName)\b/)]
但是,只有当输入eventmetadata.columnsupdated值只有“firstname”或“lastname”时,这才有效。如果该值用逗号分隔(“firstname,lastname,age”,如上面的输入json所示),那么它就不起作用了。
请在这个问题上帮助我。
1条答案
按热度按时间vm0i2vca1#
链接的汇合文档揭示了jayway的jsonpath实现在这里起作用,这是一个很好的消息,因为在特性和过滤器语法方面或多或少存在一些细微的差异。
从您的需求来看,我认为您使用regex模式是正确的。
但是,模式需要匹配整个字符串(值),因为
=~
正则表达式与正则表达式查找不匹配。换句话说,我们需要在开头和结尾使用可选文本.*
(或更具体的模式,如果需要):