csv Logstash将冒号替换为箭头

kg7wmglp  于 2024-01-03  发布在  Logstash
关注(0)|答案(3)|浏览(182)

我的目标是从Kinesis读取数据并在S3上上传CSV文件,但在CSV文件中,箭头(=>)是JSON数据,而不是冒号(:)
Logstash配置文件

input {
  kinesis {
    application_name => "logstash"
    kinesis_stream_name => "events"
    type => "kinesis"
    region => "us-east-1"
    profile => "default"
    metrics => "cloudwatch"
    codec => "json"
  }
}

filter {
    grok {
      match => { "[data][ti]" => "%{YEAR:event_year}-%{MONTHNUM2:event_month}-%{MONTHDAY:event_day}" }
   }
}

output {
  s3 {
    bucket => "test-logstash"
    region => "us-east-1"
    prefix => "%{event_year}/%{event_month}/%{event_day}/%{[data][brandid]}/%{[data][event_name]}"
    encoding => "none"
    codec => csv {
        separator => "␁"
    }
    size_file => 200000000
    time_file => 60
  }
}

字符串

0lvr5msh

0lvr5msh1#

您可以使用Logstash中的mutate过滤器将“arrow(=>)”替换为“colon(:)"。

input {
  kinesis {
    application_name => "logstash"
    kinesis_stream_name => "events"
    type => "kinesis"
    region => "us-east-1"
    profile => "default"
    metrics => "cloudwatch"
    codec => "json"
  }
}

filter {
  # Use the mutate filter to replace "arrow (=>)" with "colon (:)"
  mutate {
    gsub => [
      "message", "=>", ":"
    ]
  }

  grok {
    match => { "[data][ti]" => "%{YEAR:event_year}-%{MONTHNUM2:event_month}-%{MONTHDAY:event_day}" }
  }
}

output {
  s3 {
    bucket => "test-logstash"
    region => "us-east-1"
    prefix => "%{event_year}/%{event_month}/%{event_day}/%{[data][brandid]}/%{[data][event_name]}"
    encoding => "none"
    codec => csv {
      separator => "␁"
    }
    size_file => 200000000
    time_file => 60
  }
}

字符串
在filter部分,mutate filter用于执行全局替换(gsub),将“arrow(=>)”替换为“colon(:)"。这应该确保您的JSON数据按预期正确处理。
如果JSON数据位于不同的字段中,请确保调整mutate过滤器中的字段名称(消息)。

yptwkmov

yptwkmov2#

将Hash写入CSV文件时,使用as_json将其转换为JSON或使用to_json将其转换为JSON字符串。

eoxn13cs

eoxn13cs3#

下面是我的工作解决方案,我使用ruby过滤器来字符串化json属性

filter {
    grok {
      match => { "[data][ti]" => "%{YEAR:event_year}-%{MONTHNUM2:event_month}-%{MONTHDAY:event_day}" }
   }

    ruby {
      code => "
        require 'json'
        begin
          event.set('[data][attributes1]', event.get('[data][attributes1]').to_json)
          event.set('[data][attributes2]', event.get('[data][attributes]').to_json)
        rescue Exception => e
          @logger.error('Ruby exception occurred: #{e}')
          raise 'error'
        end
      "
    }
}

字符串

相关问题