中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當(dāng)前位置: 首頁 > news >正文

頁游和做網(wǎng)站南寧seo排名優(yōu)化

頁游和做網(wǎng)站,南寧seo排名優(yōu)化,怎么用百度云做網(wǎng)站空間,網(wǎng)站流量提升方案一 Flink的核心組件 1.1 client 1.將數(shù)據(jù)流程圖DataFlow發(fā)送給JobManager。 1.2 JobManager 1.收集client的DataFlow圖,將圖分解成一個個的task任務(wù),并返回狀態(tài)更新數(shù)據(jù)給client 2.JobManager負(fù)責(zé)作業(yè)調(diào)度,收集TaskManager的Heartbeat和…

一 Flink的核心組件

1.1 client

1.將數(shù)據(jù)流程圖DataFlow發(fā)送給JobManager。

1.2 JobManager

1.收集client的DataFlow圖,將圖分解成一個個的task任務(wù),并返回狀態(tài)更新數(shù)據(jù)給client

2.JobManager負(fù)責(zé)作業(yè)調(diào)度,收集TaskManager的Heartbeat和統(tǒng)計信息。

1.3 TaskManager

1.將每一個task任務(wù)放到一個TaskSlot槽中

2.TaskManager 之間以流的形式進行數(shù)據(jù)的傳輸。

二 Flink的集群搭建

2.1 獨立集群

2.1.1?上傳解壓配置環(huán)境變量

1.解壓?tar -zxvf flink-1.15.2-bin-scala_2.12.tgz -C ../

2.配置環(huán)境變量

# 配置環(huán)境變量
vim /etc/profileexport FLINK_HOME=/usr/local/soft/flink-1.15.2
export PATH=$PATH:$FLINK_HOME/binsource /etc/profile

2.1.2 修改配置文件

1.修改flink-conf.yaml

jobmanager.rpc.address: master
jobmanager.bind-host: 0.0.0.0
taskmanager.bind-host: 0.0.0.0
taskmanager.host: localhost # noe1和node2需要單獨修改
taskmanager.numberOfTaskSlots: 4
rest.address: master
rest.bind-address: 0.0.0.0

2.修改masters

master:8081

3.修改workers

node1
node2

2.1.3 同步到所有節(jié)點

1.同步

scp -r flink-1.15.2 node1:`pwd`
scp -r flink-1.15.2 node2:`pwd`

2.修改子節(jié)點的flink-conf.yaml文件中的taskmanager.host

taskmanager.host: node1
taskmanager.host: node2

2.1.4 啟動與關(guān)閉集群

1.啟動

start-cluster.sh

2.看是否成功,打開web ui界面

http://master:8081

3.關(guān)閉集群

stop-cluster.sh

2.1.5 提交任務(wù)

1.將代碼打包到服務(wù)器中提交

1.啟動命令

flink run -c com.shujia.flink.core.Demo1StreamWordCount flink-1.0.jar

com.shujia.flink.core.Demo1StreamWordCount:主類名

flink-1.0.jar:jar包名

2.查看web界面

3.查看結(jié)果

4.關(guān)閉任務(wù)

?2.web界面提交任務(wù)

1.提交

2.相關(guān)配置

2.2?Flink on Yarn

2.2.1 整合

1.在環(huán)境變量中配置HADOOP_CLASSSPATH

vim /etc/profileexport HADOOP_CLASSPATH=`hadoop classpath`source /etc/profile

?2.2.2?Application Mode

1、將任務(wù)提交到y(tǒng)arn上運行,yarn會為每一個flink地任務(wù)啟動一個jobmanager和一個或者多個taskmanasger

2、代碼main函數(shù)不再本地運行,dataFlow不再本地構(gòu)建,如果代碼報錯在本地看不到詳細(xì)地錯誤日志

?1.啟動命令

flink run-application -t yarn-application -c com.shujia.flink.core.Demo1StreamWordCount flink-1.0.jar

flink run-application -t yarn-application -c:任務(wù)命令名

com.shujia.flink.core.Demo1StreamWordCount:主類名

flink-1.0.jar:jar包名

2.查看界面

點擊這個,直接跳轉(zhuǎn)到Flink的web界面

2.2.3 Per-Job Cluster Mode

1、將任務(wù)提交到y(tǒng)arn上運行,yarn會為每一個flink地任務(wù)啟動一個jobmanager和一個或者多個taskmanasger

2、代碼地main函數(shù)在本地啟動,在本地構(gòu)建dataflow,再將dataflow提交給jobmanager,如果代碼報錯再本地可以爛到部分錯誤日志

1.啟動命令

flink run -t yarn-per-job -c com.shujia.flink.core.Demo1StreamWordCount flink-1.0.jar

flink run -t yarn-per-job -c:命令名

com.shujia.flink.core.Demo1StreamWordCount:主類名

flink-1.0.jar:jar包名

2.界面跟Application Mode一樣

2.3.4 Session Mode

1、先再yarn中啟動一個jobmanager, 不啟動taskmanager

2、提交任務(wù)地時候再動態(tài)申請taskmanager

3、所有使用session模式提交的任務(wù)共享同一個jobmanager

4、類似獨立集群,只是集群在yarn中啟動了,可以動態(tài)申請資源

5、一般用于測試

1.先啟動會話集群

yarn-session.sh -d

啟動過后出現(xiàn)這個,一個是任務(wù)編碼application_1717379968853_0003

另一個是web界面,復(fù)制可以打開

2.提交任務(wù)

命令提交

flink run -t yarn-session -Dyarn.application.id=application_1717379968853_0003 -c com.shujia.flink.core.Demo1StreamWordCount flink-1.0.jar

Dyarn.application.id=application_1717379968853_0003:這個是啟動會話集群給的

com.shujia.flink.core.Demo1StreamWordCount:主類名

flink-1.0.jar:jar包名

web界面提交:跟Application Mode的web提交一模一樣

三 并行度

3.1 設(shè)置并行度

3.1.1 代碼中設(shè)置

1.代碼中不設(shè)置,默認(rèn)的并行度數(shù)量是配置文件里面的

2.代碼中配置

env.setParallelism(2)

3.1.2 提交任務(wù)中設(shè)置

1.加一個參數(shù) -p 并行度數(shù)量

例如:

flink run -t yarn-session -p 3 -Dyarn.application.id=application_1717379968853_0003 -c com.shujia.flink.core.Demo1StreamWordCount flink-1.0.jar

2.或者在ui界面中設(shè)置

3.1.3 配置文件中設(shè)置

1.這個一般不用

在flink-conf.yaml修改配置

3.1.4 每一個算子單獨設(shè)置

在代碼中使用算子時候后面可以設(shè)置并行度,但是這種不用

3.1.4 優(yōu)先級

代碼>提交任務(wù)中配置>配置文件

3.2 共享資源

?1、flink需要資源的數(shù)量和task數(shù)量無關(guān)

?2、一個并行度對應(yīng)一個資源(slot)

?3、上游task的下游task共享同一個資源

3.3 并行度設(shè)置原則

1.實時計算的任務(wù)并行度取決于數(shù)據(jù)的吞吐量

?2、聚合計算(有shuffle)的代碼一個并行度大概一秒可以處理10000條數(shù)據(jù)左右

?3、非聚合計算是,一個并行度大概一秒可以處理10萬條左右

四 事件時間

4.1 event time

數(shù)據(jù)產(chǎn)生的時間,數(shù)據(jù)中有一個時間字段,使用數(shù)據(jù)的時間字段觸發(fā)計算,代替真實的時間,可以反應(yīng)數(shù)據(jù)真實發(fā)生的順序,計算更有意義

4.1.1 數(shù)據(jù)時間無亂序

1.解析數(shù)據(jù),分析哪個數(shù)據(jù)是數(shù)據(jù)時間

2.指定時間字段

forMonotonousTimestamps():單調(diào)遞增。數(shù)據(jù)時間只能是往上增的

tsDS.assignTimestampsAndWatermarks(WatermarkStrategy
//指定水位線生產(chǎn)策略,水位線等于最新一條數(shù)據(jù)的時間戳,如果數(shù)據(jù)亂序可能會丟失數(shù)據(jù)
.<Tuple2<String, Long>>forMonotonousTimestamps()
//指定時間字段
.withTimestampAssigner((event, ts) -> event.f1));

2.完整代碼如下?

package com.shujia.flink.core;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.time.Duration;public class Demo5EventTime {public static void main(String[] args)throws Exception {/** 事件時間:數(shù)據(jù)中有一個時間字段,使用數(shù)據(jù)的時間字段觸發(fā)計算,代替真實的時間,可以反應(yīng)數(shù)據(jù)真實發(fā)生的順序,計算更有意義*//*java,1717395300000java,1717395301000java,1717395302000java,1717395303000java,1717395304000java,1717395305000*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<String> linesDS = env.socketTextStream("master", 8888);//解析數(shù)據(jù)DataStream<Tuple2<String, Long>> tsDS = linesDS.map(line -> {String[] split = line.split(",");String word = split[0];long ts = Long.parseLong(split[1]);return Tuple2.of(word, ts);}, Types.TUPLE(Types.STRING, Types.LONG));/** 指定時間字段和水位線生成策略*/DataStream<Tuple2<String, Long>> assDS = tsDS.assignTimestampsAndWatermarks(WatermarkStrategy//指定水位線生產(chǎn)策略,水位線等于最新一條數(shù)據(jù)的時間戳,如果數(shù)據(jù)亂序可能會丟失數(shù)據(jù).<Tuple2<String, Long>>forMonotonousTimestamps()//指定時間字段.withTimestampAssigner((event, ts) -> event.f1));/**每隔5秒統(tǒng)計單詞的數(shù)量*/DataStream<Tuple2<String, Integer>> kvDS = assDS.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);//TumblingEventTimeWindows:滾動的事件時間窗口WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS.window(TumblingEventTimeWindows.of(Time.seconds(5)));windowDS.sum(1).print();env.execute();}
}

3.結(jié)果分析

上面代碼是以5秒作為一個滾動的事件時間窗口。不包括第五秒,左閉右開。

窗口的觸發(fā)條件:水位線大于等于窗口的結(jié)束時間;窗口內(nèi)有數(shù)據(jù)

水位線:等于最新一條數(shù)據(jù)的時間戳

比如說0-5-10-15-20.0-5是一個窗口,5-10是一個窗口,且窗口里面有數(shù)據(jù)才能被計算,如果這個窗口里面出現(xiàn)了不存在這個時間的事件,則不會被處理

輸入的事件時間是亂序的,他丟失第四次輸出的。

?4.1.2 數(shù)據(jù)時間亂序

1.水位線前移,使用forBoundedOutOfOrderness里面?zhèn)魅肭耙频臅r間

tsDS.assignTimestampsAndWatermarks(WatermarkStrategy
//水位線前移時間(數(shù)據(jù)最大亂序時間)
.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
//指定時間字段
.withTimestampAssigner((event, ts) -> event.f1));

2.完整代碼

package com.shujia.flink.core;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.time.Duration;public class Demo5EventTime {public static void main(String[] args)throws Exception {/** 事件時間:數(shù)據(jù)中有一個時間字段,使用數(shù)據(jù)的時間字段觸發(fā)計算,代替真實的時間,可以反應(yīng)數(shù)據(jù)真實發(fā)生的順序,計算更有意義*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/*java,1717395301000java,1717395302000java,1717395303000java,1717395304000java,1717395305000java,1717395307000java,1717395308000java,1717395311000java,1717395313000java,1717395315000*/env.setParallelism(1);DataStream<String> linesDS = env.socketTextStream("master", 8888);//解析數(shù)據(jù)DataStream<Tuple2<String, Long>> tsDS = linesDS.map(line -> {String[] split = line.split(",");String word = split[0];long ts = Long.parseLong(split[1]);return Tuple2.of(word, ts);}, Types.TUPLE(Types.STRING, Types.LONG));/** 指定時間字段和水位線生成策略*/DataStream<Tuple2<String, Long>> assDS = tsDS.assignTimestampsAndWatermarks(WatermarkStrategy//指定水位線生產(chǎn)策略,水位線等于最新一條數(shù)據(jù)的時間戳,如果數(shù)據(jù)亂序可能會丟失數(shù)據(jù)
//                                .<Tuple2<String, Long>>forMonotonousTimestamps()//水位線前移時間(數(shù)據(jù)最大亂序時間).<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))//指定時間字段.withTimestampAssigner((event, ts) -> event.f1));/**每隔5秒統(tǒng)計單詞的數(shù)量*/DataStream<Tuple2<String, Integer>> kvDS = assDS.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);//TumblingEventTimeWindows:滾動的事件時間窗口WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS.window(TumblingEventTimeWindows.of(Time.seconds(5)));windowDS.sum(1).print();env.execute();}
}

3.結(jié)果分析

我輸入的如圖所示,我代碼設(shè)置了水位線前移5秒中,所以觸發(fā)時間是10秒才觸發(fā)任務(wù),0-10秒里有4個0-5里面的數(shù)據(jù),所以輸出了4.為什么14000沒有輸出,因為14-5=9,他還沒有到下一階段的水位線。我再輸出了16秒的,他就有結(jié)果了。

4.1.3 水位線對齊

1.當(dāng)上游有多個task時,下游task會取上游task水位線的最小值,如果數(shù)據(jù)量小。水位線就很難對齊,窗口就不會觸發(fā)計算。故要設(shè)置并行度,提前把task設(shè)定好。

2.如果不設(shè)置并行度,可能要輸出很多事件才能觸發(fā)計算。

4.2 processing time

1.處理時間:真實時間

2.這個代碼是設(shè)置了滾動的處理時間窗口嗎,每現(xiàn)實時間5秒中處理一下數(shù)據(jù)

package com.shujia.flink.core;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class Demo4ProcTime {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> wordsDS = env.socketTextStream("master", 8888);//轉(zhuǎn)換成kvDataStream<Tuple2<String, Integer>> kvDS = wordsDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));//按照單詞分組KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);//劃分窗口//TumblingProcessingTimeWindows:滾動的處理時間窗口WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));//統(tǒng)計單詞的數(shù)量DataStream<Tuple2<String, Integer>> countDS = windowDS.sum(1);countDS.print();env.execute();}
}

五 窗口

5.1 time window

1.時間窗口有四種:

SlidingEventTimeWindows:滑動的事件時間窗口

SlidingProcessingTimeWindows: 滑動的處理時間窗口?

TumblingEventTimeWindows:滾動的事件時間窗口

TumblingProcessingTimeWindows:滾動的處理時間窗口

2.滑動事件需要設(shè)置2個時間,一個設(shè)置窗口的大小,另一個是滾動的時間

package com.shujia.flink.window;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.time.Duration;public class Demo1TimeWindow {public static void main(String[] args)throws Exception {/** 事件時間:數(shù)據(jù)中有一個時間字段,使用數(shù)據(jù)的時間字段觸發(fā)計算,代替真實的時間,可以反應(yīng)數(shù)據(jù)真實發(fā)生的順序,計算更有意義*/StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/*java,1717395301000java,1717395302000java,1717395303000java,1717395304000java,1717395305000java,1717395307000java,1717395308000java,1717395311000java,1717395313000java,1717395315000*//**水位線對齊* 1、當(dāng)上游有多個task時,下游task會取上游task水位線的最小值,如果數(shù)據(jù)量小。水位線就很難對齊,窗口就不會觸發(fā)計算*/env.setParallelism(1);DataStream<String> linesDS = env.socketTextStream("master", 8888);//解析數(shù)據(jù)DataStream<Tuple2<String, Long>> tsDS = linesDS.map(line -> {String[] split = line.split(",");String word = split[0];long ts = Long.parseLong(split[1]);return Tuple2.of(word, ts);}, Types.TUPLE(Types.STRING, Types.LONG));/** 指定時間字段和水位線生成策略*/DataStream<Tuple2<String, Long>> assDS = tsDS.assignTimestampsAndWatermarks(WatermarkStrategy//指定水位線生產(chǎn)策略,水位線等于最新一條數(shù)據(jù)的時間戳,如果數(shù)據(jù)亂序可能會丟失數(shù)據(jù)
//                                .<Tuple2<String, Long>>forMonotonousTimestamps()//水位線前移時間(數(shù)據(jù)最大亂序時間).<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))//指定時間字段.withTimestampAssigner((event, ts) -> event.f1));/**每隔5秒統(tǒng)計單詞的數(shù)量*/DataStream<Tuple2<String, Integer>> kvDS = assDS.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);/** SlidingEventTimeWindows:滑動的事件時間窗口* SlidingProcessingTimeWindows: 滑動的處理時間窗口* TumblingEventTimeWindows:滾動的事件時間窗口* TumblingProcessingTimeWindows:滾動的處理時間窗口* 滑動的時間窗口需要設(shè)置兩個時間,第一個是窗口的大小,第二個是記錄的時間,* 比如說(15,5),這是每5秒計算最近15秒內(nèi)的數(shù)據(jù)*/WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS.window(SlidingEventTimeWindows.of(Time.seconds(15),Time.seconds(5)));windowDS.sum(1).print();env.execute();}
}

這個代碼用的是滑動的事件時間窗口,我設(shè)置了每5秒鐘計算最近15秒內(nèi)的數(shù)據(jù)

5.2 count time

1.滾動的統(tǒng)計窗口:每個key隔多少數(shù)據(jù)計算一次

package com.shujia.flink.window;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;public class Demo2CountWindow {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);DataStream<Tuple2<String, Integer>> kvDS = linesDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);/** 統(tǒng)計窗口* countWindow(10):滾動的統(tǒng)計窗口, 每個key每隔10條數(shù)據(jù)計算一次* countWindow(10, 2): 滑動的統(tǒng)計窗口,每隔兩條數(shù)據(jù)計算最近10條數(shù)據(jù)*/WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> countWindowDS = keyByDS.countWindow(10, 2);countWindowDS.sum(1).print();env.execute();}
}

