Storm是一个免费开源、分布式、高容错的实时计算系统。Storm令持续不断的流计算变得容易,弥补了Hadoop批处理所不能满足的实时要求。

官网 http://storm.apache.org

Storm简介

Storm进程常驻内存,数据不经过磁盘,在内存中处理。

流式处理

1、流式处理(异步)

客户端提交数据进行结算,并不会等待数据计算结果。

2、逐条处理

例:ETL

3、统计分析

例:计算PV、UV、访问热点 以及 某些数据的聚合、加和、平均等

  • 客户端提交数据之后,计算完成结果存储到Redis、HBase、MySQL或者其他MQ当中,
  • 客户端并不关心最终结果是多少。

实时请求

1、实时请求应答服务(同步)

客户端提交数据请求之后,立刻取得计算结果并返回给客户端

2、Drpc

3、实时请求处理

例:图片特征提取

高可靠性

1、异常处理

2、消息可靠性保障机制。

可维护性

StormUI 图形化监控接口

Storm与MR、Spark对比

Storm对比Mapreduce

Storm:进程、线程常驻内存运行,数据不进入磁盘,数据通过网络传递。

MapReduce:为TB、PB级别数据设计的批处理计算框架。

Storm对比Spark Streaming

Storm:纯流式处理

  • 专门为流式处理设计
  • 数据传输模式更为简单,很多地方也更为高效
  • 并不是不能做批处理,它也可以来做微批处理,来提高吞吐

Spark Streaming:微批处理

  • 将RDD做的很小来用小的批处理来接近流式处理
  • 基于内存和DAG

Storm架构设计

Nimbus

  • 资源调度
  • 任务分配
  • 接收jar包

Supervisor

  • 接收nimbus分配的任务。
  • 启动、停止自己管理的worker进程(当前supervisor上worker数量由配置文件设定)

Worker

  • 运行具体处理运算组件的进程(每个Worker对应执行一个Topology的子集)。
  • worker任务类型,即spout任务、bolt任务两种。
  • 启动executor(executor即worker JVM进程中的一个java线程,一般默认每个executor负责执行一个task任务,一个worker可以有多个executor,每个executor又可以执行多个task)。

Zookeeper

  • Storm重点依赖的外部资源。
  • Nimbus和Supervisor甚至实际运行的Worker都是把心跳保存在Zookeeper上的。
  • Nimbus也是根据Zookeerper上的心跳和任务运行状况,进行调度和任务分配的。

Storm 架构设计与Hadoop架构对比

Storm任务提交流程

提交流程图

下图是Storm的数据交互图。可以看出两个模块Nimbus和Supervisor之间没有直接交互。状态都是保存在Zookeeper上。Worker之间通过ZeroMQ传送数据。

提交流程叙述

Client:

1、client提交topology 到Nimbus;

Nimbus:

2、提交的jar包会被上传到nimbus服务器的nimbus/inbox目录下;

3、submitTopology方法对这个topology进行处理:

​ 它首先要对storm本身,以及topology进行一些校验;
​ 它要检查storm的状态是否是active的
​ 它要检查是否已经有同名的topology在storm里面运行了
​ 因为我们会在代码里面给spout、bolt指定id,storm会检查是否有两个spout和bolt使用了相同的id。
​ 任何一个id都不能以 “_” 开头,这种命名方式是系统保留的。

4、建立topology的本地目录:

​ nimbus/stormdist/topology-uuid,该目录包含三个文件:
​ stormjar.jar – 包含这个topology所有代码的jar包(从nimbus/index里面挪过来的)
​ stormcode.ser – 这个topology对象的序列化
​ stormconf.ser – 运行这个topology的配置

5、nimbus分配任务,根据topology定义中给定的参数,给spout/bolt设定task数据,分配对应的task-id,最后把分配好task的信息写入到zookeeper的/task目录;

6、nimbus在zookeeper上创建taskbeats目录,要求每个task定时发送心跳信息;

7、将分配好的任务,写入zookeeper,任务提交完毕;

8、将topology的信息写入到zookeeper/storms目录;

Supervisor:

1、定期扫描zookeeper上的storms目录,查看是否有新的任务,有就下载下来;

2、删除本地不再运行的topology的代码;

3、根据nimbus指定的任务信息启动worker进行工作;

Worker:

1、查看需要执行哪些任务;

2、根据taskid分辨出spout、bolt;

