Flume:搭建配置以及 source读取在netcat、http,sink 落实在本地、HDFS

x33g5p2x  于2021-11-29 转载在 Flume  
字(6.0k)|赞(0)|评价(0)|浏览(632)

1、下载创建并解压Flume

下载地址:http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

云盘链接:https://pan.baidu.com/s/1dFJ6uhgJcSUGwAHiIOrCDA
提取码:ycjf

下载好,上传到虚拟机节点,进行解压。先创建目录。

# 创建目录
mkdir -p /usr/flume
cd /usr/flume
# 解压到flume目录
tar -zxvf ./apache-flume-1.7.0-bin.tar.gz -C /usr/flume/
# cd /usr/flume/,重命名一下
mv apache-flume-1.7.0-bin/ ./flume-1.7.0

2、配置文件

2.1 配置 flume-env.sh文件

cd /usr/flume/flume-1.7.0/conf
ls
mv flume-env.sh.template flume-env.sh
vi flume-env.sh
## 添加jdk路径:
export JAVA_HOME=/usr/java/jdk1.8.0_171

2.2 配置Flume环境变量(三个节点)

在 /etc/profile 配置Flume环境变量。

vi /etc/profile

# flume
export FLUME_HOME=/usr/flume/flume-1.7.0
export PATH=$PATH:$FLUME_HOME/bin
export FLUME_CONF_DIR=$FLUME_HOME/conf

source /etc/profile

2.3 配置conf

通过netcat作为source, sink为logger的方式,配置example.conf。

  • agent进行重命名: a1
  • sources: r1
  • sinks: k1
  • channels: c1
# 1、通过netcat作为source, sink为logger的方式
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

cd /usr/flume/flume-1.7.0,在此目录下启动Flume。

/usr/flume/flume-1.7.0/bin/flume-ng agent --conf conf --conf-file /usr/flume/flume-1.7.0/conf/example.conf -name a1 -Dflume.root.logger=INFO,console

再启动master节点,输入 telent localhost 44444

输入数据之后,另一个master就会接收到数据并提示!

当关闭flume,telnet也会自动断开。

2.4 分发文件

scp -r /usr/flume/ root@slave1:/usr/
scp -r /usr/flume/ root@slave2:/usr/

3、Flume的conf 多种部署

3.1 需求:显示结果是过滤数据

通过netcat作为source, sink为logger的方式,现在我之关心字母,过滤掉数字。

重新编辑 example.conf,添加正则表达。

# 添加source定义正则匹配规则
a1.sources.r1.interceptors = i1  
a1.sources.r1.interceptors.i1.type =regex_filter  
a1.sources.r1.interceptors.i1.regex =^[0-9]*$  
a1.sources.r1.interceptors.i1.excludeEvents =true

保存退出,再启动flume。

./bin/flume-ng agent --conf conf --conf-file ./conf/example.conf -name a1 -Dflume.root.logger=INFO,console

telnet localhost 44444,输入数据 12,11,bad,hello java。

可以发现接收的数据是过滤了数字!!。

3.2 需求:通过netcat作为source, sink写到hdfs

编辑:vi examp.conf

# Name the components on this agent 针对agent重命名为a1
a1.sources = r1  # source别名为 r1
a1.sinks = k1    # sinks别名为 k1
a1.channels = c1 # channel别名为 c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# source定义正则匹配规则
a1.sources.r1.interceptors = i1  
a1.sources.r1.interceptors.i1.type =regex_filter  
a1.sources.r1.interceptors.i1.regex =^[0-9]*$  
a1.sources.r1.interceptors.i1.excludeEvents =true

