中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當前位置: 首頁 > news >正文

wordpress免費教育主題搜索引擎優(yōu)化技術(shù)有哪些

wordpress免費教育主題,搜索引擎優(yōu)化技術(shù)有哪些,網(wǎng)站開發(fā)設(shè)計技術(shù),wordpress自定義文章排序11、水位線 11.1、水位線概念 一般實時流處理場景中,事件時間基本與處理時間保持同步,可能會略微延遲。 flink中用來衡量事件時間進展的標記就是水位線(WaterMark)。水位線可以看作一條特殊的數(shù)據(jù)記錄,它是插入到數(shù)…

11、水位線

11.1、水位線概念

一般實時流處理場景中,事件時間基本與處理時間保持同步,可能會略微延遲。

flink中用來衡量事件時間進展的標記就是水位線(WaterMark)。水位線可以看作一條特殊的數(shù)據(jù)記錄,它是插入到數(shù)據(jù)流中的一個標記點,主要內(nèi)容是一個時間戳,用來指示當前的事件時間。一般使用某個數(shù)據(jù)的時間戳作為水位線的時間戳。

水位線特性:

  • 水位線是插入到數(shù)據(jù)流中的一個標記
  • 水位線主要內(nèi)容是一個時間戳用來表示當前事件時間的進展
  • 水位線是基于數(shù)據(jù)的時間戳生成的
  • 水位線時間戳單調(diào)遞增
  • 水位線可通過設(shè)置延遲正確處理亂序數(shù)據(jù)
  • 一個水位線WaterMark(t)表示在當前流中事件時間已經(jīng)達到了時間戳t,代表t之前的所有數(shù)據(jù)都到齊了,之后流中不會出現(xiàn)時間戳小于或等于t的數(shù)據(jù)

以WaterMark等2s為例:
在這里插入圖片描述
**注意:**flink窗口并不是靜態(tài)準備好的,而是動態(tài)創(chuàng)建的,當有羅在這個窗口區(qū)間范圍的數(shù)據(jù)達到時才創(chuàng)建對應(yīng)的窗口。當?shù)竭_窗口結(jié)束時間后窗口就觸發(fā)計算并關(guān)閉,觸發(fā)計算和窗口關(guān)閉兩個行為也是分開的。

11.2、生成水位線
11.2.1、原則

要性能就設(shè)置低水位線或不設(shè)置水位線,直接使用處理時間語義可得到最低的延遲,但有可能遺漏數(shù)據(jù)。

如要保證數(shù)據(jù)全部到齊可以設(shè)置高水位線,但會影響性能,計算會有延遲。

11.2.2、內(nèi)置水位線

1、有序流中內(nèi)置水位線設(shè)置

直接調(diào)用

package waterMark;import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;public class WatermarkMonoDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} ).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy//升序的WaterMark,沒有等待時間.<WaterSensor>forMonotonousTimestamps()//指定時間戳分配器.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>(){@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {//返回的時間戳為毫秒System.out.println("數(shù)據(jù)="+element+"recordTS="+recordTimestamp);return element.getTs()*1000L;}}));SingleOutputStreamOperator<String> process = dataStreamSource.keyBy(value -> value.getId())//要使用事件語義的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long l = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "條數(shù)據(jù)" + elements.toString());}});process.print();env.execute();}
}

2、亂序流中內(nèi)置水位線設(shè)置

設(shè)置等待時間為2秒,即12秒時觸發(fā)窗口關(guān)閉

package waterMark;import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title:* @Author lizhe* @Package sink* @Date 2024/6/5 21:57* @description:*/
public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} ).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy//亂序的WaterMark,有等待時間.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))//指定時間戳分配器.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>(){@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {//返回的時間戳為毫秒System.out.println("數(shù)據(jù)="+element+"recordTS="+recordTimestamp);return element.getTs()*1000L;}}));SingleOutputStreamOperator<String> process = dataStreamSource.keyBy(value -> value.getId())//要使用事件語義的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long l = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "條數(shù)據(jù)" + elements.toString());}});process.print();env.execute();}
}

結(jié)果:
在這里插入圖片描述
可見當發(fā)送數(shù)據(jù)=WaterSensor{id=‘s1’, ts=12, vc=12}recordTS=-9223372036854775808時使得[0,10)窗口關(guān)閉,但是WaterSensor{id=‘s1’, ts=12, vc=12}不會在[0,10)窗口中,而是在[10,20)窗口中。

11.2.3、內(nèi)置WaterMark生成原理
  • 都是周期性生成的,默認是200ms
  • 有序流:WaterMark=當前最大的事件時間-1ms
  • 亂序流:WaterMark=當前最大的事件時間-延遲時間-1ms
11.3、水位線的傳遞
11.3.1、多并行度下水位線傳遞

水位線傳遞以最小的WaterMark為準,否則提前觸發(fā)關(guān)窗造成數(shù)據(jù)丟失。
在這里插入圖片描述
演示W(wǎng)aterMark多并行度下的傳遞