?2.滑動的統(tǒng)計窗口:每隔多少數(shù)據(jù)計算最近的多少條數(shù)據(jù)

package com.shujia.flink.window;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;public class Demo2CountWindow {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);DataStream<Tuple2<String, Integer>> mapDS = linesDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyBy = mapDS.keyBy(kv -> kv.f0);WindowedStream<Tuple2<String, Integer>, String, GlobalWindow> countWindow = keyBy.countWindow(10,2);countWindow.sum(1).print();env.execute();}
}

5.3 session time

1.處理時間的會話窗口ProcessingTimeSessionWindows:對一個key,10秒內(nèi)沒有下一步數(shù)據(jù)開始計算。比如說我輸入了 a*7次,然后等10秒輸出結(jié)果是(a,7)。我再輸入a*6次加一個aa,那么輸出結(jié)果是(aa,1)與(a,6).

package com.shujia.flink.window;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class Demo3SessionWindow {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);DataStream<Tuple2<String, Integer>> mapDS = linesDS.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyBy = mapDS.keyBy(kv -> kv.f0);WindowedStream<Tuple2<String, Integer>, String, TimeWindow> window = keyBy.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)));window.sum(1).print();env.execute();}
}

2.事件時間的會話窗口EventTimeSessionWindows:根據(jù)數(shù)據(jù)的時間,對應(yīng)同一個key,10秒內(nèi)沒有下一步數(shù)據(jù)開始計算

