elasticsearch Logstash -解析嵌套KV日志

vu8f3i0k  于 2023-05-06  发布在  ElasticSearch
关注(0)|答案(2)|浏览(149)

我对这种日志有一个问题:

<183>Apr 26 12:53:53 host.example tag_audit_log: type=USER_ACCT msg=audit(1989212.965:15124458): pid=20982 uid=0 auid=427890281 ses=4294967295 subj=system_u:system_r:init_t:s0 msg='op=PAM:accounting grantors=pam_unix acct="root" exe="/usr/lib/systemd/systemd" hostname=? addr=? terminal=? res=success'

我认为最简单的解析方法是使用grok,然后将其传递给KV filter

filter {

  grok {
    match => { "message" => "%{SYSLOG5424PRI}%{SYSLOGTIMESTAMP:date} %{NOTSPACE:hostname} %{NOTSPACE:location}: %{GREEDYDATA:kv_message}" }
    overwrite => [ message ]
  }

  kv {
    source => [ "kv_message" ]
  }
}

除了msg字段中的日志之外,所有内容都解析得很好。KV创建了一个数组:

"msg" => [
[0] "audit(1989212.965:15124458):",
[1] "\\'op=PAM:accounting"
  ]

我想知道如何解析msg[1]内部的进一步消息。我尝试了field_split_pattern,但我不确定如何正确地 Package 它。有些日志不包含这些嵌套字段,因此也必须考虑。

ibrsph3r

ibrsph3r1#

我建议配置kv过滤器以删除单引号,并使用以下配置递归解析第二个msg字段中的值:

kv {
  source => [ "kv_message" ]
  remove_char_value => "\\'"
  recursive => true
}

此外,还可以重命名msg数组元素

mutate {
  rename => {
    "[msg][0]" => "[msg]"
    "[msg][1]" => "[sub_msg]"
  }
}

你会得到这样的东西:

{
    "@timestamp": "2023-05-05T09:18:08.410Z",
    "@version": "1",
    "auid": "427890281",
    "date": "Apr 26 12:53:53",
    "host": "iMac.local",
    "hostname": "host.example",
    "location": "tag_audit_log",
    "msg": "audit(1989212.965:15124458):",
    "pid": "20982",
    "ses": "4294967295",
    "sub_msg": {
        "acct": "root",
        "addr": "?",
        "exe": "/usr/lib/systemd/systemd",
        "grantors": "pam_unix",
        "hostname": "?",
        "op": "PAM:accounting",
        "res": "success",
        "terminal": "?"
    },
    "subj": "system_u:system_r:init_t:s0",
    "syslog5424_pri": "183",
    "type": "USER_ACCT",
    "uid": "0"
}
72qzrwbm

72qzrwbm2#

Tldr;

它看起来像unix审计日志,前面添加了一些字段。
您可能需要在线查看现有的logstash过滤器

溶液

基于您提供的单个示例:

filter {

  grok {
    match => { "message" => "%{SYSLOG5424PRI}%{SYSLOGTIMESTAMP:date} %{NOTSPACE:hostname} %{NOTSPACE:location}: type=%{WORD:audit_type} msg=audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\): pid=%{NUMBER:audit_pid} uid=%{NUMBER:audit_uid} auid=%{NUMBER:audit_audid} ses=%{NUMBER:audit_sesid} subj=%{GREEDYDATA:audit_subject} msg='%{GREEDYDATA:audit_message}'" }
    overwrite => [ message ]
  }

  kv {
    source => [ "audit_message" ]
  }
}

相关问题