package waterMark;import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title:* @Author lizhe* @Package sink* @Date 2024/6/5 21:57* @description:*/
public class WatermarkOutOfOrdernessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();//演示W(wǎng)aterMark多并行度下的傳遞env.setParallelism(2);SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} ).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy//亂序的WaterMark,有等待時間.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))//指定時間戳分配器.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>(){@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {//返回的時間戳為毫秒System.out.println("數(shù)據(jù)="+element+"recordTS="+recordTimestamp);return element.getTs()*1000L;}}));SingleOutputStreamOperator<String> process = dataStreamSource.keyBy(value -> value.getId())//要使用事件語義的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows.window(TumblingEventTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long l = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "條數(shù)據(jù)" + elements.toString());}});process.print();env.execute();}
}

結(jié)果:
在這里插入圖片描述
在多并行度下,增加了一個WaterMark的更新操作。當數(shù)據(jù)WaterSensor{id=‘s1’, ts=12, vc=12}到來時,一個WaterMark,5-2=3,一個WaterMark是12-2=10,因WaterMark取小原則WaterMark是3未更新為10。當數(shù)據(jù)WaterSensor{id=‘s1’, ts=13, vc=13}到來,WaterMark更新為10,進而觸發(fā)窗口關(guān)閉。

結(jié)論:在多并行度下,當觸發(fā)WaterMark的下一條數(shù)據(jù)到來時才能進行關(guān)窗操作。

11.3.2、水位線空閑等待設(shè)置

在多個上游并行任務(wù)中,如果有其中一個沒有數(shù)據(jù),由于當前Task是以最小的那個座位當前任務(wù)的事件時鐘,就會導(dǎo)致當前Task的水位線無法推進,從而導(dǎo)致窗口無法觸發(fā)。這時候可以設(shè)置空閑等待。

package waterMark;import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title:* @Author lizhe* @Package * @Date 2024/6/5 21:57* @description:*/
public class WatermarkIdlenessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);SingleOutputStreamOperator<Integer> streamOperator = env.socketTextStream("192.168.132.101", 7777)//自定義分區(qū)器,數(shù)據(jù)%分區(qū)數(shù),只輸入奇數(shù),都只會去往一個子任務(wù).partitionCustom(new Partitioner<String>() {@Overridepublic int partition(String key, int numPartitions) {return Integer.parseInt(key) % numPartitions;}}, value -> value).map(value -> Integer.parseInt(value)).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy.<Integer>forMonotonousTimestamps().withTimestampAssigner((r, ts) -> r * 1000)//空閑等待5s.withIdleness(Duration.ofSeconds(5)));//分成兩組:奇數(shù)一組,偶數(shù)一組,開10s的事件時間滾動窗口streamOperator.keyBy(value -> value%2).window(TumblingEventTimeWindows.of(Time.seconds(2))).process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {@Overridepublic void process(Integer integer, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long l = elements.spliterator().estimateSize();out.collect("key=" + integer + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "條數(shù)據(jù)" + elements.toString());}}).print();env.execute();}
}
11.4、遲到數(shù)據(jù)處理
11.4.1、推遲WaterMark推進

在WaterMark產(chǎn)生時設(shè)置一個亂序容忍度,推遲系統(tǒng)時間的推進,保證窗口計算被延遲執(zhí)行,為亂序的數(shù)據(jù)爭取更多時間進入窗口。

forBoundedOutOfOrderness(Duration.ofSeconds(2))
11.4.2、設(shè)置窗口延遲關(guān)閉

flink的窗口允許遲到數(shù)據(jù)。當觸發(fā)窗口計算后會先計算當前結(jié)果,但此時并不會關(guān)閉窗口。以后每來一條數(shù)據(jù)就觸發(fā)一次窗口計算(增量計算)。直到WaterMark超過了窗口結(jié)束時間+推遲時間,窗口才會關(guān)閉。

.allowedLateness(Time.seconds(2))
package waterMark;import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title:* @Author lizhe* @Package * @Date 2024/6/5 21:57* @description:*/
public class WatermarkAllowLatenessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} ).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy//亂序的WaterMark,有等待時間.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))//指定時間戳分配器.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>(){@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {//返回的時間戳為毫秒System.out.println("數(shù)據(jù)="+element+"recordTS="+recordTimestamp);return element.getTs()*1000L;}}));SingleOutputStreamOperator<String> process = dataStreamSource.keyBy(value -> value.getId())//要使用事件語義的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows.window(TumblingEventTimeWindows.of(Time.seconds(10)))//推遲2秒關(guān)窗.allowedLateness(Time.seconds(2)).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long l = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "條數(shù)據(jù)" + elements.toString());}});process.print();env.execute();}
}
11.4.3、使用側(cè)流接收遲到數(shù)據(jù)

使用.sideOutputLateData()函數(shù)將遲到數(shù)據(jù)放到側(cè)輸出流

package waterMark;import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.lang.reflect.Type;
import java.time.Duration;/*** @Title:* @Author lizhe* @Package sink* @Date 2024/6/5 21:57* @description:*/
public class WatermarkAllowLatenessDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777).map(value ->{String[] datas = value.split(",");return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );} ).assignTimestampsAndWatermarks(//指定WaterMark策略WatermarkStrategy//亂序的WaterMark,有等待時間.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))//指定時間戳分配器.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>(){@Overridepublic long extractTimestamp(WaterSensor element, long recordTimestamp) {//返回的時間戳為毫秒System.out.println("數(shù)據(jù)="+element+"recordTS="+recordTimestamp);return element.getTs()*1000L;}}));OutputTag<WaterSensor> outputTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));SingleOutputStreamOperator<String> process = dataStreamSource.keyBy(value -> value.getId())//要使用事件語義的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows.window(TumblingEventTimeWindows.of(Time.seconds(10)))//推遲2秒關(guān)窗.allowedLateness(Time.seconds(2))//關(guān)窗后的遲到數(shù)據(jù)放到側(cè)輸出流.sideOutputLateData(outputTag).process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {@Overridepublic void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {long start = context.window().getStart();long end = context.window().getEnd();String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");long l = elements.spliterator().estimateSize();out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "條數(shù)據(jù)" + elements.toString());}});process.print();process.getSideOutput(outputTag).print("側(cè)輸出流");env.execute();}
}
11.4.4、總結(jié)
  1. 亂序與遲到的區(qū)別:

    **亂序:**數(shù)據(jù)的順序亂了,出現(xiàn)時間早的比時間晚的晚來

    **遲到:**數(shù)據(jù)的時間戳<當前的WaterMark

  2. 亂序與遲到數(shù)據(jù)的處理

    • 在WaterMark中指定亂序等待時間
    • 如果開窗設(shè)置窗口允許遲到
    • 關(guān)窗后的遲到數(shù)據(jù)放入側(cè)輸出流
  3. WaterMark等待時間與窗口允許遲到時間并不能等同和替換

    WaterMark涉及到窗口第一次計算時間,WaterMark等待時間過長會導(dǎo)致計算延遲變大。

    窗口允許遲到時間只是要保證計算結(jié)果更加準確,但不應(yīng)影響數(shù)據(jù)計算延遲。

    所以二者不能等價代替。

  4. WaterMark等待時間與窗口允許遲到時間設(shè)置經(jīng)驗

    WaterMark等待時間不能設(shè)置過大,一般秒級。窗口允許遲到時間只考慮大部分的遲到數(shù)據(jù)。極端情況小部分遲到數(shù)據(jù)使用側(cè)輸出流。

    12、基于時間的合流

上面提到的connect合流可滿足大部分需求。但統(tǒng)計固定時間內(nèi)兩條流數(shù)據(jù)的匹配情況,對于connect要使用自定義,但可以使用更簡單的Window來表示,flink 內(nèi)置了API。

12.1、窗口聯(lián)結(jié)Window Join
  1. 落在同一個時間窗口范圍內(nèi)才能匹配
  2. 根據(jù)keyby的key來進行匹配關(guān)聯(lián)
  3. 只能拿到匹配上的數(shù)據(jù),類似有固定時間范圍的inner join
package waterMark;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;/*** @Title: WindowJoinDemo* @Author lizhe* @Package Window Join* @Date 2024/6/8 21:11* @description:*/
public class WindowJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {@Overridepublic long extractTimestamp(Tuple2<String, Integer> element, long recordTimestamp) {return element.f1 * 1000L;}}));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.fromElements(Tuple3.of("a", 1,1),Tuple3.of("a", 11,1),Tuple3.of("b", 2,1),Tuple3.of("b", 12,1),Tuple3.of("c", 14,1),Tuple3.of("d", 15,1)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Integer>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, Integer> element, long recordTimestamp) {return element.f1 * 1000L;}}));DataStream<String> join = ds1.join(ds2).where(r1 -> r1.f0).equalTo(r2 -> r2.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 關(guān)聯(lián)上的數(shù)據(jù)調(diào)用join方法* @param first ds1的數(shù)據(jù)* @param second ds2的數(shù)據(jù)* @return* @throws Exception*/@Overridepublic String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {return first + "----" + second;}});join.print();env.execute();}
}
12.2、間隔聯(lián)結(jié)Interval Join

有時要處理的時間間隔并不固定。要匹配的數(shù)據(jù)可能剛開卡在窗口邊緣兩側(cè)造成匹配失敗。所有窗口聯(lián)結(jié)并不能滿足要求。

間隔聯(lián)結(jié)的思路是針對一條流的每個數(shù)據(jù)開辟出其時間戳前后的一段時間間隔指定,上下界的偏移,負號代表時間往前,正號代表時間往后,看這期間是否有來自另一條流的匹配。(只支持事件時間語義)
在這里插入圖片描述

