頁游和做網(wǎng)站南寧seo排名優(yōu)化
一 Flink的核心組件
1.1 client
1.將數(shù)據(jù)流程圖DataFlow發(fā)送給JobManager。
1.2 JobManager
1.收集client的DataFlow圖,將圖分解成一個個的task任務(wù),并返回狀態(tài)更新數(shù)據(jù)給client
2.JobManager負(fù)責(zé)作業(yè)調(diào)度,收集TaskManager的Heartbeat和統(tǒng)計信息。
1.3 TaskManager
1.將每一個task任務(wù)放到一個TaskSlot槽中
2.TaskManager 之間以流的形式進行數(shù)據(jù)的傳輸。
二 Flink的集群搭建
2.1 獨立集群
2.1.1?上傳解壓配置環(huán)境變量
1.解壓?tar -zxvf flink-1.15.2-bin-scala_2.12.tgz -C ../
2.配置環(huán)境變量
# 配置環(huán)境變量
vim /etc/profileexport FLINK_HOME=/usr/local/soft/flink-1.15.2
export PATH=$PATH:$FLINK_HOME/binsource /etc/profile
2.1.2 修改配置文件
1.修改flink-conf.yaml
jobmanager.rpc.address: master
jobmanager.bind-host: 0.0.0.0
taskmanager.bind-host: 0.0.0.0
taskmanager.host: localhost # noe1和node2需要單獨修改
taskmanager.numberOfTaskSlots: 4
rest.address: master
rest.bind-address: 0.0.0.0
2.修改masters
master:8081
3.修改workers
node1
node2
2.1.3 同步到所有節(jié)點
1.同步
scp -r flink-1.15.2 node1:`pwd`
scp -r flink-1.15.2 node2:`pwd`
2.修改子節(jié)點的flink-conf.yaml文件中的taskmanager.host
taskmanager.host: node1
taskmanager.host: node2
2.1.4 啟動與關(guān)閉集群
1.啟動
start-cluster.sh
2.看是否成功,打開web ui界面
http://master:8081
3.關(guān)閉集群
stop-cluster.sh
2.1.5 提交任務(wù)
1.將代碼打包到服務(wù)器中提交
1.啟動命令
flink run -c com.shujia.flink.core.Demo1StreamWordCount flink-1.0.jar
com.shujia.flink.core.Demo1StreamWordCount:主類名
flink-1.0.jar:jar包名
2.查看web界面
3.查看結(jié)果
4.關(guān)閉任務(wù)
?2.web界面提交任務(wù)
1.提交
2.相關(guān)配置
2.2?Flink on Yarn
2.2.1 整合
1.在環(huán)境變量中配置HADOOP_CLASSSPATH
vim /etc/profileexport HADOOP_CLASSPATH=`hadoop classpath`source /etc/profile
?2.2.2?Application Mode
1、將任務(wù)提交到y(tǒng)arn上運行,yarn會為每一個flink地任務(wù)啟動一個jobmanager和一個或者多個taskmanasger
2、代碼main函數(shù)不再本地運行,dataFlow不再本地構(gòu)建,如果代碼報錯在本地看不到詳細(xì)地錯誤日志
?1.啟動命令
flink run-application -t yarn-application -c com.shujia.flink.core.Demo1StreamWordCount flink-1.0.jar
flink run-application -t yarn-application -c:任務(wù)命令名
com.shujia.flink.core.Demo1StreamWordCount:主類名
flink-1.0.jar:jar包名
2.查看界面
點擊這個,直接跳轉(zhuǎn)到Flink的web界面
2.2.3 Per-Job Cluster Mode
1、將任務(wù)提交到y(tǒng)arn上運行,yarn會為每一個flink地任務(wù)啟動一個jobmanager和一個或者多個taskmanasger
2、代碼地main函數(shù)在本地啟動,在本地構(gòu)建dataflow,再將dataflow提交給jobmanager,如果代碼報錯再本地可以爛到部分錯誤日志
1.啟動命令
flink run -t yarn-per-job -c com.shujia.flink.core.Demo1StreamWordCount flink-1.0.jar
flink run -t yarn-per-job -c:命令名
com.shujia.flink.core.Demo1StreamWordCount:主類名
flink-1.0.jar:jar包名
2.界面跟Application Mode一樣
2.3.4 Session Mode
1、先再yarn中啟動一個jobmanager, 不啟動taskmanager
2、提交任務(wù)地時候再動態(tài)申請taskmanager
3、所有使用session模式提交的任務(wù)共享同一個jobmanager
4、類似獨立集群,只是集群在yarn中啟動了,可以動態(tài)申請資源
5、一般用于測試
1.先啟動會話集群
yarn-session.sh -d
啟動過后出現(xiàn)這個,一個是任務(wù)編碼application_1717379968853_0003
另一個是web界面,復(fù)制可以打開
2.提交任務(wù)
命令提交:
flink run -t yarn-session -Dyarn.application.id=application_1717379968853_0003 -c com.shujia.flink.core.Demo1StreamWordCount flink-1.0.jar
Dyarn.application.id=application_1717379968853_0003:這個是啟動會話集群給的
com.shujia.flink.core.Demo1StreamWordCount:主類名
flink-1.0.jar:jar包名
web界面提交:跟Application Mode的web提交一模一樣
三 并行度
3.1 設(shè)置并行度
3.1.1 代碼中設(shè)置
1.代碼中不設(shè)置,默認(rèn)的并行度數(shù)量是配置文件里面的
2.代碼中配置
env.setParallelism(2)
3.1.2 提交任務(wù)中設(shè)置
1.加一個參數(shù) -p 并行度數(shù)量
例如:
flink run -t yarn-session -p 3 -Dyarn.application.id=application_1717379968853_0003 -c com.shujia.flink.core.Demo1StreamWordCount flink-1.0.jar
2.或者在ui界面中設(shè)置
3.1.3 配置文件中設(shè)置
1.這個一般不用
在flink-conf.yaml修改配置
3.1.4 每一個算子單獨設(shè)置
在代碼中使用算子時候后面可以設(shè)置并行度,但是這種不用
3.1.4 優(yōu)先級
代碼>提交任務(wù)中配置>配置文件
3.2 共享資源
?1、flink需要資源的數(shù)量和task數(shù)量無關(guān)
?2、一個并行度對應(yīng)一個資源(slot)
?3、上游task的下游task共享同一個資源
3.3 并行度設(shè)置原則
1.實時計算的任務(wù)并行度取決于數(shù)據(jù)的吞吐量
?2、聚合計算(有shuffle)的代碼一個并行度大概一秒可以處理10000條數(shù)據(jù)左右
?3、非聚合計算是,一個并行度大概一秒可以處理10萬條左右
四 事件時間
4.1 event time
數(shù)據(jù)產(chǎn)生的時間,數(shù)據(jù)中有一個時間字段,使用數(shù)據(jù)的時間字段觸發(fā)計算,代替真實的時間,可以反應(yīng)數(shù)據(jù)真實發(fā)生的順序,計算更有意義
4.1.1 數(shù)據(jù)時間無亂序
1.解析數(shù)據(jù),分析哪個數(shù)據(jù)是數(shù)據(jù)時間
2.指定時間字段
forMonotonousTimestamps():單調(diào)遞增。數(shù)據(jù)時間只能是往上增的
tsDS.assignTimestampsAndWatermarks(WatermarkStrategy
//指定水位線生產(chǎn)策略,水位線等于最新一條數(shù)據(jù)的時間戳,如果數(shù)據(jù)亂序可能會丟失數(shù)據(jù)
.<Tuple2<String, Long>>forMonotonousTimestamps()
//指定時間字段
.withTimestampAssigner((event, ts) -> event.f1));
2.完整代碼如下?
package com.shujia.flink.core;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.time.Duration;public class Demo5EventTime {public static void main(String[] args)throws Exception {/** 事件時間:數(shù)據(jù)中有一個時間字段,使用數(shù)據(jù)的時間字段觸發(fā)計算,代替真實的時間,可以反應(yīng)數(shù)據(jù)真實發(fā)生的順序,計算更有意義*//*java,1717395300000java,1717395301000java,1717395302000java,1717395303000java,1717395304000java,1717395305000*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> linesDS = env.socketTextStream("master", 8888);//解析數(shù)據(jù)DataStream<Tuple2<String, Long>> tsDS = linesDS.map(line -> {String[] split = line.split(",");String word = split[0];long ts = Long.parseLong(split[1]);return Tuple2.of(word, ts);}, Types.TUPLE(Types.STRING, Types.LONG));/** 指定時間字段和水位線生成策略*/DataStream<Tuple2<String, Long>> assDS = tsDS.assignTimestampsAndWatermarks(WatermarkStrategy//指定水位線生產(chǎn)策略,水位線等于最新一條數(shù)據(jù)的時間戳,如果數(shù)據(jù)亂序可能會丟失數(shù)據(jù).<Tuple2<String, Long>>forMonotonousTimestamps()//指定時間字段.withTimestampAssigner((event, ts) -> event.f1));/**每隔5秒統(tǒng)計單詞的數(shù)量*/DataStream<Tuple2<String, Integer>> kvDS = assDS.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);//TumblingEventTimeWindows:滾動的事件時間窗口WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS.window(TumblingEventTimeWindows.of(Time.seconds(5)));windowDS.sum(1).print();env.execute();}
}
3.結(jié)果分析
上面代碼是以5秒作為一個滾動的事件時間窗口。不包括第五秒,左閉右開。
窗口的觸發(fā)條件:水位線大于等于窗口的結(jié)束時間;窗口內(nèi)有數(shù)據(jù)
水位線:等于最新一條數(shù)據(jù)的時間戳
比如說0-5-10-15-20.0-5是一個窗口,5-10是一個窗口,且窗口里面有數(shù)據(jù)才能被計算,如果這個窗口里面出現(xiàn)了不存在這個時間的事件,則不會被處理
輸入的事件時間是亂序的,他丟失第四次輸出的。
?4.1.2 數(shù)據(jù)時間亂序
1.水位線前移,使用forBoundedOutOfOrderness里面?zhèn)魅肭耙频臅r間
tsDS.assignTimestampsAndWatermarks(WatermarkStrategy
//水位線前移時間(數(shù)據(jù)最大亂序時間)
.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
//指定時間字段
.withTimestampAssigner((event, ts) -> event.f1));
2.完整代碼
package com.shujia.flink.core;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.time.Duration;public class Demo5EventTime {public static void main(String[] args)throws Exception {/** 事件時間:數(shù)據(jù)中有一個時間字段,使用數(shù)據(jù)的時間字段觸發(fā)計算,代替真實的時間,可以反應(yīng)數(shù)據(jù)真實發(fā)生的順序,計算更有意義*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/*java,1717395301000java,1717395302000java,1717395303000java,1717395304000java,1717395305000java,1717395307000java,1717395308000java,1717395311000java,1717395313000java,1717395315000*/env.setParallelism(1);DataStream<String> linesDS = env.socketTextStream("master", 8888);//解析數(shù)據(jù)DataStream<Tuple2<String, Long>> tsDS = linesDS.map(line -> {String[] split = line.split(",");String word = split[0];long ts = Long.parseLong(split[1]);return Tuple2.of(word, ts);}, Types.TUPLE(Types.STRING, Types.LONG));/** 指定時間字段和水位線生成策略*/DataStream<Tuple2<String, Long>> assDS = tsDS.assignTimestampsAndWatermarks(WatermarkStrategy//指定水位線生產(chǎn)策略,水位線等于最新一條數(shù)據(jù)的時間戳,如果數(shù)據(jù)亂序可能會丟失數(shù)據(jù)
// .<Tuple2<String, Long>>forMonotonousTimestamps()//水位線前移時間(數(shù)據(jù)最大亂序時間).<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))//指定時間字段.withTimestampAssigner((event, ts) -> event.f1));/**每隔5秒統(tǒng)計單詞的數(shù)量*/DataStream<Tuple2<String, Integer>> kvDS = assDS.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);//TumblingEventTimeWindows:滾動的事件時間窗口WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS.window(TumblingEventTimeWindows.of(Time.seconds(5)));windowDS.sum(1).print();env.execute();}
}
3.結(jié)果分析
我輸入的如圖所示,我代碼設(shè)置了水位線前移5秒中,所以觸發(fā)時間是10秒才觸發(fā)任務(wù),0-10秒里有4個0-5里面的數(shù)據(jù),所以輸出了4.為什么14000沒有輸出,因為14-5=9,他還沒有到下一階段的水位線。我再輸出了16秒的,他就有結(jié)果了。
4.1.3 水位線對齊
1.當(dāng)上游有多個task時,下游task會取上游task水位線的最小值,如果數(shù)據(jù)量小。水位線就很難對齊,窗口就不會觸發(fā)計算。故要設(shè)置并行度,提前把task設(shè)定好。
2.如果不設(shè)置并行度,可能要輸出很多事件才能觸發(fā)計算。
4.2 processing time
1.處理時間:真實時間
2.這個代碼是設(shè)置了滾動的處理時間窗口嗎,每現(xiàn)實時間5秒中處理一下數(shù)據(jù)
package com.shujia.flink.core;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class Demo4ProcTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> wordsDS = env.socketTextStream("master", 8888);//轉(zhuǎn)換成kvDataStream<Tuple2<String, Integer>> kvDS = wordsDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));//按照單詞分組KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);//劃分窗口//TumblingProcessingTimeWindows:滾動的處理時間窗口WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));//統(tǒng)計單詞的數(shù)量DataStream<Tuple2<String, Integer>> countDS = windowDS.sum(1);countDS.print();env.execute();}
}
五 窗口
5.1 time window
1.時間窗口有四種:
SlidingEventTimeWindows:滑動的事件時間窗口
SlidingProcessingTimeWindows: 滑動的處理時間窗口?
TumblingEventTimeWindows:滾動的事件時間窗口
TumblingProcessingTimeWindows:滾動的處理時間窗口
2.滑動事件需要設(shè)置2個時間,一個設(shè)置窗口的大小,另一個是滾動的時間
package com.shujia.flink.window;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.time.Duration;public class Demo1TimeWindow {public static void main(String[] args)throws Exception {/** 事件時間:數(shù)據(jù)中有一個時間字段,使用數(shù)據(jù)的時間字段觸發(fā)計算,代替真實的時間,可以反應(yīng)數(shù)據(jù)真實發(fā)生的順序,計算更有意義*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/*java,1717395301000java,1717395302000java,1717395303000java,1717395304000java,1717395305000java,1717395307000java,1717395308000java,1717395311000java,1717395313000java,1717395315000*//**水位線對齊* 1、當(dāng)上游有多個task時,下游task會取上游task水位線的最小值,如果數(shù)據(jù)量小。水位線就很難對齊,窗口就不會觸發(fā)計算*/env.setParallelism(1);DataStream<String> linesDS = env.socketTextStream("master", 8888);//解析數(shù)據(jù)DataStream<Tuple2<String, Long>> tsDS = linesDS.map(line -> {String[] split = line.split(",");String word = split[0];long ts = Long.parseLong(split[1]);return Tuple2.of(word, ts);}, Types.TUPLE(Types.STRING, Types.LONG));/** 指定時間字段和水位線生成策略*/DataStream<Tuple2<String, Long>> assDS = tsDS.assignTimestampsAndWatermarks(WatermarkStrategy//指定水位線生產(chǎn)策略,水位線等于最新一條數(shù)據(jù)的時間戳,如果數(shù)據(jù)亂序可能會丟失數(shù)據(jù)
// .<Tuple2<String, Long>>forMonotonousTimestamps()//水位線前移時間(數(shù)據(jù)最大亂序時間).<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))//指定時間字段.withTimestampAssigner((event, ts) -> event.f1));/**每隔5秒統(tǒng)計單詞的數(shù)量*/DataStream<Tuple2<String, Integer>> kvDS = assDS.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);/** SlidingEventTimeWindows:滑動的事件時間窗口* SlidingProcessingTimeWindows: 滑動的處理時間窗口* TumblingEventTimeWindows:滾動的事件時間窗口* TumblingProcessingTimeWindows:滾動的處理時間窗口* 滑動的時間窗口需要設(shè)置兩個時間,第一個是窗口的大小,第二個是記錄的時間,* 比如說(15,5),這是每5秒計算最近15秒內(nèi)的數(shù)據(jù)*/WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS.window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(5)));windowDS.sum(1).print();env.execute();}
}
這個代碼用的是滑動的事件時間窗口,我設(shè)置了每5秒鐘計算最近15秒內(nèi)的數(shù)據(jù)
5.2 count time
1.滾動的統(tǒng)計窗口:每個key隔多少數(shù)據(jù)計算一次
package com.shujia.flink.window;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;public class Demo2CountWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);DataStream<Tuple2<String, Integer>> kvDS = linesDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);/** 統(tǒng)計窗口* countWindow(10):滾動的統(tǒng)計窗口, 每個key每隔10條數(shù)據(jù)計算一次* countWindow(10, 2): 滑動的統(tǒng)計窗口,每隔兩條數(shù)據(jù)計算最近10條數(shù)據(jù)*/WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> countWindowDS = keyByDS.countWindow(10, 2);countWindowDS.sum(1).print();env.execute();}
}
?2.滑動的統(tǒng)計窗口:每隔多少數(shù)據(jù)計算最近的多少條數(shù)據(jù)
package com.shujia.flink.window;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;public class Demo2CountWindow {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);DataStream<Tuple2<String, Integer>> mapDS = linesDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyBy = mapDS.keyBy(kv -> kv.f0);WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> countWindow = keyBy.countWindow(10,2);countWindow.sum(1).print();env.execute();}
}
5.3 session time
1.處理時間的會話窗口ProcessingTimeSessionWindows:對一個key,10秒內(nèi)沒有下一步數(shù)據(jù)開始計算。比如說我輸入了 a*7次,然后等10秒輸出結(jié)果是(a,7)。我再輸入a*6次加一個aa,那么輸出結(jié)果是(aa,1)與(a,6).
package com.shujia.flink.window;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class Demo3SessionWindow {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);DataStream<Tuple2<String, Integer>> mapDS = linesDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyBy = mapDS.keyBy(kv -> kv.f0);WindowedStream<Tuple2<String, Integer>, String, TimeWindow> window = keyBy.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));window.sum(1).print();env.execute();}
}
2.事件時間的會話窗口EventTimeSessionWindows:根據(jù)數(shù)據(jù)的時間,對應(yīng)同一個key,10秒內(nèi)沒有下一步數(shù)據(jù)開始計算
這個不常用
package com.shujia.flink.window;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.time.Duration;public class Demo4EventTimeSessionWindow {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/*
java,1685433130000
java,1685433131000
java,1685433132000
java,1685433134000
java,1685433135000
java,1685433137000
java,1685433139000
java,1685433149000
java,1685433155000
java,1685433170000*/env.setParallelism(1);DataStream<String> linesDS = env.socketTextStream("master", 8888);//解析數(shù)據(jù)DataStream<Tuple2<String, Long>> tsDS = linesDS.map(line -> {String[] split = line.split(",");String word = split[0];long ts = Long.parseLong(split[1]);return Tuple2.of(word, ts);}, Types.TUPLE(Types.STRING, Types.LONG));/** 指定時間字段和水位線生成策略*/DataStream<Tuple2<String, Long>> assDS = tsDS.assignTimestampsAndWatermarks(WatermarkStrategy//水位線前移時間(數(shù)據(jù)最大亂序時間).<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))//指定時間字段.withTimestampAssigner((event, ts) -> event.f1));/**每隔5秒統(tǒng)計單詞的數(shù)量*/DataStream<Tuple2<String, Integer>> kvDS = assDS.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);/** EventTimeSessionWindows:事件時間的會話窗口*/WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS.window(EventTimeSessionWindows.withGap(Time.seconds(10)));windowDS.sum(1).print();env.execute(); }
}
5.4 process與窗口結(jié)合
1.設(shè)置了窗口過后的DS后面用process算子,他里面?zhèn)魅氲氖菍崿F(xiàn)ProcessWindowFunction中的抽象方法process的對象,這個抽象類里面?zhèn)鞯氖?個參數(shù)(IN, OUT, KEY, W),輸入的類型,輸出的類型,key的類型,以及窗口類型。窗口類型是三大窗口的其中之一。
2.process方法里面,第一個參數(shù)是key,第二個參數(shù)是flink的環(huán)境連接對象。第三個參數(shù)是kv的鍵值對,第四個參數(shù)是發(fā)送的對象
代碼如下
package com.shujia.flink.window;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class Demo5WindowProcess {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = linesDS.map(line -> {String[] lines = line.split(",");String clazz = lines[4];int age = Integer.parseInt(lines[2]);return Tuple2.of(clazz, age);}, Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyBy = kvDS.keyBy(kv -> kv.f0);WindowedStream<Tuple2<String, Integer>, String, TimeWindow> window = keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));DataStream<Tuple2<String, Double>> process = window.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Double>, String, TimeWindow>() {@Overridepublic void process(String clazz,ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Double>, String, TimeWindow>.Context context,Iterable<Tuple2<String, Integer>> elements,Collector<Tuple2<String, Double>> out) throws Exception {double sum_age = 0;int num = 0;for (Tuple2<String, Integer> element : elements) {sum_age += element.f1;num++;}double avg_age = sum_age / num;out.collect(Tuple2.of(clazz, avg_age));}});process.print();env.execute();}
}