網(wǎng)站開(kāi)發(fā)月薪多少錢(qián)小程序推廣運(yùn)營(yíng)的公司
前言
spark操作iceberg之前先要配置spark catalogs,詳情參考Iceberg與Spark整合環(huán)境配置。
有些操作需要在spark3中開(kāi)啟iceberg sql擴(kuò)展。
Iceberg使用Apache Spark的DataSourceV2 API來(lái)實(shí)現(xiàn)數(shù)據(jù)源和catalog。Spark DSv2是一個(gè)不斷發(fā)展的API,在Spark版本中具有不同級(jí)別的支持:
Spark 3支持SQL INSERT INTO、MERGE INTO和INSERT OVERWRITE,以及新的DataFrameWriterV2 API來(lái)進(jìn)行iceberg表的寫(xiě)操作,接下來(lái)我們進(jìn)行詳細(xì)講解。
INSERT INTO
insert into是往iceberg表中插入新數(shù)據(jù),主要有兩種語(yǔ)法:
INSERT INTO prod.db.table VALUES (1, 'a'), (2, 'b')
INSERT INTO prod.db.table SELECT ...
這兩種語(yǔ)法和其它組件如hive等沒(méi)有太多區(qū)別,比較容易掌握。
MERGE INTO
Iceberg "merge into"語(yǔ)法可以對(duì)表數(shù)據(jù)進(jìn)行行級(jí)更新或刪除,在Spark3.x版本之后支持,其原理是重寫(xiě)包含需要?jiǎng)h除和更新行數(shù)據(jù)所在的data files。"merge into"可以使用一個(gè)查詢(xún)結(jié)果數(shù)據(jù)來(lái)更新目標(biāo)表的數(shù)據(jù),其語(yǔ)法通過(guò)類(lèi)似join關(guān)聯(lián)方式,根據(jù)指定的匹配條件對(duì)匹配的行數(shù)據(jù)進(jìn)行相應(yīng)操作。
- 語(yǔ)法
MERGE INTO tbl t -- 目標(biāo)表
USING (SELECT ...) s -- 數(shù)據(jù)源表,也就是用數(shù)據(jù)源表查出的數(shù)據(jù)來(lái)更新或刪除目標(biāo)表
ON t.id = s.id -- 關(guān)聯(lián)條件,類(lèi)似join的on條件
WHEN MATCHED AND ... THEN DELETE -- 刪除直接用delete命令
WHEN MATCHED AND ... THEN UPDATE SET ... --更新用upate set
WHEN MATCHED AND ... AND ... THEN UPDATE SET ... --多條件更新
WHEN NOT MATCHED ADN ... THEN INSERT (col1,col2...) VALUES(s.col1,s.col2 ...) --匹配不上向目標(biāo)表插入數(shù)據(jù)
- 示例
- 創(chuàng)建兩張表a和b
create table hadoop_prod.default.a (id int,name string,age int) using iceberg;
create table hadoop_prod.default.b (id int,name string,age int,tp string) using iceberg
- 插入數(shù)據(jù)
insert into hadoop_prod.default.a values (1,"zs",18),(2,"ls",19),(3,"ww",20)
insert into hadoop_prod.default.b values (1,"zs",30,"delete"),(2,"李四",31,"update"),(4,"王五",32,"add")
- 使用MERGE INTO 語(yǔ)法向目標(biāo)表更新、刪除、新增數(shù)據(jù)
這里我們計(jì)劃將b表與a表匹配id,如果b表中tp字段是"delete"那么a表中對(duì)應(yīng)的id數(shù)據(jù)刪除,如果b表中tp字段是"update",那么a表中對(duì)應(yīng)的id數(shù)據(jù)其他字段進(jìn)行更新,如果a表與b表id匹配不上,那么將b表中的數(shù)據(jù)插入到a表中,具體操作如下:
merge into hadoop_prod.default.a t1 -- 目標(biāo)表a
using (select id,name ,age,tp from hadoop_prod.default.b) t2 -- 數(shù)據(jù)源表b
on t1.id = t2.id -- 關(guān)聯(lián)條件為id
when matched and t2.tp = 'delete' then delete -- 如果數(shù)據(jù)源表中tp字段為delete,則對(duì)目標(biāo)表關(guān)聯(lián)d對(duì)應(yīng)的數(shù)據(jù)進(jìn)行刪除操作
when matched and t2.tp = 'update' then update set t1.name = t2.name,t1.age = t2.age -- 如果數(shù)據(jù)源表tp字段為update,則對(duì)目標(biāo)表關(guān)聯(lián)id對(duì)應(yīng)數(shù)據(jù)用數(shù)據(jù)源表中name和age更新目標(biāo)表對(duì)應(yīng)字段
when not matched then insert (id,name,age) values (t2.id,t2.name,t2.age) -- 如果id關(guān)聯(lián)不上,則直接把數(shù)據(jù)源表對(duì)應(yīng)id這條數(shù)據(jù)插入到目標(biāo)表中
注意:我們很多數(shù)據(jù)庫(kù)都沒(méi)有類(lèi)似merge into的操作,為了便于初學(xué)者理解,每一行操作都有詳細(xì)的注釋。
- 結(jié)果
id=1,可以匹配上,但數(shù)據(jù)源表tp為delete,因此會(huì)把目標(biāo)表id=1對(duì)應(yīng)的行刪除;
id=2,可以匹配上,但數(shù)據(jù)源表tp為update,因此會(huì)把目標(biāo)表id=2對(duì)應(yīng)的name和age用數(shù)據(jù)源表name和age進(jìn)行更新;
id=3,沒(méi)有匹配上,需要把數(shù)據(jù)源表對(duì)應(yīng)的這條數(shù)據(jù)插入到目標(biāo)表,但是由于數(shù)據(jù)源中沒(méi)有id=3的數(shù)據(jù),因此沒(méi)有插入數(shù)據(jù),此時(shí)保留數(shù)據(jù)源表中id=3對(duì)應(yīng)的數(shù)據(jù);
id=4,沒(méi)有匹配上,需要把數(shù)據(jù)源表對(duì)應(yīng)的這條數(shù)據(jù)插入到目標(biāo)表;
注意
:更新數(shù)據(jù)時(shí),在查詢(xún)的數(shù)據(jù)中只能有一條匹配的數(shù)據(jù)更新到目標(biāo)表,否則將報(bào)錯(cuò)。
INSERT OVERWRITE
"insert overwrite"可以覆蓋Iceberg表中的數(shù)據(jù),這種操作會(huì)將表中全部數(shù)據(jù)替換掉,建議如果有部分?jǐn)?shù)據(jù)替換操作可以使用"merge into"操作。
對(duì)于Iceberg分區(qū)表使用"insert overwrite"操作時(shí),有兩種情況,第一種是“動(dòng)態(tài)覆蓋”,第二種是“靜態(tài)覆蓋”。
-
動(dòng)態(tài)分區(qū)覆蓋
動(dòng)態(tài)覆蓋會(huì)全量將原有數(shù)據(jù)覆蓋,并將新插入的數(shù)據(jù)根據(jù)Iceberg表分區(qū)規(guī)則自動(dòng)分區(qū),類(lèi)似Hive中的動(dòng)態(tài)分區(qū)。 -
靜態(tài)分區(qū)覆蓋
靜態(tài)覆蓋需要在向Iceberg中插入數(shù)據(jù)時(shí)需要手動(dòng)指定分區(qū),如果當(dāng)前Iceberg表存在這個(gè)分區(qū),那么只有這個(gè)分區(qū)的數(shù)據(jù)會(huì)被覆蓋,其他分區(qū)數(shù)據(jù)不受影響,如果Iceberg表不存在這個(gè)分區(qū),那么相當(dāng)于給Iceberg表增加了個(gè)一個(gè)分區(qū)。 -
示例
- 創(chuàng)建三張表并插入數(shù)據(jù)
創(chuàng)建test1分區(qū)表、test2普通表、test3普通表三張表,并插入數(shù)據(jù),每張表字段相同,但是插入數(shù)據(jù)不同。
-- test1為分區(qū)表
create table hadoop_prod.default.test1 (id int,name string,loc string)
using iceberg
partitioned by (loc);-- 插入數(shù)據(jù)
insert into hadoop_prod.default.test1 values (1,"zs","beijing"),(2,"ls","shanghai");
-- test2為普通無(wú)分區(qū)表
create table hadoop_prod.default.test2 (id int,name string,loc string)
using iceberg;
-- 插入數(shù)據(jù)
insert into hadoop_prod.default.test2 values (10,"x1","shandong"),(11,"x2","hunan");
-- test3為普通無(wú)分區(qū)表
create table hadoop_prod.default.test3 (id int,name string,loc string)
using iceberg;
-- 插入數(shù)據(jù)
insert into hadoop_prod.default.test3 values (3,"ww","beijing"),(4,"ml","shanghai"),(5,"tq","guangzhou");
- 使用insert overwrite 讀取test3表中的數(shù)據(jù)覆蓋到test2表中
-- 使用insert overwrite 讀取test3 表中的數(shù)據(jù)覆蓋到test2 普通表中
insert overwrite hadoop_prod.default.test2 select id,name,loc from hadoop_prod.default.test3;
-- 查詢(xún)test2表數(shù)據(jù)
select * from hadoop_prod.default.test2;
此時(shí)test2表中的結(jié)果如下:
說(shuō)明此時(shí)insert overwrite操作是把test2表的數(shù)據(jù)全部刪除,然后把test3表的所有數(shù)據(jù)插入到test2表。
- 使用insert overwrite 讀取test3表數(shù)據(jù),動(dòng)態(tài)分區(qū)方式覆蓋到表test1
-- 使用insert overwrite 讀取test3表數(shù)據(jù) 動(dòng)態(tài)分區(qū)方式覆蓋到表 test1
insert overwrite hadoop_prod.default.test1 select id,name,loc from hadoop_prod.default.test3;
-- 查詢(xún) test1 表數(shù)據(jù)
select * from hadoop_prod.default.test1;
此時(shí)test1表中的數(shù)據(jù)如下:
說(shuō)明此時(shí)insert overwrite操作是把test1表的數(shù)據(jù)全部刪除,然后把test3表的所有數(shù)據(jù)插入到test1表,并且分區(qū)字段loc按照動(dòng)態(tài)分區(qū)的方式進(jìn)行分區(qū)。
- 靜態(tài)分區(qū)方式,將iceberg表test3的數(shù)據(jù)覆蓋到Iceberg表test1中
這里可以將test1表刪除,然后重新創(chuàng)建,加載數(shù)據(jù),也可以直接讀取test3中的數(shù)據(jù)靜態(tài)分區(qū)方式更新到test1。另外,使用insert overwrite 語(yǔ)法覆蓋靜態(tài)分區(qū)方式時(shí),查詢(xún)的語(yǔ)句中就不要再次寫(xiě)入分區(qū)列,否則會(huì)重復(fù)。
-- 刪除表test1,重新創(chuàng)建表test1 分區(qū)表,并插入數(shù)據(jù)
drop table hadoop_prod.default.test1;
-- 重建test1分區(qū)表
create table hadoop_prod.default.test1 (id int,name string,loc string) using iceberg partitioned by (loc);
-- 插入數(shù)據(jù)
insert into hadoop_prod.default.test1 values (1,"zs","beijing"),(2,"ls","shanghai");
-- 查詢(xún)test1表數(shù)據(jù)
select * from hadoop_prod.default.test1;
-- 注意:指定靜態(tài)分區(qū)"jiangsu",靜態(tài)分區(qū)下,就不要在查詢(xún) “l(fā)oc" 列了,否則重復(fù)
insert overwrite hadoop_prod.default.test1 partition (loc = "jiangsu") select id,name from hadoop_prod.default.test3;
-- 查詢(xún) test1 表數(shù)據(jù)
select * from hadoop_prod.default.test1;
此時(shí)test1表的數(shù)據(jù)如下:
我們可以看到test1表原來(lái)沒(méi)有jiangsu分區(qū),采用靜態(tài)分區(qū)指定jiangsu分區(qū)的時(shí)候,并不影響非jiangsu的數(shù)據(jù),只是從test3中讀取所有數(shù)據(jù),并存放到loc=jiangsu這個(gè)分區(qū)目錄下。
注意
:使用insert overwrite 讀取test3表數(shù)據(jù) 靜態(tài)分區(qū)方式覆蓋到表 test1,表中其他分區(qū)數(shù)據(jù)不受影響,只會(huì)覆蓋指定的靜態(tài)分區(qū)數(shù)據(jù)。
至此,我相信我們已經(jīng)完全掌握了merge into的用法。
DELETE FROM
Spark3.x版本之后支持"Delete from"可以根據(jù)指定的where條件來(lái)刪除表中數(shù)據(jù)。如果where條件匹配Iceberg表一個(gè)分區(qū)的數(shù)據(jù),Iceberg僅會(huì)修改元數(shù)據(jù),如果where條件匹配的表的單個(gè)行,則Iceberg會(huì)只重寫(xiě)受影響行所在的data files。
-- 創(chuàng)建表 delete_tbl ,并加載數(shù)據(jù)
create table hadoop_prod.default.delete_tbl (id int,name string,age int) using iceberg;
insert into hadoop_prod.default.delete_tbl values (1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tq",22),(6,"gb",23);
-- 根據(jù)條件范圍刪除表 delete_tbl 中的數(shù)據(jù)
delete from hadoop_prod.default.delete_tbl where id >3 and id <6;
-- 查詢(xún)數(shù)據(jù)
select * from hadoop_prod.default.delete_tbl;
刪除了id大于3和小于6之間的所有數(shù)據(jù):
-- 根據(jù)條件刪除表 delete_tbl 中的一條數(shù)據(jù)
delete from hadoop_prod.default.delete_tbl where id = 2;
-- 查詢(xún)數(shù)據(jù)
select * from hadoop_prod.default.delete_tbl;
刪除了id=2的數(shù)據(jù):
刪除操作和其它數(shù)據(jù)庫(kù)完全一樣,操作很簡(jiǎn)單,但是得理解底層刪除數(shù)據(jù)的原理。
UPDATE
Spark3.x+版本支持了update更新數(shù)據(jù)操作,可以根據(jù)匹配的條件進(jìn)行數(shù)據(jù)更新操作。
-- 創(chuàng)建表 update_tbl ,并加載數(shù)據(jù)
create table hadoop_prod.default.update_tbl (id int,name string,age int) using iceberg;
-- 插入數(shù)據(jù)
insert into hadoop_prod.default.update_tbl values (1,"zs",18),(2,"ls",19),(3,"ww",20),(4,"ml",21),(5,"tq",22),(6,"gb",23);
insert into hadoop_prod.default.update_tbl values (1,“zs”,18),(2,“l(fā)s”,19),(3,“ww”,20),(4,“ml”,21),(5,“tq”,22),(6,“gb”,23),操作如下:
-- 更新 delete_tbl 表
update hadoop_prod.default.update_tbl set name = 'zhangsan' ,age = 30 where id <=3;
-- 查詢(xún)數(shù)據(jù)
select * from hadoop_prod.default.update_tbl;
把id小于等于3的,name全部改成zhangshan,age全部改成30:
update操作和其它數(shù)據(jù)庫(kù)一模一樣,非常簡(jiǎn)單。
注意:UPDATE 更加專(zhuān)注于單一記錄的修改,而 MERGE INTO 則是一個(gè)更全面的操作,可以同時(shí)處理多個(gè)數(shù)據(jù)狀態(tài)的變化。因此一些復(fù)雜的操作直接用MERGE INTO,比如:
- 同步外部數(shù)據(jù)源:如果你有一個(gè)外部數(shù)據(jù)庫(kù)系統(tǒng),你可能希望定期將更改(包括插入、更新和刪除)同步到你的數(shù)據(jù)湖中的表。MERGE INTO 可以用來(lái)比較兩個(gè)表,并根據(jù)匹配條件執(zhí)行更新,對(duì)于沒(méi)有匹配記錄的新數(shù)據(jù)則執(zhí)行插入。
- 數(shù)據(jù)集成:當(dāng)需要合并多個(gè)來(lái)源的數(shù)據(jù)到一個(gè)目標(biāo)表中時(shí),MERGE INTO 可以有效地處理這種情況。它可以檢查數(shù)據(jù)是否已經(jīng)存在,并決定是更新還是添加新的記錄。
- 高效的數(shù)據(jù)處理:在處理大量數(shù)據(jù)時(shí),MERGE INTO 可以減少數(shù)據(jù)處理的時(shí)間,因?yàn)樗恍枰淮尾僮骶涂梢酝瓿筛潞筒迦搿?/li>
參考文獻(xiàn)
Spark Write
https://bbs.huaweicloud.com/blogs/364273