package waterMark;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;/*** @Title:* @Author lizhe* @Package* @Date 2024/6/8 21:11* @description:*/
public class IntervalJoinDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env.fromElements(Tuple2.of("a", 1),Tuple2.of("a", 2),Tuple2.of("b", 3),Tuple2.of("c", 4)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {@Overridepublic long extractTimestamp(Tuple2<String, Integer> element, long recordTimestamp) {return element.f1 * 1000L;}}));SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env.fromElements(Tuple3.of("a", 1,1),Tuple3.of("a", 11,1),Tuple3.of("b", 2,1),Tuple3.of("b", 12,1),Tuple3.of("c", 14,1),Tuple3.of("d", 15,1)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Integer>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Integer>>() {@Overridepublic long extractTimestamp(Tuple3<String, Integer, Integer> element, long recordTimestamp) {return element.f1 * 1000L;}}));KeyedStream<Tuple2<String, Integer>, String> stream1 = ds1.keyBy(value -> value.f0);KeyedStream<Tuple3<String, Integer, Integer>, String> stream2 = ds2.keyBy(value -> value.f0);stream1.intervalJoin(stream2).between(Time.seconds(-2),Time.seconds(2)).process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {/*** 兩條流的數(shù)據(jù)匹配上才會調(diào)用方法* @param left stream1的數(shù)據(jù)* @param right stream2的數(shù)據(jù)* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {//進入這個方法是關(guān)聯(lián)上的數(shù)據(jù)out.collect(left+"----"+right);}}).print();env.execute();}
}

1.17版本支持將該匹配上的遲到數(shù)據(jù)通過側(cè)輸出流輸出

如果當前數(shù)據(jù)的事件時間<當前的WaterMark就是遲到數(shù)據(jù),主流的process不處理。

但在between后使用SideOutputLeftLateData(),SideOutputRightLateData()函數(shù)將遲到數(shù)據(jù)放到側(cè)輸出流

13、處理函數(shù)

DataStream更下層的API,統(tǒng)一稱為process算子,接口就是process function(處理函數(shù))

在這里插入圖片描述

13.1、基本處理函數(shù)

處理函數(shù)提供一個定時服務(wù)(TimeService),可以通過它訪問流中的事件、時間戳、水位線,甚至可以注冊定時事件。處理函數(shù)集成了AbstractRichFunction,擁有富函數(shù)類的所有特性,可以訪問狀態(tài)和其他運行時信息。處理函數(shù)可以直接將數(shù)據(jù)輸出的側(cè)輸出流。處理函數(shù)是最為靈活的處理方法,可實現(xiàn)各種自定義邏輯。

分類:

  1. ProcessFunction
  2. KeyedProcessFunction
  3. ProcessWindowFunction
  4. ProcessAllWindowFunction
  5. CoProcessFunction
  6. ProcessJoinFunction
  7. BroadcastProcessFunction
  8. KeyedBroadcastProcessFunction
13.2、按鍵分區(qū)處理函數(shù)KeyedProcessFunction

只有在KeyedStream才支持使用TimeService設(shè)置定時器。

13.2.1、定時器和定時服務(wù)
keyedStream.process(new KeyedProcessFunction<String, WaterSensor, String>() {/*** 來一條數(shù)據(jù)處理一次* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//數(shù)據(jù)中提取出來的事件時間,如果沒有則為nullLong timestamp = ctx.timestamp();//定時器TimerService timerService = ctx.timerService();//注冊定時器:處理時間timerService.registerEventTimeTimer(10L);//注冊定時器:事件時間timerService.currentProcessingTime();//刪除定時器:事件時間timerService.deleteEventTimeTimer(10L);//刪除定時器:處理時間timerService.deleteProcessingTimeTimer(10L);//獲取當前處理時間,即系統(tǒng)時間timerService.currentProcessingTime();//獲取當前WaterMarktimerService.currentWatermark();}});

事件時間定時器:

package process;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title: KeyedProcessTimerDemo* @Author lizhe* @Package process* @Date 2024/6/9 12:29* @description:*/
public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> singleOutputStreamOperator = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));});WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;});SingleOutputStreamOperator<WaterSensor> operator = singleOutputStreamOperator.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);KeyedStream<WaterSensor, String> keyedStream = operator.keyBy(value -> value.getId());keyedStream.process(new KeyedProcessFunction<String, WaterSensor, String>() {/*** 來一條數(shù)據(jù)處理一次* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//數(shù)據(jù)中提取出來的事件時間,如果沒有則為nullLong timestamp = ctx.timestamp();//定時器TimerService timerService = ctx.timerService();//注冊定時器:處理時間timerService.registerEventTimeTimer(5000L);System.out.println("當前時間"+timestamp+",注冊了一個5s的定時器");}/*** 時間進展到定時器注冊的時間,調(diào)用該方法* @param timestamp 當前時間進展* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);System.out.println("現(xiàn)在時間"+timestamp + "定時器觸發(fā),key為"+ctx.getCurrentKey());}}).print();env.execute();}
}

輸出:
在這里插入圖片描述
TimeService會以key和時間戳作為標準,對定時器去重;即對每個key和時間戳最多只有一個定時器,如果注冊了多次,onTimer()方法也將被調(diào)用一次。

處理時間定時器:

package process;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title: KeyedProcessTimerDemo* @Author lizhe* @Package process* @Date 2024/6/9 12:29* @description:*/
public class KeyedProcessTimerDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> singleOutputStreamOperator = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));});WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;});SingleOutputStreamOperator<WaterSensor> operator = singleOutputStreamOperator.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);KeyedStream<WaterSensor, String> keyedStream = operator.keyBy(value -> value.getId());keyedStream.process(new KeyedProcessFunction<String, WaterSensor, String>() {/*** 來一條數(shù)據(jù)處理一次* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {//數(shù)據(jù)中提取出來的事件時間,如果沒有則為nullLong timestamp = ctx.timestamp();//定時器TimerService timerService = ctx.timerService();long currentProcessingTime = timerService.currentProcessingTime();timerService.registerProcessingTimeTimer(currentProcessingTime+5000L);System.out.println("當前時間"+currentProcessingTime+",注冊了一個5后的定時器,key為"+ctx.getCurrentKey() );}/*** 時間進展到定時器注冊的時間,調(diào)用該方法* @param timestamp 當前時間進展* @param ctx 上下文* @param out 采集器* @throws Exception*/@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);System.out.println("現(xiàn)在時間"+timestamp + "定時器觸發(fā),key為"+ctx.getCurrentKey());}}).print();env.execute();}
}

總結(jié):

  1. 事件時間定時器通過WaterMark來觸發(fā)的,WaterMark>=注冊時間。

    注意:

    WaterMark=當前最大事件時間-等待時間-1ms,因為-1ms會推遲一條數(shù)據(jù)。比如5s的定時器,如果等待=3s,WaterMark=8s-3s-1ms=4999ms,不會觸發(fā)5s的定時器。需要WaterMark=9s-3s-1ms=5999ms才能觸發(fā)5s的定時器

  2. 在Process中獲取當前的WaterMark顯示的是上一次的的WaterMark(因為Process還沒接收到這條數(shù)據(jù)對應(yīng)生成的新WaterMark)

13.3、應(yīng)用案例

統(tǒng)計一段時間內(nèi)出現(xiàn)次數(shù)最多的水位。統(tǒng)計10s內(nèi)出現(xiàn)次數(shù)最多的兩個水位,這兩個水位每5s更新一次。

可使用滑動窗口實現(xiàn)按照不同水位進行統(tǒng)計

后面仔細看吧,可能有問題!!!

package process;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.*;/*** @Title:* @Author lizhe* @Package process* @Date 2024/6/9 12:29* @description:*/
public class TopNDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> operator  = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;}));//1、按照vc分組,開窗聚合(增量計算+全量打標簽)//開窗聚合后就是普通的流,丟失了窗口信息需要自己打窗口標簽(WindowEnd)SingleOutputStreamOperator<Tuple3<Integer, Integer, Long>> aggregate = operator.keyBy(value -> value.getVc()).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).aggregate(new VcCountAgg(), new WindowResult());//2、按照窗口標簽keyby,保證同一個窗口時間范圍的結(jié)果到一起去。排序去TopNaggregate.keyBy(value -> value.f2).process(new TopN(2)).print();env.execute();}public  static class VcCountAgg implements AggregateFunction<WaterSensor,Integer,Integer>{@Overridepublic Integer createAccumulator() {return 0;}@Overridepublic Integer add(WaterSensor value, Integer accumulator) {return ++accumulator;}@Overridepublic Integer getResult(Integer accumulator) {return accumulator;}@Overridepublic Integer merge(Integer a, Integer b) {return null;}}/*** 泛型如下:* 第一個:輸入類型=增量函數(shù)的輸出 count值* 第二個:輸出類型=Tuple(vc,count,WindowEnd)帶上窗口結(jié)束時間的標簽* 第三個:key類型,vc,Integer* 第四個:窗口類型*/public  static class WindowResult extends ProcessWindowFunction<Integer, Tuple3<Integer,Integer,Long>,Integer, TimeWindow>{@Overridepublic void process(Integer key, Context context, Iterable<Integer> elements, Collector<Tuple3<Integer, Integer, Long>> out) throws Exception {Integer count = elements.iterator().next();long windowsEnd = context.window().getEnd();out.collect(Tuple3.of(key,count,windowsEnd));}}public  static class TopN extends KeyedProcessFunction<Long,Tuple3<Integer, Integer, Long>,String>{//存不同窗口的統(tǒng)計結(jié)果 key=windowEnd value=list數(shù)據(jù)private Map<Long, List< Tuple3<Integer,Integer,Long>>> dataListMap;//要取的Top的數(shù)量private int threshold;public TopN(int threshold) {dataListMap = new HashMap<>();this.threshold = threshold;}@Overridepublic void processElement(Tuple3<Integer, Integer, Long> value, Context ctx, Collector<String> out) throws Exception {//進入這個方法只是一條數(shù)據(jù),要排序,要存起來,不同的窗口要分開存//1、存到HashMap中Long windowEnd = value.f2;if (dataListMap.containsKey(windowEnd)){//1.1 包含vc 不是該vc的第一條,直接加到list中List<Tuple3<Integer, Integer, Long>> tuple3List = dataListMap.get(windowEnd);tuple3List.add(value);}else {//1.1 不包含vc是該vc的第一條,需要初始化listList<Tuple3<Integer, Integer, Long>> dataList = new ArrayList<>();dataList.add(value);dataListMap.put(windowEnd,dataList);}//2、注冊一個定時器,WindowsEnd+1ms即可(同一個窗口范圍應(yīng)該同時輸出的,只不過是一條條調(diào)用ProcessElement方法,只需延遲1ms)ctx.timerService().registerProcessingTimeTimer(windowEnd+1);}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {super.onTimer(timestamp, ctx, out);//定時器觸發(fā),同一個窗口范圍的計算結(jié)果攢齊了,開始、排序、取TopNLong windowEnd = ctx.getCurrentKey();//1、排序List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd);dataList.sort(new Comparator<Tuple3<Integer, Integer, Long>>() {@Overridepublic int compare(Tuple3<Integer, Integer, Long> o1, Tuple3<Integer, Integer, Long> o2) {return o2.f1-o1.f1;}});//2、取TopNStringBuilder outStr = new StringBuilder();outStr.append("==========\n");//遍歷 排序后的list,取出前threshold個,dataList要是不夠dataList個取dataList.size()for (int i = 0; i < Math.min(threshold,dataList.size()); i++) {Tuple3<Integer, Integer, Long> vcCount = dataList.get(i);outStr.append("Top"+(i+1)+"\n");outStr.append("vc="+vcCount.f0+"\n");outStr.append("count="+vcCount.f1 + "\n");outStr.append("窗口結(jié)束時間"+ vcCount.f2 + "\n");}//用完的list及時清理dataList.clear();out.collect(outStr.toString());}}
}
13.4、側(cè)輸出流

使用側(cè)輸出流實現(xiàn)水位告警

package process;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.time.Duration;/*** @Title:* @Author lizhe* @Package process* @Date 2024/6/9 12:29* @description:*/
public class SideOutputDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;});SingleOutputStreamOperator<WaterSensor> singleOutputStreamOperator = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);final OutputTag<String> warnTag = new OutputTag<>("warn", Types.STRING);SingleOutputStreamOperator<WaterSensor> process = singleOutputStreamOperator.keyBy(value -> value.getId()).process(new KeyedProcessFunction<String, WaterSensor, WaterSensor>() {@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {//使用側(cè)輸出流告警if (value.getVc() > 10) {ctx.output(warnTag, "當前水位=" + value.getVc() + ",大于閾值10!");}out.collect(value);}});process.print();process.getSideOutput(warnTag).printToErr("warn");env.execute();}
}

14、狀態(tài)管理

14.1、flink的狀態(tài)

分為有狀態(tài)和無狀態(tài)兩種。

無狀態(tài)的算子任務(wù)只要觀察每個獨立事件,根據(jù)當前輸入的數(shù)據(jù)直接轉(zhuǎn)換輸出結(jié)果。如:map、filter、flatMap。
在這里插入圖片描述
有狀態(tài)算子任務(wù)除當前數(shù)據(jù)外還要其他數(shù)據(jù)來得到計算結(jié)果?!捌渌麛?shù)據(jù)”就是狀態(tài)。如:聚合算子、窗口算子。
在這里插入圖片描述
狀態(tài)的分類:

  1. 托管狀態(tài)和原始狀態(tài)

    托管狀態(tài):由flink統(tǒng)一管理使用時只需要調(diào)用相應(yīng)接口。

    原始狀態(tài):自定義的相當于開辟了一塊內(nèi)存自己管理,自己實現(xiàn)狀態(tài)的序列化和故障恢復(fù)。

    通常采用flink托管狀態(tài)(重點)

  2. 算子狀態(tài)和按鍵分區(qū)狀態(tài)

    通過keyby()函數(shù)的稱為按鍵分區(qū)狀態(tài),其他為算子狀態(tài)

14.2、算子狀態(tài)

對于一個并行子任務(wù),處理的所有數(shù)據(jù)都會訪問到相同的狀態(tài),狀態(tài)對于同一任務(wù)而言是共享的。

算子狀態(tài)可以用在所有算子上,類似本地變量。

14.3、按鍵分區(qū)狀態(tài)

狀態(tài)根據(jù)輸入流中定義的鍵來維護和訪問,也就keyby后能用。

14.3.1、值狀態(tài)

狀態(tài)只保存一個值。

水位相差10則報警

package state;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.io.Serializable;
import java.time.Duration;/*** @Title: KeyedValueStateDemo* @Author lizhe* @Package state* @Date 2024/6/11 20:54* @description:水位相差10則報警*/
public class KeyedValueStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;}));sensorDS.keyBy(value -> value.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {ValueState<Integer> lastVcState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);ValueStateDescriptor lastVc = new ValueStateDescriptor<Integer>("lastVc", Integer.class);lastVcState=getRuntimeContext().getState(lastVc);}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {int lastVc = lastVcState.value() == null ? 0 : lastVcState.value();if (Math.abs(value.getVc()-lastVc)>10) {out.collect("傳感器id="+value.getId()+",當前水位值="+value.getVc()+",上一條水水位值="+lastVc+"相差超過10");}lastVcState.update(value.getVc());}}).print();env.execute();}
}
14.3.2、列表狀態(tài)

將要保存的數(shù)據(jù)以列表形式進行保存

針對每種傳感器輸出最高的三個水位值

package state;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.ArrayList;/*** @Title:* @Author lizhe* @Package state* @Date 2024/6/11 20:54* @description:針對每種傳感器輸出最高的三個水位值*/
public class KeyedListStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;}));sensorDS.keyBy(value -> value.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {ListState<Integer> vcListState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcListState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcListState", Integer.class));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {vcListState.add(value.getVc());ArrayList<Integer> arrayList = new ArrayList<>();for (Integer vc : vcListState.get()) {arrayList.add(vc);}arrayList.sort((o1,o2)->{return o2-o1;});if (arrayList.size() > 3) {arrayList.remove(3);}out.collect("傳感器id="+value.getId()+",最大三個水位值="+arrayList.toString());vcListState.update(arrayList);}}).print();env.execute();}
}
14.3.3、map狀態(tài)

把鍵值對最為狀態(tài)保存起來

統(tǒng)計每種傳感器每種水位值出現(xiàn)的次數(shù)

package state;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;/*** @Title:* @Author lizhe* @Package state* @Date 2024/6/11 20:54* @description:統(tǒng)計每種傳感器每種水位值出現(xiàn)的次數(shù)*/
public class KeyedMapStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;}));sensorDS.keyBy(value -> value.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {MapState<Integer,Integer> vcCountMapState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("vcCountMapState", Integer.class, Integer.class));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {Integer vc = value.getVc();if (vcCountMapState.contains(vc)){int count = vcCountMapState.get(vc) ;vcCountMapState.put(vc,++count);}else {vcCountMapState.put(vc, 1);}StringBuilder outStr = new StringBuilder();outStr.append("傳感器id為"+value.getId()+"\n");for (Map.Entry<Integer, Integer> vcCount : vcCountMapState.entries()) {outStr.append(vcCount.toString()+"\n");}outStr.append("==============");out.collect(outStr.toString());}}).print();env.execute();}
}
14.3.4、規(guī)約狀態(tài)

