Spark计算框架(一)
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。
Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架。
Spark介绍
Spark 基于 MapReduce 算法实现的分布式计算,拥有Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job中间输出和结果可以保存在内存中
,从而不再需要读写 HDFS,因此 Spark能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。Spark 是 Scala 编写,方便快速编程。
spark相比MapReduce重要的点:
- spark的中间结果依然保存在内存中。
- Apache Spark使用最先进的DAG(有向无环图)调度程序。
这两个特点使得 spark 非常适用于迭代计算,在迭代计算中比MR快100倍。
Spark的四大特性
1、高效性
在迭代计算中运行速度提高100倍。
Apache Spark使用最先进的DAG调度程序,查询优化程序和物理执行引擎,实现批量和流式数据的高性能。
2、易用性
Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。
3、通用性
Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。
4、兼容性
Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。此外,Spark还提供了在EC2上部署Standalone的Spark集群的工具。
Spark运行模式
Local(本地)
多用于本地测试,如在 eclipse,idea 中写程序测试等。
standalone(spark自带)
Spark自己可以给自己分配资源(master,worker)。
Yarn
Spark可以运行在yarn上面。
注意:要基于 Yarn 来进行资源调度,必须实现AppalicationMaster 接口,Spark 实现了这个接口,所以可以基于 Yarn。
Mesos
Spark可以运行在Mesos里面(Mesos 类似于yarn的一个资源调度框架)。
Spark的组成
Spark组成(BDAS):全称伯克利数据分析栈,通过大规模集成算法、机器、人之间展现大数据应用的一个平台。也是处理大数据、云计算、通信的技术解决方案。
它的主要组件有:
SparkCore:将分布式数据抽象为弹性分布式数据集(RDD),实现了应用任务调度、RPC、序列化和压缩,并为运行在其上的上层组件提供API。
SparkSQL:Spark Sql 是Spark来操作结构化数据的程序包,可以让我使用SQL语句的方式来查询数据,Spark支持 多种数据源,包含Hive表,parquest以及JSON等内容。
SparkStreaming: 是Spark提供的实时数据进行流式计算的组件。
MLlib:提供常用机器学习算法的实现库。
GraphX:提供一个分布式图计算框架,能高效进行图计算。
BlinkDB:用于在海量数据上进行交互式SQL的近似查询引擎。
Tachyon:以内存为中心高容错的的分布式文件系统。
应用场景
-
Yahoo将Spark用在Audience Expansion中的应用,进行点击预测和即席查询等
-
淘宝技术团队使用了Spark来解决多次迭代的机器学习算法、高计算复杂度的算法等。应用于内容推荐、社区发现等
-
腾讯大数据精准推荐借助Spark快速迭代的优势,实现了在“数据实时采集、算法实时训练、系统实时预测”全流程实时并行高维算法,最终成功应用于广点通pCTR投放系统上。
-
优酷土豆将Spark应用于视频推荐(图计算)、广告业务,主要实现机器学习、图计算等迭代计算。
SparkCore
RDD
1、概念
RDD(Resilient Distributed Dateset),弹性分布式数据集,是Spark中最基本的数据抽象。
2、RDD 的五大特性
- RDD 是由一系列的 partition 组成的。
- 函数是作用在每一个 partition(split)上的。
- RDD 之间有一系列的依赖关系。
- 分区器是作用在 K,V 格式的 RDD 上。
- RDD 提供一系列最佳的计算位置。
3、RDD 理解图
注意:
1、textFile 方法底层封装的是读取 MR 读取文件的方式,读取文件之前先 split,默认 split 大小是一个 block 大小。
2、RDD 实际上不存储数据,这里方便理解,暂时理解为存储数据。
3、什么是 K,V 格式的 RDD?
- 如果 RDD 里面存储的数据都是二元组对象,那么这个 RDD 我们
就叫做 K,V 格式的 RDD。
4、哪里体现 RDD 的弹性(容错)?
- partition 数量,大小没有限制,体现了 RDD 的弹性。
- RDD 之间依赖关系,可以基于上一个 RDD 重新计算出 RDD,体现了容错。
5、哪里体现 RDD 的分布式?
- RDD 是由 Partition 组成,partition 是分布在不同节点上的。
6、RDD 提供计算最佳位置,体现了数据本地化。体现了大数据中 “计算移动数据不移动” 的理念。
Spark任务执行原理
以上图中有四个机器节点,Driver 和 Worker 是启动在节点上的进程,运行在 JVM 中的进程。
- Driver 与集群节点之间有频繁的通信。
- Driver 负责任务(tasks)的分发和结果的回收。任务的调度。如果 task 的计算结果非常大就不要回收了。会造成 oom。
- Worker 是 Standalone 资源调度框架里面资源管理的从节点。也是 JVM 进程。
- Master 是 Standalone 资源调度框架里面资源管理的主节点。也是 JVM 进程。
Spark代码流程
1、创建 SparkConf 对象
- 可以设置 Application name。
- 可以设置运行模式及资源需求。
2、创建 SparkContext 对象
3、基于 Spark 的上下文创建一个 RDD,对 RDD 进行处理。
4、应用程序中要有 Action 类算子来触发 Transformation 类算子执行。
5、关闭 Spark 上下文对象 SparkContext。
Transformations 转换算子
1、概念
Transformations 类算子是一类算子(函数)叫做转换算子,如map,flatMap,reduceByKey 等。Transformations 算子是延迟执行,也叫懒加载执行。
2、Transformation 类算子
- filter
过滤符合条件的记录数,true 保留,false 过滤掉。 - map
将一个 RDD 中的每个数据项,通过 map 中的函数映射变为一个新的元素。
特点:输入一条,输出一条数据。 - flatMap
先 map 后 flat。与 map 类似,每个输入项可以映射为 0 到多个输出项。 - sample
随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。 - reduceByKey
将相同的 Key 根据相应的逻辑进行处理。 - sortByKey/sortBy
作用在 K,V 格式的 RDD 上,对 key 进行升序或者降序排序。
Action 触发算子
1、概念
Action 类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count 等。Transformations 类算子是延迟执行,Action 类算子是触发执行。一个 application 应用程序中有几个 Action 类算子执行,就有几个 job 运行。
2、Action 类算子
- count
返回数据集中的元素数。会在结果计算完成后回收到 Driver 端。 - take(n)
返回一个包含数据集前 n 个元素的集合。 - first
first=take(1),返回数据集中的第一个元素。 - foreach
循环遍历数据集中的每个元素,运行相应的逻辑。 - collect
将计算结果转成集合回收到 Driver 端。
Demo01(WordCount)
Scala实现
object WordCount { |
Java实现
public class WordCount { |
控制算子
控制算子有三种,cache、persist、checkpoint,以上算子都可以将 RDD 持久化,持久化的单位是 partition。cache 和 persist 都是懒执行的,必须有一个 action 类算子触发执行。checkpoint 算子不仅能将 RDD 持久化到磁盘,还能切断 RDD 之间的依赖关系。
1、cache
默认将 RDD 的数据持久化到内存中。cache 是懒执行。
注意:cache() = persist()=persist(StorageLevel.Memory_Only)
-
测试 cache 文件:
随机生成一个数据量比较大的测试文件:Test.txt
SparkConf conf = new SparkConf(); |
2、persist
可以指定持久化的级别。最常用的是 MEMORY_ONLY 和 MEMORY_AND_DISK。”_2”表示有副本数。
持久化级别如下:
cache 和 和 persist 的注意事项:
- cache 和 persist 都是懒执行,必须有一个 action 类算子触发执行。
- cache 和 persist 算子的返回值可以赋值给一个变量,在其他 job 中直接使用这个变量就是使用持久化的数据了。持久化的单位是 partition。
- cache 和 persist 算子后不能立即紧跟 action 算子。
错误:rdd.cache().count() 返回的不是持久化的 RDD,而是一个数值了。
3、checkpoint
checkpoint 将 RDD 持久化到磁盘,还可以切断 RDD 之间的依赖关系。
- checkpoint 的执行原理:
-
当 RDD 的 job 执行完毕后,会从 finalRDD 从后往前回溯。
-
当回溯到某一个 RDD 调用了 checkpoint 方法,会对当前的 RDD 做一个标记。
-
Spark 框架会重新启动一个新的 job,从头开始计算到这个 RDD 的数据,将数据持久化到 HDFS 上。
-
优化:对 RDD 执行 checkpoint 之前,最好对这个 RDD 先执行 cache,这样新启动的 job 只需要将内存中的数据拷贝到 HDFS 上就可以,省去了重新计算这一步。
-
使用样例:
SparkConf conf = new SparkConf(); |
Spark集群搭建
Standalone
1、下载安装包,解压
2、改名
3、进入安装包的conf目录下,修改slaves.template文件,添加从节点。保存。
4、修改 spark-env.sh
JAVA_HOME:配置 java_home 路径 |
5、同步到其他节点上
6、启动集群
进入 sbin 目录下,执行当前目录下的./start-all.sh
查看各节点进程:
主节点
从节点
7、启动Spark WEB 界面
使用提交任务的节点。
注意:
8080 是 Spark WEBUI 界面的端口,7077 是 Spark 任务提交的端口。
Yarn
1、1,2,3,4,5,7 步同 standalone。
2、在客户端中配置:
HADOOP_CONF_DIR=/usr/soft/hadoop-2.6.5 |
测试
进到spark安装包的bin目录下,里面又一个脚本:spark-submit
然后执行以下命令提交测试任务。
Standalone 提交命令:
./spark-submit --master spark://sean01:7077 --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100 |
从日志中可以查看到结果:
通过访问WEB端也能看到 >> sean01:8080
YARN 提交命令:
./spark-submit --master yarn --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 10 |
提交之前要先关闭Standalone模式
进入 sbin 目录下,执行./stop-all.sh |
然后启动Hadoop集群
先启动Zookeeper:zkServer.sh start |
最后提交spark任务: