Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。

Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架。

Spark介绍

Spark 基于 MapReduce 算法实现的分布式计算,拥有Hadoop MapReduce 所具有的优点;但不同于 MapReduce 的是 Job中间输出和结果可以保存在内存中,从而不再需要读写 HDFS,因此 Spark能更好地适用于数据挖掘与机器学习等需要迭代的 MapReduce 的算法。Spark 是 Scala 编写,方便快速编程。

spark相比MapReduce重要的点:

  • spark的中间结果依然保存在内存中。
  • Apache Spark使用最先进的DAG(有向无环图)调度程序。

这两个特点使得 spark 非常适用于迭代计算,在迭代计算中比MR快100倍。

Spark的四大特性

1、高效性

在迭代计算中运行速度提高100倍。

Apache Spark使用最先进的DAG调度程序,查询优化程序和物理执行引擎,实现批量和流式数据的高性能。

2、易用性

Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。

3、通用性

Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。

4、兼容性

Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。此外,Spark还提供了在EC2上部署Standalone的Spark集群的工具。

Spark运行模式

Local(本地)

多用于本地测试,如在 eclipse,idea 中写程序测试等。

standalone(spark自带)

Spark自己可以给自己分配资源(master,worker)。

Yarn

Spark可以运行在yarn上面。

注意:要基于 Yarn 来进行资源调度,必须实现AppalicationMaster 接口,Spark 实现了这个接口,所以可以基于 Yarn。

Mesos

Spark可以运行在Mesos里面(Mesos 类似于yarn的一个资源调度框架)。

Spark的组成

Spark组成(BDAS):全称伯克利数据分析栈,通过大规模集成算法、机器、人之间展现大数据应用的一个平台。也是处理大数据、云计算、通信的技术解决方案。

它的主要组件有:

SparkCore:将分布式数据抽象为弹性分布式数据集(RDD),实现了应用任务调度、RPC、序列化和压缩,并为运行在其上的上层组件提供API。

SparkSQL:Spark Sql 是Spark来操作结构化数据的程序包,可以让我使用SQL语句的方式来查询数据,Spark支持 多种数据源,包含Hive表,parquest以及JSON等内容。

SparkStreaming: 是Spark提供的实时数据进行流式计算的组件。

MLlib:提供常用机器学习算法的实现库。

GraphX:提供一个分布式图计算框架,能高效进行图计算。

BlinkDB:用于在海量数据上进行交互式SQL的近似查询引擎。

Tachyon:以内存为中心高容错的的分布式文件系统。

应用场景

  • Yahoo将Spark用在Audience Expansion中的应用,进行点击预测和即席查询等

  • 淘宝技术团队使用了Spark来解决多次迭代的机器学习算法、高计算复杂度的算法等。应用于内容推荐、社区发现等

  • 腾讯大数据精准推荐借助Spark快速迭代的优势,实现了在“数据实时采集、算法实时训练、系统实时预测”全流程实时并行高维算法,最终成功应用于广点通pCTR投放系统上。

  • 优酷土豆将Spark应用于视频推荐(图计算)、广告业务,主要实现机器学习、图计算等迭代计算。

SparkCore

RDD

1、概念

RDD(Resilient Distributed Dateset),弹性分布式数据集,是Spark中最基本的数据抽象。

2、RDD 的五大特性

  1. RDD 是由一系列的 partition 组成的。
  2. 函数是作用在每一个 partition(split)上的。
  3. RDD 之间有一系列的依赖关系。
  4. 分区器是作用在 K,V 格式的 RDD 上。
  5. RDD 提供一系列最佳的计算位置。

3、RDD 理解图

注意:

1、textFile 方法底层封装的是读取 MR 读取文件的方式,读取文件之前先 split,默认 split 大小是一个 block 大小。

2、RDD 实际上不存储数据,这里方便理解,暂时理解为存储数据。

3、什么是 K,V 格式的 RDD?

  • 如果 RDD 里面存储的数据都是二元组对象,那么这个 RDD 我们
    就叫做 K,V 格式的 RDD。

4、哪里体现 RDD 的弹性(容错)?

  • partition 数量,大小没有限制,体现了 RDD 的弹性。
  • RDD 之间依赖关系,可以基于上一个 RDD 重新计算出 RDD,体现了容错。

5、哪里体现 RDD 的分布式?

  • RDD 是由 Partition 组成,partition 是分布在不同节点上的。

