logstash每30秒创建一个新文件

rvpgvaaj  于 2022-12-16  发布在  Logstash
关注(0)|答案(1)|浏览(221)

我的logstatsh管道中有以下过滤器配置。它的作用是,在事件开始时,第一个过滤器创建一个带有标题的CSV文件,并将文件名设置为元数据。第二个过滤器将输出写入上述CSV。
我的挑战(或)要求是:每X秒,我们需要创建一个新的CSV文件,并写入该文件。我不是rubyMaven,无法从谷歌搜索得到任何线索。有人能给我建议吗?

filter {
    ruby {
       init => "
            begin
                randval = (0...8).map { (65 + rand(26)).chr }.join      
                @csv_file = 'output'+randval+'.csv'
                csv_headers = ['YYYY-MM-ddTHH:mm:ss.SSSZ','Log Level','Event ID']
                if File.zero?(@csv_file) || !File.exist?(@csv_file)
                    CSV.open(@csv_file, 'w') do |csv|
                        csv << csv_headers
                    end
                    
                end
            end
        "
        code => '
                event.set("[@metadata][suffix]",@csv_file)
        '
    }
}

output {
   file {
      path => "output.log"
   }    
   csv {
       fields => [ "created", "level", "code"]
       path => "%{[@metadata][suffix]}"
    }
}
vohkndzv

vohkndzv1#

有趣的是,昨天有人在discuss.elastic.co上问了这个问题,还使用了同样不必要的init选项,所以我碰巧知道答案是

ruby {
    code => '
            event.set("[@metadata][suffix]", 'output' + (Time.now.to_i / 30).to_s + '.csv')
    '
}

这将导致文件输出将在30秒间隔内到达的任何事件写入不同的文件。
我不知道有什么方法可以只为输出写入文件的第一个事件添加标头。csv输出可以为每一行添加标头。csv编解码器可以写一次标头,但当文件名更改时,不会再写一次。
也就是说,如果你只是写一个文件,你可以在ruby过滤器中写,并跟踪是否已经为www.example.com _i / 30的当前值写了一个头Time.now.to。你可以使用类似于下面的代码来完成这一任务。我重新使用了csv output中的一些代码。

input { heartbeat { interval => 5 message => '{ "foo": 1, "bar": 2, "baz": 3 }' } }
filter {
    json { source => "message" target => "data" remove_field => [ "message" ] }
    ruby {
        init => '
            @fields = [ "[data][bar]", "[data][baz]" ]
            @csv_options = Hash.new
            @spreadsheet_safe = true
        '
        code => '
            def event_to_csv(event)
                csv_values = @fields.map {|name| get_value(name, event)}
                csv_values.to_csv(@csv_options)
            end

            def get_value(name, event)
                val = event.get(name)
                val.is_a?(Hash) ? LogStash::Json.dump(val) : escape_csv(val)
            end

            def escape_csv(val)
                (@spreadsheet_safe && val.is_a?(String) && val.start_with?("=")) ? "\'#{val}" : val
            end

            id = Time.now.to_i / 20
            file = "output" + id.to_s + ".csv"

            fd = open("/tmp/#{file}", "a")
            if id != @last_id
                chunk = "bar,baz\n"
                fd.write(chunk)
            end

            chunk = event_to_csv(event)
            fd.write(chunk)
            fd.close

            @last_id = id
            event.cancel
        '
    }
}

相关问题