下载地址:http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
云盘链接:https://pan.baidu.com/s/1dFJ6uhgJcSUGwAHiIOrCDA
提取码:ycjf
下载好,上传到虚拟机节点,进行解压。先创建目录。
# 创建目录
mkdir -p /usr/flume
cd /usr/flume
# 解压到flume目录
tar -zxvf ./apache-flume-1.7.0-bin.tar.gz -C /usr/flume/
# cd /usr/flume/,重命名一下
mv apache-flume-1.7.0-bin/ ./flume-1.7.0
cd /usr/flume/flume-1.7.0/conf
ls
mv flume-env.sh.template flume-env.sh
vi flume-env.sh
## 添加jdk路径:
export JAVA_HOME=/usr/java/jdk1.8.0_171
在 /etc/profile 配置Flume环境变量。
vi /etc/profile
# flume
export FLUME_HOME=/usr/flume/flume-1.7.0
export PATH=$PATH:$FLUME_HOME/bin
export FLUME_CONF_DIR=$FLUME_HOME/conf
source /etc/profile
通过netcat作为source, sink为logger的方式,配置example.conf。
# 1、通过netcat作为source, sink为logger的方式
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
cd /usr/flume/flume-1.7.0
,在此目录下启动Flume。
/usr/flume/flume-1.7.0/bin/flume-ng agent --conf conf --conf-file /usr/flume/flume-1.7.0/conf/example.conf -name a1 -Dflume.root.logger=INFO,console
再启动master节点,输入 telent localhost 44444
输入数据之后,另一个master就会接收到数据并提示!
当关闭flume,telnet也会自动断开。
scp -r /usr/flume/ root@slave1:/usr/
scp -r /usr/flume/ root@slave2:/usr/
通过netcat作为source, sink为logger的方式,现在我之关心字母,过滤掉数字。
重新编辑 example.conf,添加正则表达。
# 添加source定义正则匹配规则
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =regex_filter
a1.sources.r1.interceptors.i1.regex =^[0-9]*$
a1.sources.r1.interceptors.i1.excludeEvents =true
保存退出,再启动flume。
./bin/flume-ng agent --conf conf --conf-file ./conf/example.conf -name a1 -Dflume.root.logger=INFO,console
telnet localhost 44444
,输入数据 12,11,bad,hello java。
可以发现接收的数据是过滤了数字!!。
编辑:vi examp.conf
# Name the components on this agent 针对agent重命名为a1
a1.sources = r1 # source别名为 r1
a1.sinks = k1 # sinks别名为 k1
a1.channels = c1 # channel别名为 c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# source定义正则匹配规则
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =regex_filter
a1.sources.r1.interceptors.i1.regex =^[0-9]*$
a1.sources.r1.interceptors.i1.excludeEvents =true
# Describe the sink
# a1.sinks.k1.type = logger
a1.channels = c1
a1.sinks = k1
# sink为hdfs
a1.sinks.k1.type = hdfs
# 数据存储到hdfs的文件路径
a1.sinks.k1.hdfs.path = hdfs:/flume
# 表示最终的文件前缀
a1.sinks.k1.hdfs.filePrefix = events
# 表示到了需要触发的时间时,是否要更新文件夹,true:表示是
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
# 表示切换时间的单位是分钟
a1.sinks.k1.hdfs.roundUnit = minute
# 表示过了一分钟生成一个文件
a1.sinks.k1.hdfs.roundInterval = 60
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in
# memory channel配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
# 将source channel sink进行串联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume,在example.conf配置中,我把每隔一分钟输入的数据生成一个文件到HDFS的flume目录下,文件前缀名有event。
./bin/flume-ng agent --conf conf --conf-file ./conf/example.conf -name a1 -Dflume.root.logger=INFO,console
可以看到一分钟内它还是个临时文件tmp,等一分钟后就变成一个文件在HDFS了。
查看HDFS是否有flume保存的文件。
# 如果事先HDFS已经创建了flume,要先删除
hadoop fs -ls /flume
hadoop fs -cat /flume/events.1638113932881
可以发现,我们是把sink的数据存到HDFS了。
我们有发现一个问题:就是假如一分钟内输入的flume数据过小,那就极大浪费存储空间,我们需要限定一下,设置flume防止小文件。
# 1、限定一个文件的文件数据大小
a1.sinks.k1.hdfs.rollSize = 200*1024*1024
# 2、限定文件可以存储多少个event
a1.sinks.k1.hdfs.rollCount = 10000
创建http.conf, 在/usr/flume/flume-1.7.0/conf,vi http.conf
。
# HTTP.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.bind = master
a1.sources.r1.port = 50020
#a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
配置完成,启动Flume。在 /usr/flume/flume-1.7.0 目录。
./bin/flume-ng agent --conf conf --conf-file ./conf/http.conf -name a1 -Dflume.root.logger=INFO,console
输入数据:
curl -X POST -d '[{"headers" : {"timestamp" : "434324343","host" : "random_host.example.com"},"body" : "random_body"},{"headers" : {"namenode" : "namenode.example.com","datanode" : "random_datanode.example.com"},"body" : "badou,badou"}]' master:50020
这个命令也可以在其他节点执行。http是允许在不同结点发送消息给source的。
在slave1 编辑文件pull.conf
vi pull.conf
#Name the components on this agent
a2.sources= r1
a2.sinks= k1
a2.channels= c1
#Describe/configure the source
a2.sources.r1.type= avro
a2.sources.r1.channels= c1
a2.sources.r1.bind= slave1
a2.sources.r1.port= 44444
#Describe the sink
a2.sinks.k1.type= logger
a2.sinks.k1.channel = c1
#Use a channel which buffers events in memory
a2.channels.c1.type= memory
a2.channels.c1.keep-alive= 10
a2.channels.c1.capacity= 100000
a2.channels.c1.transactionCapacity= 100000
在master节点编辑 vi push.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type= netcat
a1.sources.r1.bind= localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels= c1
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.keep-alive= 10
a1.channels.c1.capacity= 100000
a1.channels.c1.transactionCapacity= 100000
#Describe/configure the source
a1.sinks.k1.type= avro
a1.sinks.k1.channel= c1
a1.sinks.k1.hostname= slave1
a1.sinks.k1.port= 44444
# 1、slave1启动:
./bin/flume-ng agent -c conf -f conf/pull.conf -n a2 -Dflume.root.logger=INFO,console
# 2、master启动:
./bin/flume-ng agent -c conf -f conf/push.conf -n a1 -Dflume.root.logger=INFO,console
在master上执行 telnet localhost 44444
可以发现我们在master输入的数据,在salve1 进行显示。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_44775255/article/details/121599487
内容来源于网络,如有侵权,请联系作者删除!