下面是我的JSON数据,我尝试在opensearch中将其从filebeat发送到摄取管道“logpipeline.json”。
json数据
{
"@timestamp":"2022-11-08T10:07:05+00:00",
"client":"10.x.x.x",
"server_name":"example.stack.com",
"server_port":"80",
"server_protocol":"HTTP/1.1",
"method":"POST",
"request":"/example/api/v1/",
"request_length":"200",
"status":"500",
"bytes_sent":"598",
"body_bytes_sent":"138",
"referer":"",
"user_agent":"Java/1.8.0_191",
"upstream_addr":"10.x.x.x:10376",
"upstream_status":"500",
"gzip_ratio":"",
"content_type":"application/json",
"request_time":"6.826",
"upstream_response_time":"6.826",
"upstream_connect_time":"0.000",
"upstream_header_time":"6.826",
"remote_addr":"10.x.x.x",
"x_forwarded_for":"10.x.x.x",
"upstream_cache_status":"",
"ssl_protocol":"TLSv",
"ssl_cipher":"xxxx",
"ssl_session_reused":"r",
"request_body":"{\"date\":null,\"sourceType\":\"BPM\",\"processId\":\"xxxxx\",\"comment\":\"Process status: xxxxx: \",\"user\":\"xxxx\"}",
"response_body":"{\"statusCode\":500,\"reasonPhrase\":\"Internal Server Error\",\"errorMessage\":\"xxxx\"}",
"limit_req_status":"",
"log_body":"1",
"connection_upgrade":"close",
"http_upgrade":"",
"request_uri":"/example/api/v1/",
"args":""
}
Filebeat到Opensearch日志传送
# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
# Array of hosts to connect to.
hosts: ["192.168.29.117:9200"]
pipeline: logpipeline
#index: "filebeatelastic-%{[agent.version]}-%{+yyyy.MM.dd}"
index: "nginx_dev-%{+yyyy.MM.dd}"
# Protocol - either `http` (default) or `https`.
protocol: "https"
ssl.enabled: true
ssl.verification_mode: none
# Authentication credentials - either API key or username/password.
#api_key: "id:api_key"
username: "filebeat"
password: "filebeat"
我正在摄取管道中对一些字段进行“数据”字段转换,方法是进行类型转换,效果很好。但我面临的唯一问题是“@时间戳”。
“@timestamp”是“date”类型,一旦json数据通过管道,我就将json数据消息Map到名为“data”的根级json对象。在转换后的数据中,“data.@timestamp”显示为“string”类型,尽管我没有对它进行任何转换。
打开搜索获取管道- logpipeline.json
{
"description" : "Logging Pipeline",
"processors" : [
{
"json" : {
"field" : "message",
"target_field" : "data"
}
},
{
"date" : {
"field" : "data.@timestamp",
"formats" : ["ISO8601"]
}
},
{
"convert" : {
"field" : "data.body_bytes_sent",
"type": "integer",
"ignore_missing": true,
"ignore_failure": true
}
},
{
"convert" : {
"field" : "data.bytes_sent",
"type": "integer",
"ignore_missing": true,
"ignore_failure": true
}
},
{
"convert" : {
"field" : "data.request_length",
"type": "integer",
"ignore_missing": true,
"ignore_failure": true
}
},
{
"convert" : {
"field" : "data.request_time",
"type": "float",
"ignore_missing": true,
"ignore_failure": true
}
},
{
"convert" : {
"field" : "data.upstream_connect_time",
"type": "float",
"ignore_missing": true,
"ignore_failure": true
}
},
{
"convert" : {
"field" : "data.upstream_header_time",
"type": "float",
"ignore_missing": true,
"ignore_failure": true
}
},
{
"convert" : {
"field" : "data.upstream_response_time",
"type": "float",
"ignore_missing": true,
"ignore_failure": true
}
}
]
}
是否有任何方法可以保留“@timestamp”“date”类型字段,即使在摄取管道中执行转换后也是如此?
索引文档图像:
Edit 1:更新摄取管道模拟结果
{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_id" : "_id",
"_source" : {
"index_date" : "2022.11.08",
"@timestamp" : "2022-11-08T12:07:05.000+02:00",
"message" : """
{ "@timestamp": "2022-11-08T10:07:05+00:00", "client": "10.x.x.x", "server_name": "example.stack.com", "server_port": "80", "server_protocol": "HTTP/1.1", "method": "POST", "request": "/example/api/v1/", "request_length": "200", "status": "500", "bytes_sent": "598", "body_bytes_sent": "138", "referer": "", "user_agent": "Java/1.8.0_191", "upstream_addr": "10.x.x.x:10376", "upstream_status": "500", "gzip_ratio": "", "content_type": "application/json", "request_time": "6.826", "upstream_response_time": "6.826", "upstream_connect_time": "0.000", "upstream_header_time": "6.826", "remote_addr": "10.x.x.x", "x_forwarded_for": "10.x.x.x", "upstream_cache_status": "", "ssl_protocol": "TLSv", "ssl_cipher": "xxxx", "ssl_session_reused": "r", "request_body": "{\"date\":null,\"sourceType\":\"BPM\",\"processId\":\"xxxxx\",\"comment\":\"Process status: xxxxx: \",\"user\":\"xxxx\"}", "response_body": "{\"statusCode\":500,\"reasonPhrase\":\"Internal Server Error\",\"errorMessage\":\"xxxx\"}", "limit_req_status": "", "log_body": "1", "connection_upgrade": "close", "http_upgrade": "", "request_uri": "/example/api/v1/", "args": ""}
""",
"data" : {
"server_name" : "example.stack.com",
"request" : "/example/api/v1/",
"referer" : "",
"log_body" : "1",
"upstream_addr" : "10.x.x.x:10376",
"body_bytes_sent" : 138,
"upstream_header_time" : 6.826,
"ssl_cipher" : "xxxx",
"response_body" : """{"statusCode":500,"reasonPhrase":"Internal Server Error","errorMessage":"xxxx"}""",
"upstream_status" : "500",
"request_time" : 6.826,
"upstream_cache_status" : "",
"content_type" : "application/json",
"client" : "10.x.x.x",
"user_agent" : "Java/1.8.0_191",
"ssl_protocol" : "TLSv",
"limit_req_status" : "",
"remote_addr" : "10.x.x.x",
"method" : "POST",
"gzip_ratio" : "",
"http_upgrade" : "",
"bytes_sent" : 598,
"request_uri" : "/example/api/v1/",
"x_forwarded_for" : "10.x.x.x",
"args" : "",
"@timestamp" : "2022-11-08T10:07:05+00:00",
"upstream_connect_time" : 0.0,
"request_body" : """{"date":null,"sourceType":"BPM","processId":"xxxxx","comment":"Process status: xxxxx: ","user":"xxxx"}""",
"request_length" : 200,
"ssl_session_reused" : "r",
"server_port" : "80",
"upstream_response_time" : 6.826,
"connection_upgrade" : "close",
"server_protocol" : "HTTP/1.1",
"status" : "500"
}
},
"_ingest" : {
"timestamp" : "2023-01-18T08:06:35.335066236Z"
}
}
}
]
}
1条答案
按热度按时间m1m5dgzv1#
终于可以解决我的问题了。我更新了filebeat.yml如下。以前的模板名称和模式是不同的。但这个默认的模板名称“filebeat”和模式“filebeat”似乎为我做的工作。
至
但仍然需要弄清楚模板是如何工作的