关于Spark的任务提交方式,总的分为 Client 提交和 Cluster 提交两种。这里以 Standalone 和 Yarn 为例详细阐述他们在 Spark 中提交任务的流程。

Spark任务提交方式

注意:使用Standalone-Cluster方式提交任务时,必须保证所有的节点上(放在Spark/lib/ 下)都有这个jar包,示例中使用的jar包系统默认都有。但使用Yarn方式时只需提交的节点存在jar包即可,因为在Yarn中会共享这个jar包!!!

Standalone方式

1、Standalone-Client 提交

  • 提交命令

    ./spark-submit --master spark://sean01:7077 --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100

    或者

    ./spark-submit --master spark://sean01:7077 --deploy-mode client --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
    ## 默认就是 --deploy-mode client
  • 执行原理图解

  • 执行流程
  1. client 模式提交任务后,并会在客户端启动 Driver 进程。
  2. Driver 会向 Master 申请用于启动 Application 的资源。
  3. 资源申请成功,Driver 端将 task 发送到 worker 端的 executor 执行。
  4. worker 将 task 执行结果返回到 Driver 端。
  • 总结:

    client 模式适用于测试调试程序。Driver 进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。在 Driver 端可以看到 task 执行的情况。生产环境下不能使用 client 模式,是因为:假设要提交 100 个 application 到集群运行,Driver 每次都会在 client 端启动,那么就会导致客户端 100 次网卡流量暴增的问题。

2、Standalone-Cluster 提交

  • 提交命令

    ./spark-submit --master spark://sean01:7077 --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100

    注意: Standalone-cluster 提交方式,应用程序使用的所有 jar 包和文件,必须保证所有的 worker 节点都要有,因为此种方式,spark 不会自动上传包。
    解决方式:

    1. 将所有的依赖包和文件打到同一个包中,然后放在 hdfs 上。

    2. 将所有的依赖包和文件各放一份在 worker 节点上。

  • 执行原理图解

  • 执行流程

    1. cluster 模式提交应用程序后,会向 Master 请求启动 Driver.
    2. Master 接受请求,随机在集群一台节点启动 Driver 进程。
    3. Driver 启动后为当前的应用程序申请资源。
    4. Driver 端发送 task 到 worker 节点的 executor 上执行。
    5. worker 将执行情况和执行结果返回给 Driver 端。
  • 总结:

    Driver 进程是在集群某一台 Worker 上启动的,在客户端是无法查看 task 的执行情况的。假设要提交 100 个 application 到集群运行,每次 Driver 会随机在集群中某一台 Worker 上启动,那么这 100 次网卡流量暴增的问题就散布在集群上。

总结 Standalone 两种方式提交任务,Driver 与集群的通信包括:

  1. Driver 负责应用程序资源的申请
  2. 任务的分发。
  3. 结果的回收。
  4. 监控 task 执行情况。

Yarn方式

1、yarn-client 提交

  • 提交命令

    ./spark-submit --master yarn --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100

    或者

    ./spark-submit --master yarn-client --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100

    或者

    ./spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
  • 执行原理图解

  • 执行流程

    1. 客户端提交一个 Application,并启动一个 Driver 进程。
    2. 应用程序启动后会向 RS(ResourceManager)发送请求,启动AM(ApplicationMaster)的资源。
    3. RS 收到请求,随机选择一台 NM(NodeManager)启动 AM。这里的 NM 相当于 Standalone 中的 Worker 节点。
    4. AM启动后,会向RS请求一批container资源,用于启动Executor.
    5. RS 会找到一批 NM 返回给 AM,用于启动 Executor。
    6. AM 会向 NM 发送命令启动 Executor。
    7. Executor 启动后,会反向注册给 Driver,Driver 发送 task 到 Executor,并回收执行情况和结果。
  • 总结

    Yarn-client 模式同样是适用于测试,因为 Driver 运行在本地,Driver会与 yarn 集群中的 Executor 进行大量的通信,会造成客户机网卡流量的大量增加.

