Spark SQL 是 Spark 处理数据的一个模块,跟基本的 Spark RDD 的API不同,Spark SQL中提供的接口将会提供给Spark 更多关于结构化数据和计算的信息。其本质是,Spark SQL使用这些额外的信息去执行额外的优化。

Shark

Shark 是基于 Spark 计算框架之上且兼容 Hive 语法的 SQL 执行引擎,由于底层的计算采用了 Spark,性能比 MapReduce 的 Hive 普遍快 2 倍以上,当数据全部 load 在内存的话,将快 10 倍以上,因此 Shark 可以作为交互式查询应用服务来使用。

除了基于 Spark 的特性外,Shark 是完全兼容 Hive 的语法,表结构以及UDF函数等,已有的 HiveSql 可以直接进行迁移至 Shark 上 Shark 底层依赖于 Hive 的解析器,查询优化器,但正是由于 SHark 的整体设计架构对 Hive 的依赖性太强,难以支持其长远发展,比如不能和 Spark 的其他组件进行很好的集成,无法满足 Spark 的一栈式解决大数据处理的需求。

SparkSQL

1、SparkSQL 介绍

Hive 是 Shark 的前身,Shark 是 SparkSQL 的前身,SparkSQL 产生的根本原因是其完全脱离了 Hive 的限制

  • SparkSQL支持查询原生的RDD。 RDD是Spark平台的核心概念,是 Spark 能够高效的处理大数据的各种场景的基础。

  • 能够在 Scala 中写 SQL 语句。支持简单的 SQL 语法检查,能够在Scala中写Hive语句访问Hive数据,并将结果取回作为RDD使用。

2、Spark on Hive 和 Hive on Spark

  • Spark on Hive: Hive 只作为储存角色,Spark 负责 sql 解析优化,执行。
  • Hive on Spark:Hive 即作为存储又负责 sql 的解析优化,Spark 负责执行。

3、DataFrame

DataFrame 也是一个分布式数据容器。与 RDD 类似,然而 DataFrame 更像传统数据库的二维表格,除了数据以外,还掌握数据的结构信息(元数据信息),即 schema。

同时,与 Hive 类似,DataFrame 也支持嵌套数据类型(struct、array 和 map)。从 API 易用性的角度上 看, DataFrame API 提供的是一套高层的关系操作,比函数式的 RDD API 要更加友好,门槛更低。

DataFrame 的底层封装的是 RDD,只不过 RDD 的泛型是 Row 类型。

4、SparkSQL 的数据源

SparkSQL 的数据源可以是 JSON 类型的字符串,JDBC,Parquent,Hive,HDFS 等。

5、SparkSQL 底层架构

首先拿到 sql 后解析一批未被解决的逻辑计划,再经过分析得到分析后的逻辑计划,再经过一批优化规则转换成一批最佳优化的逻辑计划,再经过 SparkPlanner 的策略转化成一批物理计划,随后经过消费模型转换成一个个的 Spark 任务执行。

6、谓词下推(predicate Pushdown)

我们知道,可以通过封装SparkSql的Data Source API完成各类数据源的查询,那么如果底层数据源无法高效完成数据的过滤,就会执行直接的全局扫描,把每条相关的数据都交给SparkSql的Filter操作符完成过滤,虽然SparkSql使用的Code Generation技术极大的提高了数据过滤的效率,但是这个过程无法避免大量数据的磁盘读取,甚至在某些情况下会涉及网络IO(例如数据非本地化时);如果底层数据源在进行扫描时能非常快速的完成数据的过滤,那么就会把过滤交给底层数据源来完成,这就是SparkSql中的谓词下推。

创建 DataFrame 的几种方式

1. 读取 Json 格式的文件创建 DataFrame

注意:

  • Json 文件中的 json 数据不能嵌套 json 格式数据。
  • DataFrame 是一个一个 Row 类型的 RDD,df.rdd()/df.javaRdd()。
  • 可以两种方式读取 json 格式的文件。
  • df.show()默认显示前 20 行数据。
  • DataFrame 原生 API 可以操作 DataFrame(不方便)。
  • 注册成临时表时,表中的列默认按 ASCII 顺序显示列。