# Describe the sink
# a1.sinks.k1.type = logger
a1.channels = c1
a1.sinks = k1
# sink为hdfs
a1.sinks.k1.type = hdfs
# 数据存储到hdfs的文件路径
a1.sinks.k1.hdfs.path = hdfs:/flume
# 表示最终的文件前缀
a1.sinks.k1.hdfs.filePrefix = events
# 表示到了需要触发的时间时,是否要更新文件夹,true:表示是
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
# 表示切换时间的单位是分钟
a1.sinks.k1.hdfs.roundUnit = minute
# 表示过了一分钟生成一个文件
a1.sinks.k1.hdfs.roundInterval = 60 
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in 
# memory channel配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 将source channel sink进行串联起来
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume,在example.conf配置中,我把每隔一分钟输入的数据生成一个文件到HDFS的flume目录下,文件前缀名有event。

./bin/flume-ng agent --conf conf --conf-file ./conf/example.conf -name a1 -Dflume.root.logger=INFO,console

可以看到一分钟内它还是个临时文件tmp,等一分钟后就变成一个文件在HDFS了。
查看HDFS是否有flume保存的文件。

# 如果事先HDFS已经创建了flume,要先删除
hadoop fs -ls /flume
hadoop fs -cat /flume/events.1638113932881

可以发现,我们是把sink的数据存到HDFS了。
我们有发现一个问题:就是假如一分钟内输入的flume数据过小,那就极大浪费存储空间,我们需要限定一下,设置flume防止小文件。

# 1、限定一个文件的文件数据大小
a1.sinks.k1.hdfs.rollSize = 200*1024*1024
# 2、限定文件可以存储多少个event
a1.sinks.k1.hdfs.rollCount = 10000

3.3 通过HTTP作为source, sink写到logger

创建http.conf, 在/usr/flume/flume-1.7.0/conf,vi http.conf

# HTTP.conf
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.bind = master
a1.sources.r1.port = 50020
#a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

配置完成,启动Flume。在 /usr/flume/flume-1.7.0 目录。

./bin/flume-ng agent --conf conf --conf-file ./conf/http.conf -name a1 -Dflume.root.logger=INFO,console

输入数据:

curl -X POST -d '[{"headers" : {"timestamp" : "434324343","host" : "random_host.example.com"},"body" : "random_body"},{"headers" : {"namenode" : "namenode.example.com","datanode" : "random_datanode.example.com"},"body" : "badou,badou"}]' master:50020

这个命令也可以在其他节点执行。http是允许在不同结点发送消息给source的。

3.4 多节点进行串联

在slave1 编辑文件pull.conf
vi pull.conf

#Name the components on this agent
a2.sources= r1
a2.sinks= k1
a2.channels= c1

#Describe/configure the source
a2.sources.r1.type= avro
a2.sources.r1.channels= c1
a2.sources.r1.bind= slave1
a2.sources.r1.port= 44444

#Describe the sink
a2.sinks.k1.type= logger
a2.sinks.k1.channel = c1

#Use a channel which buffers events in memory
a2.channels.c1.type= memory
a2.channels.c1.keep-alive= 10
a2.channels.c1.capacity= 100000
a2.channels.c1.transactionCapacity= 100000

在master节点编辑 vi push.conf

#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1

#Describe/configure the source
a1.sources.r1.type= netcat
a1.sources.r1.bind= localhost
a1.sources.r1.port = 44444
a1.sources.r1.channels= c1

#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.keep-alive= 10
a1.channels.c1.capacity= 100000
a1.channels.c1.transactionCapacity= 100000

#Describe/configure the source
a1.sinks.k1.type= avro
a1.sinks.k1.channel= c1
a1.sinks.k1.hostname= slave1
a1.sinks.k1.port= 44444

# 1、slave1启动:
./bin/flume-ng agent -c conf -f conf/pull.conf -n a2 -Dflume.root.logger=INFO,console

# 2、master启动:
./bin/flume-ng agent -c conf -f conf/push.conf -n a1 -Dflume.root.logger=INFO,console

在master上执行 telnet localhost 44444

可以发现我们在master输入的数据,在salve1 进行显示。

相关文章