关于 Spark 算子的应用案例有很多,这里介绍一些一些不常见但是很有用的算子,以及几个小案例的源码介绍。
补充算子
- 
mapPartitionWithIndex
类似于 mapPartitions,除此之外还会携带分区的索引值。
 
- 
repartition
增加或减少分区。会产生shuffle。
 
- 
coalesce
coalesce 常用来减少分区,第二个参数是减少分区的过程中是否产生 shuffle。
true 为产生 shuffle,false 不产生 shuffle。默认是 false。
如果 coalesce 设置的分区数比原来的 RDD 的分区数还多的话,第二个参数设置为 false 不会起作用,如果设置成 true,效果和 repartition 一样。即 repartition(numPartitions) = coalesce(numPartitions,true)
 
- 
groupBykey
作用在 K,V 格式的 RDD 上。根据 Key 进行分组。作用在(K,V),返回(K,Iterable )。
 
- 
zip
将两个 RDD 中的元素(KV 格式/非 KV 格式)变成一个 KV 格式的 RDD。、
两个 RDD 的个数必须相同。
 
- 
zipWithIndex
该函数将 RDD 中的元素和这个元素在 RDD 中的索引号(从 0 开始)组合成(K,V)对。
 
Action
案例分析
这里举例 PV&PU、二次排序、分组取topN。更多详细源码及测试文件请见 个人github
PV&PU
PV 是网站分析的一个术语,用以衡量网站用户访问的网页的数量。对于广告主,PV 值可预期它可以带来多少广告收入。一般来说,PV 与来访者的数量成正比,但是 PV 并不直接决定页面的真实来访者数量,如同一个来访
者通过不断的刷新页面,也可以制造出非常高的 PV。
1、什么是 PV 值
PV (page view )即页面浏览量或点击量,是衡量一个网站或网页用户访问量。具体的说,PV 值就是所有访问者在 24 小时(0 点到 24 点)内看了某个网站多少个页面或某个网页多少次。PV 是指页面刷新的次数,每一次页面刷新,就算做一次 PV 流量。
2、什么是 UV 值
UV (unique visitor )即独立访客数,指访问某个站点或点击某个网页的不同 IP 地址的人数。在同一天内,UV 只记录第一次进入网站的具有独立 IP 的访问者,在同一天内再次访问该网站则不计数。UV 提供了一定时间内不同观众数量的统计指标,而没有反应出网站的全面活动。
3、代码
 public class PV {     public static void main(String[] args) {                  SparkConf conf = new SparkConf();         conf.setMaster("local").setAppName("pv");         JavaSparkContext context = new JavaSparkContext(conf);         JavaRDD<String> lineRDD = context.textFile("./data/pvuvdata");
          lineRDD.mapToPair(new PairFunction<String, String, Integer>() {             @Override             public Tuple2<String, Integer> call(String s) throws Exception {                 return new Tuple2<>(s.split("\t")[5],1);             }         }).reduceByKey(new Function2<Integer, Integer, Integer>() {             @Override             public Integer call(Integer v1, Integer v2) throws Exception {                 return v1 + v2;             }         }).mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() {             @Override             public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple2)                  throws Exception {                 return new Tuple2<>(tuple2._2,tuple2._1);             }         }).sortByKey(false).mapToPair(             new PairFunction<Tuple2<Integer,String>, String, Integer>() {             @Override             public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple2)                  throws Exception {                 return new Tuple2<>(tuple2._2,tuple2._1);             }         }).foreach(new VoidFunction<Tuple2<String, Integer>>() {             @Override             public void call(Tuple2<String, Integer> tuple2) throws Exception {                 System.out.println(tuple2);             }         });                  context.stop();     } }
 
  public class UV {     public static void main(String[] args) {
          SparkConf conf = new SparkConf();         conf.setMaster("local").setAppName("pv");         JavaSparkContext context = new JavaSparkContext(conf);         JavaRDD<String> lineRDD = context.textFile("./data/pvuvdata");
          JavaPairRDD<String, String> urlipRDD = lineRDD.mapToPair(                 new PairFunction<String, String, String>() {             @Override             public Tuple2<String, String> call(String s) throws Exception {                 return new Tuple2<>(s.split("\t")[5], s.split("\t")[0]);             }         });                  JavaPairRDD<String, Iterable<String>> groupByKeyRDD = urlipRDD.groupByKey();
          groupByKeyRDD.foreach(new VoidFunction<Tuple2<String, Iterable<String>>>() {             @Override             public void call(Tuple2<String, Iterable<String>> tuple2)                  throws Exception {                 HashSet<Object> set = new HashSet<>();                 Iterator<String> iterator = tuple2._2.iterator();                 while(iterator.hasNext()){                     set.add(iterator.next());                 }                 System.out.println("url : " + tuple2._1  + " value: " + set.size());             }         });
          context.stop();     } }
 
  | 
 