Java:

public class CreateDFFromJosonFile {
public static void main(String[] args){
// 配置上下文环境
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("jsonFile");
SparkContext sc = new SparkContext(conf);

// 创建sqlContext
SQLContext sqlContext = new SQLContext(sc);

/**
* DataFrame的底层是一个一个的RDD RDD的泛型是Row类型。
* 以下两种方式都可以读取json格式的文件
*/
DataFrame df = sqlContext.read().format("json").load("./data/json");
// DataFrame df = sqlContext.read().json("./data/json");

/**
* 显示 DataFrame中的内容,默认显示前20行。如果现实多行要指定多少行show(行数)
* 注意:当有多个列时,显示的列先后顺序是按列的ascii码先后显示。
*/
// df.show();

// dataFrame转换成RDD,两种方式
RDD<Row> rdd = df.rdd();
// JavaRDD<Row> rowJavaRDD = df.javaRDD();

// 树形形式显示scahema信息
// df.printSchema();

/**
* dataFram自带的API 操作DataFrame
*/
// select name ,age from table where age>18
df.select(df.col("name"),df.col("age")).where(df.col("age").gt(18)).show();

//select count(*) from table group by age
df.groupBy(df.col("age")).count().show();

/**
* 将DataFrame注册成临时的一张表,
* 这张表临时注册到内存中,是逻辑上的表,不会雾化到磁盘
*/
df.registerTempTable("jtable");
DataFrame result = sqlContext.sql("select age, count(1) from jtable group by age");
result.show();

sc.stop();
}
}

Scala:

object CreateDFFromJsonFile {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("jsonFile")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.json("./data/json")
// val df = sqlContext.read.format("json").load("./data/json")

df.show()
df.printSchema()

// select * from table
df.select(df.col("name")).show()

// select name from table where age>19
df.select(df.col("name"), df.col("age")).where(df.col("age").gt(19)).show()

// 注册临时表
df.registerTempTable("jtable")

val result = sqlContext.sql("select * from jtable")
result.show()

sc.stop()
}
}

2、通过 json 格式的 RDD 创建 DataFrame

Java:

public class CreateDFFromJsonRDD {
public static void main(String[] args){
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("jsonRDD");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);

JavaRDD<String> nameRDD = sc.parallelize(Arrays.asList(
"{\"name\":\"zhangsan\",\"age\":\"18\"}",
"{\"name\":\"lisi\",\"age\":\"19\"}",
"{\"name\":\"wangwu\",\"age\":\"20\"}"
));

JavaRDD<String> scoreRDD = sc.parallelize(Arrays.asList(
"{\"name\":\"zhangsan\",\"score\":\"100\"}",
"{\"name\":\"lisi\",\"score\":\"200\"}",
"{\"name\":\"wangwu\",\"score\":\"300\"}"
));

DataFrame namedf = sqlContext.read().json(nameRDD);
DataFrame scoredf = sqlContext.read().json(scoreRDD);

namedf.registerTempTable("t1");
scoredf.registerTempTable("t2");

DataFrame result = sqlContext.sql("select t1.name, t1.age, t2.score from t1, t2 where t1.name=t2.name ");
result.show();

sc.stop();
}
}

Scala:

object CreateDFFromJsonRDD {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("jsonRDD")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

val nameRDD = sc.makeRDD(Array(
"{\"name\":\"zhangsan\",\"age\":18}",
"{\"name\":\"lisi\",\"age\":19}",
"{\"name\":\"wangwu\",\"age\":20}"
))

val scoreRDD = sc.makeRDD(Array(
"{\"name\":\"zhangsan\",\"score\":100}",
"{\"name\":\"lisi\",\"score\":200}",
"{\"name\":\"wangwu\",\"score\":300}"
))

val nameDF = sqlContext.read.json(nameRDD)
val scoreDF = sqlContext.read.json(scoreRDD)

// 创建临时表t1、t2
nameDF.registerTempTable("t1")
scoreDF.registerTempTable("t2")

val result = sqlContext.sql("select t1.name, t1.age, t2.score from t1, t2 where t1.name = t2.name")
result.show()

sc.stop()
}
}

