如何在自己電腦上做網(wǎng)站服務(wù)器夜狼seo
接上文:Flink實戰(zhàn)五_狀態(tài)機(jī)制
1、需求背景
現(xiàn)在網(wǎng)絡(luò)直播平臺非?;鸨?#xff0c;在斗魚這樣的網(wǎng)絡(luò)直播間,經(jīng)常可以看到這樣的總榜排名,體現(xiàn)了主播的人氣值。
人氣值計算規(guī)則:用戶發(fā)送1條彈幕互動,贈送1個熒光棒免費道具、100個免費魚丸、親密度禮物等行為,均可為主播貢獻(xiàn)1點及以上人氣值。
我們就以這個人氣值日榜為例,來設(shè)計一個Flink的計算程序。
對于人氣值日榜這樣的功能,可以理解為是一個典型的流式計算的場景,強(qiáng)調(diào)的是數(shù)據(jù)的實時處理。因為在這個場景下,必須要及時的累計用戶的送禮物數(shù)據(jù),才能形成你追我趕的實時效果,提升用戶的參與體驗。這個場景下的實時性,雖然不要求每一條數(shù)據(jù)都及時響應(yīng),但是整體的數(shù)據(jù)延遲還是要盡量縮短的。
這種場景下,使用Flink進(jìn)行流批統(tǒng)一的計算,感覺就非常合適。
2、數(shù)據(jù)流程設(shè)計
在確定了使用Flink進(jìn)行計算后,首先就需要設(shè)計出數(shù)據(jù)的上下游流程,進(jìn)行簡單的方案可行性評估。
對于數(shù)據(jù)上游,我們這個人氣值日榜統(tǒng)計的業(yè)務(wù)場景,數(shù)據(jù)來源自然就是粉絲們的打賞行為。一方面整個平臺的打賞行為的數(shù)據(jù)量是非常大的,另一方面這些打賞行為涉及到賬戶操作,所以他的作用,更大的是體現(xiàn)在人氣值榜功能以外的其他業(yè)務(wù)過程中。基于這兩方面考慮,自然就會想到使用kafka來進(jìn)行削峰以及解耦。而Flink在DataStream/DataSet API和 Table API&SQL 兩個部分都對kafka提供了連接器實現(xiàn),所以用kafka作為數(shù)據(jù)接入是可行的。
而對于數(shù)據(jù)下游,其實可以想象,最終計算出來的數(shù)據(jù),最為重要的是要強(qiáng)調(diào)查詢的靈活性以及時效性,這樣才能支持頁面的快速查詢。如果考慮查詢的時效性,HBase和ElasticSearch都是比較理想的大數(shù)據(jù)存儲引擎。但是如果考慮到查詢的靈活性,就會想到ElasticSearch會相比HBase更適合。因為我們統(tǒng)計出來的這些粉絲人氣值度的結(jié)果,不光可以作為每個直播間人氣值榜的排名,也應(yīng)該可以作為以后平臺主播年度排名等其他業(yè)務(wù)場景的數(shù)據(jù)來源。如果想要兼顧這些查詢場景,使用HBase就會對Rowkey產(chǎn)生大量的侵入,而Elasticsearch可以根據(jù)任意字段快速查詢,就比較有優(yōu)勢。 另外,從官方文檔中可以查到,對于HBase,Flink只提供了Table API&SQL 模塊的connector支持,而DataStream/DataSet API中沒有提供支持,而ElasticSearch則支持更為全面。當(dāng)然,這跟HBase的具體場景是有關(guān)聯(lián)的,但是也可以從另一個角度認(rèn)為,使用ElasticSearch的可行性更高。
這樣,就初步確定了 kafka-> Flink -> ElasticSearch 這樣的大致數(shù)據(jù)流程。這
也是在實際開發(fā)中非常典型的一個組合方式。后續(xù)就可以著手搭建kafka集群以及ElasticSearch+Kibana的集群了。搭建的過程就略過了。
確定數(shù)據(jù)的基礎(chǔ)結(jié)構(gòu)
這一步主要是確定入口數(shù)據(jù)和出口數(shù)據(jù)的結(jié)構(gòu)。只要這兩個數(shù)據(jù)結(jié)構(gòu)確定了,那
么應(yīng)用程序模塊和大數(shù)據(jù)計算模塊就可以分開進(jìn)行開發(fā)了。是雙方主要的解耦方
式。
在數(shù)據(jù)入口處,可以定義這樣的簡化的數(shù)據(jù)結(jié)構(gòu):
public static class GiftRecord{
private String hostId; //主播ID
private String fansId; //粉絲ID
private long giftCount; //禮物數(shù)量
private String giftTime; //送禮物時間。時間格式 yyyy-MM-DD HH:mm:SS
.....
}
在kafka中,確定使用gift作為Topic,MQ的消息格式為 #{hostId},#{fansId},#{giftCount},#{giftTime} 這樣的字符串。
在數(shù)據(jù)出口處,可以定義ES中這樣簡化的索引結(jié)構(gòu):
-- 貢獻(xiàn)日榜索引
PUT daygiftanalyze
{
"mappings":{"properties": {"windowEnd":{"type": "long"},"hostId": {"type": "keyword"},"fansId": {"type": "keyword"},"giftCount":{"type": "long"}}}
}
這樣,一個簡單的設(shè)計方案就形成了。應(yīng)用程序只需要在粉絲發(fā)送禮物時往kafka中同步一條消息記錄,然后從ES中查詢主播的人氣值日榜和人氣值周榜數(shù)據(jù)即可。而我們也可以模擬數(shù)據(jù)格式進(jìn)行開發(fā)了。
3、應(yīng)用實現(xiàn)
人氣值日榜:
基礎(chǔ)數(shù)據(jù)結(jié)構(gòu):
public static class GiftRecord{private String hostId; //主播IDprivate String fansId; //粉絲IDprivate long giftCount; //禮物數(shù)量private String giftTime; //送禮物時間。時間格式 yyyy-MM-DD HH:mm:SS.....
}
在kafka中,確定使用gift作為Topic,MQ的消息格式為 #{hostId},#{fansId},#{giftCount},#{giftTime} 這樣的字符串。
ES索引:
PUT daygiftanalyze
{"mappings": {"properties": {"windowEnd": {"type": "long"},"hostId": {"type": "keyword"},"fansId": {"type": "keyword"},"giftCount": {"type": "long"}}}
}
然后運行Flink程序,com.flink.project.flink.DayGiftAna,從kafka中讀取數(shù)
據(jù)。測試數(shù)據(jù)見giftrecord.txt。計算程序會及時將十分鐘內(nèi)的粉絲禮物統(tǒng)計都存入到ES當(dāng)中。
giftrecord.txt如下:
1001,3001,100,2021-09-15 15:15:10
1001,3002,321,2021-09-15 15:17:14
1001,3003,234,2021-09-15 15:16:24
1001,3004,15,2021-09-15 15:17:13
1001,3005,264,2021-09-15 15:18:14
1001,3006,678,2021-09-15 15:17:54
1001,3007,123,2021-09-15 15:19:22
1001,3008,422,2021-09-15 15:18:37
1001,3009,566,2021-09-15 15:22:43
1001,3001,76,2021-09-15 15:21:28
1001,3001,88,2021-09-15 15:26:28
1001,3007,168,2021-09-15 15:32:29
1001,3002,157,2021-09-15 15:28:56
1001,3009,567,2021-09-15 15:27:32
1001,3004,145,2021-09-15 15:30:26
1001,3003,1656,2021-09-15 15:31:19
1001,3005,543,2021-09-15 15:36:49
1001,3001,864,2021-09-15 15:38:26
1001,3001,548,2021-09-15 15:45:10
1001,3007,359,2021-09-15 15:52:39
1001,3008,394,2021-09-15 15:59:48
com.flink.project.flink.DayGiftAna,如下:
import com.roy.flink.project.fansgift.FansGiftResult;
import com.roy.flink.project.fansgift.GiftRecord;
import org.apache.commons.lang.SystemUtils;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichAggregateFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.util.Collector;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;/*** @desc 貢獻(xiàn)日榜計算程序*/
public class DayGiftAna {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.getConfig().setAutoWatermarkInterval(1000L); //BoundedOutOfOrdernessWatermarks定時提交Watermark的間隔
// env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop01:8020/dayGiftAna"));// Checkpoint存儲到文件if(SystemUtils.IS_OS_WINDOWS){env.setStateBackend(new FsStateBackend("file:///D:/flink_file"));}else{// linuxenv.setStateBackend(new FsStateBackend("file:///home/file_file"));}//使用Socket測試。env.setParallelism(1);final DataStreamSource<String> dataStream = env.socketTextStream("10.86.97.206", 7777);final SingleOutputStreamOperator<FansGiftResult> fansGiftResult = dataStream.map((MapFunction<String, GiftRecord>) value -> {final String[] valueSplit = value.split(",");//SimpleDateFormat 多線程不安全。SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");final long giftTime = sdf.parse(valueSplit[3]).getTime();return new GiftRecord(valueSplit[0], valueSplit[1], Integer.parseInt(valueSplit[2]), giftTime);}).assignTimestampsAndWatermarks(WatermarkStrategy.<GiftRecord>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner((SerializableTimestampAssigner<GiftRecord>) (element, recordTimestamp) -> element.getGiftTime()))
// .keyBy((KeySelector<GiftRecord, String>) value -> value.getHostId() + "_" + value.getFansId()) //按照HostId_FansId分組.keyBy((KeySelector<GiftRecord, String>) value -> value.getHostId()) //按照HostId分組.window(TumblingEventTimeWindows.of(Time.seconds(10)))
// .allowedLateness(Time.seconds(2)).aggregate(new WinodwGiftRecordAgg(), new AllWindowGiftRecordAgg());//打印結(jié)果測試fansGiftResult.print("fansGiftResult");env.execute("DayGiftAna");}//在每個子任務(wù)中將窗口期內(nèi)的禮物進(jìn)行累計合并//增加狀態(tài)后端。private static class WinodwGiftRecordAgg implements AggregateFunction<GiftRecord, Long, Long> {@Overridepublic Long createAccumulator() {return 0L;}@Overridepublic Long add(GiftRecord value, Long accumulator) {Long res = accumulator + value.getGiftCount();return res;}@Overridepublic Long getResult(Long accumulator) {return accumulator;}@Overridepublic Long merge(Long a, Long b) {return a + b;}}//對窗口期內(nèi)的所有子任務(wù)進(jìn)行窗口聚合操作。private static class AllWindowGiftRecordAgg extends RichWindowFunction<Long, FansGiftResult, String, TimeWindow> {ValueState<FansGiftResult> state;@Overridepublic void apply(String s, TimeWindow window, java.lang.Iterable<Long> input, Collector<FansGiftResult> out) throws Exception {final String[] splitKey = s.split("_");String hostId = splitKey[0];String fansId ="";if(splitKey.length>1){fansId=splitKey[1];}final Long giftCount = input.iterator().next();final long windowEnd = window.getEnd();final FansGiftResult fansGiftResult = new FansGiftResult(hostId, fansId, giftCount, windowEnd);out.collect(fansGiftResult);state.update(fansGiftResult);}@Overridepublic void open(Configuration parameters) throws Exception {final ValueStateDescriptor<FansGiftResult> stateDescriptor = new ValueStateDescriptor<>("WinodwGiftRecordAgg", TypeInformation.of(new TypeHint<FansGiftResult>() {}));state = this.getRuntimeContext().getState(stateDescriptor);}}
}
FansGiftResult,代碼如下:
public class FansGiftResult {private String hostId;private String fansId;private long giftCount;private long windowEnd;public FansGiftResult() {}public FansGiftResult(String hostId, String fansId, long giftCount, long windowEnd) {this.hostId = hostId;this.fansId = fansId;this.giftCount = giftCount;this.windowEnd = windowEnd;}@Overridepublic String toString() {if(fansId!=null && fansId.length()>0){return "FansGiftResult{" +"hostId='" + hostId + '\'' +", fansId='" + fansId + '\'' +", giftCount=" + giftCount +", windowEnd=" + windowEnd +'}';}else{return "FansGiftResult{" +"hostId='" + hostId + '\'' +", giftCount=" + giftCount +", windowEnd=" + windowEnd +'}';}}public String getHostId() {return hostId;}public void setHostId(String hostId) {this.hostId = hostId;}public String getFansId() {return fansId;}public void setFansId(String fansId) {this.fansId = fansId;}public long getGiftCount() {return giftCount;}public void setGiftCount(long giftCount) {this.giftCount = giftCount;}public long getWindowEnd() {return windowEnd;}public void setWindowEnd(long windowEnd) {this.windowEnd = windowEnd;}
}
GiftRecord,代碼如下:
public class GiftRecord {private String hostId; //主播IDprivate String fansId; //粉絲IDprivate int giftCount; //禮物數(shù)量private long giftTime; //送禮物時間。原始時間格式 yyyy-MM-DD HH:mm:ss,ssspublic GiftRecord() {}public GiftRecord(String hostId, String fansId, int giftCount, long giftTime) {this.hostId = hostId;this.fansId = fansId;this.giftCount = giftCount;this.giftTime = giftTime;}public String getHostId() {return hostId;}public void setHostId(String hostId) {this.hostId = hostId;}public String getFansId() {return fansId;}public void setFansId(String fansId) {this.fansId = fansId;}public int getGiftCount() {return giftCount;}public void setGiftCount(int giftCount) {this.giftCount = giftCount;}public long getGiftTime() {return giftTime;}public void setGiftTime(long giftTime) {this.giftTime = giftTime;}@Overridepublic String toString() {return "GiftRecord{" +"hostId='" + hostId + '\'' +", fansId='" + fansId + '\'' +", giftCount=" + giftCount +", giftTime='" + giftTime + '\'' +'}';}
}
ES查詢語句:
GET daygiftanalyze/_search
{"query": {"bool": {"must": [{"range": {"windowEnd": {"gte": 1631635200000,"lte": 1631721600000}}},{"match": {"hostId": "1001"}}]}},"aggs": {"groupByFans": {"terms": {"field": "fansId","size": 3,"order": {"giftCount": "desc"}},"aggs": {"giftCount": {"sum": {"field": "giftCount"}}}}}
}
ES中的查詢結(jié)果:
直播應(yīng)用就可以根據(jù)這個查詢結(jié)果組織客戶端查詢代碼,最終實現(xiàn)日榜排名的功能。
4、實現(xiàn)效果分析
具體的計算方案參見示例代碼,這里就不多做分析了。這里只分析一下在實現(xiàn)過程中需要注意的幾個重要的問題:
-
時間語義分析
對于網(wǎng)絡(luò)直播這樣的場景,從下午六點到第二天早上六點才是一天的高峰期,所以,在進(jìn)行統(tǒng)計時,將每一天的統(tǒng)計時間定義為從早上六點到第二天早上六點,這樣就能盡量保持高峰期的完整性。很多跟娛樂相關(guān)的場景,比如網(wǎng)絡(luò)游戲,也大都是以這樣的范圍來定義一天,而不是傳統(tǒng)意義上的從0點到24點。 -
并行度優(yōu)化
可以直接使用Flink的開窗機(jī)制,待一周的數(shù)據(jù)收集完整了之后,一次性向ES中輸出統(tǒng)計結(jié)果,這種場景下要注意累計器的持久化,以及計算程序出錯后的重啟恢復(fù)機(jī)制。 -
后續(xù)改進(jìn)方式
狀態(tài)后端、而對于人氣值日榜的計算,就不能等一天的數(shù)據(jù)收集齊了再計算了。這時是有兩種解決方案,一種是完全的流處理方式。也就是每來一條數(shù)據(jù)就往ES中更新結(jié)果。另一中方式是采用小批量的流處理方式。以五分鐘為單位,將數(shù)據(jù)拆分成一個一個小窗
口來進(jìn)行處理。顯然后一種方式對數(shù)據(jù)處理的壓力會比較小一點。雖然數(shù)據(jù)量會更
多,但是ES的存儲以及快速查詢能力可以比較好的彌補(bǔ)數(shù)據(jù)量的問題。也因此,在
設(shè)計ES數(shù)據(jù)機(jī)構(gòu)時,將人氣值日榜的文檔結(jié)構(gòu)設(shè)計成了一個一個的小范圍。