ApplicationMaster 的作用:

  1. 为当前的 Application 申请资源
  2. 给 NodeManager 发送消息启动 Executor。

注意:ApplicationMaster 有 launchExecutor 和申请资源的功能,并没有作业调度的功能。

2、yarn-cluster 提交

  • 提交任务

    ./spark-submit --master yarn --deploy-mode cluster --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100

    或者

    ./spark-submit --master yarn-cluster --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
  • 执行原理图

  • 执行流程

    1. 客户机提交 Application 应用程序,发送请求到 RS(ResourceManager),请求启动 AM(ApplicationMaster),Driver在AM中启动运行。
    2. RS 收到请求后随机在一台 NM(NodeManager)上启动 AM(相当于 Driver 端)。
    3. AM 启动,AM 发送请求到 RS,请求一批 container 用于启动 Excutor。
    4. RS 返回一批 NM 节点给 AM。
    5. AM 连接到 NM,发送请求到 NM 启动 Excutor。
    6. Excutor 反向注册到 AM 所在的节点的 Driver。Driver 发送 task 到 Excutor并回收结果。
  • 总结

    Yarn-Cluster 主要用于生产环境中,因为 Driver 运行在 Yarn 集群中某一台 nodeManager 中,每次提交任务的 Driver 所在的机器都是随机的,不会产生某一台机器网卡流量激增的现象,缺点是任务提交后不能看到日志。只能通过 yarn 查看日志。

ApplicationMaster 的作用:

  1. 为当前的 Application 申请资源
  2. 给 NodeManger 发送消息启动 Excutor。
  3. 任务调度。

补充部分算子

transformation

join、leftOuterJoin、rightOuterJoin、fullOuterJoin

作用在 K,V 格式的 RDD 上。根据 K 进行连接,对(K,V)join(K,W)返回(K,(V,W))

join 后的分区数与父 RDD 分区数多的那一个相同。

union

合并两个数据集。两个数据集的类型要一致。

返回新的 RDD 的分区数是合并 RDD 分区数的总和。

intersection

取两个数据集的交集

subtract

取两个数据集的差集

mapPartition

与 map 类似,遍历的单位是每个 partition 上的数据。

distinct(map+reduceByKey+map)

对相同的值进行去重

cogroup

当调用类型(K,V)和(K,W)的数据上时,返回一个数据集(K,(Iterable < V >,Iterable < W >))

action

foreachPartition

遍历的数据是每个 partition 的数据。

术语解释

窄依赖和宽依赖

RDD 之间有一系列的依赖关系,依赖关系又分为窄依赖和宽依赖。

窄依赖

父 RDD 和子 RDD partition 之间的关系是一对一或多对一。不会有 shuffle 的产生。

宽依赖

父RDD与子RDD partition之间的关系是一对多。会有shuffle的产生。

宽窄依赖图理解

Stage

Spark 任务会根据 RDD 之间的依赖关系,形成一个 DAG 有向无环图,DAG 会提交给 DAGScheduler,DAGScheduler 会把 DAG 划分成相互依赖的多个 stage,划分 stage 的依据就是 RDD 之间的宽窄依赖。遇到宽依赖就划分 stage,每个 stage 包含一个或多个 task 任务。然后将这些 task 以 taskSet 的形式提交给 TaskScheduler 运行。

stage 是由一组并行的 task 组成。

stage 切割规则

从后往前追溯,遇到宽依赖就切割 stage。

计算模式

pipeline 管道计算模式,pipeline 只是一种计算思想,模式。

1、数据一直在管道里面什么时候数据会落地?

  • 对 RDD 进行持久化(cache,persist,checkPoint)。

  • shuffle write 的时候。

2、Stage 的 的 task 并行度是由 stage 的最后一个 RDD 的分区数来决定的。

3、如何改变 RDD 的分区数?

例如:textFile(path, numPartitons),reduceByKey(XXX, 3),GroupByKey(4)

测试验证 pipeline 计算模式