二次排序
 public class SecondSortApp {     public static void main(String[] args) {
          SparkConf sparkConf = new SparkConf();         sparkConf.setMaster("local").setAppName("SecondarySortTest");         final JavaSparkContext sc = new JavaSparkContext(sparkConf);         JavaRDD<String> secondRDD = sc.textFile("data/secondSort.txt");
          JavaPairRDD<SecondSortKey, String> pairSecondRDD = secondRDD.mapToPair(                 new PairFunction<String, SecondSortKey, String>() {             private static final long serialVersionUID = 1L;             @Override             public Tuple2<SecondSortKey, String> call(String line) throws Exception {                 String[] splited = line.split(" ");                 int first = Integer.valueOf(splited[0]);                 int second = Integer.valueOf(splited[1]);                                                   SecondSortKey secondSortKey = new SecondSortKey(first, second);                 return new Tuple2<SecondSortKey, String>(secondSortKey, line);             }         });
          pairSecondRDD.sortByKey(false).foreach(                 new VoidFunction<Tuple2<SecondSortKey, String>>() {                     private static final long serialVersionUID = 1L;                     @Override                     public void call(Tuple2<SecondSortKey, String> tuple)                          throws Exception {                         System.out.println(tuple + "------" + tuple._2);                     }                 });     }
  }
 
  public class SecondSortKey  implements Serializable , Comparable<SecondSortKey>{          private static final long serialVersionUID = 1L;     private int first;     private int second;     public int getFirst() {         return first;     }     public void setFirst(int first) {         this.first = first;     }     public int getSecond() {         return second;     }     public void setSecond(int second) {         this.second = second;     }
      public SecondSortKey(int first, int second) {         super();         this.first = first;         this.second = second;     }
      @Override     public int compareTo(SecondSortKey o1) {         if (getFirst() - o1.getFirst() == 0) {             return o1.getSecond() - getSecond();         } else {             return  o1.getFirst() - getFirst();     	}     } }
 
  | 
 
分组取topN
public class TopN {     public static void main(String[] args) {
          SparkConf conf = new SparkConf();         conf.setMaster("local").setAppName("TopOps");         JavaSparkContext sc = new JavaSparkContext(conf);                  JavaRDD<String> linesRDD = sc.textFile("./data/scores.txt",5);
          JavaPairRDD<String, Integer> pairRDD = linesRDD.mapToPair(                 new PairFunction<String, String, Integer>() {                     private static final long serialVersionUID = 1L;                                          @Override                     public Tuple2<String, Integer> call(String str) throws Exception {                         String[] splited = str.split(" ");                         String clazzName = splited[0];                         Integer score = Integer.valueOf(splited[1]);                         return new Tuple2<String, Integer>(clazzName, score);                     }                 });                  JavaPairRDD<String, Iterable<Integer>> groupByKeyRDD = pairRDD.groupByKey();
          groupByKeyRDD.foreach(                 new VoidFunction<Tuple2<String, Iterable<Integer>>>() {                     private static final long serialVersionUID = 1L;                     @Override                     public void call(Tuple2<String, Iterable<Integer>> tuple)                          throws Exception {                         String clazzName = tuple._1;                         Iterator<Integer> iterator = tuple._2.iterator();                         System.out.println(tuple);                                                                           Integer[] top3 = new Integer[3];                         while (iterator.hasNext()) {                             Integer score = iterator.next();                             for (int i = 0; i < top3.length; i++) {                                 if (top3[i] == null) {                                     top3[i] = score;                                     break;                                 } else if (score > top3[i]) {                                     for (int j = 2; j > i; j--) {                                         top3[j] = top3[j - 1];                                     }                                     top3[i] = score;                                     break;                                 }                             }                         }                         System.out.println("class Name:" + clazzName);                         for (Integer sscore : top3) {                             System.out.println(sscore);                         }                     }                 });     } }
  | 
 
Spark-Submit 提交参数
Options:
- 
–master
MASTER_URL, 可 以 是 spark://host:port,mesos://host:port,yarn,yarn-cluster,yarn-client,local
 
- 
–deploy-mode
DEPLOY_MODE, Driver 程序运行的地方,client 或者 cluster,默认是client。
 
- 
–class
CLASS_NAME,主类名称,含包名
 
- 
–jars
逗号分隔的本地 JARS,Driver 和 executor 依赖的第三方 jar 包
 
