流式实时计算引擎

x33g5p2x  于2020-09-30 发布在 Storm  
字(9.2k)|赞(0)|评价(0)|浏览(2624)

流式数据在实际应用中非常常见,典型的流式数据包括点击日志、监控指标数据、搜索日志等。流式数据往往伴随实时计算需求,即对流式数据进行实时分析,以便尽可能快速地获取有价值的信息。在大数据领域,将针对流式数据进行实时分析的计算引擎称为流式实时计算引擎。这类引擎最大的特点是延迟低,即从数据产生到最终处理完成,整个过程用时极短,往往是毫秒级或秒级处理延迟。与批处理计算引擎类似,流式实时计算引擎也需具有良好的容错性、扩展性和编程简易性等特点。目前常用的流式实时计算引擎分为两类:面向行(row-based)和面向微批处理(micro-batch)。其中,面向行的流式实时计算引擎的代表是Apache Storm,其典型特点是延迟低,但吞吐率也低;而面向微批处理的流式实时计算引擎的代表是Spark Streaming,其典型特点是延迟高,但吞吐率也高。

1 概述

11 产生背景

一个流式计算过程可用图13-1所示概括,一条消息(msg1)到达后,依次经若干用户实现逻辑处理后,将最终结果写入外部系统。每条消息经用户逻辑处理后,会衍生出新的消息(比如msg1衍生出msg2或msg3)。而对于流式计算而言,应保证消息的可靠性:每条消息进入系统后,可以依次完整经历用户定义的逻辑,最终产生期望的结果,而不应因任意故障导致消息处理中断后致使消息处理不完整。

图13-1 流式计算过程

传统的流式计算平台是通过“消息队列+工作进程”组合方式构建的,具体如图13-2所示。流式数据到达系统后:

  1. 按照某种预定义的分区策略,被放入若干消息队列中;
  2. 由嵌入用户应用逻辑的工作进程从合适的消息队列中读取消息(这里的“消息”指一条流式数据),经处理后,衍生出的消息被重新放入消息队列;
  3. 再由另外一类工作进程处理这些新产生的消息;
  4. 重复以上过程,直到任意进入系统的消息(或者衍生出的消息),被所有工作进程处理一遍;

图13-2 传统流式计算平台

这类系统能够解决流式数据处理的问题,但是存在以下几个缺点:

  • 扩展性差:消息分区以及工作进程分布通过人工完成的,当机器规模增加时,整个系统扩展起来非常烦琐。
  • 容错性差:当工作进程因硬件故障或软件bug而失败时,需要人工干预,重启对应的工作进程。当一个消息队列崩溃时,可能面临数据丢失的危险。
  • 无法保证数据被处理完:流式处理应用场景通常是对数据处理完整性有一定的要求,即每条数据应至少被处理一次(at least once)或仅且仅被处理一次(exactly once)。

为了克服传统消息队列系统的不足,新型流式计算引擎诞生了。这类计算引擎为用户提供了简易的编程接口,用户可通过实现这些编程接口即可完成分布式流式应用程序的开发,而其他比较复杂的工作,如节点间的通信、节点失效、数据分片、系统扩展等,全部由运行时环境完成,用户无需关心这些细节。

1.2 常见开源实现

当前比较主流的流式数据线(Data Pipeline)共分为四步:

(1)数据采集:该阶段主要负责从不同的数据源上实时采集数据,典型的数据源包括移动客户端,网站后端等,通常根据后端数据缓存模块不同,选用不同的实现方案,可选的包括Flume以及自定义Kafka Producer。

(2)数据缓冲:为了平衡数据采集和数据处理速率的不对等,通常数据采集阶段和处理阶段之间加入一个数据缓冲阶段,通过由消息队列担任该角色,比如:Kafka

(3)实时分析:流式地从数据缓冲区获取数据,并快速完成数据处理,将结果写到后端的存储系统中。根据系统对延迟和吞吐率的要求不同,可选用不同的流式计算引擎,比如Storm或SparkStreaming。

(4)结果存储:将计算产生的结果存储到外存储系统中,根据应用场景不同,可选择不同的存储系统,比如大量可实时查询的系统,可存储到HBase中,小量但需可高并发查询的系统,可存入Redis中。

