怎么做pp網(wǎng)站近期熱點(diǎn)新聞事件50個(gè)
在上一篇文章中已經(jīng)大致說明了DataFrame APi,下面我們具體介紹DataFrame DSL的使用。DataFrame DSL是一種命令式編寫Spark SQL的方式,使用的是一種類sql的風(fēng)格語法。
文章鏈接:
一、單詞統(tǒng)計(jì)案例引入
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object Demo2DSLWordCount {def main(args: Array[String]): Unit = {/*** 在新版本的spark中,如果想要編寫spark sql的話,需要使用新的spark入口類:SparkSession*/val sparkSession: SparkSession = SparkSession.builder().master("local").appName("wc spark sql").getOrCreate()/*** spark sql和spark core的核心數(shù)據(jù)類型不太一樣** 1、讀取數(shù)據(jù)構(gòu)建一個(gè)DataFrame,相當(dāng)于一張表*/val linesDF: DataFrame = sparkSession.read.format("csv") //指定讀取數(shù)據(jù)的格式.schema("line STRING") //指定列的名和列的類型,多個(gè)列之間使用,分割.option("sep", "\n") //指定分割符,csv格式讀取默認(rèn)是英文逗號.load("spark/data/words.txt") // 指定要讀取數(shù)據(jù)的位置,可以使用相對路徑/*** DSL: 類SQL語法 api 介于代碼和純sql之間的一種api** spark在DSL語法api中,將純sql中的函數(shù)都使用了隱式轉(zhuǎn)換變成一個(gè)scala中的函數(shù)* 如果想要在DSL語法中使用這些函數(shù),需要導(dǎo)入隱式轉(zhuǎn)換**///導(dǎo)入Spark sql中所有的sql隱式轉(zhuǎn)換函數(shù)import org.apache.spark.sql.functions._//導(dǎo)入另一個(gè)隱式轉(zhuǎn)換,后面可以直接使用$函數(shù)引用字段進(jìn)行處理import sparkSession.implicits._// linesDF.select(explode(split($"line","\\|")) as "word")
// .groupBy($"word")
// .count().show()val resultDF: DataFrame = linesDF.select(explode(split($"line", "\\|")) as "word").groupBy($"word").agg(count($"word") as "counts")/*** 保存數(shù)據(jù)*/resultDF.repartition(1).write.format("csv").option("sep","\t").mode(SaveMode.Overwrite).save("spark/data/sqlout2")}}
注意:show()可以指定兩個(gè)參數(shù),第一個(gè)參數(shù)為展現(xiàn)的條數(shù),不指定默認(rèn)展示前20條數(shù)據(jù),第二個(gè)參數(shù)默認(rèn)為false,代表的是如果數(shù)據(jù)過長展示就會(huì)不完全,可以指定為true,使得數(shù)據(jù)展示完整,比如 : show(200,truncate = false)
二、數(shù)據(jù)源獲取
查看官方文檔:Data Sources - Spark 3.5.1 Documentation,看到DataFrame支持多種數(shù)據(jù)源的獲取。
?1、csv-->json
val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("多種類型數(shù)據(jù)源讀取演示").config("spark.sql.shuffer.partitions", 1) //指定分區(qū)數(shù)為1,默認(rèn)分區(qū)數(shù)是200個(gè).getOrCreate()//導(dǎo)入spark sql中所有的隱式轉(zhuǎn)換函數(shù)import org.apache.spark.sql.functions._//導(dǎo)入sparkSession下的所有隱式轉(zhuǎn)換函數(shù),后面可以直接使用$函數(shù)引用字段import sparkSession.implicits._/*** 讀csv格式的文件-->寫到j(luò)son格式文件中*///1500100967,能映秋,21,女,文科五班val studentsDF: DataFrame = sparkSession.read.format("csv").schema("id String,name String,age Int,gender String,clazz String").option("sep", ",").load("spark/data/student.csv")studentsDF.write.format("json").mode(SaveMode.Overwrite).save("spark/data/students_out_json.json")
2、json-->parquet
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("").config("spark.sql.shuffer.partitions", 1) //指定分區(qū)數(shù)為1,默認(rèn)分區(qū)數(shù)是200個(gè).getOrCreate()//導(dǎo)入spark sql中所有的隱式轉(zhuǎn)換函數(shù)//導(dǎo)入sparkSession下的所有隱式轉(zhuǎn)換函數(shù),后面可以直接使用$函數(shù)引用字段/*** 讀取json數(shù)據(jù)格式,因?yàn)閖son數(shù)據(jù)有鍵值對,會(huì)自動(dòng)的將健作為列名,值作為列值,不需要手動(dòng)的設(shè)置表結(jié)構(gòu)*///1500100967,能映秋,21,女,文科五班//方式1:// val studentsJsonDF: DataFrame = sparkSession.read// .format("json")// .load("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")//方式2:實(shí)際上也是調(diào)用方式1,只是更簡潔了// def json(paths: String*): DataFrame = format("json").load(paths : _*)val studebtsReadDF: DataFrame = sparkSession.read.json("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")studebtsReadDF.write.format("parquet").mode(SaveMode.Overwrite).save("spark/data/students_parquet")
3、parquet-->csv
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("").config("spark.sql.shuffer.partitions", 1) //指定分區(qū)數(shù)為1,默認(rèn)分區(qū)數(shù)是200個(gè).getOrCreate()//導(dǎo)入Spark sql中所有的sql隱式轉(zhuǎn)換函數(shù)import org.apache.spark.sql.functions._//導(dǎo)入另一個(gè)隱式轉(zhuǎn)換,后面可以直接使用$函數(shù)引用字段進(jìn)行處理import sparkSession.implicits._/*** parquet:壓縮的比例由信息熵決定,通俗的說就是數(shù)據(jù)的重復(fù)程度決定*/val studebtsReadDF: DataFrame = sparkSession.read.format("parquet").load("spark/data/students_parquet/part-00000-8b815a03-97f7-4d71-8b71-4e7e30f60995-c000.snappy.parquet")studebtsReadDF.write.format("csv").mode(SaveMode.Overwrite).save("spark/data/students_csv")
4、數(shù)據(jù)庫
下面我們以mysql為例:
val sparkSession: SparkSession = SparkSession.builder().master("local").appName("連接數(shù)據(jù)庫").config("spark.sql.shuffle.partitions",1) //默認(rèn)分區(qū)的數(shù)量是200個(gè).getOrCreate()/*** 讀取數(shù)據(jù)庫中的數(shù)據(jù),mysql* 如果鏈接失敗,可以將參數(shù)補(bǔ)全:jdbc:mysql://192.168.19.100:3306?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf8&useSSL=false*/val jdDF: DataFrame = sparkSession.read.format("jdbc").option("url", "jdbc:mysql://192.168.19.100:3306?useSSL=false").option("dbtable", "bigdata29.emp").option("user", "root").option("password", "123456").load()jdDF.show(10,truncate = false)
三、DataFrame DSL API的使用
1、select
import org.apache.spark.sql.{DataFrame, SparkSession}object Demo1Select {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().master("local").appName("select函數(shù)演示").getOrCreate()//導(dǎo)入Spark sql中所有的sql隱式轉(zhuǎn)換函數(shù)import org.apache.spark.sql.functions._//導(dǎo)入另一個(gè)隱式轉(zhuǎn)換,后面可以直接使用$函數(shù)引用字段進(jìn)行處理import sparkSession.implicits._val studentsDF: DataFrame = sparkSession.read.format("csv").schema("id String,name String,age String,gender String,clazz String").option("sep", ",").load("spark/data/student.csv")/*** select函數(shù)*///方式1:只能查詢原有字段,不能對字段做出處理,比如加減、起別名之類studentsDF.select("id", "name", "age")//方式2:彌補(bǔ)了方式1的不足studentsDF.selectExpr("id","name","age+1 as new_age")//方式3:使用隱式轉(zhuǎn)換函數(shù)中的$將字段變?yōu)橐粋€(gè)對象val stuDF: DataFrame = studentsDF.select($"id", $"name", $"age")//3.1使用對象對字段進(jìn)行處理
// stuDF.select($"id", $"name", $"age",$"age".+(1) as "new_age").show() //不可使用未變?yōu)閷ο蟮淖侄蝧tuDF.select($"id", $"name", $"age",$"age" + 1 as "new_age") // +是函數(shù),可以等價(jià)于該語句//3.2可以在select中使用sql函數(shù)studentsDF.select($"id", $"name", $"age", substring($"id", 0, 2))}
}
2、where
/*** where函數(shù):過濾數(shù)據(jù)*///方式1:直接將sql中的where語句以字符串形式傳參studentsDF.where("clazz='文科一班' and gender='男'")//方式2:使用$列對象形式過濾/*** 注意在此種方式下:等于和不等于符號與我們平常使用的有所不同* 等于:===* 不等于:=!=*/studentsDF.where($"clazz" === "文科一班" and $"gender"=!="男").show()
3、groupBy和agg
/*** groupby:分組函數(shù) agg:聚合函數(shù)* 注意:* 1、groupby與agg函數(shù)通常都是一起使用* 2、分組聚合之后的結(jié)果DataFrame中只會(huì)包含分組字段與聚合字段* 3、分組聚合之后select中無法出現(xiàn)不是分組的字段*///需求:根據(jù)班級分組,求每個(gè)班級的人數(shù)和平均年齡studentsDF.groupBy($"clazz").agg(count($"clazz") as "clazz_number",avg($"age") as "avg_age").show()
4、join
/*** 5、join:表關(guān)聯(lián)*/val subjectDF1: DataFrame = sparkSession.read.format("csv").option("sep", ",").schema("id String,subject_id String,score Int").load("spark/data/score.csv")val subjectDF2: DataFrame = sparkSession.read.format("csv").option("sep", ",").schema("sid String,subject_id String,score Int").load("spark/data/score.csv")//關(guān)聯(lián)場景1:所關(guān)聯(lián)的字段名字一樣studentsDF.join(subjectDF1,"id")//關(guān)聯(lián)場景2:所關(guān)聯(lián)的字段名字不一樣studentsDF.join(subjectDF2,$"id"===$"sid","inner")
// studentsDF.join(subjectDF2,$"id"===$"sid","left").show()/*** 上面兩種關(guān)聯(lián)場景默認(rèn)inner連接方式(內(nèi)連接),可以指定參數(shù)選擇連接方式,比如左連接、右連接、全連接之類* * @param joinType Type of join to perform. Default `inner`. Must be one of:* * `inner`, `cross`, `outer`, `full`, `fullouter`,`full_outer`, `left`,* * `leftouter`, `left_outer`, `right`, `rightouter`, `right_outer`.*/
5、開窗
/*** 開窗函數(shù)* 1、ROW_NUMBER():為分區(qū)中的每一行分配一個(gè)唯一的序號。序號是根據(jù)ORDER BY子句定義的順序分配的* 2、RANK()和DENSE_RANK():為分區(qū)中的每一行分配一個(gè)排名。RANK()在遇到相同值時(shí)會(huì)產(chǎn)生間隙,而DENSE_RANK()則不會(huì)。**///需求:統(tǒng)計(jì)每個(gè)班級總分前三的學(xué)生val stu_scoreDF: DataFrame = studentsDF.join(subjectDF2, $"id" === $"sid")//方式1:在select中使用row_number() over Window.partitionBy().orderBy()stu_scoreDF.groupBy($"clazz", $"id").agg(sum($"score") as "sum_score").select($"clazz", $"id", $"sum_score", row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc) as "score_rank").where($"score_rank" <= 3)//方式2:使用withcolumn()函數(shù),會(huì)新增一列,但是要預(yù)先指定列名stu_scoreDF.repartition(1).groupBy($"clazz", $"id").agg(sum($"score") as "sum_score").withColumn("score_rank",row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc)).where($"score_rank" <= 3).show()
注意:
? ? ? DSL API 不直接對應(yīng) SQL 的關(guān)鍵字執(zhí)行順序(如 SELECT、FROM、WHERE、GROUP BY 等),但可以按照構(gòu)建邏輯查詢的方式來組織代碼,使其與 SQL 查詢的邏輯結(jié)構(gòu)相似。
在構(gòu)建 Spark DataFrame 轉(zhuǎn)換和操作時(shí),常用流程介紹:
- 選擇數(shù)據(jù)源:使用?
spark.read
?或從其他 DataFrame 派生。 - 轉(zhuǎn)換:使用各種轉(zhuǎn)換函數(shù)(如?
select
、filter
、map
、flatMap
、join
?等)來修改 DataFrame。 - 聚合:使用?
groupBy
?和聚合函數(shù)(如?sum
、avg
、count
?等)對數(shù)據(jù)進(jìn)行分組和匯總。 - 排序:使用?
orderBy
?或?sort
?對數(shù)據(jù)進(jìn)行排序。 - 輸出:使用?
show
、collect
、write
?等函數(shù)將結(jié)果輸出到控制臺(tái)、收集到驅(qū)動(dòng)程序或?qū)懭胪獠看鎯?chǔ)。
四、RDD與DataFrame的轉(zhuǎn)換
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}object RddToDf {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().appName("Rdd與Df之間的轉(zhuǎn)換").master("local").config("spark.sql.shuffle.partitions", 1).getOrCreate()import org.apache.spark.sql.functions._import sparkSession.implicits._val sparkContext: SparkContext = sparkSession.sparkContextval idNameRdd: RDD[(String, String)] = sparkContext.textFile("spark/data/student.csv").map(_.split(",")).map {case Array(id: String, name: String, _, _, _) => (id, name)}/*** Rdd-->DF* 因?yàn)樵赗dd中不會(huì)存儲(chǔ)文件的結(jié)構(gòu)(schema)信息,所以要指定字段*/val idNameDF: DataFrame = idNameRdd.toDF("id", "name")idNameDF.createOrReplaceTempView("idNameTb")sparkSession.sql("select id,name from idNameTb").show()/*** DF-->Rdd*/val idNameRdd2: RDD[Row] = idNameDF.rddidNameRdd2.foreach(println)}
}