3、计算出所代表的的spout/bolt会给哪些task发送消息;

4、根据ip和端口号创建响应的网络连接用来发送消息。

Storm计算模型

Topology - DAG有向无环图的实现

  • Storm提交运行的程序称为Topology。
  • 由一系列通过数据流相互关联的Spout、Bolt所组成的拓扑结构。
  • 生命周期:此拓扑只要启动就会一直在集群中运行,直到手动将其kill,否则不会终止(区别于MapReduce当中的Job,MR当中的Job在计算执行完成就会终止)

Spout – 数据源

  • 拓扑中数据流的来源。一般会从指定外部的数据源读取元组(Tuple)发送到拓扑(Topology)中,一个Spout可以发送多个数据流(Stream)。

  • 可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的emit方法指定数据流Id(streamId)参数将数据发送出去。

  • Spout中最核心的方法是nextTuple,该方法会被Storm线程不断调用、主动从数据源拉取数据,再通过emit
    方法将数据生成元组(Tuple)发送给之后的Bolt计算。

Bolt – 数据流处理组件

  • 拓扑中数据处理均有Bolt完成。对于简单的任务或者数据流转换,单个Bolt可以简单实现;更加复杂场景往往
    需要多个Bolt分多个步骤完成。
  • 一个Bolt可以发送多个数据流(Stream)。
  • 可先通过OutputFieldsDeclarer中的declare方法声明定义的不同数据流,发送数据时通过SpoutOutputCollector中的emit方法指定数据流Id(streamId)参数将数据发送出去。
  • Bolt中最核心的方法是execute方法,该方法负责接收到一个元组(Tuple)数据、真正实现核心的业务逻辑。

Tuple – 元组

  • Stream中最小数据组成单元。

Stream – 数据流

  • 从Spout中源源不断传递数据给Bolt、以及上一个Bolt传递数据给下一个Bolt,所形成的这些数据通道即叫做
    Stream。
  • Stream声明时需给其指定一个Id(默认为Default)
  • 实际开发场景中,多使用单一数据流,此时不需要单独指定StreamId

demo API

Storm 数据累加

MyTopology类(main方法)

public class MyTopology {
public static void main(String[] args){
// 数据累加... spout bolt
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("myspout", new MySpout());
topologyBuilder.setBolt("mybolt", new MyBolt()).shuffleGrouping("myspout");

// 定义配置
Config config = new Config();
StormTopology topology = topologyBuilder.createTopology();

//本地提交
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("sum", config, topology)
}
}

MySpout类:发送数据

public class MySpout extends BaseRichSpout {
SpoutOutputCollector collector;
int i = 0;

/**
* 初始化方法。框架在执行任务的时候会先执行此方法
* @param conf 可以得到spout的配置
* @param context 上下文环境
* @param collector 往下游发送数据
*/
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}

/**
* 此方法是spout的核心方法。
* 框架会一直(无限)调用此方法,每当调用时,我们应该往下游发送数据
*/
@Override
public void nextTuple() {
i++;
collector.emit(new Values(i)); //底层不定参

try {
//发送一条数据睡眠一下,降低发送频率
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("spout 发送。。。" + i);
}

/**
* 当需要往下游发送数据时,就要声明字段个数和名称
* @param declarer
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("number")); //与Values方法传入的参数保持一致
}
}

MyBolt类:处理数据

public class MyBolt extends BaseRichBolt{
int sum;

/**
* bolt初始化方法
* @param stormConf
* @param context
* @param collector
*/
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
}

/**
* bolt中最核心的方法。框架会一只调用此方法,每次调用就传一个数据进来
* @param input
*/
@Override
public void execute(Tuple input) {
Integer integer = input.getInteger(0);
// input.getIntegerByField("number");
sum += integer;

System.out.println("execute : " + integer + " sum : " + sum);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
//这里不需要发送数据
}
}

运行代码,查看打印信息:

从打印信息看出,累加操作完成。

Storm WordCount

WordCountToplogy类(main方法)

