福州市建設(shè)廳網(wǎng)站谷歌搜索指數(shù)查詢
內(nèi)容目錄
- 創(chuàng)建SparkSession對象
- 從CSV文件中讀取
- 從JSON文件中讀取
- 從Parquet文件中讀取
- 從數(shù)據(jù)列表中創(chuàng)建DataFrame
- 從字典列表中創(chuàng)建DataFrame
- 選擇一列
- 選擇多列
- 過濾年齡大于30的數(shù)據(jù)
- 過濾名字為Alice的數(shù)據(jù)
- 可以使用and、or、not等操作符進行組合查詢
- 按照年齡分組并計算平均年齡和最大年齡
- 將age列從整型改為浮點型
Spark的DataFrame是一種類似于表格的數(shù)據(jù)結(jié)構(gòu),可以進行各種數(shù)據(jù)處理操作。使用DataFrame可以很方便地處理結(jié)構(gòu)化數(shù)據(jù)(如CSV、JSON、Parquet等格式的數(shù)據(jù))。
DataFrame是Spark SQL中的概念,因此需要首先引入Spark SQL庫:
from pyspark.sql import SparkSession
創(chuàng)建SparkSession對象
spark = SparkSession.builder.appName(“example”).getOrCreate()
創(chuàng)建DataFrame
從文件中讀取
可以使用SparkSession的read方法從不同類型的文件中讀取數(shù)據(jù),例如:
從CSV文件中讀取
df = spark.read.csv(“file.csv”, header=True, inferSchema=True)
從JSON文件中讀取
df = spark.read.json(“file.json”)
從Parquet文件中讀取
df = spark.read.parquet(“file.parquet”)
直接創(chuàng)建
也可以使用SparkSession的createDataFrame方法直接創(chuàng)建DataFrame,例如:
從數(shù)據(jù)列表中創(chuàng)建DataFrame
data = [(“Alice”, 25), (“Bob”, 30), (“Charlie”, 35)]
df = spark.createDataFrame(data, [“name”, “age”])
從字典列表中創(chuàng)建DataFrame
data = [{“name”: “Alice”, “age”: 25}, {“name”: “Bob”, “age”: 30}, {“name”: “Charlie”, “age”: 35}]
df = spark.createDataFrame(data)
DataFrame的基本操作
顯示DataFrame
使用show方法可以將DataFrame中的數(shù)據(jù)顯示出來,例如:
df.show()
查看DataFrame的結(jié)構(gòu)
使用printSchema方法可以查看DataFrame的結(jié)構(gòu),例如:
df.printSchema()
選擇列
使用select方法可以選擇一列或多列,例如:
選擇一列
df.select(“name”).show()
選擇多列
df.select(“name”, “age”).show()
過濾數(shù)據(jù)
使用filter方法可以根據(jù)條件過濾數(shù)據(jù),例如:
過濾年齡大于30的數(shù)據(jù)
df.filter(df[“age”] > 30).show()
過濾名字為Alice的數(shù)據(jù)
df.filter(df[“name”] == “Alice”).show()
可以使用and、or、not等操作符進行組合查詢
df.filter((df[“age”] > 30) & (df[“name”] != “Alice”)).show()
分組聚合
使用groupBy方法可以對數(shù)據(jù)進行分組聚合操作,例如:
按照年齡分組并計算平均年齡和最大年齡
df.groupBy(“age”).agg({“age”: “avg”, “age”: “max”}).show()
DataFrame的類型轉(zhuǎn)換
更改列名
使用withColumnRenamed方法可以更改列名,例如:
df = df.withColumnRenamed(“name”, “person_name”)
df.show()
更改列類型
使用withColumn方法可以更改列的數(shù)據(jù)類型,例如:
from pyspark.sql.functions import col
將age列從整型改為浮點型
df = df.withColumn(“age”, col(“age”).cast(“float”))
df.printSchema()
DataFrame的持久化
DataFrame的持久化可以將數(shù)據(jù)緩存在內(nèi)存或磁盤中,避免重復(fù)讀取數(shù)據(jù),提高性能。
使用cache方法可以將DataFrame緩存在內(nèi)存中,例如:
df.cache()
使用persist方法可以將DataFrame緩存在磁盤中,例如:
df.persist()