- 
–files
用逗号隔开的文件列表,会放置在每个 executor 工作目录中
 
- 
–conf
spark的配置属性
 
- 
–driver-memory
Driver 程序使用内存大小(例如:1000M,5G),默认 1024M
 
- 
–executor-memory
每个 executor 内存大小(如:1000M,2G),默认 1G
 
Spark standalone with cluster deploy mode only:
Spark standalone or Mesos with cluster deploy mode only:
Spark standalone and Mesos only:
Spark standalone and YARN only:
YARN-only:
- 
–driver-cores
driver 使用的 core,仅在 cluster 模式下,默认为 1。
 
- 
–queue
QUEUE_NAME 指定资源队列的名称,默认:default。
 
- 
–num-executors
一共启动的 executor 数量,默认是 2 个。
 
SparkShell
概念:
SparkShell 是 Spark 自带的一个快速原型开发工具,也可以说是 Spark 的 scala REPL(Read-Eval-Print-Loop),即交互式 shell。支持使用 scala 语言来进行 Spark 的交互式编程。
使用:
1、启动 Standalone 集群(sbin目录下):./start-all.sh
2、在客户端上启动 spark-shell(bin目录下):
./spark-shell --master spark://sean01:7077
   | 
 
3、启动 HDFS,创建目录 spark/test,上传文件 wc.txt:
#启动 hdfs 集群: start-all.sh #创建目录: hdfs dfs -mkdir -p /spark/test #上传 wc.txt hdfs dfs -put /root/test/wc.txt /spark/test
   | 
 
在 root/test 目录下随机准备一份数据 wc.txt

4、测试运行 WordCount
sc.textFile("hdfs://Xss/spark/test/wc.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).foreach(println)
  | 
 

5、通过 Web 端查看

SparkUI
SparkUI 界面介绍
可以指定提交 Application 的名称。
添加参数: --name 自定义名称
./spark-shell --master spark://sean01:7077 --name myapp
   | 
 
注意:无论名称还是配置都是以代码写定的为准。
优先级为:代码 > shell > conf
1、配置临时 historyServer
临时配置,对本次提交的应用程序起作用。
./spark-shell --master spark://sean01:7077 --name WC --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=hdfs://Xss/spark/test
   | 
 
作用:停止程序,在 Web Ui 中 Completed Applications 对应的 ApplicationID 中能查看 history。
2、spark-default.conf 配置文件中配置 HistoryServer,对所有提交的 Application 都起作用
- 在客户端节点,进入 …/spark-1.6.0/conf/spark-defaults.conf 最后加入(3台都配):
 
 spark.eventLog.enabled true
  spark.eventLog.dir hdfs://Xss/spark/test
  spark.history.fs.logDirectory hdfs://Xss/spark/test
  spark.eventLog.compress true
 
  | 
 

#sbin目录下: ./stop-all.sh ./start-all.sh
   | 
 

#sbin目录下: ./start-history-server.sh
   | 
 

Master HA
1、Master 的高可用原理
Standalone 集群只有一个 Master,如果 Master 挂了就无法提交应用程序,需要给 Master 进行高可用配置,Master 的高可用可以使用 fileSystem(文件系统)和 zookeeper(分布式协调服务)。
- 
fileSystem
fileSystem 只有存储功能,可以存储 Master 的元数据信息,用 fileSystem 搭建的 Master 高可用,在 Master 失败时,需要我们手动启动另外的备用 Master,这种方式不推荐使用。
 
- 
zookeeper
zookeeper 有选举和存储功能,可以存储 Master 的元素据信息,使用 zookeeper 搭建的 Master 高可用,当 Master 挂掉时,备用的 Master会自动切换,推荐使用这种方式搭建 Master 的 HA。
 

2、Master 高可用搭建
export SPARK_DAEMON_JAVA_OPTS=" -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=sean01:2181,sean02:2181,sean03:2181 -Dspark.deploy.zookeeper.dir=/sparkmaster0821"
   | 
 
#sbin目录下 ./start-master.sh
   | 
 
- 打开主 Master 和备用 Master WebUI 页面,观察状态。
 


3、注意点
- 主备切换过程中不能提交 Application。
 
- 主备切换过程中不影响已经在集群中运行的 Application。因为 Spark 是粗粒度资源调度。
 
4、测试验证
提交 SparkPi 程序,kill 主 Master 观察现象
#bin目录下 ./spark-submit --master spark://sean01:7077,sean02:7077 --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 100
   | 
 
kill 掉主节点 sean01,查看 Web 端备用节点 sean02 是否进行了切换

成功切换成主节点!