我试图解析suricata,logstash和opensearch日志与字典过滤器
下面是我配置的一部分
input {
file {
path =\> "/opt/logs/opensearchTest/opensearch_server.json"
codec =\> "json"
type =\> "opensearch_log"
}
}
filter {
if \[type\] == "opensearch_log" {
if \[level\] == "ERROR" {
mutate {
add_field =\> { "translated_values" =\> "\*\*\*\*\*\*\*\*\*\* " }
}
} else {
translate {
source =\> "message"
target =\> "translated_field"
dictionary =\> {
"***" =\> "***"
"**" =\> "**"
"***" =\> "***\*\*\*\*\*\*\*"
}
fallback =\> ""
}
mutate {
add_field => { "translated_values" => "%{translated_field}" }
remove_field => [ "message", "translated_field" ]
}
}
}
}
output {
else if \[type\] == "logstash_log" or \[type\] == "suricata_log" or \[type\] == "opensearch_log" {
if \[translated_values\] {
http {
http_method =\> "post"
url =\> "http://\*\*\*\*\*\*\*\*\*\*\*\*/"
format =\> "form"
headers => {
"Authorization" => "Token *********"
"Content-Type" => "application/json"
}
mapping => ["action_result", "True", "action_type", "add_audit", "comment", "%{translated_values}"]
}
}
}
}
我已经为logstash和suricata使用了相同的模式,这完全可以工作,但是对于opensearch日志,我收到了这个错误:JSON parse error, original data now in message field {:message=>"incompatible json object type=java.lang.String , only hash map or arrays are supported", :exception=>LogStash::Json::ParserError, :data=>"\"type\": \"server\", \"timestamp\": \"2023-10-19T00:00:00,834+03:00\", \"level\": \"INFO\", \"component\": \"o.o.c.m.MetadataUpdateSettingsService\", \"cluster.name\": \"opensearch\", \"node.name\": \"astra-altar-wazuh\", \"message\": \"updating number_of_replicas to [0] for indices [wazuh-monitoring-2023.42w]\", \"cluster.uuid\": \"7wgF5XaTRuqjNBptO52m-g\", \"node.id\": \"JBRbEq1VTey6vZjMxcWPfQ\"
以下是opensearch日志示例:{"type": "server", "timestamp": "2023-10-19T11:33:04,042+03:00", "level": "INFO", "component": "o.o.p.PluginsService", "cluster.name": "opensearch", "node.name": "astra-altar-wazuh", "message": "PluginService:onIndexModule index:[wazuh-monitoring-2023.37w/mbsyMgM7STWGQ6zkR6FdHw]", "cluster.uuid": "7wgF5XaTRuqjNBptO52m-g", "node.id": "JBRbEq1VTey6vZjMxcWPfQ" } {"type": "server", "timestamp": "2023-10-19T11:33:04,045+03:00", "level": "INFO", "component": "o.o.p.PluginsService", "cluster.name": "opensearch", "node.name": "astra-altar-wazuh", "message": "PluginService:onIndexModule index:[.kibana_1/0XetZAI8QqSmtSAOuOrsxg]", "cluster.uuid": "7wgF5XaTRuqjNBptO52m-g", "node.id": "JBRbEq1VTey6vZjMxcWPfQ" } {"type": "server", "timestamp": "2023-10-19T11:33:04,123+03:00", "level": "INFO", "component": "o.o.c.r.a.AllocationService", "cluster.name": "opensearch", "node.name": "astra-altar-wazuh", "message": "Cluster health status changed from [RED] to [YELLOW] (reason: [shards started [[.kibana_1][0]]]).", "cluster.uuid": "7wgF5XaTRuqjNBptO52m-g", "node.id": "JBRbEq1VTey6vZjMxcWPfQ" } {"type": "server", "timestamp": "2023-10-19T11:34:25,979+03:00", "level": "INFO", "component": "o.o.n.Node", "cluster.name": "opensearch", "node.name": "astra-altar-wazuh", "message": "stopping ...", "cluster.uuid": "7wgF5XaTRuqjNBptO52m-g", "node.id": "JBRbEq1VTey6vZjMxcWPfQ" }
1条答案
按热度按时间krcsximq1#
您遇到的错误消息表明Logstash试图解析JSON的数据不是有效的JSON。
在您的示例中,您所显示的OpenSearch日志似乎是每行多个JSON对象,这不是有效的JSON格式。
Logstash的JSON编解码器期望每行包含一个JSON对象。您需要调整输入,以便OpenSearch日志的每一行都包含一个JSON对象。
实现此目的的一种方法是在Logstash中使用多行过滤器来合并合并行,直到形成完整的JSON对象,然后使用JSON编解码器。
您可以使用以下配置:
在此配置中,多行编解码器用于将以“{"(指示JSON对象的开始)开头的行组合为单个事件,然后应用JSON编解码器来解析组合的事件。