这几年大数据的飞速发展,出现了很多热门的开源社区,其中著名的有 Hadoop 、 Storm ,以及后来的 Spark ,他们都有着各自专注的应用场景。 Spark 掀开了 内存计算 的先河,也以内存为赌注,赢得了内存计算的飞速发展。Spark 的火热或多或少的掩盖了其他分布式计算的系统身影。就像 Flink,也就在这个时候默默的发展着。
在国外一些社区,有很多人将大数据的计算引擎分成了 4 代 ,当然也有很多人不会认同。我们先姑且这么认为和讨论
第一代(Hadoop MapReduce)
批处理
Mapper、Reducer
hadoop的mapReduce将计算分为两个阶段,分别为Map和Reduce。对于上层应用来说,就不得不想法设法去拆分算法,甚至不得不在上层应用实现多个Job的串联,以完成一个完整的算法,例如迭代计算
第二代(DAG框架Tez + MapReduce)
第三代(Spark)
第四代(Flink)
概述:
官方介绍:
Stateful Computations over Data Streams
,即数据流上的有状态的计算
Data Streams
Flink认为有界数据集是无界数据流的一种特例,所以说有界数据集也是一种数据流,事件流也是一种数据流。Everything is streams
,即Flink可以用来处理任何的数据,可以支持批处理、流处理、AI、MachineLearning等
Stateful Computations
即有状态计算。有状态计算是最近几年来越来越被用户需求的一个功能。比如说一个网站一天内访问UV数,那么这个NV数便是状态。Flink提供了内置的对状态的一致性处理,即如果任务发生了Failover,其状态不会丢失、不会被多算少算,同时提供了非常高的性能
**无界流:**意思很明显,只有开始没有结束。必须连续的处理无界流数据,也即是在事件注入之后立即要对其进行处理。不能等待数据到达了再去全部处理,因为数据是无界的并且永远不会结束数据注入。处理无界流数据往往要求事件注入的时候有一定的顺序性,例如可以以事件产生的顺序注入,这样会使得处理结果完整
有界流:也即是有明确的开始和结束的定义。有界流可以等待数据全部注入完成了在开始处理。注入的顺序不是必须的了,因为对于一个静态的数据集,我们是可以对其进行排序的。有界流的处理也可以称为批处理
其他特点:
基于Apache Flink在阿里巴巴搭建的平台于2016年正式上线,并从阿里巴巴的 搜索 和 推荐 这两大场景开始实现。目前阿里巴巴所有的业务,包括阿里巴巴所有子公司都采用了基于Flink搭建的实时计算平台。同时Flink计算平台运行在开源的Hadoop集群之上。
采用Hadoop的 YARN 做为资源管理调度,以 HDFS 作为数据存储。
因此,Flink可以和开源大数据软件Hadoop无缝对接。
Flink在阿里巴巴的大规模应用,表现如何?
Flink分支 Blink
阿里自15年起开始调研开源流计算引擎,最终决定基于Flink打造新一代计算引擎
阿里贡献了数百个commiter,并对Flink进行高度定制,并取名为 Blink
阿里是 Flink SQL 的最大贡献者,一半以上的功能都是阿里的工程师开发的
Flink支持多种安装模式
环境准备:
安装包flink-1.6.1-bin-hadoop28-scala_2.11.tgz
注:需要根据自己的hadoop对应版本进行下载
安装步骤:
tar -zxvf flink-1.6.1-bin-hadoop28-scala_2.11.tgz -C /opt/module
cd /opt/module/flink-1.6.1
bin/start-cluster.sh
此时使用jps可以查看到下面两个进程
http://bigdata111:8081
注:slot 在flink里面可以认为是资源组,Flink是通过将任务分成子任务并且将这些子任务分配到slot来并行执行程序。
bin/flink-1.6.1 run /opt/module/flink-1.6.1/examples/batch/WordCount.jar --input /opt/module/zookeeper-3.4.10/zookeeper.out --output /opt/file/word.txt
此时控制台输出如下内容
Starting execution of program
Program execution finished
Job with JobID bb53dc79d510202e8abd95094791a32b has finished.
Job Runtime: 9539 ms
Standalone集群架构
部署情况:
安装步骤:
修改配置文件
`vim ./conf/flink-conf.yaml
## jobManager 的IP地址
jobmanager.rpc.address: node01
## JobManager 的端口号
jobmanager.rpc.port: 6123
## JobManager JVM heap 内存大小
jobmanager.heap.size: 1024
## TaskManager JVM heap 内存大小
taskmanager.heap.size: 1024
## 每个 TaskManager 提供的任务 slots 数量大小
taskmanager.numberOfTaskSlots: 2
#是否进行预分配内存,默认不进行预分配,这样在我们不使用flink集群时候不会占用集群资源
taskmanager.memory.preallocate: false
## 程序默认并行计算的个数
parallelism.default: 1
#JobManager的Web界面的端口(默认:8081)
jobmanager.web.port: 8081
#配置每个taskmanager生成的临时文件目录(选配)
taskmanager.tmp.dirs: /export/servers/flink-1.6.1/tmp
```
注:根据使用的版本不同,里面的内容可能略有不同
**slot和parallelism总结**
- taskmanager.numberOfTaskSlots:2
- 每一个taskmanager中的分配2个TaskSlot,3个taskmanager一共有6个TaskSlot
- parallelism.default:1 运行程序默认的并行度为1,6个TaskSlot只用了1个,有5个空闲
1. slot 是静态的概念,是指 taskmanager 具有的并发执行能力
2. parallelism 是动态的概念,是指程序运行时实际使用的并发能力
2. 修改slaves文件
`vim conf/slaves
bigdata111
bigdata222
bigdata333
注:此处填入主机名,如果没有设置主机名映射,需要先进行设置
3. 修改/etc/profile系统环境变量配置文件,添加HADOOP_CONF_DIR目录**(节点同步进行配置)**
添加如下内容
export HADOOP_CONF_DIR=/opt/module/hadoop-2.8.4/etc/hadoop
执行命令重新加载环境变量
source /etc/profile
4. 分发flink
scp -r /opt/module/flink-1.6.1 root@bigdata222:/opt/module
scp -r /opt/module/flink-1.6.1 root@bigdata333:/opt/module
5. 启动Flink集群
bin/start-cluster.sh
友情提示:如果之前开启,请先关闭之后在启动
- 启动/停止flink集群
- 启动:./bin/start-cluster.sh
- 停止:./bin/stop-cluster.sh
- 启动/停止jobmanager 如果集群中的jobmanager进程挂了,执行下面命令启动
- bin/jobmanager.sh start
- bin/jobmanager.sh stop
- 启动/停止taskmanager 添加新的taskmanager节点或者重启taskmanager节点
- bin/taskmanager.sh start
- bin/taskmanager.sh stop
6. 启动HDFS集群
start-dfs.sh
7. 在HDFS中创建测试目录,并上传测试文件
hdfs dfs -mkdir -p /flinkTest/input
hdfs dfs -put /opt/file/wordcount.txt /flinkTest/input
8. 测试
bin/flink-1.6.1 run /opt/module/flink-1.6.1/examples/batch/WordCount.jar --input hdfs://bigdata111:9000/flinkTest/input/wordcount.txt --output hdfs://bigdata111:9000/flinkTest/output/word.txt
9. 浏览webUI页面
http://bigdata111:8081
### 2.3 Standalone高可用HA模式
从上述架构图中,可发现JobManager存在 单点故障 ,一旦JobManager出现意外,整个集群无法工作。所以,为了确保集群的高可用,需要搭建Flink的HA。
**HA架构图**
![](http://img.saoniuhuo.com/images/1601452516295.png)
**部署情况:**
- 服务器:bigdata111(Master+Slave)
- 服务器:bigdata222(Master+Slave)
- 服务器:bigdata333(Slave)
**安装步骤:**
1. 修改配置文件`flink-conf.yaml`,添加如下配置
#开启HA,使用文件系统作为快照存储
state.backend: filesystem
#启用检查点,可以将快照保存到HDFS
state.backend.fs.checkpointdir: hdfs://bigdata111:9000/flink-checkpoints
#使用zookeeper搭建高可用
high-availability: zookeeper
high-availability.storageDir: hdfs://bigdata111:9000/flink/ha/
high-availability.zookeeper.quorum: bigdata111:2181,bigdata222:2181,bigdata333:2181
注:同样因为版本不同,配置文件内容也不同。
2. 分发
scp -r /opt/module/flink-1.6.1/conf/flink-conf.yaml root@bigdata222:/opt/module/flink-1.6.1/conf/
scp -r /opt/module/flink-1.6.1/conf/flink-conf.yaml root@bigdata333:/opt/module/flink-1.6.1/conf/
3. 修改节点2里的`flink-conf.yaml`配置文件
jobmanager.rpc.address: bigdata222
注:修改为节点所在主机名
4. 修改节点1的`conf/masters`配置文件,添加如下内容
bigdata111:8081
bigdata222:8081
5. 分发masters文件
scp -r /opt/module/flink-1.6.1/conf/masters root@bigdata222:/opt/module/flink-1.6.1/conf/
scp -r /opt/module/flink-1.6.1/conf/masters root@bigdata333:/opt/module/flink-1.6.1/conf/
6. 启动zookeeper集群
7. 启动HDFS集群
8. 启动flink集群
9. 分别查看两个节点的WEBUI
使用`kill -9 进程号` 干掉一个节点,然后查看另外一个节点的WEBUI
### 2.4 Yarn集群搭建
在一个企业中,为了最大化的利用集群资源,一般都会在一个集群中同时运行多种类型的 Workload。因此 Flink 也支持在 Yarn 上面运行; flink on yarn 的前提是:hdfs、yarn均启动
**部署情况:**
- JobManager: bigdata111
- WorkManager: bigdata111,bigdata222,bigdata333
**安装步骤:**
1. 修改Hadoop的`yarn-site.xm`
添加该配置表示内存超过分配值,是否将任务杀掉。默认为true。
运行Flink程序,很容易超过分配的内存
```xml
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
yarn-site.xm
至其他节点 scp -r /opt/module/hadoop-2.8.4/etc/hadoop/yarn-site.xml root@bigdata222:/opt/module/hadoop-2.8.4/etc/hadoop/
scp -r /opt/module/hadoop-2.8.4/etc/hadoop/yarn-site.xml root@bigdata333:/opt/module/hadoop-2.8.4/etc/hadoop/
start-dfs.sh
start-yarn.sh
Flink运行在YARN上,可以使用 yarn-session 来快速提交作业到YARN集群。我们先来看下Flink On Yarn
模式,Flink是如何和Yarn进行交互的。
yarn-session提供两种模式: 会话模式 和 分离模式
使用步骤:
bin/yarn-session.sh -n 2 -tm 800 -s 1 -d
yarn-session.sh脚本可以携带的参数:
Required
-n,--container <arg> 分配多少个yarn容器 (=taskmanager的数量)
Optional
-D <arg> 动态属性
-d,--detached 独立运行 (以分离模式运行作业)
-id,--applicationId <arg> YARN集群上的任务id,附着到一个后台运行的yarn
session中
-j,--jar <arg> Path to Flink jar file
-jm,--jobManagerMemory <arg> JobManager的内存 [in MB]
-m,--jobmanager <host:port> 指定需要连接的jobmanager(主节点)地址
使用这个参数可以指定一个不同于配置文件中的jobmanager
-n,--container <arg> 分配多少个yarn容器 (=taskmanager的数量)
-nm,--name <arg> 在YARN上为一个自定义的应用设置一个名字
-q,--query 显示yarn中可用的资源 (内存, cpu核数)
-qu,--queue <arg> 指定YARN队列
-s,--slots <arg> 每个TaskManager使用的slots数量
-st,--streaming 在流模式下启动Flink
-tm,--taskManagerMemory <arg> 每个TaskManager的内存 [in MB]
-z,--zookeeperNamespace <arg> 针对HA模式在zookeeper上创建NameSpace
bin/flink-1.6.1 run examples/batch/WordCount.jar
yarn application -kill application_id
杀掉任务 yarn application -kill application_1554377097889_0002
使用步骤:
bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
Flink之所以能这么流行,离不开它最重要的四个基石: Checkpoint、State、Time、Windo
首先是Checkpoint
机制,这是Flink的一个重要特性,Flink基于Chandy-Lamport
算法实现了一个分布式的一致性的快照,从而提供了一致性的语义。Chandy-Lamport算法是上在1985年的时候就已经被提出来,但并没有被很广泛的应用,而flink则把这个算法发扬光大了。Spark最近在实现Continue streaming,Continue streaming的目的是为了降低它处理的延时,其也需要提供这种一致性的语义,最终采用Chandy-Lamport这个算法,说明Chandy-Lamport算法在业界得到了一定的肯定
提供了一致性的语义之后,Flink为了让用户在编程时能够更轻松,更容易地去管理状态,还提供了一套非常简单明了的State API,包括里面的有有ValueState、ListState、MapState,近期添加了了BroadcastState,使用State API能够自动享受到这种一致性的语义
除此之外,Flink还实现了Watermark的机制,能够支持基于事件的时间的处理,或者说基于系统时间的处理,能够容忍数据的迟到,容忍乱序的数据
另外流计算中一般在对流数据进行操作之前都会先进行开窗,即基于一个什么样的窗口上做这个计算。Flink提供了开箱即用的各种窗口,比如如 滑动窗口 、 滚动窗口 、 会话窗口 以及非常灵活的 自定义的窗口 。
Flink是一个分层架构的系统,每一层所包含的组件都提供了特定的抽象,用来服务于上层组件。Flink分层的组件栈如下图所示:
从上至下:
部署层
Flink支持本地运行、能在独立集群或者在被YARN管理的集群上运行,也能部署在云上。
运行时
Runtime层提供了支持Flink计算的全部核心实现,为上层API层提供基础服务
API
DataStream、DataSet、Table、SQLAPI
扩展库
Flink还包括用于复杂事件处理,机器学习,图形处理和Apache Storm兼容性的专用代码库
Flink提供了不同的抽象级别以开发流式或批处理应用
Flink程序的基本构建块是流和转换(请注意。Flink的DataSet API中使用的DataSet也是内部流)。从概念上讲,流是(可能永无止境的)数据记录流,而转换是将一个或者多个流作为一个或多个流的操作。输入,并产生一个或者多个输出流
Flink应用程序结构就是如上图所示:
Flink程序在执行的时候,会被映射成一个Streaming Dataflow
,一个Streaming Dataflow是一组Stream
和Transformation Operator
组成的。再启动时从一个或多个Source Operator
开始,结束语一个或者多个Sink Operator
Flink程序本质上是 并行的和分布式的,在执行过程中,一个流(Stream)包含一个或多个流分区,而每一个operator包含一个或多个operator子任务。操作子任务间彼此独立,在不同的线程中执行,甚至是在不同的机器或者不同的容器上。operator子任务的数量是这一特定operator的 并行度。相同程序中的不同operator有不同级别的并行度
一个Stream可以被分成多个Stream的分区,也就是 Stream Partition
。一个Operator也可以被分为 多个Operator Subtask
。如上图中,Source被分成Source1和Source2,它们分别为Source的Operator Subtask。每Operator Subtask都是在 不同的线程 当中独立执行的。一个Operator的并行度,就等于Operator Subtask的个数。上图Source的并行度为2。而一个Stream的并行度就等于它生成的Operator的并行度。
数据在两个operator之间传递的时候有两种模式:
Flink的所有操作都称之为Operator(转换),客户端在提交任务的时候会对Operator进行优化操作,能进行合并的Operator会被合并为一个Operator,合并后的Operator称为Operator chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行
Actor System
、Scheduler
、CheckPoint
三个重要的组件任务槽
每个TaskManager是一个JVM的进程,可以在不同的线程中执行一个或者多个子任务
为了控制一个worker能接受到多少个task。worker通过task slot来进行控制(一个worker至少有一个task slot)
每个task slot 表示TaskManager拥有资源的固定大小的子集
Flink将进程的内存进行了划分到多个slot中
图中有两个TaskManager,每个TaskManager有三个slot,每个slot占有1/3的内存
内存被划分到不同的slot之后可以获得如下好处
槽共享(Slot Sharing)
默认情况下,Flink允许子任务共享插槽,即使它们是不同任务的子任务,只要它们来自同一作业。结果是一个槽可以保存作业的整个管道。允许插槽共享有两个好处:
在大数据处理领域,批处理任务与流处理任务一般被认为是两种 不同 的任务
一个大数据框架一般会被设计为只能处理其中一种任务
在执行引擎这一层,流处理系统与批处理系统最大不同在于 节点间的数据传输方式 :
这两种数据传输模式是两个极端,对应的是流处理系统对 低延迟 的要求和批处理系统对 高吞吐量 的要求
Flink的执行引擎采用了一种十分灵活的方式,同时支持了这两种数据传输
阿里在Flink的应用主要包含四个模块:实时监控、实时报表、流数据分析和实时仓库
从很多公司的应用案例发现,其实Flink主要用在如下三大场景:
场景一:Event-driven Applications【事件驱动】
事件驱动型应用 是一类具有状态的应用,它从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。
事件驱动型应用是在计算存储分离的 传统应用 基础上进化而来。
在传统架构中,应用需要 读写远程事务型数据库 。
相反,事件驱动型应用是基于 状态化流处理 来完成。在该设计中, 数据和计算不会分离 ,应用只需访问本地(内存或磁盘)即可获取数据。系统 容错性 的实现依赖于定期向远程持久化存储写入checkpoint 。下图描述了传统应用和事件驱动型应用架构的区别。
典型的事件驱动类应用:
场景二:Data Analytics Applications【数据分析】
数据分析任务需要从原始数据中提取有价值的信息和指标。
如下图所示,Apache Flink 同时支持流式及批量分析应用
Data Analytics Applications包含Batch analytics(批处理分析)和Streaming analytics(流处理分析)。
Batch analytics可以理解为 周期性查询 :比如Flink应用凌晨从Recorded Events中读取昨天的数据,然后做周期查询运算,最后将数据写入Database或者HDFS,或者直接将数据生成报表供公司上层领导决策使用。
Streaming analytics可以理解为 连续性查询 :比如实时展示双十一天猫销售GMV,用户下单数据需要实时写入消息队列,Flink 应用源源不断读取数据做实时计算,然后不断的将数据更新至Database或者K-VStore,最后做大屏实时展示。
典型的数据分析应用实例
场景三:Data Pipeline Applications【数据管道】
什么是数据管道?
提取-转换-加载(ETL)是一种在存储系统之间进行数据转换和迁移的常用方法。
ETL 作业通常会周期性地触发,将数据从事务型数据库拷贝到分析型数据库或数据仓库
数据管道和 ETL 作业的用途相似,都可以转换、丰富数据,并将其从某个存储系统移动到另一个。
但数据管道是以 持续流模式 运行,而非周期性触发。因此它支持从一个不断生成数据的源头读取记录,并将它们以低延迟移动到终点。例如:数据管道可以用来监控文件系统目录中的新文件,并将其数据写入事件日志;另一个应用可能会将事件流物化到数据库或增量构建和优化查询索引。
和周期性 ETL 作业相比, 持续数据管道 可以明显降低将数据移动到目的端的延迟。此外,由于它能够持续消费和发送数据,因此用途更广,支持用例更多。
下图描述了周期性 ETL 作业和持续数据管道的
Periodic ETL:比如每天凌晨周期性的启动一个Flink ETL Job,读取传统数据库中的数据,然后做ETL,最后写入数据库和文件系统。
Data Pipeline:比如启动一个Flink 实时应用,数据源(比如数据库、Kafka)中的数据不断的通过Flink Data Pipeline流入或者追加到数据仓库(数据库或者文件系统),或者Kafka消息队列。
典型的数据管道应用实例
思考题:
假设你是一个电商公司,经常搞运营活动,但收效甚微,经过细致排查,发现原来是羊毛党在薅平台的羊毛,把补给用户的补贴都薅走了,钱花了不少,效果却没达到。我们应该怎么办呢?
你可以做一个实时的异常检测系统,监控用户的高危行为,及时发现高危行为并采取措施,降低损失。
系统流程:
内容来源于网络,如有侵权,请联系作者删除!