這個不常用

package com.shujia.flink.window;import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import java.time.Duration;public class Demo4EventTimeSessionWindow {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();/*
java,1685433130000
java,1685433131000
java,1685433132000
java,1685433134000
java,1685433135000
java,1685433137000
java,1685433139000
java,1685433149000
java,1685433155000
java,1685433170000*/env.setParallelism(1);DataStream<String> linesDS = env.socketTextStream("master", 8888);//解析數(shù)據(jù)DataStream<Tuple2<String, Long>> tsDS = linesDS.map(line -> {String[] split = line.split(",");String word = split[0];long ts = Long.parseLong(split[1]);return Tuple2.of(word, ts);}, Types.TUPLE(Types.STRING, Types.LONG));/** 指定時間字段和水位線生成策略*/DataStream<Tuple2<String, Long>> assDS = tsDS.assignTimestampsAndWatermarks(WatermarkStrategy//水位線前移時間(數(shù)據(jù)最大亂序時間).<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))//指定時間字段.withTimestampAssigner((event, ts) -> event.f1));/**每隔5秒統(tǒng)計單詞的數(shù)量*/DataStream<Tuple2<String, Integer>> kvDS = assDS.map(kv -> Tuple2.of(kv.f0, 1), Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);/** EventTimeSessionWindows:事件時間的會話窗口*/WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS.window(EventTimeSessionWindows.withGap(Time.seconds(10)));windowDS.sum(1).print();env.execute();        }
}

5.4 process與窗口結(jié)合

