我正在使用logstash2.4读取来自kafka主题的json消息,并将它们发送到elasticsearch索引。
json格式如下-
{
"schema":
{
"type": "struct",
"fields": [
{
"type":"string",
"optional":false,
"field":"reloadID"
},
{
"type":"string",
"optional":false,
"field":"externalAccountID"
},
{
"type":"int64",
"optional":false,
"name":"org.apache.kafka.connect.data.Timestamp",
"version":1,
"field":"reloadDate"
},
{
"type":"int32",
"optional":false,
"field":"reloadAmount"
},
{
"type":"string",
"optional":true,
"field":"reloadChannel"
}
],
"optional":false,
"name":"reload"
},
"payload":
{
"reloadID":"328424295",
"externalAccountID":"9831200013",
"reloadDate":1446242463000,
"reloadAmount":240,
"reloadChannel":"C1"
}
}
在我的配置文件中没有任何过滤器,es索引中的目标文档如下所示--
{
"_index" : "kafka_reloads",
"_type" : "logs",
"_id" : "AVfcyTU4SyCFNFP2z5-l",
"_score" : 1.0,
"_source" : {
"schema" : {
"type" : "struct",
"fields" : [ {
"type" : "string",
"optional" : false,
"field" : "reloadID"
}, {
"type" : "string",
"optional" : false,
"field" : "externalAccountID"
}, {
"type" : "int64",
"optional" : false,
"name" : "org.apache.kafka.connect.data.Timestamp",
"version" : 1,
"field" : "reloadDate"
}, {
"type" : "int32",
"optional" : false,
"field" : "reloadAmount"
}, {
"type" : "string",
"optional" : true,
"field" : "reloadChannel"
} ],
"optional" : false,
"name" : "reload"
},
"payload" : {
"reloadID" : "155559213",
"externalAccountID" : "9831200014",
"reloadDate" : 1449529746000,
"reloadAmount" : 140,
"reloadChannel" : "C1"
},
"@version" : "1",
"@timestamp" : "2016-10-19T11:56:09.973Z",
}
}
但是,我只希望“payload”字段的值部分作为目标json主体移动到es索引。所以我尝试在配置文件中使用“mutate”过滤器,如下所示--
input {
kafka {
zk_connect => "zksrv-1:2181,zksrv-2:2181,zksrv-4:2181"
group_id => "logstash"
topic_id => "reload"
consumer_threads => 3
}
}
filter {
mutate {
remove_field => [ "schema","@version","@timestamp" ]
}
}
output {
elasticsearch {
hosts => ["datanode-6:9200","datanode-2:9200"]
index => "kafka_reloads"
}
}
使用此筛选器,es文档现在如下所示--
{
"_index" : "kafka_reloads",
"_type" : "logs",
"_id" : "AVfch0yhSyCFNFP2z59f",
"_score" : 1.0,
"_source" : {
"payload" : {
"reloadID" : "850846698",
"externalAccountID" : "9831200013",
"reloadDate" : 1449356706000,
"reloadAmount" : 30,
"reloadChannel" : "C1"
}
}
}
但实际上应该是这样的——
{
"_index" : "kafka_reloads",
"_type" : "logs",
"_id" : "AVfch0yhSyCFNFP2z59f",
"_score" : 1.0,
"_source" : {
"reloadID" : "850846698",
"externalAccountID" : "9831200013",
"reloadDate" : 1449356706000,
"reloadAmount" : 30,
"reloadChannel" : "C1"
}
}
有办法吗?有人能帮我吗?
我也试过下面的过滤器--
filter {
json {
source => "payload"
}
}
但这给了我一些错误,比如——
分析json时出错{:source=>“payload”,:raw=>{“reloadid”=>“572584696”,“externalaccountid”=>“9831200011”,“reloaddate”=>1449093851000,“reloadamount”=>180,“reloadchannel”=>“c1”},:exception=>java.lang.classcastexception:org.jruby.rubyhash不能转换为org.jruby.rubyo,:level=>:warn}
任何帮助都将不胜感激。
谢谢gautam ghosh
2条答案
按热度按时间wfypjpf41#
您可以使用以下方法实现您想要的
ruby
过滤器:它的作用是:
删除除
payload
一全部复制
payload
根级别的内部字段删除
payload
场本身你会得到你需要的。
5kgi1eie2#
已经有一段时间了,但这里有一个有效的解决方法,希望它会有用。