對添加的數(shù)據(jù)進行規(guī)約,將規(guī)約聚合后的值作為狀態(tài)保存

計算每種傳感器的水位和

package state;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title:* @Author lizhe* @Package state* @Date 2024/6/11 20:54* @description:計算每種傳感器的水位和*/
public class KeyedReduceStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;}));sensorDS.keyBy(value -> value.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {ReducingState<Integer> vcSum;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcSum = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Integer>("vcSum", new ReduceFunction<Integer>() {@Overridepublic Integer reduce(Integer value1, Integer value2) throws Exception {return value1 + value2;}}, Integer.class));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {vcSum.add(value.getVc());out.collect("傳感器id="+value.getId()+"水位值和="+vcSum.get());}}).print();env.execute();}
}
14.3.5、聚合狀態(tài)

類似規(guī)約狀態(tài),相比于規(guī)約狀態(tài),聚合里有個累加器來表示狀態(tài),聚合的狀態(tài)類型可與輸入數(shù)據(jù)類型不同

計算水位平均值

package state;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title:* @Author lizhe* @Package state* @Date 2024/6/11 20:54* @description:計算水位平均值*/
public class KeyedAggregateStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;}));sensorDS.keyBy(value -> value.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {AggregatingState<Integer,Double> vcAggregatingState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);vcAggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>("vcAggregatingState", new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {@Overridepublic Tuple2<Integer, Integer> createAccumulator() {return Tuple2.of(0,0);}@Overridepublic Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {return Tuple2.of(accumulator.f0+value,accumulator.f1 + 1);}@Overridepublic Double getResult(Tuple2<Integer, Integer> accumulator) {return accumulator.f0*1D/accumulator.f1;}@Overridepublic Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {return null;}}, Types.TUPLE(Types.INT, Types.INT)));}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {vcAggregatingState.add(value.getVc());Double vcAvg = vcAggregatingState.get();out.collect("傳感器id="+value.getId()+"平均水位="+vcAvg);}}).print();env.execute();}
}
14.3.6、狀態(tài)生存時間TTL

狀態(tài)創(chuàng)建時候,失效時間=當前時間+TTL??蓪r效時間進行更新,創(chuàng)建配置對象,調(diào)用狀態(tài)描述器啟動TTL

package state;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;import java.time.Duration;/*** @Title: KeyedValueStateDemo* @Author lizhe* @Package state* @Date 2024/6/11 20:54* @description:*/
public class KeyedStateTTLDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((element, recordTime) -> {return element.getTs() * 1000L;}));sensorDS.keyBy(value -> value.getId()).process(new KeyedProcessFunction<String, WaterSensor, String>() {ValueState<Integer> lastVcState;@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.seconds(5))//過期時間5s.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//狀態(tài)的創(chuàng)建和寫入會刷新過期時間.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//不返回過期的狀態(tài)值.build();ValueStateDescriptor<Integer> lastVc = new ValueStateDescriptor<>("lastVc", Integer.class);lastVc.enableTimeToLive(stateTtlConfig);lastVcState=getRuntimeContext().getState(lastVc);}@Overridepublic void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {Integer value1 = lastVcState.value();out.collect("key="+value.getId()+"狀態(tài)值"+ value1);lastVcState.update(value.getVc());}}).print();env.execute();}
}
14.4、算子狀態(tài)

狀態(tài)分為:列表狀態(tài)ListState、聯(lián)合列表狀態(tài)ListUnionState、廣播狀態(tài)BroadcastState

算子并行實例上定義的狀態(tài),作用范圍被限定為當前算子任務(wù)。

package state;import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;
import java.util.List;/*** @Title: OperatorListDemo* @Author lizhe* @Package state* @Date 2024/6/13 21:50* @description:在map算子中計算數(shù)據(jù)個數(shù)*/
public class OperatorListDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.socketTextStream("192.168.132.101", 7777).map(new MyCountMapFunction()).print();env.execute();}public  static  class MyCountMapFunction implements MapFunction<String,Long>, CheckpointedFunction{private  long count =0L;private ListState<Long> state;@Overridepublic Long map(String value) throws Exception {return ++count;}/*** 本地變量持久化:將本地變量拷貝到算子狀態(tài)* @param context* @throws Exception*/@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {System.out.println("snapshotState");state.clear();state.add(count);}/*** 初始化本地變量:程序恢復(fù)時,從狀態(tài)中將數(shù)據(jù)添加到本地變量,每個子任務(wù)調(diào)用一次* @param context* @throws Exception*/@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {System.out.println("initializeState");state = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("state", Long.class));if (context.isRestored()){for (Long aLong : state.get()) {count+=aLong;}}}}
}

算子狀態(tài)List與UnionList區(qū)別:

  • list狀態(tài):輪詢均分給新的子任務(wù)
  • UnionList狀態(tài):將原先多個子任務(wù)狀態(tài)的合并成一份完整的。給新的并行子任務(wù)每人一份完整的

