关于 Spark 算子的应用案例有很多,这里介绍一些一些不常见但是很有用的算子,以及几个小案例的源码介绍。
补充算子
-
mapPartitionWithIndex
类似于 mapPartitions,除此之外还会携带分区的索引值。
-
repartition
增加或减少分区。会产生shuffle。
-
coalesce
coalesce 常用来减少分区,第二个参数是减少分区的过程中是否产生 shuffle。
true 为产生 shuffle,false 不产生 shuffle。默认是 false。
如果 coalesce 设置的分区数比原来的 RDD 的分区数还多的话,第二个参数设置为 false 不会起作用,如果设置成 true,效果和 repartition 一样。即 repartition(numPartitions) = coalesce(numPartitions,true)
-
groupBykey
作用在 K,V 格式的 RDD 上。根据 Key 进行分组。作用在(K,V),返回(K,Iterable )。
-
zip
将两个 RDD 中的元素(KV 格式/非 KV 格式)变成一个 KV 格式的 RDD。、
两个 RDD 的个数必须相同。
-
zipWithIndex
该函数将 RDD 中的元素和这个元素在 RDD 中的索引号(从 0 开始)组合成(K,V)对。
Action
案例分析
这里举例 PV&PU、二次排序、分组取topN。更多详细源码及测试文件请见 个人github
PV&PU
PV 是网站分析的一个术语,用以衡量网站用户访问的网页的数量。对于广告主,PV 值可预期它可以带来多少广告收入。一般来说,PV 与来访者的数量成正比,但是 PV 并不直接决定页面的真实来访者数量,如同一个来访
者通过不断的刷新页面,也可以制造出非常高的 PV。
1、什么是 PV 值
PV (page view )即页面浏览量或点击量,是衡量一个网站或网页用户访问量。具体的说,PV 值就是所有访问者在 24 小时(0 点到 24 点)内看了某个网站多少个页面或某个网页多少次。PV 是指页面刷新的次数,每一次页面刷新,就算做一次 PV 流量。
2、什么是 UV 值
UV (unique visitor )即独立访客数,指访问某个站点或点击某个网页的不同 IP 地址的人数。在同一天内,UV 只记录第一次进入网站的具有独立 IP 的访问者,在同一天内再次访问该网站则不计数。UV 提供了一定时间内不同观众数量的统计指标,而没有反应出网站的全面活动。
3、代码
public class PV { public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("pv"); JavaSparkContext context = new JavaSparkContext(conf); JavaRDD<String> lineRDD = context.textFile("./data/pvuvdata");
lineRDD.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<>(s.split("\t")[5],1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() { @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple2) throws Exception { return new Tuple2<>(tuple2._2,tuple2._1); } }).sortByKey(false).mapToPair( new PairFunction<Tuple2<Integer,String>, String, Integer>() { @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple2) throws Exception { return new Tuple2<>(tuple2._2,tuple2._1); } }).foreach(new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> tuple2) throws Exception { System.out.println(tuple2); } }); context.stop(); } }
public class UV { public static void main(String[] args) {
SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("pv"); JavaSparkContext context = new JavaSparkContext(conf); JavaRDD<String> lineRDD = context.textFile("./data/pvuvdata");
JavaPairRDD<String, String> urlipRDD = lineRDD.mapToPair( new PairFunction<String, String, String>() { @Override public Tuple2<String, String> call(String s) throws Exception { return new Tuple2<>(s.split("\t")[5], s.split("\t")[0]); } }); JavaPairRDD<String, Iterable<String>> groupByKeyRDD = urlipRDD.groupByKey();
groupByKeyRDD.foreach(new VoidFunction<Tuple2<String, Iterable<String>>>() { @Override public void call(Tuple2<String, Iterable<String>> tuple2) throws Exception { HashSet<Object> set = new HashSet<>(); Iterator<String> iterator = tuple2._2.iterator(); while(iterator.hasNext()){ set.add(iterator.next()); } System.out.println("url : " + tuple2._1 + " value: " + set.size()); } });
context.stop(); } }
|
二次排序
public class SecondSortApp { public static void main(String[] args) {
SparkConf sparkConf = new SparkConf(); sparkConf.setMaster("local").setAppName("SecondarySortTest"); final JavaSparkContext sc = new JavaSparkContext(sparkConf); JavaRDD<String> secondRDD = sc.textFile("data/secondSort.txt");
JavaPairRDD<SecondSortKey, String> pairSecondRDD = secondRDD.mapToPair( new PairFunction<String, SecondSortKey, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<SecondSortKey, String> call(String line) throws Exception { String[] splited = line.split(" "); int first = Integer.valueOf(splited[0]); int second = Integer.valueOf(splited[1]); SecondSortKey secondSortKey = new SecondSortKey(first, second); return new Tuple2<SecondSortKey, String>(secondSortKey, line); } });
pairSecondRDD.sortByKey(false).foreach( new VoidFunction<Tuple2<SecondSortKey, String>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<SecondSortKey, String> tuple) throws Exception { System.out.println(tuple + "------" + tuple._2); } }); }
}
public class SecondSortKey implements Serializable , Comparable<SecondSortKey>{ private static final long serialVersionUID = 1L; private int first; private int second; public int getFirst() { return first; } public void setFirst(int first) { this.first = first; } public int getSecond() { return second; } public void setSecond(int second) { this.second = second; }
public SecondSortKey(int first, int second) { super(); this.first = first; this.second = second; }
@Override public int compareTo(SecondSortKey o1) { if (getFirst() - o1.getFirst() == 0) { return o1.getSecond() - getSecond(); } else { return o1.getFirst() - getFirst(); } } }
|
分组取topN
public class TopN { public static void main(String[] args) {
SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("TopOps"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> linesRDD = sc.textFile("./data/scores.txt",5);
JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String str) throws Exception { String[] splited = str.split(" "); String clazzName = splited[0]; Integer score = Integer.valueOf(splited[1]); return new Tuple2<String, Integer>(clazzName, score); } }); JavaPairRDD<String, Iterable<Integer>> groupByKeyRDD = pairRDD.groupByKey();
groupByKeyRDD.foreach( new VoidFunction<Tuple2<String, Iterable<Integer>>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Iterable<Integer>> tuple) throws Exception { String clazzName = tuple._1; Iterator<Integer> iterator = tuple._2.iterator(); System.out.println(tuple); Integer[] top3 = new Integer[3]; while (iterator.hasNext()) { Integer score = iterator.next(); for (int i = 0; i < top3.length; i++) { if (top3[i] == null) { top3[i] = score; break; } else if (score > top3[i]) { for (int j = 2; j > i; j--) { top3[j] = top3[j - 1]; } top3[i] = score; break; } } } System.out.println("class Name:" + clazzName); for (Integer sscore : top3) { System.out.println(sscore); } } }); } }
|
Spark-Submit 提交参数
Options:
-
–master
MASTER_URL, 可 以 是 spark://host:port,mesos://host:port,yarn,yarn-cluster,yarn-client,local
-
–deploy-mode
DEPLOY_MODE, Driver 程序运行的地方,client 或者 cluster,默认是client。
-
–class
CLASS_NAME,主类名称,含包名
-
–jars
逗号分隔的本地 JARS,Driver 和 executor 依赖的第三方 jar 包
-
–files
用逗号隔开的文件列表,会放置在每个 executor 工作目录中
-
–conf
spark的配置属性
-
–driver-memory
Driver 程序使用内存大小(例如:1000M,5G),默认 1024M
-
–executor-memory
每个 executor 内存大小(如:1000M,2G),默认 1G
Spark standalone with cluster deploy mode only:
Spark standalone or Mesos with cluster deploy mode only:
Spark standalone and Mesos only:
Spark standalone and YARN only:
YARN-only:
-
–driver-cores
driver 使用的 core,仅在 cluster 模式下,默认为 1。
-
–queue
QUEUE_NAME 指定资源队列的名称,默认:default。
-
–num-executors
一共启动的 executor 数量,默认是 2 个。
SparkShell
概念:
SparkShell 是 Spark 自带的一个快速原型开发工具,也可以说是 Spark 的 scala REPL(Read-Eval-Print-Loop),即交互式 shell。支持使用 scala 语言来进行 Spark 的交互式编程。
使用:
1、启动 Standalone 集群(sbin目录下):./start-all.sh
2、在客户端上启动 spark-shell(bin目录下):
./spark-shell --master spark://sean01:7077
|
3、启动 HDFS,创建目录 spark/test,上传文件 wc.txt:
#启动 hdfs 集群: start-all.sh #创建目录: hdfs dfs -mkdir -p /spark/test #上传 wc.txt hdfs dfs -put /root/test/wc.txt /spark/test
|
在 root/test 目录下随机准备一份数据 wc.txt
4、测试运行 WordCount
sc.textFile("hdfs://Xss/spark/test/wc.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).foreach(println)
|
5、通过 Web 端查看
SparkUI
SparkUI 界面介绍
可以指定提交 Application 的名称。
添加参数: --name 自定义名称
./spark-shell --master spark://sean01:7077 --name myapp
|
注意:
无论名称还是配置都是以代码写定的为准。
优先级为:代码 > shell > conf
1、配置临时 historyServer
临时配置,对本次提交的应用程序起作用。
./spark-shell --master spark://sean01:7077 --name WC --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=hdfs://Xss/spark/test
|
作用:
停止程序,在 Web Ui 中 Completed Applications 对应的 ApplicationID 中能查看 history。
2、spark-default.conf 配置文件中配置 HistoryServer,对所有提交的 Application 都起作用
- 在客户端节点,进入 …/spark-1.6.0/conf/spark-defaults.conf 最后加入(3台都配):
spark.eventLog.enabled true
spark.eventLog.dir hdfs://Xss/spark/test
spark.history.fs.logDirectory hdfs://Xss/spark/test
spark.eventLog.compress true
|
#sbin目录下: ./stop-all.sh ./start-all.sh
|
#sbin目录下: ./start-history-server.sh
|
Master HA
1、Master 的高可用原理
Standalone 集群只有一个 Master,如果 Master 挂了就无法提交应用程序,需要给 Master 进行高可用配置,Master 的高可用可以使用 fileSystem(文件系统)和 zookeeper(分布式协调服务)。
-
fileSystem
fileSystem 只有存储功能,可以存储 Master 的元数据信息,用 fileSystem 搭建的 Master 高可用,在 Master 失败时,需要我们手动启动另外的备用 Master,这种方式不推荐使用
。
-
zookeeper
zookeeper 有选举和存储功能,可以存储 Master 的元素据信息,使用 zookeeper 搭建的 Master 高可用,当 Master 挂掉时,备用的 Master会自动切换,推荐使用这种方式搭建 Master 的 HA。
2、Master 高可用搭建
export SPARK_DAEMON_JAVA_OPTS=" -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=sean01:2181,sean02:2181,sean03:2181 -Dspark.deploy.zookeeper.dir=/sparkmaster0821"
|
#sbin目录下 ./start-master.sh
|
- 打开主 Master 和备用 Master WebUI 页面,观察状态。
3、注意点
- 主备切换过程中不能提交 Application。
- 主备切换过程中不影响已经在集群中运行的 Application。因为 Spark 是粗粒度资源调度。
4、测试验证
提交 SparkPi 程序,kill 主 Master 观察现象
#bin目录下 ./spark-submit --master spark://sean01:7077,sean02:7077 --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
|
kill 掉主节点 sean01,查看 Web 端备用节点 sean02 是否进行了切换
成功切换成主节点!