json解析opensearch日志时出错

qacovj5a  于 2023-10-21  发布在  其他
关注(0)|答案(1)|浏览(283)

我试图解析suricata,logstash和opensearch日志与字典过滤器
下面是我配置的一部分

input {
file {
path =\> "/opt/logs/opensearchTest/opensearch_server.json"
codec =\> "json"
type =\> "opensearch_log"
}
}
filter {
if \[type\] == "opensearch_log" {
if \[level\] == "ERROR" {
mutate {
add_field =\> { "translated_values" =\> "\*\*\*\*\*\*\*\*\*\* " }
}
} else {
translate {
source =\> "message"
target =\> "translated_field"
dictionary =\> {
"***" =\> "***"
"**" =\> "**"
"***" =\> "***\*\*\*\*\*\*\*"
}
fallback =\> ""
}

      mutate {
        add_field => { "translated_values" => "%{translated_field}" }
        remove_field => [ "message", "translated_field" ]
      }
    }

}
}
output {
else if \[type\] == "logstash_log" or \[type\] == "suricata_log" or \[type\] == "opensearch_log" {
if \[translated_values\] {
http {
http_method =\> "post"
url =\> "http://\*\*\*\*\*\*\*\*\*\*\*\*/"
format =\> "form"

        headers => {
          "Authorization" => "Token *********"
          "Content-Type" => "application/json"
        }
    
        mapping => ["action_result", "True", "action_type", "add_audit", "comment", "%{translated_values}"]
      }
    }

}
}

我已经为logstash和suricata使用了相同的模式,这完全可以工作,但是对于opensearch日志,我收到了这个错误:
JSON parse error, original data now in message field {:message=>"incompatible json object type=java.lang.String , only hash map or arrays are supported", :exception=>LogStash::Json::ParserError, :data=>"\"type\": \"server\", \"timestamp\": \"2023-10-19T00:00:00,834+03:00\", \"level\": \"INFO\", \"component\": \"o.o.c.m.MetadataUpdateSettingsService\", \"cluster.name\": \"opensearch\", \"node.name\": \"astra-altar-wazuh\", \"message\": \"updating number_of_replicas to [0] for indices [wazuh-monitoring-2023.42w]\", \"cluster.uuid\": \"7wgF5XaTRuqjNBptO52m-g\", \"node.id\": \"JBRbEq1VTey6vZjMxcWPfQ\"
以下是opensearch日志示例:
{"type": "server", "timestamp": "2023-10-19T11:33:04,042+03:00", "level": "INFO", "component": "o.o.p.PluginsService", "cluster.name": "opensearch", "node.name": "astra-altar-wazuh", "message": "PluginService:onIndexModule index:[wazuh-monitoring-2023.37w/mbsyMgM7STWGQ6zkR6FdHw]", "cluster.uuid": "7wgF5XaTRuqjNBptO52m-g", "node.id": "JBRbEq1VTey6vZjMxcWPfQ" } {"type": "server", "timestamp": "2023-10-19T11:33:04,045+03:00", "level": "INFO", "component": "o.o.p.PluginsService", "cluster.name": "opensearch", "node.name": "astra-altar-wazuh", "message": "PluginService:onIndexModule index:[.kibana_1/0XetZAI8QqSmtSAOuOrsxg]", "cluster.uuid": "7wgF5XaTRuqjNBptO52m-g", "node.id": "JBRbEq1VTey6vZjMxcWPfQ" } {"type": "server", "timestamp": "2023-10-19T11:33:04,123+03:00", "level": "INFO", "component": "o.o.c.r.a.AllocationService", "cluster.name": "opensearch", "node.name": "astra-altar-wazuh", "message": "Cluster health status changed from [RED] to [YELLOW] (reason: [shards started [[.kibana_1][0]]]).", "cluster.uuid": "7wgF5XaTRuqjNBptO52m-g", "node.id": "JBRbEq1VTey6vZjMxcWPfQ" } {"type": "server", "timestamp": "2023-10-19T11:34:25,979+03:00", "level": "INFO", "component": "o.o.n.Node", "cluster.name": "opensearch", "node.name": "astra-altar-wazuh", "message": "stopping ...", "cluster.uuid": "7wgF5XaTRuqjNBptO52m-g", "node.id": "JBRbEq1VTey6vZjMxcWPfQ" }

krcsximq

krcsximq1#

您遇到的错误消息表明Logstash试图解析JSON的数据不是有效的JSON。
在您的示例中,您所显示的OpenSearch日志似乎是每行多个JSON对象,这不是有效的JSON格式。
Logstash的JSON编解码器期望每行包含一个JSON对象。您需要调整输入,以便OpenSearch日志的每一行都包含一个JSON对象。
实现此目的的一种方法是在Logstash中使用多行过滤器来合并合并行,直到形成完整的JSON对象,然后使用JSON编解码器。
您可以使用以下配置:

input {
  file {
    path => "/opt/logs/opensearchTest/opensearch_server.json"
    type => "opensearch_log"
    codec => multiline {
      pattern => "^\{"
      negate => true
      what => "previous"
    }
  }
}

filter {
  if [type] == "opensearch_log" {
    json {
      source => "message"
      target => "parsed_log"
    }
    if [level] == "ERROR" {
      mutate {
        add_field => { "translated_values" => "*********" }
      }
    } else {
      translate {
        source => "[parsed_log][message]"
        target => "translated_field"
        dictionary => {
          "***" => "***"
          "**" => "**"
          "***" => "*** *******"
        }
        fallback => ""
      }
      mutate {
        add_field => { "translated_values" => "%{translated_field}" }
        remove_field => [ "message", "translated_field", "parsed_log" ]
      }
    }
  }
}

output {
  if [type] == "opensearch_log" {
    if [translated_values] {
      http {
        http_method => "post"
        url => "http://*********/"
        format => "form"
        headers => {
          "Authorization" => "Token *********"
          "Content-Type" => "application/json"
        }
        mapping => ["action_result", "True", "action_type", "add_audit", "comment", "%{translated_values}"]
      }
    }
  }
}

在此配置中,多行编解码器用于将以“{"(指示JSON对象的开始)开头的行组合为单个事件,然后应用JSON编解码器来解析组合的事件。

相关问题