3、非 json 格式的 RDD 创建 DataFrame

(1)通过反射的方式将非 json 格式的 RDD 转换成 DataFrame(不建议使用)

  • 自定义类要可序列化
  • 自定义类的访问级别是 Public
  • RDD 转成 DataFrame 后会根据映射将字段按 Assci 码排序
  • 将 DataFrame 转换成 RDD 时获取字段两种方式,一种是df.getInt(0)下标获取(不推荐使用),另一种是 df.getAs(“列名”)获取(推荐使用)

Java:

public class CreateDFFromRDDWithReflect {
public static void main(String[] args){
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("RDD");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> lineRDD = sc.textFile("./data/person.txt");

JavaRDD<Person> personRDD = lineRDD.map(new Function<String, Person>() {
@Override
public Person call(String s) throws Exception {
Person p = new Person();
p.setId(s.split(",")[0]);
p.setName(s.split(",")[1]);
p.setAge(Integer.valueOf(s.split(",")[2]));
return p;
}
});

/**
* 传入进去Person.class的时候,sqlContext是通过反射的方式创建DataFrame
* 在底层通过反射的方式获得Person的所有field,结合RDD本身,就生成了DataFrame
*/
DataFrame df = sqlContext.createDataFrame(personRDD, Person.class);
df.show();

df.registerTempTable("person");
sqlContext.sql("select name from person where id = 2").show();

/**
* 将DataFrame转成JavaRDD
* 注意:
* 1.可以使用row.getInt(0),row.getString(1)...
* 通过下标获取返回Row类型的数据,但是要注意列顺序问题---不常用
* 2.可以使用row.getAs("列名")来获取对应的列值
*/
JavaRDD<Row> javaRDD = df.javaRDD();

JavaRDD<Person> map = javaRDD.map(new Function<Row, Person>() {
@Override
public Person call(Row row) throws Exception {
Person p = new Person();

//p.setId(row.getString(1));
//p.setName(row.getString(2));
//p.setAge(row.getInt(0));

p.setId((String) row.getAs("id"));
p.setName((String) row.getAs("name"));
p.setAge((Integer) row.getAs("age"));

return p;
}
});

map.foreach(new VoidFunction<Person>() {
@Override
public void call(Person person) throws Exception {
System.out.println(person);
}
});

sc.stop();
}
}

Scala:

object CreateDFFromRDDWithReflect {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("reflectRDD")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val lineRDD = sc.textFile("./data/person.txt")

// 将RDD隐式转换成DataFrame
import sqlContext.implicits._
val personRDD = lineRDD.map(x => {
val person = Person(
x.split(",")(0),
x.split(",")(1),
Integer.valueOf(x.split(",")(2)
))
person
})

val df = personRDD.toDF()
df.show()

/**
* 将DataFrame转换成PersonRDD
*/
val rdd = df.rdd
val result = rdd.map(x => {
Person(x.getAs("id"), x.getAs("name"), x.getAs("age"))
})
result.foreach(println)

sc.stop()
}

}

(2)动态创建 Schema 将非 json 格式的 RDD 转换成 DataFrame

Java:

public class CreateDFFromRDDWithStruct {
public static void main(String[] args){
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("rddStruct");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> lineRDD = sc.textFile("./data/person.txt");

/**
* 转换成Row类型的RDD
*/
JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {

@Override
public Row call(String s) throws Exception {
return RowFactory.create(
s.split(",")[0],
s.split(",")[1],
Integer.valueOf(s.split(",")[0])
);
}
});

/**
* 动态构建DataFrame中的元数据,
* 一般来说这里的字段可以来源自字符串,也可以来源于外部数据库
*/
List<StructField> structFields = Arrays.asList(
DataTypes.createStructField("id", DataTypes.StringType, true),
DataTypes.createStructField("aname", DataTypes.StringType, true),
DataTypes.createStructField("age", DataTypes.IntegerType, true)
);
StructType schema = DataTypes.createStructType(structFields);

DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.printSchema();
df.show();

sc.stop();
}
}

Scala:

object CreateDFFromRDDWithStruct {
def main(args: Array[String]): Unit = {

val conf = new SparkConf()
conf.setMaster("local").setAppName("rddStruct")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val lineRDD = sc.textFile("./data/person.txt")

val rowRDD = lineRDD.map(x => {
val split = x.split(",")
RowFactory.create(split(0),split(1),Integer.valueOf(split(2)))
})

val schema = StructType(List(
StructField("id", StringType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)
))

val df = sqlContext.createDataFrame(rowRDD, schema)
df.show()
df.printSchema()

sc.stop()
}
}

4、读取 parquet 文件创建 DataFrame

注意:

  • 可以将 DataFrame 存储成 parquet 文件。保存成 parquet 文件的方式有两种
df.write().mode(SaveMode.Overwrite)format("parquet").save("./sparksql/parquet");
df.write().mode(SaveMode.Overwrite).parquet("./sparksql/parquet");
  • SaveMode 指定文件保存时的模式。
    • Overwrite:覆盖
    • Append:追加
    • ErrorIfExists:如果存在就报错
    • Ignore:如果存在就忽略

Java:

public class CreateDFFromParquet {
public static void main(String[] args){

SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("parquet");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> jsonRDD = sc.textFile("./data/json");
DataFrame df = sqlContext.read().json(jsonRDD);

/**
* 将DataFrame保存成parquet文件,SaveMode指定存储文件时的保存模式
* 保存成parquet文件有以下两种方式:
*/
df.write().mode(SaveMode.Overwrite).format("parquet").save("./data/parquet");
df.write().mode(SaveMode.Overwrite).parquet("./data/parquet");

df.show();

/**
* 加载parquet文件成DataFrame
* 加载parquet文件有以下两种方式:
*/
DataFrame parquet = sqlContext.read().format("parquet").load("./data/parquet");
// DataFrame parquet = sqlContext.read().parquet("./data/parquet");

parquet.show();

sc.stop();
}
}

Scala:

object CreateDFFromParquet {
def main(args: Array[String]): Unit = {

val conf = new SparkConf()
conf.setMaster("local").setAppName("parquet")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val jsonRDD = sc.textFile("./data/json")

val df = sqlContext.read.json(jsonRDD)
df.show()

/**
* 将DF保存为parquet文件
*/
df.write.mode(SaveMode.Overwrite).format("parquet").save("./data/parquet")
// df.write.mode(SaveMode.Overwrite).parquet("./data/parquet")

/**
* 读取parquet文件
*/
val result = sqlContext.read.parquet("./data/parquet")
// val result = sqlContext.read.format("parquet").load("./data/parquet")

result.show()

sc.stop()
}

}

5、读取 JDBC 中的数据创建 DataFrame(MySql 为例)

Java:

public class CreateDFFromMysql {
public static void main(String[] args){

SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("mysql");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);

/**
* 第一种方式读取MySql数据库表,加载为DataFrame
*/

HashMap<String, String> options = new HashMap<>();
options.put("url", "jdbc:mysql://127.0.0.1:3306/spark");
options.put("driver", "com.mysql.jdbc.Driver");
options.put("user", "root");
options.put("password", "123456");
options.put("dbtable", "person");

DataFrame person = sqlContext.read().format("jdbc").options(options).load();
// DataFrame person = sqlContext.read().jdbc();
person.show();

// 创建临时表t1
person.registerTempTable("t1");

/**
* 第二种方式读取MySql数据表加载为DataFrame
*/
DataFrameReader reader = sqlContext.read().format("jdbc");
reader.option("url", "jdbc:mysql://127.0.0.1:3306/spark");
reader.option("driver", "com.mysql.jdbc.Driver");
reader.option("user", "root");
reader.option("password", "123456");
reader.option("dbtable", "score");

DataFrame score = reader.load();
score.show();

// 创建临时表t2
score.registerTempTable("t2");

DataFrame result = sqlContext.sql("select t1.id, t1.name, t2.score from t1, t2 where person.name = score.name");

result.show();

/**
* 将DataFrame结果保存到Mysql中
*/
Properties properties = new Properties();
properties.setProperty("user", "root");
properties.setProperty("password", "123456");

result.write().mode(SaveMode.Overwrite).
jdbc("jdbc:mysql://127.0.0.1:3306/spark", "result", properties);

sc.stop();
}
}

Scala:

object CreateDFFromMysql {
def main(args: Array[String]): Unit = {

val conf = new SparkConf()
conf.setMaster("local").setAppName("mysql")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

/**
* 第一种方式读取Mysql数据库表创建DF
*/
val options = new HashMap[String,String]();
options.put("url", "jdbc:mysql://127.0.0.1:3306/spark")
options.put("driver","com.mysql.jdbc.Driver")
options.put("user","root")
options.put("password", "123456")
options.put("dbtable","person")

val person = sqlContext.read.format("jdbc").options(options).load()
person.show()

// 创建临时表t1
person.registerTempTable("t1")

val reader = sqlContext.read.format("jdbc")
reader.option("url", "jdbc:mysql://127.0.0.1:3306/spark")
reader.option("driver","com.mysql.jdbc.Driver")
reader.option("user","root")
reader.option("password","123456")
reader.option("dbtable", "score")

val score = reader.load()
score.show()

// 创建临时表t2
score.registerTempTable("t2")

val result = sqlContext.sql("select t1.id, t1.name, t2.score from t1, t2 where t1.name = t2.name")
result.show()

sc.stop()
}
}

6、读取 Hive 中的数据加载成 DataFrame

  • HiveContext 是 SQLContext 的子类,连接 Hive 建议使用 HiveContext。
  • 由于本地没有 Hive 环境,要提交到集群运行,提交命令:
./spark-submit
--master spark://sean01:7077,sean02:7077
--executor-cores 1
--executor-memory 1G
--total-executor-cores 1
--class com.seanxia.spark.java.sql.dataframe.CreateDFFromHiveCluster
/root/test/HiveTest.jar

Java:

public class CreateDFFromHiveCluster {
public static void main(String[] args){

SparkConf conf = new SparkConf().setAppName("hive");
JavaSparkContext sc = new JavaSparkContext(conf);
// HiveContext是SQLContext的子类
HiveContext hiveContext = new HiveContext(sc);

// 使用spark实例库
hiveContext.sql("USE spark");
// 表student_infos如果存在就删除
hiveContext.sql("DROP TABLE IF EXISTS student_infos");

// 在hive中创建student_infos表
hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos " +
"(name STRING,age INT) row format delimited fields terminated by '\t' ");
// 加载数据
hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos");
//第二种读取Hive表加载DF方式
// hiveContext.table("student_infos");

hiveContext.sql("DROP TABLE IF EXISTS student_scores");
// 在hive中创建student_scores表
hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores " +
"(name STRING, score INT) row format delimited fields terminated by '\t'");
hiveContext.sql("LOAD DATA LOCAL INPATH '/root/test/student_scores' INTO TABLE student_scores");

/**
* 查询表生成DataFrame
*/
DataFrame goodStudentsDF = hiveContext.sql("SELECT si.name, si.age, ss.score "
+ "FROM student_infos si "
+ "JOIN student_scores ss "
+ "ON si.name=ss.name "
+ "WHERE ss.score>=80");

// 注册临时表
goodStudentsDF.registerTempTable("goodStudent");
DataFrame result = hiveContext.sql("select * from goodStudent");
result.show();

/**
* 将结果保存到hive表,good_student_infos
*/
hiveContext.sql("DROP TABLE IF EXISTS good_student_infos");
goodStudentsDF.write().mode(SaveMode.Overwrite)
.saveAsTable("good_student_infos");

DataFrame table = hiveContext.table("good_student_infos");
Row[] goodStudentRows = table.collect();
for (Row goodStudentRow : goodStudentRows) {
System.out.println(goodStudentRow);
}
sc.stop();
}
}

Scala:

object CreateDFFromHiveCluster {
def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("hive")
val sc = new SparkContext(conf)

// HiveContext是SQLContext的子类
val hiveContext = new HiveContext(sc)

// 使用spark实例库
hiveContext.sql("use spark")
// 表student_infos如果存在就删除
hiveContext.sql("drop table if exists student_infos")

// 在hive中创建student_infos表
hiveContext.sql("create table if not exists student_infos (name string,age int) " +
"row format delimited fields terminated by '\t'")
// 加载数据
hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos")
//第二种读取Hive表加载DF方式
// hiveContext.table("student_infos")

hiveContext.sql("drop table if exists student_scores")
hiveContext.sql("create table if not exists student_scores (name string,score int) " +
"row format delimited fields terminated by '\t'")
hiveContext.sql("load data local inpath '/root/test/student_scores' into tablestudent_scores")

/**
* 查询表生成DataFrame
*/
val df = hiveContext.sql("select si.name,si.age,ss.score from student_infos si," +
"student_scores ss where si.name = ss.name")

hiveContext.sql("drop table if exists good_student_infos")

/**
* 将结果写入到hive表中
*/
df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")

sc.stop()
}
}

序列化问题

Java中不能被序列化的几种情况:

1、反序列化时serializable 版本号不一致时会导致不能反序列化。

2、子类中实现了serializable接口,父类中没有实现,父类中的变量不能被序列化,序列化后父类中的变量会得到null。

注意:父类实现serializable接口,子类没有实现serializable接口时,子类可以正常序列化

3、被关键字transient修饰的变量不能被序列化。

4、静态变量不能被序列化,属于类,不属于方法和对象,所以不能被序列化。

存储 DataFrame

1、将 DataFrame 存储为 parquet 文件。

2、将 DataFrame 存储到 JDBC 数据库。

3、将 DataFrame 存储到 Hive 表。

自定义函数 UDF 和 UDAF

1、UDF:用户自定义函数

Java:

public class UDF {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("UDF");
JavaSparkContext sc = new JavaSparkContext(conf);

SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhangsan","lisi","wangwu"));

JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {
private static final long serialVersionUID = 1L;

@Override
public Row call(String s) throws Exception {
return RowFactory.create(s);
}
});

/**
* 动态创建Schema方式加载DF
*/
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType,true));
StructType schema = DataTypes.createStructType(fields);

DataFrame df = sqlContext.createDataFrame(rowRDD,schema);

df.registerTempTable("user");

/**
* 根据UDF函数参数的个数来决定是实现哪一个UDF UDF1,UDF2。。。。UDF1xxx
*/
sqlContext.udf().register("StrLen",new UDF2<String, Integer, Integer>() {
private static final long serialVersionUID = 1L;

@Override
public Integer call(String t1, Integer t2) throws Exception {
return t1.length() + t2;
}
} , DataTypes.IntegerType );

sqlContext.sql("select name ,StrLen(name,100) as length from user").show();

sc.stop();
}
}

Scala:

object UDF {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local").setAppName("udf")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc);

val rdd = sc.makeRDD(Array("zhansan","lisi","wangwu"))
val rowRDD = rdd.map { x => {
RowFactory.create(x)
} }

val schema = DataTypes.createStructType(Array(StructField("name",StringType,true)))

val df = sqlContext.createDataFrame(rowRDD, schema)

df.registerTempTable("user")
//sqlContext.udf.register("StrLen",(s : String)=>{s.length()})
//sqlContext.sql("select name ,StrLen(name) as length from user").show

sqlContext.udf.register("StrLen",(s : String,i:Int)=>{s.length()+i})
sqlContext.sql("select name ,StrLen(name,10) as length from user").show
sc.stop()
}
}

2、UDAF:用户自定义聚合函数

实现 UDAF 函数如果要自定义类要实现 UserDefinedAggregateFunction 类。

Java:

public class UDAF {
public static void main(String[] args) {

SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("UDAF");
conf.set("spark.sql.shuffle.partitions", "1");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);

JavaRDD<String> parallelize = sc.parallelize(
Arrays.asList("zhangsan", "lisi", "wangwu", "zhangsan", "zhangsan", "lisi",
"zhangsan", "lisi", "wangwu", "zhangsan", "zhangsan", "lisi"),2);

JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {
private static final long serialVersionUID = 1L;

@Override
public Row call(String s) throws Exception {
return RowFactory.create(s);
}
});

