使用logstash过滤器操作来自kafka主题的json消息

klsxnrf1  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(402)

我正在使用logstash2.4读取来自kafka主题的json消息,并将它们发送到elasticsearch索引。
json格式如下-

{
   "schema":
             {
            "type": "struct",
        "fields": [
                    {
                   "type":"string",
                   "optional":false,
                   "field":"reloadID"
                },
                {
                   "type":"string",
                   "optional":false,
                   "field":"externalAccountID"
                },
                {
                   "type":"int64",
                   "optional":false,
                   "name":"org.apache.kafka.connect.data.Timestamp",
                   "version":1,
                   "field":"reloadDate"
                },
                {
                   "type":"int32",
                   "optional":false,
                   "field":"reloadAmount"
                },
                {
                   "type":"string",
                   "optional":true,
                   "field":"reloadChannel"
                }
              ],
        "optional":false,
        "name":"reload"
         },
   "payload":
             {
            "reloadID":"328424295",
        "externalAccountID":"9831200013",
        "reloadDate":1446242463000,
        "reloadAmount":240,
        "reloadChannel":"C1"
         }
}

在我的配置文件中没有任何过滤器,es索引中的目标文档如下所示--

{
  "_index" : "kafka_reloads",
  "_type" : "logs",
  "_id" : "AVfcyTU4SyCFNFP2z5-l",
  "_score" : 1.0,
  "_source" : {
    "schema" : {
      "type" : "struct",
      "fields" : [ {
        "type" : "string",
        "optional" : false,
        "field" : "reloadID"
      }, {
        "type" : "string",
        "optional" : false,
        "field" : "externalAccountID"
      }, {
        "type" : "int64",
        "optional" : false,
        "name" : "org.apache.kafka.connect.data.Timestamp",
        "version" : 1,
        "field" : "reloadDate"
      }, {
        "type" : "int32",
        "optional" : false,
        "field" : "reloadAmount"
      }, {
        "type" : "string",
        "optional" : true,
        "field" : "reloadChannel"
      } ],
      "optional" : false,
      "name" : "reload"
    },
    "payload" : {
      "reloadID" : "155559213",
      "externalAccountID" : "9831200014",
      "reloadDate" : 1449529746000,
      "reloadAmount" : 140,
      "reloadChannel" : "C1"
    },
    "@version" : "1",
    "@timestamp" : "2016-10-19T11:56:09.973Z",
  }
}

但是,我只希望“payload”字段的值部分作为目标json主体移动到es索引。所以我尝试在配置文件中使用“mutate”过滤器,如下所示--

input {
   kafka {
            zk_connect => "zksrv-1:2181,zksrv-2:2181,zksrv-4:2181"
            group_id => "logstash"
            topic_id => "reload"
            consumer_threads => 3
   }
}
filter {
  mutate {
     remove_field => [ "schema","@version","@timestamp" ]
  }
}
output {
   elasticsearch {
                    hosts => ["datanode-6:9200","datanode-2:9200"]
                    index => "kafka_reloads"
   }
}

使用此筛选器,es文档现在如下所示--

{
      "_index" : "kafka_reloads",
      "_type" : "logs",
      "_id" : "AVfch0yhSyCFNFP2z59f",
      "_score" : 1.0,
      "_source" : {
        "payload" : {
          "reloadID" : "850846698",
          "externalAccountID" : "9831200013",
          "reloadDate" : 1449356706000,
          "reloadAmount" : 30,
          "reloadChannel" : "C1"
        }
      }
}

但实际上应该是这样的——

{
      "_index" : "kafka_reloads",
      "_type" : "logs",
      "_id" : "AVfch0yhSyCFNFP2z59f",
      "_score" : 1.0,
      "_source" : {
          "reloadID" : "850846698",
          "externalAccountID" : "9831200013",
          "reloadDate" : 1449356706000,
          "reloadAmount" : 30,
          "reloadChannel" : "C1"
      }
}

有办法吗?有人能帮我吗?
我也试过下面的过滤器--

filter {
   json {
      source => "payload"
   }
}

但这给了我一些错误,比如——
分析json时出错{:source=>“payload”,:raw=>{“reloadid”=>“572584696”,“externalaccountid”=>“9831200011”,“reloaddate”=>1449093851000,“reloadamount”=>180,“reloadchannel”=>“c1”},:exception=>java.lang.classcastexception:org.jruby.rubyhash不能转换为org.jruby.rubyo,:level=>:warn}
任何帮助都将不胜感激。
谢谢gautam ghosh

wfypjpf4

wfypjpf41#

您可以使用以下方法实现您想要的 ruby 过滤器:

ruby {
     code => "
        event.to_hash.delete_if {|k, v| k != 'payload'}
        event.to_hash.update(event['payload'].to_hash)
        event.to_hash.delete_if {|k, v| k == 'payload'}
     "
  }

它的作用是:
删除除 payload
全部复制 payload 根级别的内部字段
删除 payload 场本身
你会得到你需要的。

5kgi1eie

5kgi1eie2#

已经有一段时间了,但这里有一个有效的解决方法,希望它会有用。

json_encode {
  source => "json"
  target => "json_string"
}

json {
  source => "json_string"
}

相关问题