根据流式计算引擎的数据组织特点,可将其分为两类:基于行(row based)和基于微批处理(micro-batch based)。基于行的流式实时处理系统以行为单位处理数据,其主要优点是单条数据的处理延迟低,但系统吞吐率一般也较低,其典型代表是Apache Storm;基于微批处理的流式实时处理系统则将流式处理转化为批处理,即以批为单位组织数据,它通常以时间为单位将流式数据切割成连续的批数据,并通过批处理的方式处理每批数据,这类系统的优点是吞吐率高,而缺点也很明显:单条数据处理延迟较高,其典型代表是Spark Streaming。

2 Storm基础与实战

本节将介绍Storm的基本概念、软件架构、程序设计方法以及内部原理。

2.1 Storm概念与架构

本小节将介绍Storm基本概念,包括:Tuple、Stream、Topology、Bolt和Spout。

1. 概念

Storm中核心概念如下:

  • Tuple:由一组可序列化的元素构成,每个元素可以是任意类型,包括Java原生类型、String、byte[]、自定义类型(必须是可序列化的)等;
  • Stream:无限的Tuple序列对象形成一个Stream,每个Stream由一个唯一ID、一个对Tuple中元素命名的Schema以及无限Tuple构成;
  • Topology:Storm中的用户应用程序被称为“Topology”,这类似于MapReduce中的Job。它的英文本意是网络拓扑,是由一些列Spout和Blot构成的DAG(有向无环图),其中每个点表示一个Spout或Blot,每个边表示Tuple流动方向。
  • Spout:Stream的数据源,它通常从外部系统(比如:消息队列)中读取数据,并发射到Topology中。Spout可将数据(Tuple)发射到一个或多个Stream中。
  • Blot:消息处理逻辑,可以对接收到的消息做任意处理逻辑,包括:过滤、聚集、与外部数据库通信、消息转换等。Blot可进一步将产生的数据(Tuple)发射到一个或多个Stream中。

在一个Topology中,每个Spout或Blot通常由多个Task组成,每个Spout和Blot的Task相互独立,可以并行执行。如图13-4所示,可类比MapReduce中的job:一个MapReduce Job可看作一个两阶段的DAG,其中Map阶段可分解成多个Map Task, Reduce阶段可分解成多个Reduce Task,相比之下,Storm Topology是一个更加通用的DAG,可以有多个Spout和Blot阶段,每个阶段可进一步分解成多个Task。

图13-4 Storm的Topology构成

  • Streaming Grouping:Stream Grouping决定了Topology中Tuple在不同Task之间的传递方式。Storm主要提供了多种Stream Grouping实现,常用的有:

    (1)Shuffle Grouping:随机化的轮询方式,即Task产生的Tuple将采用轮询方式发送给下一类组件的Task;

    (2)LoadOrShuffle Grouping:经优化的Shuffle Grouping实现,它使得同一Worker内部的Task优先将Tuple传递给同Worker的其他Task。

    (3)Fields Grouping:某个字段值相同的Tuple将被发送给同一个Task,类似于MapReduce或Spark中的Shuffle实现。

2. Storm基本架构

一个Storm集群由三类组件构成:Nimbus、Supervistor和ZooKeeper。,如图13-5所示,它们的功能如下:

图13-5 Storm架构

  • Nimbus:集群管理者和调度组件,通常只有一个,负责代码分发、任务调度、故障监控以及容错 (将失败的任务调度到其他机器上)等,Nimbus是无状态的,可通过“kill -9”杀掉它而不影响正常应用程序的运行。
  • Supervistor:计算组件,通常有多个,负责执行实际的计算任务,根据Nimbus指令启动或者停止worker进程。与Nimbus类似,Supervisor也是无状态。
    • Worker:实际的计算进程,每个Supervisor可启动多个Worker进程(需静态为每个Worker分配一个固定端口号),但每个Worker只属于特定的某个Topology。
    • Executor:每个Worker内部可以启动多个Executor,以运行实际的用户逻辑代码(Task),每个Executor可以运行同类组件(同一个Topology内的Spout或Bolt)中一个或多个Task。
    • Task:用户逻辑代码,由Executor线程根据上下文调用对应的Task计算逻辑。
  • Zookeeper:Nimbus与Supervisor之间的协调组件,存储状态信息和运行时统计信息,包括:
    • Supervisor的注册与发现,监控失败的Supervisor。
    • Worker通过Zookeeper向Nimbus发送包含Executor运行状态的心跳信息。
    • Supervisor通过Zookeeper向Nimbus发送包含自己最新状态的心跳信息。

3. Topology并发度