List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.registerTempTable("user");

/**
* 注册一个UDAF函数,实现统计相同值得个数
* 注意:这里可以自定义一个类继承UserDefinedAggregateFunction类也是可以的
*/
sqlContext.udf().register("StringCount", new UserDefinedAggregateFunction() {
private static final long serialVersionUID = 1L;

/**
* 初始化一个内部的自己定义的值,在Aggregate之前每组数据的初始化结果
*/
@Override
public void initialize(MutableAggregationBuffer buffer) {
buffer.update(0, 0);
System.out.println("init ....." + buffer.get(0));
}

/**
* 更新 可以认为一个一个地将组内的字段值传递进来 实现拼接的逻辑
* buffer.getInt(0)获取的是上一次聚合后的值
* 相当于map端的combiner,combiner就是对每一个map task的处理结果进行一次小聚合
* 大聚和发生在reduce端.
* 这里即是:在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算
*/
@Override
public void update(MutableAggregationBuffer buffer, Row arg1) {
System.out.println(buffer.getClass() + "-----------------------");

buffer.update(0, buffer.getInt(0) + 1);

System.out.println("update.....buffer" + buffer.toString() + " | row" + arg1.toString() );
}

/**
* 合并 update操作,可能是针对一个分组内的部分数据,在某个节点上发生的 但是可能一个分组内的数据,会分布在多个节点上处理
* 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来
* buffer1.getInt(0) : 大聚合的时候 上一次聚合后的值
* buffer2.getInt(0) : 这次计算传入进来的update的结果
* 这里即是:最后在分布式节点完成后需要进行全局级别的Merge操作
*/
public void merge(MutableAggregationBuffer buffer1, Row arg1) {
buffer1.update(0, buffer1.getInt(0) + arg1.getInt(0));
System.out.println("merge.....buffer : " + buffer1.toString() + "| row" + arg1.toString() );
}

/**
* 在进行聚合操作的时候所要处理的数据的结果的类型
*/
@Override
public StructType bufferSchema() {
return DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("bffer", DataTypes.IntegerType, true)));
}

/**
* 最后返回一个和DataType的类型要一致的类型,返回UDAF最后的计算结果
*/
@Override
public Object evaluate(Row row) {
return row.getInt(0);
}

/**
* 指定UDAF函数计算后返回的结果类型
*/
@Override
public DataType dataType() {
return DataTypes.IntegerType;
}

/**
* 指定输入字段的字段及类型
*/
@Override
public StructType inputSchema() {
return DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("name", DataTypes.StringType, true)));
}

/**
* 确保一致性 一般用true,用以标记针对给定的一组输入,UDAF是否总是生成相同的结果。
*/
@Override
public boolean deterministic() {
return true;
}

});

sqlContext.sql("select name ,StringCount(name) as number from user group by name").show();

sc.stop();
}
}

Scala:

class MyUDAF extends UserDefinedAggregateFunction {
// 聚合操作时,所处理的数据的类型
def bufferSchema: StructType = {
DataTypes.createStructType(Array(DataTypes.createStructField("aaa",
IntegerType, true)))
}
// 最终函数返回值的类型
def dataType: DataType = {
DataTypes.IntegerType
}
def deterministic: Boolean = {
true
}
// 最后返回一个最终的聚合值 要和dataType的类型一一对应
def evaluate(buffer: Row): Any = {
buffer.getAs[Int](0)
}
// 为每个分组的数据执行初始化值
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0
}
//输入数据的类型
def inputSchema: StructType = {
DataTypes.createStructType(Array(DataTypes.createStructField("input",
StringType, true)))
}
// 最后merger的时候,在各个节点上的聚合值,要进行merge,也就是合并
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Int](0)+buffer2.getAs[Int](0)
}
// 每个组,有新的值进来的时候,进行分组对应的聚合值的计算
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[Int](0)+1
}
}
object UDAF {
def main(args: Array[String]): Unit = {

val conf = new SparkConf()
conf.setMaster("local").setAppName("udaf")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)

val rdd = sc.makeRDD(Array("zhangsan","lisi","wangwu","zhangsan","lisi"))
val rowRDD = rdd.map { x => {RowFactory.create(x)} }
val schema = DataTypes.createStructType(Array(
DataTypes.createStructField("name", StringType, true)))
val df = sqlContext.createDataFrame(rowRDD, schema)

df.show()

df.registerTempTable("user")
/**
* 注册一个udaf函数
*/
sqlContext.udf.register("StringCount", new MyUDAF())
sqlContext.sql("select name ,StringCount(name) from user group byname").show()
sc.stop()
}
}

