使用logstash grok过滤kafka json消息

kyxcudwk  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(500)

我尝试只为德国(de)过滤kafka json消息。要做到这一点,我必须写一个grok表达式。有人能帮我为这个json写一个grok模式吗?

{"table":"ORDERS","type":"I","payload":{"ID":"28112","COUNTRY":"DE","AMT":15.36}}
{"table":"ORDERS","type":"I","payload":{"ID":"28114","COUNTRY":"US","AMT":25.75}}

抱歉,我对这些技术还不熟悉。下面是我的logstash.conf的样子:

input { 
  kafka {topics => [ "test" ] auto_offset_reset => "earliest" } 
} 

filter { 
  grok {
    match => { "message" => "?????????" }

  if [message] =~ "*COUNTRY*DE*" { 
    drop{}
  }
}      
}

output { file { path => "./test.txt"  } }

最后我只想向德国提交订单。希望能得到一些帮助,谢谢!

2w3kk1z5

2w3kk1z51#

你需要用logstash吗?如果不是,我建议使用一个简单的ksql语句

CREATE STREAM GERMAN_ORDERS AS SELECT * FROM ORDERS WHERE COUNTRY='DE';

这将创建一个Kafka主题,该主题从第一个主题流式传输,并且只包含您想要的数据。在kafka主题中,如果希望将其作为处理管道的一部分,可以使用kafka connect将其连接到文件。
在这里阅读一个使用ksql的示例,并在这里进行尝试

相关问题