廣播狀態(tài):算子并行子任務(wù)都保持同一份全局狀態(tài)。

水位超過指定的閾值發(fā)送告警,閾值可以動態(tài)修改

package state;import bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;/*** @Title: OperatorListDemo* @Author lizhe* @Package state* @Date 2024/6/13 21:50* @description:水位超過指定的閾值發(fā)送告警,閾值可以動態(tài)修改*/
public class OperatorBroadcastStateDemo {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<WaterSensor> sensorDS =  env.socketTextStream("192.168.132.101", 7777).map(value -> {String[] datas = value.split(",");return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));});//配置流用來廣播配置DataStreamSource<String> configDS = env.socketTextStream("192.168.132.101", 8888);final MapStateDescriptor<String, Integer> descriptor = new MapStateDescriptor<>("configDS", String.class, Integer.class);BroadcastStream<String> broadcastStream = configDS.broadcast(descriptor);BroadcastConnectedStream<WaterSensor, String> connect = sensorDS.connect(broadcastStream);connect.process(new BroadcastProcessFunction<WaterSensor, String, String>() {/*** 數(shù)據(jù)流的處理方法* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(WaterSensor value, ReadOnlyContext ctx, Collector<String> out) throws Exception {ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(descriptor);Integer integer = broadcastState.get("threshold");//如果數(shù)據(jù)流先來,廣播流為空,要判空integer=integer==null ?0:integer;if (value.getVc()>integer){out.collect("超過閾值,閾值="+integer);}}/*** 廣播后的配置流處理方法* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {BroadcastState<String, Integer> state = ctx.getBroadcastState(descriptor);state.put("threshold",Integer.valueOf(value));}}).print();env.execute();}}
14.5、狀態(tài)后端

狀態(tài)的存儲、訪問以及維護都是由一個可插拔的組件決定的,這個組件為狀態(tài)后端,主要負責管理本地狀態(tài)的存儲方式和位置。

14.5.1、狀態(tài)后端分類

狀態(tài)后端開箱即用,可不改變程序邏輯獨立配置。有兩種,一種為哈希表狀態(tài)后端(默認),一種為內(nèi)嵌RocksDB狀態(tài)后端。

  1. 哈希表狀態(tài)后端:狀態(tài)存在內(nèi)存,直接把狀態(tài)當對象,存在TaskManager的JVM堆上,以鍵值對方式存儲。
  2. RocksDB狀態(tài)后端:RocksDB是kv型數(shù)據(jù)庫,將數(shù)據(jù)存到硬盤。
http://m.risenshineclean.com/news/64033.html

相關(guān)文章:

  • 深圳做網(wǎng)站收費百度產(chǎn)品
  • wordpress 作者idseo網(wǎng)站推廣免費
  • 光之翼可以做網(wǎng)站嗎中國網(wǎng)新山東
  • 一個主機可以建設(shè)多少個網(wǎng)站seo推廣培訓(xùn)資料
  • 網(wǎng)站做代理服務(wù)器網(wǎng)站制作培訓(xùn)
  • 品牌網(wǎng)站分析關(guān)鍵詞在線聽
  • 做網(wǎng)站需要做什么頁面媒體網(wǎng)絡(luò)推廣價格優(yōu)惠
  • 怎么樣開網(wǎng)站淘寶店鋪怎么推廣
  • 成都網(wǎng)站建設(shè)易維達好互聯(lián)網(wǎng)營銷的特點
  • 做公司網(wǎng)站注意事項網(wǎng)站推廣優(yōu)化平臺
  • 怎么按照屏幕比例做網(wǎng)站適應(yīng)安裝百度一下
  • 網(wǎng)頁靠什么賺錢南京seo網(wǎng)絡(luò)優(yōu)化公司
  • 用dw做網(wǎng)站結(jié)構(gòu)圖域名查詢 站長查詢
  • 個人可以做商城網(wǎng)站嗎優(yōu)秀品牌策劃方案
  • 手機網(wǎng)站做指向推廣教程
  • 建設(shè)黨史網(wǎng)站的意義在線智能識圖
  • 比價網(wǎng)站源碼整站程序百度上做優(yōu)化
  • h5可以做網(wǎng)站么免費推廣途徑
  • 中小企業(yè)服務(wù)平臺企業(yè)seo外包公司
  • 公司網(wǎng)站建設(shè)服務(wù)大數(shù)據(jù)獲客系統(tǒng)
  • 萊蕪公司做網(wǎng)站外鏈相冊
  • wordpress 添加 聯(lián)系我們網(wǎng)站優(yōu)化方案
  • 建設(shè)外貿(mào)商城網(wǎng)站制作怎樣在百度上發(fā)布廣告
  • 有贊做網(wǎng)站360推廣
  • 東莞做網(wǎng)站一年費用網(wǎng)站優(yōu)化seo培
  • 網(wǎng)站跳出率一般多少公司網(wǎng)站設(shè)計公司
  • 網(wǎng)站建設(shè)的合同書愛站小工具計算器
  • iis網(wǎng)站asp.net部署投放廣告找什么平臺
  • 登錄功能網(wǎng)站怎么做的搜索引擎優(yōu)化是什么意思啊
  • 網(wǎng)站自適應(yīng)手機怎么一鍵生成網(wǎng)頁