1**、Flume概述**
Hadoop的构建宗旨是处理大型数据集,通常,我们都是假设这些数据已经存储在HDFS中或者能够随时批量复制到HDFS。然而,有许多系统并不符合此假设,它们生产出的是我们想要通过Hadoop来汇总、存储和分析的数据流。与这类系统打交道,Apache Flume就再合适不过了。
Flume作为cloudera公司开发的实时日志采集系统,受到了业界的广泛认可和应用。设计Flume的目的是向Hadoop批量导入基于事件的海量数据,一个典型的例子是利用Flume从一组Web服务器中收集日志文件,然后把这些文件中的日志事件转移到一个新的HDFS汇总文件中,以做进一步处理,其终点(用Flume的术语叫做Sink)通常为HDFS。不过,Flume具有足够的灵活性,也可以将数据存储到其他系统中,如HBase或Solr等。
要想使用Flume,就需要运行Flume代理(Agent),Flume代理是由持续运行的Source(数据来源)、Sink(数据目标)以及Channel(用于连接Source和Sink)构成的Java进程。Flume的Source产生事件,并将其传送给Channel,Channel存储这些事件直至转发给Sink。可以把Source-Channel-Sink的组合当作Flume构件。
Flume由一组以分布式拓扑结构相互连接的代理构成。系统边缘的代理(如与Web服务器共存于同一台机器上的代理)负责采集数据,并把数据转发给负责汇总的代理,然后再将这些数据存储到最终目的地。代理通过配置来运行一组特定的Source和Sink,因此,使用Flume所要做的主要工作就是通过配置文件使得各个组件融合到一起。
2**、Flume的特点**
Flume是一个分布式的、可靠的以及高可用的海量日志采集、聚合和传输的系统,支持在日志系统中定制各类数据发送方,用于采集数据,同时,Flume还提供对数据进行简单处理,并最终存储到各种数据接受方(比如文本、HDFS、HBase等)的能力。
Flume的数据流由事件(Event)贯穿始终,事件是Flume的基本数据单位,它携带日志数据(字节数组形式)并且携带有头信息。这些Event由Agent外部的Source生成,当Source捕获事件后会进行特定的格式化,然后Source会把事件推入(单个或多个)Channel中。可以将Channel看作是一个缓冲区,它将保存事件直到Sink处理完该事件。Sink负责持久化日志或者把事件推向另一个Source。
A**、Flume的可靠性**
Flume使用两个独立的事务分别负责从Source到Channel以及从Channel到Sink的事件传递。一旦事务中的所有事件全部传递到Channel且提交成功,那么Source就将该文件标记为完成。事务以类似的方式应用于从Channel到Sink的事件传递过程,如果由于某种原因使得事件无法记录,那么事务将会回滚,而所有的事件仍然保留在Channel中,等待重新传递。
B**、Flume的可恢复性**
Flume的可恢复性依靠Channel来实现,建议使用File Channel,这种类型的Channel具有持久性:只要事件被写入Channel,即使代理重新启动,事件也不会丢失。(Flume还提供有Memory Channel,由于它的事件缓存在存储器中,因此它不具有持久存储能力,采用Memory Channel时,如果代理重新启动,事件就会丢失。在某些应用场景中这种情况是可以接受的,具体取决于实际需求,与File Channel相比,Memory Channel的优势在于具有较高的吞吐量)
C**、Flume的批量处理**
为了提供效率,Flume在有可能的情况下尽量以事务为单位来批量处理事件,而不是逐个事件地进行处理。批量处理方式尤其有利于提高File Channel的性能,因为每个事务只需要写一次本地磁盘和调用一次fsync。而批量的大小取决于组件的类型,并且在大多数情况下是可以配置的。例如,Spooling Directory Source以100行文本作为一个批次来读取(可通过BatchSize属性进行设置)。同样地,在通过Avro RPC发送之前,Avro Sink试图从Channel中批量读取100个事件,当然,如果可用的事件不足100个也不会引起阻塞。
3**、Flume的体系结构**
Flume运行的核心是代理(Agent),Flume以Agent为最小的独立运行的基本单位,一个Agent就是一个独立的进程,它是一个完整的数据采集工具,含有三个核心组件,分别是Source、Channel以及Sink。通过这些组件,Event可以从一个地方流向另一个地方,Flume的基本结构如下图所示。
A、Source
Source是数据的采集端,负责将数据捕获后进行特殊的格式化,并将数据封装到事件(Event)里,然后推入Channel中。Flume提供了很多内置的Source,如Spooling Directory Source、Kafka Source、JMS Source、Exec Source、Avro Source、Log4j Source以及Syslog Source等,可以让应用程序同已有的Source直接打交道,如果内置的Source无法满足应用需求,Flume还支持自定义Source。
Flume主要支持的Source类型如下图所示:
B**、Channel**
Channel是连接Source和Sink的组件,可以将它看作是一个数据的缓冲区(数据队列),它可以将事件暂存到内存中,也可以持久化到本地磁盘上,直到Sink处理完该事件,最常用的两个Channel大概就是Memory Channel和File Channel。
Flume主要支持的Channel类型如下图所示:
C**、Sink**
Sink从Channel中取出事件,然后将数据转发到其它地方,可以是文件系统、数据库、HDFS等,也可以是其他Agent的Source。在日志数据较少时,可以将数据存储在文件系统中,并且设置一定的时间间隔保存数据。
Flume主要支持的Sink类型如下图所示:
4**、Flume的拦截器和数据流**
A**、Flume的拦截器**
当需要对数据进行过滤时,除了可以在Source、Channel和Sink组件端进行代码修改外,Flume还为使用者提供了拦截器,拦截器也是采用chain的形式运用于Agent中。拦截器的使用位置处于Source和Channel之间,当为Source指定拦截器后,在拦截器中会得到Event,根据需求就可以对Event进行保留还是抛弃的处理,抛弃的数据不会进入到Channel中,该过程如下图所示:
B**、Flume的数据流**
Flume的核心是把数据从数据源采集过来,再送到目的地,为了保证传送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,再删除缓存的数据。Flume传输数据的基本单位是Event,如果是文本文件,通常是一行记录,这也是事务的基本单位,Event从Source,流向Channel,再到Sink,本身为一个byte数组,并可携带header信息。Event代表着一个数据流的最小完整单元,从外部数据源来,再向外部的目的地去。
值得注意的是,Flume提供了大量内置的Source、Channel和Sink类型,不同类型的Source,Channel和Sink可以自由组合,组合方式基于用户自定义的配置文件,非常灵活。比如,Channel可以把事件暂存在内存里,也可以持久化到本地磁盘中,Sink可以把日志写入到HDFS,HBase,甚至是另外一个Source等等。Flume还支持用户建立多级流,也就是说,多个Agent可以协同工作,并且支持Fan-in(扇入)、Fan-out(扇出)、Contextual Routing(上下文路由)以及Backup Routes(备用路由),这也正是Flume的强大之处,如下图所示:
5**、Flume的安装和使用**
A**、Flume的安装**
Flume的安装非常简单,将Flume安装包上传到/root/tools目录下,然后运行命令tar -zxvf apache-flume-1.7.0-bin.tar.gz–C /root/training/,将Flume解压到/root/training目录下即可。
B**、Flume的使用**
使用Flume的关键是配置自己的Agent,这需要我们自定义一个conf配置文件。下面将列举几个实例,以供参考学习。准备工作,在Flume的安装目录/root/training/apache-flume-1.7.0-bin下创建目录MyAgent,用于存放自定义配置文件。
实例一:按照相应策略采集指定目录下新产生的日志文件中的log
进入到/root/training/apache-flume-1.7.0-bin/MyAgent目录下,运行命令vi a1.conf,创建文件,并在文件中输入以下内容:
/#运行Flume使用的命令:bin/flume-ng agent -na1 -f MyAgent/a1.conf -c conf -Dflume.root.logger=INFO,console
/#定义Agent的组件, Source、Channel、Sink的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1
/#具体定义source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/temp/logs
/#具体定义channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
/#定义拦截器,为消息添加时间戳
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =org.apache.flume.interceptor.TimestampInterceptor$Builder
/#具体定义sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://192.168.12.221:9000/flume/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.fileType = DataStream
/#不按照条数生成文件
a1.sinks.k1.hdfs.rollCount = 0
/#HDFS上的文件达到128M时生成一个文件
a1.sinks.k1.hdfs.rollSize = 134217728
/#HDFS上的临时文件达到60秒时生成最终文件
a1.sinks.k1.hdfs.rollInterval = 60
/#组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
在目录/root/training/apache-flume-1.7.0-bin下,运行命令bin/flume-ng agent-n a1 -f MyAgent/a1.conf -c conf -Dflume.root.logger=INFO,console,从log中可以看到自定义的r1(Source组件)、c1(Channel组件)、k1(Sink组件)已经启动,如下图所示:
然后,将/root/temp目录下的file1.txt文件(该文件中的内容为MapReduce is simple),拷贝到被监听目录/root/temp/logs下,输出的log信息如下图所示:
拷贝完文件之后,迅速(保证在启动上面那条长长的命令后的60秒钟之内)去查看HDFS对应目录,如下图所示,注意文件events-.1531664661571.tmp带有后缀tmp,表明是临时文件;等到超过60秒钟之后再去查看,发现该tmp后缀消失了,这个现象的产生,是由配置文件中定义的文件生成策略所决定的,要么生成的文件大小超过128M,要么时间超过60秒钟,才会生成最终的文件。
60秒钟之后,再查看HDFS对应目录,如下图所示:
实例二:监听端口8888,采集网络请求的日志信息
进入到/root/training/apache-flume-1.7.0-bin/MyAgent目录下,运行命令vi a2.conf,创建文件,并在文件中输入以下内容:
/#运行Flume使用的命令:bin/flume-ng agent -na2 -f MyAgent/a2.conf -c conf -Dflume.root.logger=INFO,console
/#定义Agent的组件, Source、Channel、Sink的名称
a2.sources = r2
a2.channels = c2
a2.sinks = k2
/#具体定义source
a2.sources.r2.type = netcat
a2.sources.r2.bind = localhost
a2.sources.r2.port = 8888
/#具体定义channel
a2.channels.c2.type = memory
a2.channels.c2.capacity = 1000
a2.channels.c2.transactionCapacity = 100
/#具体定义sink
a2.sinks.k2.type = logger
/#组装source、channel、sink
a2.sources.r2.channels = c2
a2.sinks.k2.channel = c2
实例三:监听在命令行上命令执行的结果
进入到/root/training/apache-flume-1.7.0-bin/MyAgent目录下,运行命令vi a3.conf创建文件,并在文件中输入以下内容:
/#运行Flume使用的命令:bin/flume-ng agent -n a3 -f MyAgent/a3.conf -c conf-Dflume.root.logger=INFO,console
/#定义Agent的组件, Source、Channel、Sink的名称
a3.sources= r3
a3.channels= c3
a3.sinks= k3
/#具体定义source
a3.sources.r3.type= exec
a3.sources.r3.command= tail -f /root/temp/aaa.log
/#具体定义channel
a3.channels.c3.type= memory
a3.channels.c3.capacity= 1000
a3.channels.c3.transactionCapacity= 100
/#具体定义sink
a3.sinks.k3.type= logger
实例四:采集指定目录下新产生的日志文件中的log(实例一的简化版本)
进入到/root/training/apache-flume-1.7.0-bin/MyAgent目录下,运行命令vi a4.conf,创建文件,并在文件中输入以下内容:
/#bin/flume-ng agent -n a4 -f MyAgent/a4.conf -c conf-Dflume.root.logger=INFO,console
/#定义Agent的组件, Source、Channel、Sink的名称
a4.sources = r4
a4.channels = c4
a4.sinks = k4
/#具体定义source
a4.sources.r4.type = spooldir
a4.sources.r4.spoolDir = /root/temp/logs1
/#具体定义channel
a4.channels.c4.type = memory
a4.channels.c4.capacity = 1000
a4.channels.c4.transactionCapacity = 100
/#具体定义sink
a4.sinks.k4.type = logger
/#组装source、channel、sink
a4.sources.r4.channels = c4
a4.sinks.k4.channel = c4
至此,Flume介绍完毕,欢迎关注下期文章!
参考文献:
——《Hadoop权威指南》
——《CSDN博客》
——《百度百科》
转自https://mp.weixin.qq.com/s/hebIdWH-ZwfzjDO88v2sYg
内容来源于网络,如有侵权,请联系作者删除!