开窗函数

注意:

row_number() 开窗函数是按照某个字段分组,然后取另一字段的前几个的值,相当于分组取 topN。

如果 SQL 语句里面使用到了开窗函数,那么这个 SQL 语句必须使用 HiveContext 来执行,HiveContext 默认情况下在本地无法创建。

开窗函数格式:

row_number() over (partition by XXX order by XXX);

开窗函数语义说明:

1、首先在 select 查询时,使用 row_number() 函数,其次,row_number() 函数后面先跟上 over 关键字;

2、然后括号中,是 partition by,也就是说根据哪个字段进行分组;

3、其次是可以用 order by 进行组内排序;

4、这样 row_number() 就可以给每个组内的行,打上一个组内行号。

开窗函数的作用,其实就是,给每个分组的数据,按照排序顺序,打上分组内的行号。

OOM异常解决:

使用开窗函数由于会消耗大量内存空间,容易报OOM异常,出现此异常时,只需调整当前程序配置。步骤和配置如下:

只需在Configration中的VM options填入如下配置即可。

-Xms800m -Xmx800m  -XX:PermSize=64M -XX:MaxNewSize=256m -XX:MaxPermSize=128m

代码展示:

Java:

public class RowNumberWindowFun {
public static void main(String[] args) {

SparkConf conf = new SparkConf();
conf.setAppName("windowfun").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
conf.set("spark.sql.shuffle.partitions", "1");
HiveContext hiveContext = new HiveContext(sc);

hiveContext.sql("use spark");
hiveContext.sql("drop table if exists sales");
hiveContext.sql("create table if not exists sales (riqi string,leibie string,jine Int) "
+ "row format delimited fields terminated by '\t'");
hiveContext.sql("load data local inpath './data/sales.txt' into table sales");
/**
* 开窗函数格式:
* 【 row_number() over (partition by XXX order by XXX) as rank】
* 注意:rank 从1开始
*/
/**
* 以类别分组,按每种类别金额降序排序,显示 【日期,种类,金额】 结果,如:
*
* 1 A 100
* 2 B 200
* 3 A 300
* 4 B 400
* 5 A 500
* 6 B 600
* 排序后:
* 5 A 500 --rank 1
* 3 A 300 --rank 2
* 1 A 100 --rank 3
* 6 B 600 --rank 1
* 4 B 400 --rank 2
* 2 B 200 --rank 3
*/
DataFrame result = hiveContext.sql("select riqi,leibie,jine,rank " +
"from (select riqi,leibie,jine, " +
"row_number() over (partition by leibie order by jine desc) rank " +
"where t.rank<=3");

result.show(100);
/**
* 将结果保存到hive表sales_result
*/
result.write().mode(SaveMode.Overwrite).saveAsTable("sales_result");
sc.stop();
}
}

Scala:

object RowNumberWindowFun {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("windowfun")
val sc = new SparkContext(conf)
val hiveContext = new HiveContext(sc)
hiveContext.sql("use spark");
hiveContext.sql("drop table if exists sales");
hiveContext.sql("create table if not exists sales (riqi string,leibie string,jine Int) "
+ "row format delimited fields terminated by '\t'");
hiveContext.sql("load data local inpath '/root/test/sales' into table sales");
/**
* 开窗函数格式:
* 【 rou_number() over (partitin by XXX order by XXX) 】
*/
val result = hiveContext.sql("select riqi,leibie,jine "
+ "from (select riqi,leibie,jine,"
+ "row_number() over (partition by leibie order by jine desc) rank "
+ "from sales) t "
+ "where t.rank<=3");
result.show();
sc.stop()
}
}