1.設(shè)置了窗口過后的DS后面用process算子,他里面?zhèn)魅氲氖菍崿F(xiàn)ProcessWindowFunction中的抽象方法process的對象,這個抽象類里面?zhèn)鞯氖?個參數(shù)(IN, OUT, KEY, W),輸入的類型,輸出的類型,key的類型,以及窗口類型。窗口類型是三大窗口的其中之一。

2.process方法里面,第一個參數(shù)是key,第二個參數(shù)是flink的環(huán)境連接對象。第三個參數(shù)是kv的鍵值對,第四個參數(shù)是發(fā)送的對象

代碼如下

package com.shujia.flink.window;import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class Demo5WindowProcess {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStream<String> linesDS = env.socketTextStream("master", 8888);SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = linesDS.map(line -> {String[] lines = line.split(",");String clazz = lines[4];int age = Integer.parseInt(lines[2]);return Tuple2.of(clazz, age);}, Types.TUPLE(Types.STRING, Types.INT));KeyedStream<Tuple2<String, Integer>, String> keyBy = kvDS.keyBy(kv -> kv.f0);WindowedStream<Tuple2<String, Integer>, String, TimeWindow> window = keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));DataStream<Tuple2<String, Double>> process = window.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Double>, String, TimeWindow>() {@Overridepublic void process(String clazz,ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Double>, String, TimeWindow>.Context context,Iterable<Tuple2<String, Integer>> elements,Collector<Tuple2<String, Double>> out) throws Exception {double sum_age = 0;int num = 0;for (Tuple2<String, Integer> element : elements) {sum_age += element.f1;num++;}double avg_age = sum_age / num;out.collect(Tuple2.of(clazz, avg_age));}});process.print();env.execute();}
}

http://m.risenshineclean.com/news/62622.html

相關(guān)文章:

  • 黃頁網(wǎng)站大全免費什么是新媒體運營
  • 做網(wǎng)站需要什么特色全網(wǎng)seo優(yōu)化電話
  • 網(wǎng)站建設(shè)維護需要作假嗎域名查詢注冊商
  • 沒得公司可以做網(wǎng)站嘛百度電腦端網(wǎng)頁版入口
  • 電商網(wǎng)站 網(wǎng)站服務(wù)內(nèi)容百度店鋪免費入駐
  • 互助網(wǎng)站開發(fā)seo站長查詢
  • 網(wǎng)站必做外鏈濟南今日頭條新聞
  • 做的網(wǎng)站如何防止怕爬蟲新聞早知道
  • 旅游網(wǎng)站開發(fā)方案ppt網(wǎng)絡(luò)營銷概述ppt
  • 貴州安順做公司網(wǎng)站搜索引擎優(yōu)化的方法與技巧
  • 邢臺市政建設(shè)集團股份有限公司網(wǎng)站百度云網(wǎng)盤入口
  • 制造業(yè)網(wǎng)站建設(shè)惠州自動seo
  • 做網(wǎng)站怎么添加背景圖片黃金網(wǎng)站app大全
  • 廣州天美展覽公司網(wǎng)站營銷策劃公司是干什么的
  • 連云港企業(yè)建站 網(wǎng)站36優(yōu)化大師下載安裝
  • 如何給網(wǎng)站死鏈接做404北京seo外包平臺
  • 校園網(wǎng)站建設(shè)的請示免費企業(yè)網(wǎng)站管理系統(tǒng)
  • 北京海淀國稅局網(wǎng)站北京seo網(wǎng)站管理
  • 中英文企業(yè)網(wǎng)站怎么做信息流廣告代理商
  • 帶做網(wǎng)站綠標(biāo)互聯(lián)網(wǎng)營銷師怎么做
  • 適合網(wǎng)站設(shè)計的gif圖片拉新推廣渠道
  • wordpress css無效網(wǎng)站手機版排名seo
  • 疫情騙局濰坊seo培訓(xùn)
  • 合肥做網(wǎng)站加盟網(wǎng)站優(yōu)化方案模板
  • 飛言情做最好的言情網(wǎng)站合肥網(wǎng)絡(luò)公司
  • 扁平化風(fēng)格 網(wǎng)站企業(yè)查詢app
  • 精品課程網(wǎng)站設(shè)計建站優(yōu)化
  • 做網(wǎng)站 對方傳銷企拓客app騙局
  • 網(wǎng)站里的搜索怎么做的已矣seo排名點擊軟件
  • 做多國語言網(wǎng)站品牌策略的7種類型