6、RDD 提供计算最佳位置,体现了数据本地化。体现了大数据中 “计算移动数据不移动” 的理念。

Spark任务执行原理

以上图中有四个机器节点,Driver 和 Worker 是启动在节点上的进程,运行在 JVM 中的进程。

  • Driver 与集群节点之间有频繁的通信。
  • Driver 负责任务(tasks)的分发和结果的回收。任务的调度。如果 task 的计算结果非常大就不要回收了。会造成 oom。
  • Worker 是 Standalone 资源调度框架里面资源管理的从节点。也是 JVM 进程。
  • Master 是 Standalone 资源调度框架里面资源管理的主节点。也是 JVM 进程。

Spark代码流程

1、创建 SparkConf 对象

  • 可以设置 Application name。
  • 可以设置运行模式及资源需求。

2、创建 SparkContext 对象

3、基于 Spark 的上下文创建一个 RDD,对 RDD 进行处理。

4、应用程序中要有 Action 类算子来触发 Transformation 类算子执行。

5、关闭 Spark 上下文对象 SparkContext。

Transformations 转换算子

1、概念

Transformations 类算子是一类算子(函数)叫做转换算子,如map,flatMap,reduceByKey 等。Transformations 算子是延迟执行,也叫懒加载执行。

2、Transformation 类算子

  • filter
    过滤符合条件的记录数,true 保留,false 过滤掉。
  • map
    将一个 RDD 中的每个数据项,通过 map 中的函数映射变为一个新的元素。
    特点:输入一条,输出一条数据。
  • flatMap
    先 map 后 flat。与 map 类似,每个输入项可以映射为 0 到多个输出项。
  • sample
    随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样。
  • reduceByKey
    将相同的 Key 根据相应的逻辑进行处理。
  • sortByKey/sortBy
    作用在 K,V 格式的 RDD 上,对 key 进行升序或者降序排序。

Action 触发算子

1、概念

Action 类算子也是一类算子(函数)叫做行动算子,如foreach,collect,count 等。Transformations 类算子是延迟执行,Action 类算子是触发执行。一个 application 应用程序中有几个 Action 类算子执行,就有几个 job 运行。

2、Action 类算子

  • count
    返回数据集中的元素数。会在结果计算完成后回收到 Driver 端。
  • take(n)
    返回一个包含数据集前 n 个元素的集合。
  • first
    first=take(1),返回数据集中的第一个元素。
  • foreach
    循环遍历数据集中的每个元素,运行相应的逻辑。
  • collect
    将计算结果转成集合回收到 Driver 端。

Demo01(WordCount)

Scala实现

object WordCount {

def main(args: Array[String]): Unit = {
val conf = new SparkConf()
/**
* 几种运行方式:
* 1.本地运行
* 2.yarn
* 3.standalone
* 4.mesos
*/
conf.setMaster("local").setAppName("wc")

val context = new SparkContext(conf)

val lineRDD = context.textFile("./wc.txt")

val wordRDD = lineRDD.flatMap(x => {x.split(" ")})

val KVRDD = wordRDD.map(x => {
println("=================")
(x,1)
})

val resultRDD = KVRDD.reduceByKey((x,y) => {x+y})

val sortRDD = resultRDD.sortBy(_._2,false)

sortRDD.foreach(println)
}

}

Java实现

public class WordCount {
public static void main(String[] args){
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("wc");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> javaRDD = context.textFile("./wc.txt");

// long count = javaRDD.count();
// List<String> collect = javaRDD.collect();
// List<String> take = javaRDD.take(5);
// String first = javaRDD.first();

JavaRDD<String> wordRDD = javaRDD
.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String line) throws Exception {
String[] split = line.split(" ");
List<String> list = Arrays.asList(split);
return list;
}
});

JavaPairRDD<String, Integer> pairRDD = wordRDD
.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2(word, 1);
}
});

JavaPairRDD<String, Integer> resultRDD = pairRDD
.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});

JavaPairRDD<Integer, String> reverseRDD = resultRDD
.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple2) throws Exception {
return new Tuple2<>(tuple2._2, tuple2._1);
}
});

JavaPairRDD<Integer, String> sortByKey = reverseRDD.sortByKey(false);

JavaPairRDD<String, Integer> result = sortByKey
.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple2) throws Exception {
return new Tuple2<>(tuple2._2, tuple2._1);
}
});

result.foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> tuple2) throws Exception {
System.out.println(tuple2);
}
});

}
}

控制算子

