关于 Spark 算子的应用案例有很多,这里介绍一些一些不常见但是很有用的算子,以及几个小案例的源码介绍。

补充算子

transformations

  • mapPartitionWithIndex

    类似于 mapPartitions,除此之外还会携带分区的索引值。

  • repartition

    增加或减少分区。会产生shuffle。

  • coalesce

    coalesce 常用来减少分区,第二个参数是减少分区的过程中是否产生 shuffle。

    true 为产生 shuffle,false 不产生 shuffle。默认是 false。

    如果 coalesce 设置的分区数比原来的 RDD 的分区数还多的话,第二个参数设置为 false 不会起作用,如果设置成 true,效果和 repartition 一样。即 repartition(numPartitions) = coalesce(numPartitions,true)

  • groupBykey

    作用在 K,V 格式的 RDD 上。根据 Key 进行分组。作用在(K,V),返回(K,Iterable )。

  • zip

    将两个 RDD 中的元素(KV 格式/非 KV 格式)变成一个 KV 格式的 RDD。、

    两个 RDD 的个数必须相同。

  • zipWithIndex

    该函数将 RDD 中的元素和这个元素在 RDD 中的索引号(从 0 开始)组合成(K,V)对。

Action

  • countBykey

    作用到 K,V 格式的 RDD 上,根据 Key 计数相同 Key 的数据集元素。

  • countByValue

    根据数据集每个元素相同的内容来计数。返回相同内容的元素对应的条数。

  • reduce

    根据聚合逻辑聚合数据集中的每个元素。

案例分析

这里举例 PV&PU、二次排序、分组取topN。更多详细源码及测试文件请见 个人github

PV&PU

PV 是网站分析的一个术语,用以衡量网站用户访问的网页的数量。对于广告主,PV 值可预期它可以带来多少广告收入。一般来说,PV 与来访者的数量成正比,但是 PV 并不直接决定页面的真实来访者数量,如同一个来访
者通过不断的刷新页面,也可以制造出非常高的 PV。

1、什么是 PV 值

PV (page view )即页面浏览量或点击量,是衡量一个网站或网页用户访问量。具体的说,PV 值就是所有访问者在 24 小时(0 点到 24 点)内看了某个网站多少个页面或某个网页多少次。PV 是指页面刷新的次数,每一次页面刷新,就算做一次 PV 流量。

2、什么是 UV 值

UV (unique visitor )即独立访客数,指访问某个站点或点击某个网页的不同 IP 地址的人数。在同一天内,UV 只记录第一次进入网站的具有独立 IP 的访问者,在同一天内再次访问该网站则不计数。UV 提供了一定时间内不同观众数量的统计指标,而没有反应出网站的全面活动。

3、代码

// PV,页面浏览量
public class PV {
public static void main(String[] args) {

SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("pv");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> lineRDD = context.textFile("./data/pvuvdata");

lineRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s.split("\t")[5],1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {
@Override
public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple2)
throws Exception {
return new Tuple2<>(tuple2._2,tuple2._1);
}
}).sortByKey(false).mapToPair(
new PairFunction<Tuple2<Integer,String>, String, Integer>() {
@Override
public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple2)
throws Exception {
return new Tuple2<>(tuple2._2,tuple2._1);
}
}).foreach(new VoidFunction<Tuple2<String, Integer>>() {
@Override
public void call(Tuple2<String, Integer> tuple2) throws Exception {
System.out.println(tuple2);
}
});

context.stop();
}
}

// UV,访客数
public class UV {
public static void main(String[] args) {

SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("pv");
JavaSparkContext context = new JavaSparkContext(conf);
JavaRDD<String> lineRDD = context.textFile("./data/pvuvdata");

JavaPairRDD<String, String> urlipRDD = lineRDD.mapToPair(
new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) throws Exception {
return new Tuple2<>(s.split("\t")[5], s.split("\t")[0]);
}
});

JavaPairRDD<String, Iterable<String>> groupByKeyRDD = urlipRDD.groupByKey();

groupByKeyRDD.foreach(new VoidFunction<Tuple2<String, Iterable<String>>>() {
@Override
public void call(Tuple2<String, Iterable<String>> tuple2)
throws Exception {
HashSet<Object> set = new HashSet<>();
Iterator<String> iterator = tuple2._2.iterator();
while(iterator.hasNext()){
set.add(iterator.next());
}
System.out.println("url : " + tuple2._1 + " value: " + set.size());
}
});

context.stop();
}
}

二次排序


