Spark计算框架(六)
SparkStreaming 是流式处理框架,是 Spark API 的扩展,支持可扩展、高吞吐、容错的实时数据流处理。
实时数据的来源可以是:Kafka,Flume,Twitter,ZeroMQ 或者 TCP sockets,并且可以使用高级功能的复杂算子来处理流数据。例如:map,reduce,join,window 。最终,处理后的数据可以存放在文件系统,数据库等,方便实时展现。
SparkStreaming 与 Storm 的区别
1、Storm 是纯实时的流式处理框架,SparkStreaming 是准实时的处理框架(微批处理)。因为微批处理,SparkStreaming 的吞吐量比 Storm 要高。
2、Storm 的事务机制比 SparkStreaming 的要完善。
3、Storm 支持动态资源调度。(spark1.2 开始和之后也支持)
4、SparkStreaming 擅长复杂的业务处理,Storm 不擅长复杂的业务处理,擅长简单的汇总型计算。
SparkStreaming 初始
1、SparkStreaming 初始理解
注意:
-
receiver task 是 7*24 小时一直在执行,一直接受数据,将一段时间内接收来的数据保存到 batch 中。假设 batchInterval 为 5s,那么会将接收来的数据每隔 5 秒封装到一个 batch 中,batch 没有分布式计算特性,这一个 batch 的数据又被封装到一个 RDD 中,RDD 最终封装到一个 DStream 中。
例如:假设 batchInterval 为 5 秒,每隔 5 秒通过 SparkStreamin 将得到一个 DStream,在第 6 秒的时候计算这 5 秒的数据,假设执行任务的时间是 3 秒,那么第 6~9 秒一边在接收数据,一边在计算任务,9~10 秒只是在接收数据。然后在第 11 秒的时候重复上面的操作。
-
如果 job 执行的时间大于 batchInterval 会有什么样的问题?
如果接受过来的数据设置的级别是仅内存,接收来的数据会越堆积越多,最后可能会导致 OOM(如果设置 StorageLevel 包含 disk, 则内存存放不下的数据会溢写至 disk, 加大延迟 )。
2、SparkStreaming 代码
注意事项:
- 启动 socket server 服务器:nc –lk 9999
- eceiver 模式下接受数据,local 的模拟线程必须大于等于 2,一个线程用来 receiver 用来接受数据,另一个线程用来执行 job。
- Durations 时间设置就是我们能接收的延迟度。这个需要根据集群的资源情况以及任务的执行情况来调节。
- 创建 JavaStreamingContext 有两种方式(SparkConf,SparkContext)
- 所有的代码逻辑完成后要有一个 output operation 类算子。
- JavaStreamingContext.start() Streaming 框架启动后不能再次添加业务逻辑。
- JavaStreamingContext.stop() 无参的 stop 方法将 SparkContext一同关闭,stop(false),不会关闭 SparkContext。
- JavaStreamingContext.stop()停止之后不能再调用 start。
public class SparkStreamingTest { |
SparkStreaming 算子操作
关于几种算子的详细API,请见Github:SparkStreamingAPI
1、foreachRDD、print
output operator 类算子,必须对抽取出来的 RDD 执行 action 类算子,代码才能执行。
2、transform
- transformation 类算子
- 可以通过 transform 算子,对Dstream做RDD到RDD的任意操作。
3、updateStateByKey
-
transformation 算子
-
updateStateByKey 作用:
1)为 SparkStreaming 中每一个 Key 维护一份 state 状态,state 类型可以是任意类型的,可以是一个自定义的对象,更新函数也可以是自定义的。
2)通过更新函数对该 key 的状态不断更新,对于每个新的 batch 而言,SparkStreaming 会在使用 updateStateByKey 的时候为已经存在的 key 进行 state 的状态更新。
-
使用到 updateStateByKey 要开启 checkpoint 机制和功能。
-
多久会将内存中的数据写入到磁盘一份?
1)如果 batchInterval 设置的时间小于10秒,那么10秒写入磁盘一份。
2)如果 batchInterval 设置的时间大于 10 秒,那么就会 batchInterval 时间间隔写入磁盘一份。
4、窗口操作
- 窗口操作理解图:
假设每隔 5s 1 个 batch,上图中窗口长度为 15s,窗口滑动间隔 10s。
-
窗口长度和滑动间隔必须是 batchInterval 的整数倍。如果不是整数倍会检测报错。
-
优化后的 window 窗口操作示意图:
- 优化后的 window 操作要保存状态所以要设置 checkpoint 路径,没有优化的 window 操作可以不设置 checkpoint 路径。
Driver HA(Standalone或Mesos)
因为 SparkStreaming 是 7*24 小时运行,Driver 只是一个简单的进程,有可能挂掉,所以实现 Driver 的 HA 就有必要(如果使用的 Client 模式就无法实现 Driver HA ,这里针对的是 cluster 模式)。
Yarn 平台 cluster 模式提交任务,AM(AplicationMaster)相当于 Driver,如果挂掉会自动启动 AM。这里所说的 DriverHA 针对的是 Spark standalone 和 Mesos 资源调度的情况下。
实现 Driver 的高可用有两个步骤:
第一:提交任务层面,在提交任务的时候加上选项 --supervise,当 Driver挂掉的时候会自动重启 Driver。
第二:代码层面,使用 JavaStreamingContext.getOrCreate(checkpoint 路径,JavaStreamingContextFactory)
Driver 中元数据包括:
1、创建应用程序的配置信息。
2、DStream 的操作逻辑。
3、job 中没有完成的批次数据,也就是 job 的执行进度。
SparkStreaming+Kafka
Receiver 模式
receiver模式原理图
receiver模式理解
在 SparkSteaming 程序运行起来后,Executor 中会有 receiver tasks 接收 kafka 推送过来的数据。数据会被持久化,默认级别为 MEMORY_AND_DISK_SER_2,这个级别也可以修改。receiver task 对接收过来的数据进行存储和备份,这个过程会有节点之间的传输。备份完成后去 Zookeeper 中更新消费偏移量,然后向 Driver 中的 receiver tracker 汇报数据的位置。最后 Driver 根据数据本地化将 task 分发到不同节点上执行。
receiver模式中存在的问题:
当 Driver 进程挂掉后,Driver 下的 Executor 都会被杀掉,当更新完 zookeeper 消费偏移量的时候,Driver 如果挂掉了,就会存在找不到数据的问题,相当于丢失数据。
如何解决这个问题?
开启WAL(write ahead log)预写日志机制,在接受过来数据备份到其他节点的时候,同时备份到 HDFS 上一份(我们需要将接收来的数据的持久化级别降级到 MEMORY_AND_DISK),这样就能保证数据的安全性。
不过,因为写 HDFS 比较消耗性能,要在备份完数据之后才能进行更新 zookeeper 以及汇报位置等,这样会增加 job 的执行时间,这样对于任务的执行提高了延迟度。
Receiver可能会造成重复消费
合理假设一个场景,假如当前zookeeper中记录的偏移量是50,本次接收的数据为51~100,当数据备份之后,同时也放到 HDFS 了,此时准备去zookeeper中更新偏移量时,服务器挂掉了,这时zookeeper中的偏移量没有更新还是50。重启之后会去HDFS中检查数据,发现51~100的数据未计算(一般计算的话需要更新完偏移量才计算),这时开始计算这部分数据。紧接着Kafka就会那这zookeeper中的50继续往下读,这样一来就造成了重复消费。这就是Receiver模式只能保证至少消费一次(at-least),但不能保证有且只会消费一次(exactly-once)。
receiver 的并行度设置
receiver 的并行度是由 spark.streaming.blockInterval
来决定的,默认为200ms。
假设 batchInterval 为 5s,那么每隔 blockInterval 就会产生一个 block,这里就对应每批次产生 RDD 的 partition,这样 5 秒产生的这个 Dstream 中的这个 RDD 的 partition 为 25 个,并行度就是25。
如果想提高并行度可以减少 blockInterval 的数值,但是最好不要低于 50ms。
receiver 模式代码
先准备一个MyProducer类用于产生数据:
public class MyProducer extends Thread { |
执行以下代码:
public class SparkStreamingOnKafkaReceiver { |
注意:
Receiver模式:只能保证至少被消费一次(at-least),但是不能保证有且只会消费一次(exactly-once)。
direct模式:可以保证exactly-once,能保证任务失败重读数据,但是不能保证任务中的输出数据有且只有一次。
Direct 模式
direct 模式理解
SparkStreaming+kafka 的 Driect 模式就是将 kafka 看成存数据的一方,不是被动接收数据,而是主动去取数据。
消费者偏移量也不是用 zookeeper 来管理,而是 SparkStreaming 内部对消费者偏移量自动来维护
。默认消费偏移量是在内存中,当然如果设置了checkpoint 目录,那么消费偏移量也会保存在 checkpoint 中
。当然也可以实现用 zookeeper 来管理。
direct 模式并行度设置
Direct 模式的并行度是由读取的 kafka 中 topic 的 partition 数决定的
。
direct 模式代码
public class SparkStreamingOnKafkaDirected { |
Web 监控页面
由于在本地运行,输入:localhost:4040,即可查看 SparkStreaming 在kafka集群中的运行状态
相关配置
预写日志:
用于优化 receiver 模式中,Driver进程挂掉,找不到数据的问题。
spark.streaming.receiver.writeAheadLog.enable #默认false 没有开启 |
blockInterval:
spark.streaming.blockInterval #默认200ms |
反压机制:
用于解决由于job执行时间大于batchInterval,接收数据内存级别为仅内存时。引起的数据堆积问题
sparkStreaming在1.5版本之后引入反压机制(back-pressure),通过动态控制数据接收速率来适配集群数据处理能力!
spark.streaming.backpressure.enabled 设置为true #默认false |
数据接收速率:
sparkStreaming在1.5版本之前通过控制接收数据的速率来解决数据堆积问题。
设置静态配置参数:
Receiver模式 |
如何优雅的关闭SparkStreaming
spark.streaming.stopGracefullyOnShutdown 设置为true #默认false |