一个Storm的Topology的并发度与Worker、Executor和Task三种实体的数目相关,用户可根据需要为Topology定制每种实体的数目。需要注意的是,这些实体的并发度也被称为“parallelism hint”,它们的数值只是初始值,而后续可根据需求进一步进行调整。三种实体的并发度设置方式具体如下:

  • Worker进程的并发度,即在Storm集群中为Topology启动多个Worker,通常有两种设置方法:
    • 在配置文件中通过配置项TOPOLOGY_WORKERS设置。
    • 在代码中,通过函数Config#setNumWorkers设置。
  • Executor线程的并发度,即每类组件(Spout或Bolt)启动Executor数目,可通过函数TopologyBuilder#setSpout()和TopologyBuilder#setBolt()分别设置Spout和Bolt类型的Executor的数目。
  • Task数目:每类组件启动Task数目,可通过函数ComponentConfigurationDeclarer#setNumTasks()设置。

以下代码创建了一个Storm Topology,包含KafkaSpout、SplitBolt以及MergeBolt三类组件,并为每类组件设置了Executor和Task数目,该代码对应的Topology运行时环境如图13-6所示。

Config conf = new Config();
conf.setNumWorkers(2);
topologyBuilder.setSqout("kafka-spout",new KafkaSpout(), 2);
topologyBuilder.setBolt("split-bolt", new SplitBolt(), 2)// 为SplitBolt启动两个Executor线程
 	.setNumTasks(4) // 设置Task数目为4
  .shuffleGrouping("kafka-spout");
topologyBuilder.setBolt("merge-bolt", new MergeBolt(), 6)// 为SplitBolt启动6个Executor线程
  .shuffleGrouping("split-bolt");
StormSubmitter.submitToplogy("mytopologgy", conf, topologgyBuilder.createTopology());

图13-6 一个Storm Topology的运行时环境

一旦Topology运行起来后,用户可通过Web UI或Shell命令动态修改Topology的并发度,比如以下Shell命令将以上Topology的Worker数目增大为4, kafka-spout Executor数目增大为4,merge-bolt Executor数目增大为8:

storm rebalance mytopology -n 4 -e kafka-spout=4 -e  merge-bolt=8

2.2 Storm程序设计

本节将介绍简化版的Storm程序设计实例:网站指标实时分析系统。在该系统中,用户行为数据(日志)被源源不断地发送到Kafka集群中,之后经Storm Topology处理后,写入HBase中,以供可视化模块展示实时统计系统,包括网站的PⅤ(Page Ⅴiew)和UⅤ(Unique visitor)等信息。

在该系统中,我们采用JSON格式保存用户访问日志,每条日志包含三个字段,分别是客户端地址(ip)、访问时间(timestamp),访问的链接(url),以下是几条日志数据的示例:

{"ip":"10.10.10.1", "timestamp":"20170822132730", "url":"http://dongxicheng.org/mapreduce-nextgen/voidbox-docker-on-hadoop-hulu"}
{"ip":"112.156.10.1", "timestamp":"20170822132730", "url":"http://dongxicheng.org/framework-on-yarn/hadoop-spark-common-parameters/"}
{"ip":"150.110.103.5", "timestamp":"20170822132732","url":"http://dongxicheng.org/mapreduce-nextgen/yarn-mesos-borg/"}

为了实现网站指标的实时统计,我们可以设计如图13-7所示的Storm Topology。

图13-7 网站指标实时分析系统的Storm Topology

该Topology包含1个Spout和3个Bolt,它们的作用如下:

  • KafkaSpout:从指定的Kafka Topic中读取数据,并以Tuple形式传递给后面的Spout。
  • ParseBolt:解析JSON数据,并提出ip、timestamp和url三个字段。
  • ComputeBolt:以固定时间窗口为单位计算各个URL访问的PⅤ和UⅤ。
  • HBaseBolt:将计算得到的PⅤ和UⅤ值存入HBase中。

创建Topology主要代码如下:

TopologyBuilder builder = new TopologyBuilder();
// 定义时间窗口大小
BaseWindowedBolt.Duration duration = BaseWindowedBolt.Duration.minutes(10);
// 构建Topology,依次设置KafkaSpout、ParseBolt、ComputeBolt、HBaseBolt
builder.setSpout("kafka-bolt", new KafkaBolt(spoutCOnf), 1)
builder.setBolt("parse-bolt", new ParseBolt(), 4).setShuffleGrouping("kafka-spout");
builder.setBolt("calculate-bolt", new CalculateBolt().withWindow(duration,duration), 1).setShuffleGrouping("parse-blot");
builder.setBolt("hbase-bolt", new HbaseBolt(), 1).setShuffleGrouping("calculate-bolt");
StormSubmitter.submitTopology("StatisticsTopology", conf, builder-createTopogy());

