wordpress懸浮小工具的插件seo文章排名優(yōu)化
文章目錄
- Pyspark catalog用法
- catalog 介紹
- cache 緩存表
- uncache 清除緩存表
- cleanCache 清理所有緩存表
- createExternalTable 創(chuàng)建外部表
- currentDatabase 返回當(dāng)前默認(rèn)庫
- tableExists 檢查數(shù)據(jù)表是否存在,包含臨時(shí)視圖
- databaseExists 檢查數(shù)據(jù)庫是否存在
- dropGlobalTempView 刪除全局臨時(shí)視圖
- dropTempView 刪除臨時(shí)視圖
- functionExists 檢查函數(shù)是否存在
- getDatabase 獲取具有指定名稱的數(shù)據(jù)庫
- getFunction 獲取方法
- getTable 獲取數(shù)據(jù)表
- isCached 檢查是否緩存成功
- listCatalogs 列出可用的catalogs
- listColumns 返回?cái)?shù)據(jù)表的列信息
- listDatabases 獲取數(shù)據(jù)庫列表
- listTables 獲取數(shù)據(jù)表,包含臨時(shí)視圖
- setCurrentDatabase 設(shè)置當(dāng)前數(shù)據(jù)庫
- refreshTable 刷新緩存
- refreshByPath 刷新路徑
- recoverPartitions 恢復(fù)分區(qū)
Pyspark catalog用法
catalog 介紹
Catalog
是Spark中用于管理元數(shù)據(jù)信息的接口,這些元數(shù)據(jù)可能包括庫、內(nèi)部或外部表、函數(shù)、表列及臨時(shí)視圖等。
總的來說,PySpark Catalogs是PySpark框架中用于管理和查詢?cè)獢?shù)據(jù)的重要組件,它使得Python用戶能夠更有效地利用PySpark進(jìn)行大數(shù)據(jù)處理和分析。
spark = SparkSession.builder.appName('LDSX_TEST') \.config('hive.metastore.uris', 'thrift://hadoop01:9083') \.config('spark.master',"local[2]" ) \.enableHiveSupport().getOrCreate()
cache 緩存表
可以設(shè)置緩存等級(jí),默認(rèn)緩存等級(jí)為MEMORY_AND_DISK,是數(shù)據(jù)表級(jí)別的緩存,跟緩存dataframe存在區(qū)別,
設(shè)置不存在的表報(bào)錯(cuò)
# 緩存數(shù)據(jù)表
spark.catalog.cacheTable('ldsx_test.ldsx_table_one')
#檢查是否緩存成功
ldsx = spark.catalog.isCached('ldsx_test.ldsx_table_one')
>True
uncache 清除緩存表
當(dāng)表不存在數(shù)據(jù)庫會(huì)報(bào)錯(cuò)
spark.catalog.uncacheTable("ldsx_test.ldsx_table_one")
cleanCache 清理所有緩存表
spark.catalog.clearCache()
createExternalTable 創(chuàng)建外部表
# spark.catalog.createExternalTable(# tableName='ldsx_test_table',# path = './ldsx_one.csv',# database='ldsx_test',## )
currentDatabase 返回當(dāng)前默認(rèn)庫
返回當(dāng)前默認(rèn)所在數(shù)據(jù)庫spark.catalog.setCurrentDatabase 設(shè)置所在數(shù)據(jù)庫
data = spark.catalog.currentDatabase()
tableExists 檢查數(shù)據(jù)表是否存在,包含臨時(shí)視圖
data = spark.catalog.tableExists('ldsx_test.ldsx_table_one')
>True
databaseExists 檢查數(shù)據(jù)庫是否存在
data = spark.catalog.databaseExists('ldsx_test')
dropGlobalTempView 刪除全局臨時(shí)視圖
全局臨時(shí)表查找時(shí)候需要指向global_temp
要?jiǎng)h除的表不存在報(bào)錯(cuò)
#創(chuàng)建全局臨時(shí)表
spark.createDataFrame([(1, 1)]).createGlobalTempView("my_table")
#注意查詢時(shí)候需要指向 global_temp
spark.sql('select * from global_temp.my_table').show()
#刪除全局臨時(shí)
ldsx= spark.catalog.dropGlobalTempView("my_table")
dropTempView 刪除臨時(shí)視圖
要?jiǎng)h除的表不存在報(bào)錯(cuò)
#創(chuàng)建臨時(shí)表
spark.createDataFrame([(1, 1)]).createTempView("my_table")
spark.sql('select * from my_table').show()
#刪除臨時(shí)表
ldsx = spark.catalog.dropTempView("my_table")
functionExists 檢查函數(shù)是否存在
spark.catalog.functionExists("count")
>True
getDatabase 獲取具有指定名稱的數(shù)據(jù)庫
data = spark.catalog.getDatabase("ldsx_test")
print(data)
>>Database(name='ldsx_test', catalog='spark_catalog', description='', locationUri='hdfs://master:7171/home/ldsx/opt/hadoopData/hive_data/ldsx_test.db')
getFunction 獲取方法
獲取不到方法報(bào)錯(cuò)
spark.sql("CREATE FUNCTION my_func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'")
data = spark.catalog.getFunction("my_func1")
print(data)
>>Function(name='my_func1', catalog='spark_catalog', namespace=['default'], description='N/A.', className='test.org.apache.spark.sql.MyDoubleAvg', isTemporary=False)
getTable 獲取數(shù)據(jù)表
獲取不到表報(bào)錯(cuò)
data = spark.catalog.getTable("ldsx_table_one")
print(data)
>>Table(name='ldsx_table_one', catalog='spark_catalog', namespace=['ldsx_test'], description=None, tableType='MANAGED', isTemporary=False)
isCached 檢查是否緩存成功
# 緩存數(shù)據(jù)表
spark.catalog.cacheTable('ldsx_test.ldsx_table_one')
data = spark.catalog.isCached('ldsx_test.ldsx_table_one')
>True
listCatalogs 列出可用的catalogs
catalogs = spark.catalog.listCatalogs()
print(catalogs)
listColumns 返回?cái)?shù)據(jù)表的列信息
# 參數(shù):數(shù)據(jù)表,數(shù)據(jù)庫
catalogs = spark.catalog.listColumns('ldsx_table_one','ldsx_test')
print(catalogs)
>> [Column(name='age', description='??', dataType='string', nullable=True, isPartition=False, isBucket=False),Column(name='name', description='??', dataType='string', nullable=True, isPartition=False, isBucket=False),Column(name='fraction', description='??', dataType='string', nullable=True, isPartition=False, isBucket=False),Column(name='class', description='??', dataType='string', nullable=True, isPartition=False, isBucket=False),Column(name='gender', description='??', dataType='string', nullable=True, isPartition=False, isBucket=False)]
listDatabases 獲取數(shù)據(jù)庫列表
data1 = spark.catalog.listDatabases()
print(data1)
>>[Database(name='default', catalog='spark_catalog', description='Default Hive database',locationUri='hdfs://master:7171/home/ldsx/opt/hadoopData/hive_data'),
Database(name='ldsx_test', catalog='spark_catalog', description='',locationUri='hdfs://master:7171/home/ldsx/opt/hadoopData/hive_data/ldsx_test.db')]
listTables 獲取數(shù)據(jù)表,包含臨時(shí)視圖
# 展示數(shù)據(jù)庫中數(shù)據(jù)表以及臨時(shí)視圖
spark.catalog.setCurrentDatabase('ldsx_test')
spark.createDataFrame([(1,1)]).createTempView('TEST')
data = spark.catalog.listTables()
print(data)
>>[Table(name='ldsx_table_one', catalog='spark_catalog', namespace=['ldsx_test'], description=None,tableType='MANAGED', isTemporary=False),Table(name='TEST', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]
setCurrentDatabase 設(shè)置當(dāng)前數(shù)據(jù)庫
spark.catalog.setCurrentDatabase('ldsx_test')
data = spark.catalog.currentDatabase()
print(data)
>> ldsx_test
refreshTable 刷新緩存
看官網(wǎng)案例是,刷新已經(jīng)緩存的表
當(dāng)一個(gè)表執(zhí)行了cacheTable后,元數(shù)據(jù)有變動(dòng)使用refreshTable進(jìn)行元數(shù)據(jù)刷新
refreshByPath 刷新路徑
# 假設(shè)有一個(gè) Hive 表,其數(shù)據(jù)存儲(chǔ)在 HDFS 上的某個(gè)路徑
path = "/user/hive/warehouse/mydb.db/mytable"
# 刷新該路徑下的表或分區(qū)信息
spark.catalog.refreshByPath(path)
df = spark.sql("SELECT * FROM mydb.mytable")
df.show()
recoverPartitions 恢復(fù)分區(qū)
recoverPartitions嘗試恢復(fù) Hive 表中丟失的分區(qū)信息,實(shí)際使用后更新