public class SecondSortApp {
public static void main(String[] args) {

SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local").setAppName("SecondarySortTest");
final JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> secondRDD = sc.textFile("data/secondSort.txt");

JavaPairRDD<SecondSortKey, String> pairSecondRDD = secondRDD.mapToPair(
new PairFunction<String, SecondSortKey, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<SecondSortKey, String> call(String line) throws Exception {
String[] splited = line.split(" ");
int first = Integer.valueOf(splited[0]);
int second = Integer.valueOf(splited[1]);

// 这里自己写一个排序的类:SecondSorkey,使key实现Comparable接口
SecondSortKey secondSortKey = new SecondSortKey(first, second);
return new Tuple2<SecondSortKey, String>(secondSortKey, line);
}
});

pairSecondRDD.sortByKey(false).foreach(
new VoidFunction<Tuple2<SecondSortKey, String>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<SecondSortKey, String> tuple)
throws Exception {
System.out.println(tuple + "------" + tuple._2);
}
});
}

}

// 类SecondSorkey
public class SecondSortKey implements Serializable , Comparable<SecondSortKey>{

private static final long serialVersionUID = 1L;
private int first;
private int second;
public int getFirst() {
return first;
}
public void setFirst(int first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}

public SecondSortKey(int first, int second) {
super();
this.first = first;
this.second = second;
}

@Override
public int compareTo(SecondSortKey o1) {
if (getFirst() - o1.getFirst() == 0) {
return o1.getSecond() - getSecond();
} else {
return o1.getFirst() - getFirst();
}
}
}

分组取topN

public class TopN {
public static void main(String[] args) {

SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("TopOps");
JavaSparkContext sc = new JavaSparkContext(conf);
//hdfs://seanxia/wc.txt
JavaRDD<String> linesRDD = sc.textFile("./data/scores.txt",5);

JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair(
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;

@Override
public Tuple2<String, Integer> call(String str) throws Exception {
String[] splited = str.split(" ");
String clazzName = splited[0];
Integer score = Integer.valueOf(splited[1]);
return new Tuple2<String, Integer>(clazzName, score);
}
});

JavaPairRDD<String, Iterable<Integer>> groupByKeyRDD = pairRDD.groupByKey();

groupByKeyRDD.foreach(
new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Iterable<Integer>> tuple)
throws Exception {
String clazzName = tuple._1;
Iterator<Integer> iterator = tuple._2.iterator();
System.out.println(tuple);

// 优化:定义一个长度为3的数组,每次只取前3个最大的,无需进行全排序
Integer[] top3 = new Integer[3];
while (iterator.hasNext()) {
Integer score = iterator.next();
for (int i = 0; i < top3.length; i++) {
if (top3[i] == null) {
top3[i] = score;
break;
} else if (score > top3[i]) {
for (int j = 2; j > i; j--) {
top3[j] = top3[j - 1];
}
top3[i] = score;
break;
}
}
}
System.out.println("class Name:" + clazzName);
for (Integer sscore : top3) {
System.out.println(sscore);
}
}
});
}
}

Spark-Submit 提交参数

Options:

  • –master

    MASTER_URL, 可 以 是 spark://host:port,mesos://host:port,yarn,yarn-cluster,yarn-client,local

  • –deploy-mode

    DEPLOY_MODE, Driver 程序运行的地方,client 或者 cluster,默认是client。

  • –class

    CLASS_NAME,主类名称,含包名

  • –jars

    逗号分隔的本地 JARS,Driver 和 executor 依赖的第三方 jar 包

  • –files

    用逗号隔开的文件列表,会放置在每个 executor 工作目录中

  • –conf

    spark的配置属性

  • –driver-memory

    Driver 程序使用内存大小(例如:1000M,5G),默认 1024M

  • –executor-memory

    每个 executor 内存大小(如:1000M,2G),默认 1G

Spark standalone with cluster deploy mode only:

  • –driver-cores

    Driver 程序的使用 core 个数(默认为 1),仅限于 Spark standalone 模式

Spark standalone or Mesos with cluster deploy mode only:

  • –supervise

    失败后是否重启 Driver,仅限于 Spark alone 或者 Mesos 模式

Spark standalone and Mesos only:

  • –total-executor-cores

    executor 使用的总核数,仅限于 SparkStandalone、Spark on Mesos 模式

Spark standalone and YARN only:

  • –executor-cores

    每个 executor 使用的 core 数,Spark on Yarn 默认为 1,standalone 默认为 worker 上所有可用的 core。

YARN-only:

  • –driver-cores

    driver 使用的 core,仅在 cluster 模式下,默认为 1。

  • –queue

    QUEUE_NAME 指定资源队列的名称,默认:default。

  • –num-executors

    一共启动的 executor 数量,默认是 2 个。

SparkShell

概念:

SparkShell 是 Spark 自带的一个快速原型开发工具,也可以说是 Spark 的 scala REPL(Read-Eval-Print-Loop),即交互式 shell。支持使用 scala 语言来进行 Spark 的交互式编程。

使用:

1、启动 Standalone 集群(sbin目录下):./start-all.sh

2、在客户端上启动 spark-shell(bin目录下):

./spark-shell --master spark://sean01:7077

3、启动 HDFS,创建目录 spark/test,上传文件 wc.txt:

#启动 hdfs 集群:
start-all.sh
#创建目录:
hdfs dfs -mkdir -p /spark/test
#上传 wc.txt
hdfs dfs -put /root/test/wc.txt /spark/test

在 root/test 目录下随机准备一份数据 wc.txt

4、测试运行 WordCount

sc.textFile("hdfs://Xss/spark/test/wc.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).foreach(println)

5、通过 Web 端查看

SparkUI

SparkUI 界面介绍

可以指定提交 Application 的名称。

添加参数: --name 自定义名称

./spark-shell --master spark://sean01:7077 --name myapp

注意:无论名称还是配置都是以代码写定的为准。

优先级为:代码 > shell > conf

1、配置临时 historyServer

临时配置,对本次提交的应用程序起作用。

./spark-shell --master spark://sean01:7077
--name WC
--conf spark.eventLog.enabled=true
--conf spark.eventLog.dir=hdfs://Xss/spark/test

作用:停止程序,在 Web Ui 中 Completed Applications 对应的 ApplicationID 中能查看 history。

2、spark-default.conf 配置文件中配置 HistoryServer,对所有提交的 Application 都起作用

  • 在客户端节点,进入 …/spark-1.6.0/conf/spark-defaults.conf 最后加入(3台都配):
# 开启记录事件日志的功能
spark.eventLog.enabled true
# 设置事件日志存储的目录
spark.eventLog.dir hdfs://Xss/spark/test
# 设置 HistoryServer 加载事件日志的位置
spark.history.fs.logDirectory hdfs://Xss/spark/test
# 日志优化选项, 压缩日志
spark.eventLog.compress true

  • 重启 spark 集群
#sbin目录下:
./stop-all.sh
./start-all.sh
  • 查看 Web 端

  • 启动 HistoryServer:
#sbin目录下:
./start-history-server.sh
  • 访问 HistoryServer:

    sean01:18080,之前所有提交的应用程序运行状况都会被记录。包括之前的都有。

Master HA

1、Master 的高可用原理

Standalone 集群只有一个 Master,如果 Master 挂了就无法提交应用程序,需要给 Master 进行高可用配置,Master 的高可用可以使用 fileSystem(文件系统)和 zookeeper(分布式协调服务)。

  • fileSystem

    fileSystem 只有存储功能,可以存储 Master 的元数据信息,用 fileSystem 搭建的 Master 高可用,在 Master 失败时,需要我们手动启动另外的备用 Master,这种方式不推荐使用

  • zookeeper

    zookeeper 有选举和存储功能,可以存储 Master 的元素据信息,使用 zookeeper 搭建的 Master 高可用,当 Master 挂掉时,备用的 Master会自动切换,推荐使用这种方式搭建 Master 的 HA。

2、Master 高可用搭建

  • 在 Spark Master 节点上配置主 Master,配置 spark-env.sh
export SPARK_DAEMON_JAVA_OPTS="
-Dspark.deploy.recoveryMode=ZOOKEEPER
-Dspark.deploy.zookeeper.url=sean01:2181,sean02:2181,sean03:2181
-Dspark.deploy.zookeeper.dir=/sparkmaster0821"
  • 发送到其他 worker 节点上

  • 找一台节点(非主 Master 节点)配置备用 Master,修改 spark-env.sh 配置节点上的 MasterIP

  • 启动集群之前启动 zookeeper 集群

  • 启动 spark Standalone 集群,在备用节点 sean02 启动备用 Master

#sbin目录下
./start-master.sh
  • 打开主 Master 和备用 Master WebUI 页面,观察状态。

3、注意点

  • 主备切换过程中不能提交 Application。
  • 主备切换过程中不影响已经在集群中运行的 Application。因为 Spark 是粗粒度资源调度。

4、测试验证

提交 SparkPi 程序,kill 主 Master 观察现象

#bin目录下
./spark-submit
--master spark://sean01:7077,sean02:7077
--class org.apache.spark.examples.SparkPi
../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100

kill 掉主节点 sean01,查看 Web 端备用节点 sean02 是否进行了切换

成功切换成主节点!