ParseBolt代码如下:

public class ParseBolt extends BaseBasicBolt {
  @Override
 	public void prepare(Map map, TopologyContext topologyContext) {
  	}

  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    byte[] buffer = (byte[]) tuple.getValueByField("bytes");
    String strs = new String(buffer);
    // 从每个Json对象中解析出ip、url和timestamp三个字段
    JSONObject json = JSON.parseObject(strs);
    String ip = (String) json.get("ip");
    String url = (String) json.get("url");
    String timestamp = (String) json.get("timestamp");
    collector.emit(new Values(url, ip, timestamp));
   }


  @Override
  public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    outputFieldsDeclarer.declare(new Fields("url", "ip", "timestamp"));
  }
}

2.3 Storm内部原理

Storm Topology是由一个Spout和多个Blot,每个Spout或Blot可通过任务并行化的方式运行在集群中。为了保证Topology可靠地运行在集群中,Storm提供了一整套分布式运行时环境,该环境由Nimbus、Supervisor和Zookeeper等组件构成。

1. Topology生命周期

如图13-8所示,可类比MapReduce Job学习Storm Topology:MapReduce Job分为Map和Reduce两个阶段,这两个阶段均通过任务并行化的方式运行,其中Map阶段启动多个MapTask, Reduce阶段启动多个Reduce Task, Reduce Task依赖于Map Task输出结果,但同类Task之间是彼此独立的。对于Storm Topology而言,它通常有一个Spout和多个Blot阶段构成,这些阶段存在数据依赖关系,进而形成一个DAG,也是通过任务并行化的方式运行,各个阶段均可以启动多个独立的Task并执行。

图13-8 MapReduce与Storm对比

如图13-9所示,Storm Topology从提交到运行,依次经历以下几个步骤:

  • Storm首先将Topology JAR包上传到Nimbus所在节点上,之后通过RPC函数将Topology提交Nimbus。
  • Nimbus收到用户的Topology后,根据其配置信息初始化Spout Task和Bolt Task,之后各节点资源(slot)使用情况将任务调度到各个节点上,并将分配结果写入Zookeeper对应目录下,需要注意的是,Numbus不会直接与Supervisor交互,而是通过ZooKeeper协调完成的。
  • Supervisor周期性与ZooKeeper通信,获取Nimbus分配的任务,之后启动Worker进程,并进一步将任务运行在Executor线程中。
  • Worker周期性将运行状态写到ZooKeeper上,Supervisor周期性将Executor的运行状态写到ZooKeeper上,而Nimbus则通过ZooKeeper监控各个组件健康状态,一旦发现某个组件出现故障则将其转移到其他节点上。

2. Storm运行时环境

一个Storm集群是由Nimbus、Supervisor和ZooKeeper三类组件构成,其中Nimbus负责调度和容错,Supervisor负责启动实际的计算任务,而Zookeeper是Nimbus和Supervisor之间的调度者。

(1) Nimbus

Nimbus扮演master角色,每个Storm集群只有一个,基本职责包括:

  • 处理来自客户端的请求,包括提交Topology,杀死Topology,调整Topology并发度等;
  • 任务调度,Nimbus内部的任务调度器是插拔式的,用户可根据自己需要修改调度器,默认情况下,Nimbus调度器采用的调度策略如下:
    • 在Slot充足的情况下,能够保证所有Topology的Task被均匀地分配到整个集群的所有节点上;
    • 在Slot不足的情况下,会把Topology的所有Task分配到仅有的slot上去,这时候并不是用户期望的理想状态,所以在Nimbus发现有多余slot的时候,它会重新将Topology的Task分配到空余的slot上去以达到理想状态。
    • 在没有slot的时候,它什么也不做,直到集群中出现可用slot。
  • 组件容错:当Worker、Executor或Task出现故障时,Nimbus调度器会重启它们,或直接转移到新的节点上重新运行。

(2) Zookeeper

Zookeeper是Nimbus和Supervisor之间的协调者,它的引入使得Nimbus和Supervisor之间解耦,由于Storm集群中所有状态均可靠地保存在ZooKeeper上,这使得Nimbus自身变成无状态,以上设计使得Storm集群的容错性和鲁棒性很好。

在Storm中,Zookeeper数据组织方式如图13-10所示

相关文章