我尝试只为德国(de)过滤kafka json消息。要做到这一点,我必须写一个grok表达式。有人能帮我为这个json写一个grok模式吗?
{"table":"ORDERS","type":"I","payload":{"ID":"28112","COUNTRY":"DE","AMT":15.36}}
{"table":"ORDERS","type":"I","payload":{"ID":"28114","COUNTRY":"US","AMT":25.75}}
抱歉,我对这些技术还不熟悉。下面是我的logstash.conf的样子:
input {
kafka {topics => [ "test" ] auto_offset_reset => "earliest" }
}
filter {
grok {
match => { "message" => "?????????" }
if [message] =~ "*COUNTRY*DE*" {
drop{}
}
}
}
output { file { path => "./test.txt" } }
最后我只想向德国提交订单。希望能得到一些帮助,谢谢!
1条答案
按热度按时间2w3kk1z51#
你需要用logstash吗?如果不是,我建议使用一个简单的ksql语句
这将创建一个Kafka主题,该主题从第一个主题流式传输,并且只包含您想要的数据。在kafka主题中,如果希望将其作为处理管道的一部分,可以使用kafka connect将其连接到文件。
在这里阅读一个使用ksql的示例,并在这里进行尝试