流式处理Storm
Storm是一个免费开源、分布式、高容错的实时计算系统。Storm令持续不断的流计算变得容易,弥补了Hadoop批处理所不能满足的实时要求。
Storm简介
Storm进程常驻内存,数据不经过磁盘,在内存中处理。
流式处理
1、流式处理(异步)
客户端提交数据进行结算,并不会等待数据计算结果。
2、逐条处理
例:ETL
3、统计分析
例:计算PV、UV、访问热点 以及 某些数据的聚合、加和、平均等
- 客户端提交数据之后,计算完成结果存储到Redis、HBase、MySQL或者其他MQ当中,
- 客户端并不关心最终结果是多少。
实时请求
1、实时请求应答服务(同步)
客户端提交数据请求之后,立刻取得计算结果并返回给客户端
2、Drpc
3、实时请求处理
例:图片特征提取
高可靠性
1、异常处理
2、消息可靠性保障机制。
可维护性
StormUI 图形化监控接口
Storm与MR、Spark对比
Storm对比Mapreduce
Storm:进程、线程常驻内存运行,数据不进入磁盘,数据通过网络传递。
MapReduce:为TB、PB级别数据设计的批处理计算框架。
Storm对比Spark Streaming
Storm:纯流式处理
- 专门为流式处理设计
- 数据传输模式更为简单,很多地方也更为高效
- 并不是不能做批处理,它也可以来做微批处理,来提高吞吐
Spark Streaming:微批处理
- 将RDD做的很小来用小的批处理来接近流式处理
- 基于内存和DAG
Storm架构设计
Nimbus
- 资源调度
- 任务分配
- 接收jar包
Supervisor
- 接收nimbus分配的任务。
- 启动、停止自己管理的worker进程(当前supervisor上worker数量由配置文件设定)
Worker
- 运行具体处理运算组件的进程(每个Worker对应执行一个Topology的子集)。
- worker任务类型,即spout任务、bolt任务两种。
- 启动executor(executor即worker JVM进程中的一个java线程,一般默认每个executor负责执行一个task任务,
一个worker可以有多个executor,每个executor又可以执行多个task
)。
Zookeeper
- Storm重点依赖的外部资源。
- Nimbus和Supervisor甚至实际运行的Worker都是把心跳保存在Zookeeper上的。
- Nimbus也是根据Zookeerper上的心跳和任务运行状况,进行调度和任务分配的。
Storm 架构设计与Hadoop架构对比
Storm任务提交流程
提交流程图
下图是Storm的数据交互图。可以看出两个模块Nimbus和Supervisor之间没有直接交互。状态都是保存在Zookeeper上。Worker之间通过ZeroMQ传送数据。
提交流程叙述
Client:
1、client提交topology 到Nimbus;
Nimbus:
2、提交的jar包会被上传到nimbus服务器的nimbus/inbox目录下;
3、submitTopology方法对这个topology进行处理:
它首先要对storm本身,以及topology进行一些校验;
它要检查storm的状态是否是active的
它要检查是否已经有同名的topology在storm里面运行了
因为我们会在代码里面给spout、bolt指定id,storm会检查是否有两个spout和bolt使用了相同的id。
任何一个id都不能以 “_” 开头,这种命名方式是系统保留的。
4、建立topology的本地目录:
nimbus/stormdist/topology-uuid,该目录包含三个文件:
stormjar.jar – 包含这个topology所有代码的jar包(从nimbus/index里面挪过来的)
stormcode.ser – 这个topology对象的序列化
stormconf.ser – 运行这个topology的配置
5、nimbus分配任务,根据topology定义中给定的参数,给spout/bolt设定task数据,分配对应的task-id,最后把分配好task的信息写入到zookeeper的/task目录;
6、nimbus在zookeeper上创建taskbeats目录,要求每个task定时发送心跳信息;
7、将分配好的任务,写入zookeeper,任务提交完毕;
8、将topology的信息写入到zookeeper/storms目录;
Supervisor:
1、定期扫描zookeeper上的storms目录,查看是否有新的任务,有就下载下来;
2、删除本地不再运行的topology的代码;
3、根据nimbus指定的任务信息启动worker进行工作;
Worker:
1、查看需要执行哪些任务;
2、根据taskid分辨出spout、bolt;
3、计算出所代表的的spout/bolt会给哪些task发送消息;
4、根据ip和端口号创建响应的网络连接用来发送消息。
Storm计算模型
Topology - DAG有向无环图的实现
- Storm提交运行的程序称为Topology。
- 由一系列通过数据流相互关联的Spout、Bolt所组成的拓扑结构。
- 生命周期:此拓扑只要启动就会一直在集群中运行,直到手动将其kill,否则不会终止(区别于MapReduce当中的Job,MR当中的Job在计算执行完成就会终止)
Spout – 数据源
-
拓扑中数据流的来源。一般会从指定外部的数据源读取元组(Tuple)发送到拓扑(Topology)中,一个Spout可以发送多个数据流(Stream)。
-
可先通过OutputFieldsDeclarer中的
declare方法
声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的emit方法
指定数据流Id(streamId)参数将数据发送出去。 -
Spout中最核心的方法是nextTuple
,该方法会被Storm线程不断调用、主动从数据源拉取数据,再通过emit
方法将数据生成元组(Tuple)发送给之后的Bolt计算。
Bolt – 数据流处理组件
- 拓扑中数据处理均有Bolt完成。对于简单的任务或者数据流转换,单个Bolt可以简单实现;更加复杂场景往往
需要多个Bolt分多个步骤完成。 - 一个Bolt可以发送多个数据流(Stream)。
- 可先通过OutputFieldsDeclarer中的
declare方法
声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的emit方法
指定数据流Id(streamId)参数将数据发送出去。 Bolt中最核心的方法是execute方法
,该方法负责接收到一个元组(Tuple)数据、真正实现核心的业务逻辑。
Tuple – 元组
- Stream中最小数据组成单元。
Stream – 数据流
- 从Spout中源源不断传递数据给Bolt、以及上一个Bolt传递数据给下一个Bolt,所形成的这些数据通道即叫做
Stream。 - Stream声明时需给其指定一个Id(默认为Default)
- 实际开发场景中,多使用单一数据流,此时不需要单独指定StreamId
demo API
Storm 数据累加
MyTopology类(main方法)
public class MyTopology { |
MySpout类:发送数据
public class MySpout extends BaseRichSpout { |
MyBolt类:处理数据
public class MyBolt extends BaseRichBolt{ |
运行代码,查看打印信息:
从打印信息看出,累加操作完成。
Storm WordCount
WordCountToplogy类(main方法)
public class WordCountToplogy { |
WordCountSpout类:发送数据
public class WordCountSpout extends BaseRichSpout { |
SplitBolt类:切分单词
public class SplitBolt extends BaseRichBolt { |
CountBolt类:统计单词个数
public class CountBolt extends BaseRichBolt { |
运行,查看打印结果:
由打印结果可以看出,统计单词完成。
Storm Grouping
Shuffle Grouping
随机分组,随机派发stream里面的tuple,保证每个bolt task接收到的tuple数目大致相同。
Fields Grouping
按字段分组,比如,按"user-id"这个字段来分组,那么具有同样"user-id"的 tuple 会被分到相同的Bolt里的一
个task, 而不同的"user-id"则可能会被分配到不同的task。
All Grouping
广播发送,对于每一个tuple,所有的bolts都会收到。
Global Grouping
全局分组,把tuple分配给task id最低的task 。
None Grouping
不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shuffle grouping是一样的效果。
有一点不同的是storm会把使用none grouping的这个bolt放到这个bolt的订阅者同一个线程里面去执行(未
来Storm如果可能的话会这样设计)。
Direct Grouping
指向型分组, 这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的
哪个task处理这个消息。只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息tuple必
须使用 emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的task的id
(OutputCollector.emit方法也会返回task的id)。
Local or shuffle grouping
本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发
送给这些同进程中的tasks。否则,和普通的Shuffle Grouping行为一致。
customGrouping
自定义,相当于mapreduce那里自己去实现一个partition一样。
并发机制
Topology参数设置
Worker – 进程
-
一个Topology拓扑会包含一个或多个Worker(每个Worker进程只能从属于一个特定的Topology)。
-
这些Worker进程会并行跑在集群中不同的服务器上,即一个Topology拓扑其实是由并行运行在Storm集群中
多台服务器上的进程所组成。
Executor – 线程
- Executor是由Worker进程中生成的一个线程。
- 每个Worker进程中会运行拓扑当中的一个或多个Executor线程。
- 一个Executor线程中可以执行一个或多个Task任务(默认每个Executor只执行一个Task任务),但是这些
Task任务都是对应着同一个组件(Spout、Bolt)。
Task
- 实际执行数据处理的最小单元。
- 每个task即为一个Spout或者一个Bolt。
- Task数量在整个Topology生命周期中保持不变,Executor数量可以变化或手动调整。
- 默认情况下,Task数量和Executor是相同的,即每个Executor线程中默认运行一个Task任务。
设置Worker进程数
Config.setNumWorkers(int workers) |
设置Executor线程数
TopologyBuilder.setSpout(String id, IRichSpout spout, Number parallelism_hint) |
其中, parallelism_hint即为executor线程数。
设置Task数量
ComponentConfigurationDeclarer.setNumTasks(Number val) |
例:
Config conf = new Config() ; |
负载均衡
Rebalance – 再平衡
动态调整Topology拓扑的Worker进程数量、以及Executor线程数量。
支持两种调整方式
- 通过Storm UI
- 通过Storm CLI
通过Storm CLI动态调整
例:storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
将 mytopology 拓扑 worker 进程数量调整为5个
“ blue-spout ” 所使用的线程数量调整为3个
“ yellow-bolt ”所使用的线程数量调整为10个
通信机制
Worker进程间的数据通信
ZMQ
ZeroMQ 开源的消息传递框架,并不是一个MessageQueue。
Netty
Netty是基于NIO的网络框架,更加高效。(之所以Storm 0.9版本之后使用Netty,是因为ZMQ的license和Storm的license不兼容。)
Worker内部的数据通信
Disruptor
- 实现了“队列”的功能。
- 可以理解为一种事件监听或者消息处理机制,即在队列当中一边由生产者放入消息数据,另一边消费者
并行取出消息数据处理。
Worker内部的消息传递机制:
容错机制
集群节点宕机
Nimbus服务器
单点故障。
非Nimbus服务器
故障时,该节点上所有Task任务都会超时,Nimbus会将这些Task任务重新分配到其他服务器上运行。
进程挂掉
Worker
挂掉时,Supervisor会重新启动这个进程。如果启动过程中仍然一直失败,并且无法向Nimbus发送心跳,Nimbus会将该Worker重新分配到其他服务器上。
Supervisor
- supervisor挂掉,不影响正在运行的worker,但也不会在这台机器上面启动新的worker。
- supervion和worker都挂了,这个worker会转移到其它正常运行的supervisor节点上面。
Nimbus
nimbus挂掉,不影响正在运行的任务,但也不会接受新的Topology。
消息的完整性
- 从Spout中发出的Tuple,以及基于他所产生Tuple(例如上个例子当中Spout发出的句子,
以及句子当中单词的tuple等)。 - 由这些消息就构成了一棵tuple树。
- 当这棵tuple树发送完成,并且树当中每一条消息都被正确处理,就表明spout发送消息被“
完整处理”,即消息的完整性。
Acker – 消息完整性的实现机制
-
Storm的拓扑当中特殊的一些任务。
-
负责跟踪每个Spout发出的Tuple的DAG(有向无环图)。
Storm 完全分布式部署
部署ZooKeeper
- 版本3.4.5+ (高版本Zookeeper实现了对于自身持久化数据的定期删除功能)
- (autopurge.purgeInterval; autopurge.snapRetainCount)
上传、解压安装包
在storm目录中创建logs目录
- $ mkdir logs
修改配置文件
- storm.yaml
配置文件内容:
storm.zookeeper.servers: |
由于每个节点都需要配置,将当前节点storm远程拷贝到其他的节点上:
scp -r apache-storm-0.10.0 sean02:`pwd` |
启动Zookeeper集群
- zkServer.sh start
在sean01上启动Nimbus
将nimbus的进程日志标准、错误输出重定向追加加到/logs/nimbus.out文件中,后面的&符代表linux后台运行 |
查看linux进程:
注:core 进程为Storm Ui的进程。
在sean02、sean03上启动Supervisor
(按照配置每个Supervisor上启动4个slots)
./bin/storm supervisor >> ./logs/supervisor.out 2>&1 & |
启动Storm UI
./storm ui >> ./logs/ui.out 2>&1 & #Web Ui的进程 |