public class WordCountToplogy {
/**
* 一对一 线程与task
* @param args
*/
public static void main(String[] args) {
TopologyBuilder topologyBuilder = new TopologyBuilder();
// 设置并行度为3即task
topologyBuilder.setSpout("wcspout",new WordCountSpout(),3);
// 设置并行度为5即task
topologyBuilder.setBolt("splitbolt",new SplitBolt(),5)
.shuffleGrouping("wcspout"); // 随机分组
// 按字段分组,每个单词累加个数
topologyBuilder.setBolt("countbolt",new CountBolt(),6)
.fieldsGrouping("splitbolt",new Fields("word"));

Config config = new Config();
// config.setNumWorkers(3); //设置worker进程个数

if (args.length > 0) {
try {
// 集群运行方式
StormSubmitter.submitTopology(args[0], config,
topologyBuilder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
} else {
// 本地运行方式
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("mytopology", config,
topologyBuilder.createTopology());
}
}
}

WordCountSpout类:发送数据

public class WordCountSpout extends BaseRichSpout {
SpoutOutputCollector collector;

String[] lines = new String[]{
"i like learning",
"i miss you ",
"good good study day day up"
};

@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
this.collector = collector;
}

@Override
public void nextTuple() {
Random random = new Random();
int index = random.nextInt(lines.length);
String line = lines[index];
System.out.println("spout : " + line);

collector.emit(new Values(line));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
}

SplitBolt类:切分单词

public class SplitBolt extends BaseRichBolt {
OutputCollector collector;

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

@Override
public void execute(Tuple input) {
String line = input.getString(0);
String[] words = line.split(" ");

for(String word : words){
collector.emit(new Values(word));
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}

}

CountBolt类:统计单词个数

public class CountBolt extends BaseRichBolt {
Map<String,Integer> resultMap = new HashMap<>();

@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
}

@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");

//判断是否已包含
if(resultMap.containsKey(word)){
Integer integer = resultMap.get(word);
integer++;
resultMap.put(word,integer);
}else{
resultMap.put(word,1); //初始值为1
}
System.out.println("word :" + word + " : " + resultMap.get(word));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}

运行,查看打印结果:

由打印结果可以看出,统计单词完成。

Storm Grouping

Shuffle Grouping

随机分组,随机派发stream里面的tuple,保证每个bolt task接收到的tuple数目大致相同。

Fields Grouping

按字段分组,比如,按"user-id"这个字段来分组,那么具有同样"user-id"的 tuple 会被分到相同的Bolt里的一
个task, 而不同的"user-id"则可能会被分配到不同的task。

All Grouping

广播发送,对于每一个tuple,所有的bolts都会收到。

Global Grouping

全局分组,把tuple分配给task id最低的task 。

None Grouping

不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shuffle grouping是一样的效果。
有一点不同的是storm会把使用none grouping的这个bolt放到这个bolt的订阅者同一个线程里面去执行(未
来Storm如果可能的话会这样设计)。

Direct Grouping

指向型分组, 这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的
哪个task处理这个消息。只有被声明为 Direct Stream 的消息流可以声明这种分组方法。而且这种消息tuple必
须使用 emitDirect 方法来发射。消息处理者可以通过 TopologyContext 来获取处理它的消息的task的id
(OutputCollector.emit方法也会返回task的id)。

Local or shuffle grouping

本地或随机分组。如果目标bolt有一个或者多个task与源bolt的task在同一个工作进程中,tuple将会被随机发
送给这些同进程中的tasks。否则,和普通的Shuffle Grouping行为一致。

customGrouping

自定义,相当于mapreduce那里自己去实现一个partition一样。

并发机制

Topology参数设置

Worker – 进程

  • 一个Topology拓扑会包含一个或多个Worker(每个Worker进程只能从属于一个特定的Topology)。

  • 这些Worker进程会并行跑在集群中不同的服务器上,即一个Topology拓扑其实是由并行运行在Storm集群中
    多台服务器上的进程所组成。

Executor – 线程

  • Executor是由Worker进程中生成的一个线程。
  • 每个Worker进程中会运行拓扑当中的一个或多个Executor线程。
  • 一个Executor线程中可以执行一个或多个Task任务(默认每个Executor只执行一个Task任务),但是这些
    Task任务都是对应着同一个组件(Spout、Bolt)。

Task

  • 实际执行数据处理的最小单元。
  • 每个task即为一个Spout或者一个Bolt。
  • Task数量在整个Topology生命周期中保持不变,Executor数量可以变化或手动调整。
  • 默认情况下,Task数量和Executor是相同的,即每个Executor线程中默认运行一个Task任务。

设置Worker进程数

Config.setNumWorkers(int workers)

设置Executor线程数

TopologyBuilder.setSpout(String id, IRichSpout spout, Number parallelism_hint)
TopologyBuilder.setBolt(String id, IRichBolt bolt, Number parallelism_hint)

其中, parallelism_hint即为executor线程数。

设置Task数量

ComponentConfigurationDeclarer.setNumTasks(Number val)

例:

Config conf = new Config() ;
conf.setNumWorkers(2);
TopologyBuilder topologyBuilder = new TopologyBuilder();
topologyBuilder.setSpout("spout", new MySpout(), 1);
topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2).setNumTasks(4)
.shuffleGrouping("blue-spout);

负载均衡

Rebalance – 再平衡

动态调整Topology拓扑的Worker进程数量、以及Executor线程数量。

支持两种调整方式

  • 通过Storm UI
  • 通过Storm CLI

通过Storm CLI动态调整

例:storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10
将 mytopology 拓扑 worker 进程数量调整为5个
“ blue-spout ” 所使用的线程数量调整为3个
“ yellow-bolt ”所使用的线程数量调整为10个

通信机制

Worker进程间的数据通信

ZMQ
ZeroMQ 开源的消息传递框架,并不是一个MessageQueue。

Netty
Netty是基于NIO的网络框架,更加高效。(之所以Storm 0.9版本之后使用Netty,是因为ZMQ的license和Storm的license不兼容。)

Worker内部的数据通信

Disruptor

  • 实现了“队列”的功能。
  • 可以理解为一种事件监听或者消息处理机制,即在队列当中一边由生产者放入消息数据,另一边消费者
    并行取出消息数据处理。

Worker内部的消息传递机制:

容错机制

集群节点宕机

Nimbus服务器

单点故障。

非Nimbus服务器

故障时,该节点上所有Task任务都会超时,Nimbus会将这些Task任务重新分配到其他服务器上运行。

进程挂掉

Worker

挂掉时,Supervisor会重新启动这个进程。如果启动过程中仍然一直失败,并且无法向Nimbus发送心跳,Nimbus会将该Worker重新分配到其他服务器上。

Supervisor

  • supervisor挂掉,不影响正在运行的worker,但也不会在这台机器上面启动新的worker。
  • supervion和worker都挂了,这个worker会转移到其它正常运行的supervisor节点上面。

Nimbus

nimbus挂掉,不影响正在运行的任务,但也不会接受新的Topology。

消息的完整性

  • 从Spout中发出的Tuple,以及基于他所产生Tuple(例如上个例子当中Spout发出的句子,
    以及句子当中单词的tuple等)。
  • 由这些消息就构成了一棵tuple树。
  • 当这棵tuple树发送完成,并且树当中每一条消息都被正确处理,就表明spout发送消息被“
    完整处理”,即消息的完整性。

Acker – 消息完整性的实现机制

  • Storm的拓扑当中特殊的一些任务。

  • 负责跟踪每个Spout发出的Tuple的DAG(有向无环图)。

Storm 完全分布式部署

部署ZooKeeper

  • 版本3.4.5+ (高版本Zookeeper实现了对于自身持久化数据的定期删除功能)
  • (autopurge.purgeInterval; autopurge.snapRetainCount)

上传、解压安装包

在storm目录中创建logs目录

  • $ mkdir logs

修改配置文件

  • storm.yaml

配置文件内容:

storm.zookeeper.servers:
- "sean01"
- "sean02"
- "sean03"
nimbus.host: “sean01"

由于每个节点都需要配置,将当前节点storm远程拷贝到其他的节点上:

scp -r apache-storm-0.10.0 sean02:`pwd`
scp -r apache-storm-0.10.0 sean03:`pwd`

启动Zookeeper集群

在sean01上启动Nimbus

#将nimbus的进程日志标准、错误输出重定向追加加到/logs/nimbus.out文件中,后面的&符代表linux后台运行
./bin/storm nimbus >> ./logs/nimbus.out 2>&1 &

#将ui的进程日志标准、错误输出重定向追加加到/logs/ui.out文件中,后台运行
./bin/storm ui >> ./logs/ui.out 2>&1 &

查看linux进程:

注:core 进程为Storm Ui的进程。

在sean02、sean03上启动Supervisor

(按照配置每个Supervisor上启动4个slots)

./bin/storm supervisor >> ./logs/supervisor.out 2>&1 &

启动Storm UI

./storm ui >> ./logs/ui.out 2>&1 &	#Web Ui的进程

通过http://sean01:8080/访问