菲利贝特对Kafka说:泰格并没有像预期的那样出现

eqqqjvef  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(292)

我试图在filebeat和logstash之间实现Kafka。
当把filebeat发送给kafka(它是文本格式的)和logstash时。为此,我假设,logstash无法处理输入。
当数据来自Kafka,直接从filebeat到logstash时,情况就不同了。
来自Kafka:

{
       "message" => "nice",
          "tags" => [
        [0] "kafka-stream"
    ],
    "@timestamp" => 2020-06-30T08:29:29.071Z,
      "@version" => "1"
}
{
       "message" => "{\"@timestamp\":\"2020-06-30T08:34:28.178Z\",\"@metadata\":{\"beat\":\"filebeat\",\"type\":\"_doc\",\"version\":\"7.8.0\"},\"agent\":{\"hostname\":\"Smits-MacBook-Pro.local\",\"ephemeral_id\":\"b9779246-3cc9-408b-83ac-e69eeef3cd28\",\"id\":\"864be1a9-e233-4d41-8624-cf94e916a0b7\",\"name\":\"Smits-MacBook-Pro.local\",\"type\":\"filebeat\",\"version\":\"7.8.0\"},\"log\":{\"offset\":11341,\"file\":{\"path\":\"/Users/Smit/Downloads/chrome/observability/spring_app_log_file.log\"}},\"message\":\"2020-06-30 16:34:20.328  INFO 63741 --- [http-nio-8080-exec-7] c.e.o.controller.HomeController          : AUDIT_LOG >> customer id a8703\",\"tags\":[\"observability\",\"audit\"],\"input\":{\"type\":\"log\"},\"ecs\":{\"version\":\"1.5.0\"},\"host\":{\"name\":\"Smits-MacBook-Pro.local\"}}",
          "tags" => [
        [0] "kafka-stream"
    ],
    "@timestamp" => 2020-06-30T08:34:29.222Z,
      "@version" => "1"
}

来自filebeat:

{
          "type" => "log",
    "@timestamp" => 2020-06-30T04:37:18.935Z,
      "@version" => "1",
           "log" => {
          "file" => {
            "path" => "/Users/Smit/Downloads/chrome/observability/spring_app_log_file.log"
        },
        "offset" => 10846
    },
         "input" => {
        "type" => "log"
    },
           "ecs" => {
        "version" => "1.5.0"
    },
       "message" => "2020-06-30 12:37:16.900  INFO 63741 --- [http-nio-8080-exec-3] c.e.o.controller.HomeController          : AUDIT_LOG >> customer id d6ebe",
          "tags" => [
        [0] "observability",
        [1] "audit",
        [2] "beats",
        [3] "beats_input_codec_plain_applied"
    ],
      "hostname" => {
        "name" => "Smits-MacBook-Pro.local"
    },
         "agent" => {
                "type" => "filebeat",
             "version" => "7.8.0",
                "name" => "Smits-MacBook-Pro.local",
            "hostname" => "Smits-MacBook-Pro.local",
        "ephemeral_id" => "1ca4e838-eeaa-4b87-b52a-89fa385865b8",
                  "id" => "864be1a9-e233-4d41-8624-cf94e916a0b7"
    }
}

现在,当我在kibana中可视化数据时:
以下是日志从filebeat直接发送到logstash时的输出:

以下是日志从filebeat到kafka再到logstash时的输出:

如果你需要更多的信息,请告诉我。
elk中每个产品的配置如下:https://github.com/shah-smit/observability-spring-demo

23c0lvtd

23c0lvtd1#

当您将日志直接发送到logstash时,您使用beats协议,该协议为您的事件添加了一些“额外”字段:
https://www.elastic.co/guide/en/beats/filebeat/current/exported-fields-beat-common.html
我不确定kafka输出使用哪种协议,但可以肯定的是,这不会添加额外的字段。
因此,当您将日志发送到kafka,然后从logstash中读取它们时,字段就更少了。

相关问题