对于流畅的环境,我试图用上下文信息丰富我们的日志(它们包含IP地址)。为此,我写了一个基于Ruby的fluentd插件。
我有一个包含子网信息的文件,以及JSON格式的每个子网的 meta信息(例如国家和内部site_id)。现在的想法是编写一个过滤器插件,将src和dst IP地址与该文件中的网络进行匹配,并将适当的元数据添加到记录中。
当前的代码看起来像这样(我已经删除了一些注解和错误处理,以使其尽可能简短):
require "fluent/plugin/filter"
require "ipaddr"
module Fluent
module Plugin
class IpaddressFilter < Fluent::Plugin::Filter
Fluent::Plugin.register_filter("ipaddress", self)
config_param :ipaddress_file_path, :string
config_param :source_address_field, :string, :default => "src_ip"
config_param :destination_address_field, :string, :default => "dest_ip"
def initialize
super
@parsed_subnets = []
end
def configure(conf)
super
@ipaddress_file = File.read(@ipaddress_file_path)
@ipaddress_data = JSON.parse(@ipaddress_file)
# For each entry, create IP address object
@ipaddress_data.each do |entry|
# Create IP address object
new_entry = {
"network" => IPAddr.new(entry["network"], family = Socket::AF_INET),
"country" => entry["country"],
"site_id" => entry["site_id"]
}
# Append hash to array
@parsed_subnets << new_entry
end
end
def filter(tag, time, record)
src_ip_obj = IPAddr.new(record[@source_address_field], family = Socket::AF_INET)
dest_ip_obj = IPAddr.new(record[@destination_address_field], family = Socket::AF_INET)
src_found = false
dst_found = false
# Check if IP addresses are in any of the subnets
@parsed_subnets.each do |entry|
# SRC IP
if entry["network"].include?(src_ip_obj)
record["src_country"] = entry["country"]
record["src_site_id"] = entry["site_id"]
src_found = true
end
# DEST IP
if entry["network"].include?(dest_ip_obj)
record["dest_country"] = entry["country"]
record["dest_site_id"] = entry["site_id"]
dst_found = true
end
# Stop loop if both are found
if src_found & dst_found
break
end
end
# Return record
record
end
end
end
end
代码本身运行良好。子网列表有超过20.000个条目,我们每秒处理超过2000个日志条目。当前的解决方案以线性方式O(n)
随子网条目的数量缩放,这远远不是最优的。子网已经总结(python的netaddr
模块)到最大可能的扩展,而不会失去唯一性。
现在的问题是如何提高这个任务的速度?我认为基于树的方法可能会奏效。任何我可以做的前期(而插件加载数据)是一个一次性的成本,这将是绝对首选相比,这样做的每一个消息。
1条答案
按热度按时间jaql4c8m1#
很可能是
@parsed_subnets.each
块降低了代码的速度,因为它迭代所有子网只找到两个匹配项。我建议不要迭代所有子网,而是将所有子网写入一个支持IP地址运算符(如PostgreSQL)的数据库,然后只查询您感兴趣的那两个值。
一个可能的解决方案大致如下所示: