中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

怎么做pp網(wǎng)站近期熱點(diǎn)新聞事件50個(gè)

怎么做pp網(wǎng)站,近期熱點(diǎn)新聞事件50個(gè),怎么做類似淘寶網(wǎng)站,網(wǎng)頁設(shè)計(jì)規(guī)范要求在上一篇文章中已經(jīng)大致說明了DataFrame APi,下面我們具體介紹DataFrame DSL的使用。DataFrame DSL是一種命令式編寫Spark SQL的方式,使用的是一種類sql的風(fēng)格語法。 文章鏈接: 一、單詞統(tǒng)計(jì)案例引入 import org.apache.spark.sql.{DataFrame, SaveMod…

在上一篇文章中已經(jīng)大致說明了DataFrame APi,下面我們具體介紹DataFrame DSL的使用。DataFrame DSL是一種命令式編寫Spark SQL的方式,使用的是一種類sql的風(fēng)格語法。

文章鏈接:

一、單詞統(tǒng)計(jì)案例引入

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object Demo2DSLWordCount {def main(args: Array[String]): Unit = {/*** 在新版本的spark中,如果想要編寫spark sql的話,需要使用新的spark入口類:SparkSession*/val sparkSession: SparkSession = SparkSession.builder().master("local").appName("wc spark sql").getOrCreate()/*** spark sql和spark core的核心數(shù)據(jù)類型不太一樣** 1、讀取數(shù)據(jù)構(gòu)建一個(gè)DataFrame,相當(dāng)于一張表*/val linesDF: DataFrame = sparkSession.read.format("csv") //指定讀取數(shù)據(jù)的格式.schema("line STRING") //指定列的名和列的類型,多個(gè)列之間使用,分割.option("sep", "\n") //指定分割符,csv格式讀取默認(rèn)是英文逗號.load("spark/data/words.txt") // 指定要讀取數(shù)據(jù)的位置,可以使用相對路徑/*** DSL: 類SQL語法 api  介于代碼和純sql之間的一種api** spark在DSL語法api中,將純sql中的函數(shù)都使用了隱式轉(zhuǎn)換變成一個(gè)scala中的函數(shù)* 如果想要在DSL語法中使用這些函數(shù),需要導(dǎo)入隱式轉(zhuǎn)換**///導(dǎo)入Spark sql中所有的sql隱式轉(zhuǎn)換函數(shù)import org.apache.spark.sql.functions._//導(dǎo)入另一個(gè)隱式轉(zhuǎn)換,后面可以直接使用$函數(shù)引用字段進(jìn)行處理import sparkSession.implicits._//    linesDF.select(explode(split($"line","\\|")) as "word")
//      .groupBy($"word")
//      .count().show()val resultDF: DataFrame = linesDF.select(explode(split($"line", "\\|")) as "word").groupBy($"word").agg(count($"word") as "counts")/*** 保存數(shù)據(jù)*/resultDF.repartition(1).write.format("csv").option("sep","\t").mode(SaveMode.Overwrite).save("spark/data/sqlout2")}}

注意:show()可以指定兩個(gè)參數(shù),第一個(gè)參數(shù)為展現(xiàn)的條數(shù),不指定默認(rèn)展示前20條數(shù)據(jù),第二個(gè)參數(shù)默認(rèn)為false,代表的是如果數(shù)據(jù)過長展示就會(huì)不完全,可以指定為true,使得數(shù)據(jù)展示完整,比如 : show(200,truncate = false)

二、數(shù)據(jù)源獲取

查看官方文檔:Data Sources - Spark 3.5.1 Documentation,看到DataFrame支持多種數(shù)據(jù)源的獲取。

?1、csv-->json

    val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("多種類型數(shù)據(jù)源讀取演示").config("spark.sql.shuffer.partitions", 1) //指定分區(qū)數(shù)為1,默認(rèn)分區(qū)數(shù)是200個(gè).getOrCreate()//導(dǎo)入spark sql中所有的隱式轉(zhuǎn)換函數(shù)import org.apache.spark.sql.functions._//導(dǎo)入sparkSession下的所有隱式轉(zhuǎn)換函數(shù),后面可以直接使用$函數(shù)引用字段import sparkSession.implicits._/*** 讀csv格式的文件-->寫到j(luò)son格式文件中*///1500100967,能映秋,21,女,文科五班val studentsDF: DataFrame = sparkSession.read.format("csv").schema("id String,name String,age Int,gender String,clazz String").option("sep", ",").load("spark/data/student.csv")studentsDF.write.format("json").mode(SaveMode.Overwrite).save("spark/data/students_out_json.json")

2、json-->parquet

val sparkSession: SparkSession = SparkSession.builder().master("local").appName("").config("spark.sql.shuffer.partitions", 1) //指定分區(qū)數(shù)為1,默認(rèn)分區(qū)數(shù)是200個(gè).getOrCreate()//導(dǎo)入spark sql中所有的隱式轉(zhuǎn)換函數(shù)//導(dǎo)入sparkSession下的所有隱式轉(zhuǎn)換函數(shù),后面可以直接使用$函數(shù)引用字段/*** 讀取json數(shù)據(jù)格式,因?yàn)閖son數(shù)據(jù)有鍵值對,會(huì)自動(dòng)的將健作為列名,值作為列值,不需要手動(dòng)的設(shè)置表結(jié)構(gòu)*///1500100967,能映秋,21,女,文科五班//方式1://    val studentsJsonDF: DataFrame = sparkSession.read//      .format("json")//      .load("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")//方式2:實(shí)際上也是調(diào)用方式1,只是更簡潔了// def json(paths: String*): DataFrame = format("json").load(paths : _*)val studebtsReadDF: DataFrame = sparkSession.read.json("spark/data/students_out_json.json/part-00000-3f086bb2-23d9-4904-9814-3a34b21020ab-c000.json")studebtsReadDF.write.format("parquet").mode(SaveMode.Overwrite).save("spark/data/students_parquet")

3、parquet-->csv

    val sparkSession: SparkSession = SparkSession.builder().master("local").appName("").config("spark.sql.shuffer.partitions", 1) //指定分區(qū)數(shù)為1,默認(rèn)分區(qū)數(shù)是200個(gè).getOrCreate()//導(dǎo)入Spark sql中所有的sql隱式轉(zhuǎn)換函數(shù)import org.apache.spark.sql.functions._//導(dǎo)入另一個(gè)隱式轉(zhuǎn)換,后面可以直接使用$函數(shù)引用字段進(jìn)行處理import sparkSession.implicits._/*** parquet:壓縮的比例由信息熵決定,通俗的說就是數(shù)據(jù)的重復(fù)程度決定*/val studebtsReadDF: DataFrame = sparkSession.read.format("parquet").load("spark/data/students_parquet/part-00000-8b815a03-97f7-4d71-8b71-4e7e30f60995-c000.snappy.parquet")studebtsReadDF.write.format("csv").mode(SaveMode.Overwrite).save("spark/data/students_csv")

4、數(shù)據(jù)庫

下面我們以mysql為例:

    val sparkSession: SparkSession = SparkSession.builder().master("local").appName("連接數(shù)據(jù)庫").config("spark.sql.shuffle.partitions",1) //默認(rèn)分區(qū)的數(shù)量是200個(gè).getOrCreate()/*** 讀取數(shù)據(jù)庫中的數(shù)據(jù),mysql* 如果鏈接失敗,可以將參數(shù)補(bǔ)全:jdbc:mysql://192.168.19.100:3306?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf8&useSSL=false*/val jdDF: DataFrame = sparkSession.read.format("jdbc").option("url", "jdbc:mysql://192.168.19.100:3306?useSSL=false").option("dbtable", "bigdata29.emp").option("user", "root").option("password", "123456").load()jdDF.show(10,truncate = false)

三、DataFrame DSL API的使用

1、select


import org.apache.spark.sql.{DataFrame, SparkSession}object Demo1Select {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().master("local").appName("select函數(shù)演示").getOrCreate()//導(dǎo)入Spark sql中所有的sql隱式轉(zhuǎn)換函數(shù)import org.apache.spark.sql.functions._//導(dǎo)入另一個(gè)隱式轉(zhuǎn)換,后面可以直接使用$函數(shù)引用字段進(jìn)行處理import sparkSession.implicits._val studentsDF: DataFrame = sparkSession.read.format("csv").schema("id String,name String,age String,gender String,clazz String").option("sep", ",").load("spark/data/student.csv")/*** select函數(shù)*///方式1:只能查詢原有字段,不能對字段做出處理,比如加減、起別名之類studentsDF.select("id", "name", "age")//方式2:彌補(bǔ)了方式1的不足studentsDF.selectExpr("id","name","age+1 as new_age")//方式3:使用隱式轉(zhuǎn)換函數(shù)中的$將字段變?yōu)橐粋€(gè)對象val stuDF: DataFrame = studentsDF.select($"id", $"name", $"age")//3.1使用對象對字段進(jìn)行處理
//    stuDF.select($"id", $"name", $"age",$"age".+(1) as "new_age").show()       //不可使用未變?yōu)閷ο蟮淖侄蝧tuDF.select($"id", $"name", $"age",$"age" + 1 as "new_age")                 // +是函數(shù),可以等價(jià)于該語句//3.2可以在select中使用sql函數(shù)studentsDF.select($"id", $"name", $"age", substring($"id", 0, 2))}
}

2、where

    /*** where函數(shù):過濾數(shù)據(jù)*///方式1:直接將sql中的where語句以字符串形式傳參studentsDF.where("clazz='文科一班' and gender='男'")//方式2:使用$列對象形式過濾/*** 注意在此種方式下:等于和不等于符號與我們平常使用的有所不同* 等于:===* 不等于:=!=*/studentsDF.where($"clazz" === "文科一班" and $"gender"=!="男").show()

3、groupBy和agg

    /*** groupby:分組函數(shù)     agg:聚合函數(shù)* 注意:* 1、groupby與agg函數(shù)通常都是一起使用* 2、分組聚合之后的結(jié)果DataFrame中只會(huì)包含分組字段與聚合字段* 3、分組聚合之后select中無法出現(xiàn)不是分組的字段*///需求:根據(jù)班級分組,求每個(gè)班級的人數(shù)和平均年齡studentsDF.groupBy($"clazz").agg(count($"clazz") as "clazz_number",avg($"age") as "avg_age").show()

4、join

/*** 5、join:表關(guān)聯(lián)*/val subjectDF1: DataFrame = sparkSession.read.format("csv").option("sep", ",").schema("id String,subject_id String,score Int").load("spark/data/score.csv")val subjectDF2: DataFrame = sparkSession.read.format("csv").option("sep", ",").schema("sid String,subject_id String,score Int").load("spark/data/score.csv")//關(guān)聯(lián)場景1:所關(guān)聯(lián)的字段名字一樣studentsDF.join(subjectDF1,"id")//關(guān)聯(lián)場景2:所關(guān)聯(lián)的字段名字不一樣studentsDF.join(subjectDF2,$"id"===$"sid","inner")
//    studentsDF.join(subjectDF2,$"id"===$"sid","left").show()/*** 上面兩種關(guān)聯(lián)場景默認(rèn)inner連接方式(內(nèi)連接),可以指定參數(shù)選擇連接方式,比如左連接、右連接、全連接之類* * @param joinType Type of join to perform. Default `inner`. Must be one of:* *                 `inner`, `cross`, `outer`, `full`, `fullouter`,`full_outer`, `left`,* *                 `leftouter`, `left_outer`, `right`, `rightouter`, `right_outer`.*/

5、開窗

    /*** 開窗函數(shù)* 1、ROW_NUMBER():為分區(qū)中的每一行分配一個(gè)唯一的序號。序號是根據(jù)ORDER BY子句定義的順序分配的* 2、RANK()和DENSE_RANK():為分區(qū)中的每一行分配一個(gè)排名。RANK()在遇到相同值時(shí)會(huì)產(chǎn)生間隙,而DENSE_RANK()則不會(huì)。**///需求:統(tǒng)計(jì)每個(gè)班級總分前三的學(xué)生val stu_scoreDF: DataFrame = studentsDF.join(subjectDF2, $"id" === $"sid")//方式1:在select中使用row_number() over Window.partitionBy().orderBy()stu_scoreDF.groupBy($"clazz", $"id").agg(sum($"score") as "sum_score").select($"clazz", $"id", $"sum_score", row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc) as "score_rank").where($"score_rank" <= 3)//方式2:使用withcolumn()函數(shù),會(huì)新增一列,但是要預(yù)先指定列名stu_scoreDF.repartition(1).groupBy($"clazz", $"id").agg(sum($"score") as "sum_score").withColumn("score_rank",row_number() over Window.partitionBy($"clazz").orderBy($"sum_score".desc)).where($"score_rank" <= 3).show()

注意:

? ? ? DSL API 不直接對應(yīng) SQL 的關(guān)鍵字執(zhí)行順序(如 SELECT、FROM、WHERE、GROUP BY 等),但可以按照構(gòu)建邏輯查詢的方式來組織代碼,使其與 SQL 查詢的邏輯結(jié)構(gòu)相似。

在構(gòu)建 Spark DataFrame 轉(zhuǎn)換和操作時(shí),常用流程介紹:

  1. 選擇數(shù)據(jù)源:使用?spark.read?或從其他 DataFrame 派生。
  2. 轉(zhuǎn)換:使用各種轉(zhuǎn)換函數(shù)(如?select、filter、map、flatMap、join?等)來修改 DataFrame。
  3. 聚合:使用?groupBy?和聚合函數(shù)(如?sum、avg、count?等)對數(shù)據(jù)進(jìn)行分組和匯總。
  4. 排序:使用?orderBy?或?sort?對數(shù)據(jù)進(jìn)行排序。
  5. 輸出:使用?showcollect、write?等函數(shù)將結(jié)果輸出到控制臺(tái)、收集到驅(qū)動(dòng)程序或?qū)懭胪獠看鎯?chǔ)。

四、RDD與DataFrame的轉(zhuǎn)換

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}object RddToDf {def main(args: Array[String]): Unit = {val sparkSession: SparkSession = SparkSession.builder().appName("Rdd與Df之間的轉(zhuǎn)換").master("local").config("spark.sql.shuffle.partitions", 1).getOrCreate()import org.apache.spark.sql.functions._import sparkSession.implicits._val sparkContext: SparkContext = sparkSession.sparkContextval idNameRdd: RDD[(String, String)] = sparkContext.textFile("spark/data/student.csv").map(_.split(",")).map {case Array(id: String, name: String, _, _, _) => (id, name)}/*** Rdd-->DF* 因?yàn)樵赗dd中不會(huì)存儲(chǔ)文件的結(jié)構(gòu)(schema)信息,所以要指定字段*/val idNameDF: DataFrame = idNameRdd.toDF("id", "name")idNameDF.createOrReplaceTempView("idNameTb")sparkSession.sql("select id,name from idNameTb").show()/*** DF-->Rdd*/val idNameRdd2: RDD[Row] = idNameDF.rddidNameRdd2.foreach(println)}
}
http://m.risenshineclean.com/news/7258.html

相關(guān)文章:

  • 做網(wǎng)站打印費(fèi)復(fù)印費(fèi)清單sem廣告投放是做什么的
  • 成都網(wǎng)站建設(shè)哪家公司好廣州番禺發(fā)布
  • 做電商網(wǎng)站的公司百度游戲中心官網(wǎng)
  • 自制軟件生成器泉州seo報(bào)價(jià)
  • 怎么開始做網(wǎng)站推廣方案100個(gè)
  • 給個(gè)網(wǎng)址2022年能直接看的seo人才招聘
  • 自己個(gè)人網(wǎng)站后臺(tái)怎么做網(wǎng)絡(luò)營銷策略方案
  • 為啥做網(wǎng)站圖片識別
  • 做百度推廣網(wǎng)站排名愛站網(wǎng)是什么
  • 運(yùn)城市住房和城鄉(xiāng)建設(shè)局網(wǎng)站免費(fèi)可用的網(wǎng)站源碼
  • 免費(fèi)網(wǎng)站制作軟件有哪些網(wǎng)絡(luò)營銷產(chǎn)品
  • 網(wǎng)站建設(shè)明細(xì)報(bào)價(jià)表 服務(wù)器互聯(lián)網(wǎng)推廣有哪些方式
  • 個(gè)人網(wǎng)站備案通過做淘客百度seo公司報(bào)價(jià)
  • 貴州省銅仁市城鄉(xiāng)建設(shè)局網(wǎng)站下載谷歌瀏覽器并安裝
  • javaweb做視頻網(wǎng)站難嗎廣西南寧做網(wǎng)站的公司
  • 汽車網(wǎng)站建設(shè)目的全能優(yōu)化大師
  • 2345網(wǎng)址導(dǎo)航主頁長沙關(guān)鍵詞優(yōu)化新行情報(bào)價(jià)
  • 備案的域名拿來做別的網(wǎng)站廣東seo網(wǎng)絡(luò)培訓(xùn)
  • 自己網(wǎng)站做問卷調(diào)查網(wǎng)站外鏈有多重要
  • 網(wǎng)站的備案許可號不存在東莞網(wǎng)絡(luò)推廣平臺(tái)
  • 柳市做網(wǎng)站接推廣怎么收費(fèi)
  • 商務(wù)型網(wǎng)站seo專員招聘
  • 商城的網(wǎng)站建設(shè)公關(guān)服務(wù)
  • 購物網(wǎng)站模塊例子洗發(fā)水營銷推廣軟文800字
  • 遵義網(wǎng)站設(shè)計(jì)aso關(guān)鍵詞搜索優(yōu)化
  • 網(wǎng)站建設(shè)需求有哪些武漢剛剛發(fā)生的新聞
  • 網(wǎng)站建設(shè)相關(guān)資訊怎樣搭建一個(gè)網(wǎng)站
  • 網(wǎng)站開發(fā)語言哪一種好些網(wǎng)絡(luò)運(yùn)營好學(xué)嗎
  • 網(wǎng)站上的充值鏈接怎么做的整站seo怎么做
  • 重慶新聞?lì)l道晉城seo