val conf = new SparkConf()
conf.setMaster("local").setAppName("pipeline");
val sc = new SparkContext(conf)
val rdd = sc.parallelize(Array(1,2,3,4))
val rdd1 = rdd.map { x => {
println("map--------"+x)
x
}}
val rdd2 = rdd1.filter { x => {
println("fliter********"+x)
true
} }
rdd2.collect()
sc.stop()

Spark资源划分和任务调度

Spark 资源划分和任务调度的流程:

1、启动集群后,Worker 节点会向 Master 节点汇报资源情况,Master 掌握了集群资源情况。

2、当 Spark 提交一个 Application 后,根据 RDD 之间的依赖关系将 Application 形成一个 DAG 有向无环图

3、任务提交后,Spark 会在Driver 端创建两个对象:DAGScheduler 和 TaskScheduler,DAGScheduler是任务调度的高层调度器,是一个对象。DAGScheduler 的主要作用就是将DAG 根据 RDD 之间的宽窄依赖关系划分为一个个的 Stage,然后将这些 Stage 以 TaskSet 的形式提交给 TaskScheduler(TaskScheduler 是任务调度的低层调度器,这里 TaskSet 其实就是一个集合,里面封装的就是一个个的 task 任务,也就是 stage 中的并行度 task 任务),TaskScheduler 会遍历 TaskSet 集合,拿到每个 task 后会将 task 发送到计算节点 Executor 中去执行(其实就是发送到 Executor 中的线程池 ThreadPool 去执行)。

4、task 在 Executor 线程池中的运行情况会向 TaskScheduler 反馈,当 task 执行失败时,则由 TaskScheduler 负责重试,将 task 重新发送给 Executor 去执行,默认重试 3 次

5、如果重试 3 次依然失败,那么这个 task 所在的 stage 就失败了。stage 失败了则由 DAGScheduler 来负责重试,重新发送 TaskSet 到TaskSchdeuler,默认重试 4 次。如果重试 4 次以后依然失败,那么这个 job 就失败了。job 失败了,Application 就失败了。

6、TaskScheduler 不仅能重试失败的 task,还会重试 straggling(落后,缓慢)task(也就是执行速度比其他 task 慢太多的 task)。如果有运行缓慢的 task那么 TaskScheduler 会启动一个新的 task 来与这个运行缓慢的 task 执行相同的处理逻辑。两个 task 哪个先执行完,就以哪个 task 的执行结果为准。这就是 Spark 的推测执行机制。在 Spark 中推测执行默认是关闭的。推测执行可以通过 spark.speculation 属性来配置。

注意:

  • 对于 ETL 类型要入数据库的业务要关闭推测执行机制,这样就不会有重复的数据入库。
  • 如果遇到数据倾斜的情况,开启推测执行则有可能导致一直会有 task 重新启动处理相同的逻辑,任务可能一直处于处理不完的状态。

图解 Spark 资源调度和任务调度的流程

粗粒度资源申请和细粒度资源申请

  • 粗粒度资源申请(Spark)

    在 Application 执行之前,将所有的资源申请完毕,当资源申请成功后,才会进行任务的调度,当所有的 task 执行完成后,才会释放这部分资源。

    优点:每个 task 不需要在执行前自己去申请资源,直接使用就可以了。这样 task 启动就快了,task 执行快了,stage 执行就快了,job 就快了,application 执行就快了。

    缺点:直到最后一个 task 执行完成才会释放资源,集群的资源无法充分利用。

  • 细粒度资源申请

    Application 执行之前不需要先去申请资源,而是直接执行,让 job中的每个 task 在执行前自己去申请资源,task 执行完成就释放资源。

    优点:集群的资源可以充分利用。

    缺点:task 自己去申请资源,task 启动变慢,Application 的运行就相应的变慢了。

如何设置粗细粒度?

通过设置 JVM 插槽个数来调节 task 使用的资源。

set mapred.job.reuse.jvm.num.tasks=n;
(n为 task 插槽个数)