/** * dataFram自带的API 操作DataFrame */ // select name ,age from table where age>18 df.select(df.col("name"),df.col("age")).where(df.col("age").gt(18)).show();
//select count(*) from table group by age df.groupBy(df.col("age")).count().show();
/** * 将DataFrame注册成临时的一张表, * 这张表临时注册到内存中,是逻辑上的表,不会雾化到磁盘 */ df.registerTempTable("jtable"); DataFrameresult= sqlContext.sql("select age, count(1) from jtable group by age"); result.show();
sc.stop(); } }
Scala:
objectCreateDFFromJsonFile{ defmain(args: Array[String]): Unit = { val conf = newSparkConf() conf.setMaster("local").setAppName("jsonFile") val sc = newSparkContext(conf) val sqlContext = newSQLContext(sc) val df = sqlContext.read.json("./data/json") // val df = sqlContext.read.format("json").load("./data/json")
df.show() df.printSchema()
// select * from table df.select(df.col("name")).show()
// select name from table where age>19 df.select(df.col("name"), df.col("age")).where(df.col("age").gt(19)).show()
// 注册临时表 df.registerTempTable("jtable")
val result = sqlContext.sql("select * from jtable") result.show() sc.stop() } }
DataFrameresult= sqlContext.sql("select t1.name, t1.age, t2.score from t1, t2 where t1.name=t2.name "); result.show();
sc.stop(); } }
Scala:
objectCreateDFFromJsonRDD{ defmain(args: Array[String]): Unit = { val conf = newSparkConf() conf.setMaster("local").setAppName("jsonRDD") val sc = newSparkContext(conf) val sqlContext = newSQLContext(sc)
val nameRDD = sc.makeRDD(Array( "{\"name\":\"zhangsan\",\"age\":18}", "{\"name\":\"lisi\",\"age\":19}", "{\"name\":\"wangwu\",\"age\":20}" ))
val scoreRDD = sc.makeRDD(Array( "{\"name\":\"zhangsan\",\"score\":100}", "{\"name\":\"lisi\",\"score\":200}", "{\"name\":\"wangwu\",\"score\":300}" ))
val nameDF = sqlContext.read.json(nameRDD) val scoreDF = sqlContext.read.json(scoreRDD) // 创建临时表t1、t2 nameDF.registerTempTable("t1") scoreDF.registerTempTable("t2")
val result = sqlContext.sql("select t1.name, t1.age, t2.score from t1, t2 where t1.name = t2.name") result.show()
objectCreateDFFromRDDWithReflect{ defmain(args: Array[String]): Unit = { val conf = newSparkConf() conf.setMaster("local").setAppName("reflectRDD") val sc = newSparkContext(conf) val sqlContext = newSQLContext(sc) val lineRDD = sc.textFile("./data/person.txt")
// 将RDD隐式转换成DataFrame import sqlContext.implicits._ val personRDD = lineRDD.map(x => { val person = Person( x.split(",")(0), x.split(",")(1), Integer.valueOf(x.split(",")(2) )) person })
val df = personRDD.toDF() df.show()
/** * 将DataFrame转换成PersonRDD */ val rdd = df.rdd val result = rdd.map(x => { Person(x.getAs("id"), x.getAs("name"), x.getAs("age")) }) result.foreach(println)
objectCreateDFFromRDDWithStruct{ defmain(args: Array[String]): Unit = {
val conf = newSparkConf() conf.setMaster("local").setAppName("rddStruct") val sc = newSparkContext(conf) val sqlContext = newSQLContext(sc) val lineRDD = sc.textFile("./data/person.txt")
val rowRDD = lineRDD.map(x => { val split = x.split(",") RowFactory.create(split(0),split(1),Integer.valueOf(split(2))) })
objectCreateDFFromParquet{ defmain(args: Array[String]): Unit = {
val conf = newSparkConf() conf.setMaster("local").setAppName("parquet") val sc = newSparkContext(conf) val sqlContext = newSQLContext(sc) val jsonRDD = sc.textFile("./data/json")
// 在hive中创建student_infos表 hiveContext.sql("CREATE TABLE IF NOT EXISTS student_infos " + "(name STRING,age INT) row format delimited fields terminated by '\t' "); // 加载数据 hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos"); //第二种读取Hive表加载DF方式 // hiveContext.table("student_infos");
hiveContext.sql("DROP TABLE IF EXISTS student_scores"); // 在hive中创建student_scores表 hiveContext.sql("CREATE TABLE IF NOT EXISTS student_scores " + "(name STRING, score INT) row format delimited fields terminated by '\t'"); hiveContext.sql("LOAD DATA LOCAL INPATH '/root/test/student_scores' INTO TABLE student_scores");
// 在hive中创建student_infos表 hiveContext.sql("create table if not exists student_infos (name string,age int) " + "row format delimited fields terminated by '\t'") // 加载数据 hiveContext.sql("load data local inpath '/root/test/student_infos' into table student_infos") //第二种读取Hive表加载DF方式 // hiveContext.table("student_infos")
hiveContext.sql("drop table if exists student_scores") hiveContext.sql("create table if not exists student_scores (name string,score int) " + "row format delimited fields terminated by '\t'") hiveContext.sql("load data local inpath '/root/test/student_scores' into tablestudent_scores")
/** * 查询表生成DataFrame */ val df = hiveContext.sql("select si.name,si.age,ss.score from student_infos si," + "student_scores ss where si.name = ss.name") hiveContext.sql("drop table if exists good_student_infos")
sqlContext.sql("select name ,StrLen(name,100) as length from user").show();
sc.stop(); } }
Scala:
objectUDF{ defmain(args: Array[String]): Unit = { val conf = newSparkConf() conf.setMaster("local").setAppName("udf") val sc = newSparkContext(conf) val sqlContext = newSQLContext(sc); val rdd = sc.makeRDD(Array("zhansan","lisi","wangwu")) val rowRDD = rdd.map { x => { RowFactory.create(x) } } val schema = DataTypes.createStructType(Array(StructField("name",StringType,true))) val df = sqlContext.createDataFrame(rowRDD, schema) df.registerTempTable("user") //sqlContext.udf.register("StrLen",(s : String)=>{s.length()}) //sqlContext.sql("select name ,StrLen(name) as length from user").show sqlContext.udf.register("StrLen",(s : String,i:Int)=>{s.length()+i}) sqlContext.sql("select name ,StrLen(name,10) as length from user").show sc.stop() } }
val conf = newSparkConf() conf.setMaster("local").setAppName("udaf") val sc = newSparkContext(conf) val sqlContext = newSQLContext(sc)
val rdd = sc.makeRDD(Array("zhangsan","lisi","wangwu","zhangsan","lisi")) val rowRDD = rdd.map { x => {RowFactory.create(x)} } val schema = DataTypes.createStructType(Array( DataTypes.createStructField("name", StringType, true))) val df = sqlContext.createDataFrame(rowRDD, schema)
df.show()
df.registerTempTable("user") /** * 注册一个udaf函数 */ sqlContext.udf.register("StringCount", newMyUDAF()) sqlContext.sql("select name ,StringCount(name) from user group byname").show() sc.stop() } }
hiveContext.sql("use spark"); hiveContext.sql("drop table if exists sales"); hiveContext.sql("create table if not exists sales (riqi string,leibie string,jine Int) " + "row format delimited fields terminated by '\t'"); hiveContext.sql("load data local inpath './data/sales.txt' into table sales"); /** * 开窗函数格式: * 【 row_number() over (partition by XXX order by XXX) as rank】 * 注意:rank 从1开始 */ /** * 以类别分组,按每种类别金额降序排序,显示 【日期,种类,金额】 结果,如: * * 1 A 100 * 2 B 200 * 3 A 300 * 4 B 400 * 5 A 500 * 6 B 600 * 排序后: * 5 A 500 --rank 1 * 3 A 300 --rank 2 * 1 A 100 --rank 3 * 6 B 600 --rank 1 * 4 B 400 --rank 2 * 2 B 200 --rank 3 */ DataFrameresult= hiveContext.sql("select riqi,leibie,jine,rank " + "from (select riqi,leibie,jine, " + "row_number() over (partition by leibie order by jine desc) rank " + "where t.rank<=3"); result.show(100); /** * 将结果保存到hive表sales_result */ result.write().mode(SaveMode.Overwrite).saveAsTable("sales_result"); sc.stop(); } }
Scala:
objectRowNumberWindowFun{ defmain(args: Array[String]): Unit = { val conf = newSparkConf() conf.setAppName("windowfun") val sc = newSparkContext(conf) val hiveContext = newHiveContext(sc) hiveContext.sql("use spark"); hiveContext.sql("drop table if exists sales"); hiveContext.sql("create table if not exists sales (riqi string,leibie string,jine Int) " + "row format delimited fields terminated by '\t'"); hiveContext.sql("load data local inpath '/root/test/sales' into table sales"); /** * 开窗函数格式: * 【 rou_number() over (partitin by XXX order by XXX) 】 */ val result = hiveContext.sql("select riqi,leibie,jine " + "from (select riqi,leibie,jine," + "row_number() over (partition by leibie order by jine desc) rank " + "from sales) t " + "where t.rank<=3"); result.show(); sc.stop() } }