我们开始通过将消息发布到kafka主题来整合应用程序中的事件日志数据。虽然我们可以直接从应用程序向kafka写入,但我们选择将其作为一个一般问题来处理,并使用flume代理。这提供了一些灵活性:如果我们想从服务器捕获其他内容,我们可以跟踪不同的源并发布到不同的kafka主题。
我们创建了一个flume agent conf文件来跟踪日志并发布到kafka主题:
tier1.sources = source1
tier1.channels = channel1
tier1.sinks = sink1
tier1.sources.source1.type = exec
tier1.sources.source1.command = tail -F /var/log/some_log.log
tier1.sources.source1.channels = channel1
tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000
tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.topic = some_log
tier1.sinks.sink1.brokerList = hadoop01:9092,hadoop02.com:9092,hadoop03.com:9092
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.batchSize = 20
不幸的是,消息本身并没有指定生成它们的主机。如果有一个应用程序在多个主机上运行,并且发生了错误,那么就无法确定是哪个主机生成了消息。
我注意到,如果flume直接写入hdfs,我们可以使用flume拦截器写入特定的hdfs位置。虽然我们可以用kafka做一些类似的事情,即为每台服务器创建一个新的主题,但这可能会变得很难处理。我们会有成千上万的主题。
flume在发布到kafka主题时是否可以附加/包括发起主机的主机名?
2条答案
按热度按时间5fjcxozz1#
您可以创建一个自定义tcp源,它读取客户机地址并将其添加到头中。
flume-conf.properties可以配置为:
我发送了一条测试消息来测试这个,它看起来像:
我已经在github上传了这个项目
sy5wg1nm2#
如果你用的是
exec
没有什么可以阻止您运行智能命令,将主机名作为日志文件内容的前缀。注意:如果命令使用管道之类的东西,还需要如下指定shell:
消息如下所示:
... 哪里
frb.hi.inet
告诉我们主人的名字。