控制算子有三种,cache、persist、checkpoint,以上算子都可以将 RDD 持久化,持久化的单位是 partition。cache 和 persist 都是懒执行的,必须有一个 action 类算子触发执行。checkpoint 算子不仅能将 RDD 持久化到磁盘,还能切断 RDD 之间的依赖关系。

1、cache

默认将 RDD 的数据持久化到内存中。cache 是懒执行。

注意:cache() = persist()=persist(StorageLevel.Memory_Only)

  • 测试 cache 文件:

    随机生成一个数据量比较大的测试文件:Test.txt

SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("CacheTest");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaRDD<String> lines = jsc.textFile("./Test.txt");

lines = lines.cache();
long startTime = System.currentTimeMillis();
long count = lines.count();
long endTime = System.currentTimeMillis();
System.out.println("共"+count+ "条数据,"+"初始化时间+cache时间+计算时间="+(endTime-startTime));

long countStartTime = System.currentTimeMillis();
long countrResult = lines.count();
long countEndTime = System.currentTimeMillis();
System.out.println("共"+countrResult+ "条数据,"+"计算时间="+ (countEndTime-countStartTime));

jsc.stop();

2、persist

可以指定持久化的级别。最常用的是 MEMORY_ONLY 和 MEMORY_AND_DISK。”_2”表示有副本数。

持久化级别如下:

cache 和 和 persist 的注意事项:

  1. cache 和 persist 都是懒执行,必须有一个 action 类算子触发执行。
  2. cache 和 persist 算子的返回值可以赋值给一个变量,在其他 job 中直接使用这个变量就是使用持久化的数据了。持久化的单位是 partition。
  3. cache 和 persist 算子后不能立即紧跟 action 算子。

错误:rdd.cache().count() 返回的不是持久化的 RDD,而是一个数值了。

3、checkpoint

checkpoint 将 RDD 持久化到磁盘,还可以切断 RDD 之间的依赖关系。

  • checkpoint 的执行原理:
  1. 当 RDD 的 job 执行完毕后,会从 finalRDD 从后往前回溯。

  2. 当回溯到某一个 RDD 调用了 checkpoint 方法,会对当前的 RDD 做一个标记。

  3. Spark 框架会重新启动一个新的 job,从头开始计算到这个 RDD 的数据,将数据持久化到 HDFS 上。

  • 优化:对 RDD 执行 checkpoint 之前,最好对这个 RDD 先执行 cache,这样新启动的 job 只需要将内存中的数据拷贝到 HDFS 上就可以,省去了重新计算这一步。

  • 使用样例:

SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("checkpoint");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setCheckpointDir("./checkpoint");
JavaRDD<Integer> parallelize = sc.parallelize(Arrays.asList(1,2,3));
parallelize.checkpoint();
parallelize.count();
sc.stop();

Spark集群搭建

Standalone

1、下载安装包,解压

2、改名

3、进入安装包的conf目录下,修改slaves.template文件,添加从节点。保存。

4、修改 spark-env.sh

JAVA_HOME:配置 java_home 路径
SPARK_MASTER_IP:master 的 ip
SPARK_MASTER_PORT:提交任务的端口,默认是 7077
SPARK_WORKER_CORES:每个 worker 从节点能够支配的 core 的个数
SPARK_WORKER_MEMORY:每个 worker 从节点能够支配的内存数

5、同步到其他节点上

6、启动集群

进入 sbin 目录下,执行当前目录下的./start-all.sh

查看各节点进程:

主节点

从节点

7、启动Spark WEB 界面

使用提交任务的节点。

注意:

8080 是 Spark WEBUI 界面的端口,7077 是 Spark 任务提交的端口。

Yarn

1、1,2,3,4,5,7 步同 standalone。

2、在客户端中配置:

HADOOP_CONF_DIR=/usr/soft/hadoop-2.6.5

测试

进到spark安装包的bin目录下,里面又一个脚本:spark-submit

然后执行以下命令提交测试任务。

Standalone 提交命令:

./spark-submit --master spark://sean01:7077 --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100

从日志中可以查看到结果:

通过访问WEB端也能看到 >> sean01:8080

YARN 提交命令:

./spark-submit --master yarn --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 10

提交之前要先关闭Standalone模式

进入 sbin 目录下,执行./stop-all.sh

然后启动Hadoop集群

先启动Zookeeper:zkServer.sh start
再启动Hadoop:start-all.sh

最后提交spark任务: