江蘇專業(yè)網(wǎng)站制作公司搜索引擎關(guān)鍵詞競價排名
一、海量數(shù)據(jù)實(shí)時去重說明
借助redis的Set,需要頻繁連接Redis,如果數(shù)據(jù)量過大, 對redis的內(nèi)存也是一種壓力;使用Flink的MapState,如果數(shù)據(jù)量過大, 狀態(tài)后端最好選擇 RocksDBStateBackend; 使用布隆過濾器,布隆過濾器可以大大減少存儲的數(shù)據(jù)的數(shù)據(jù)量。
二、海里書實(shí)時去重為什么需要布隆過濾器
如果想判斷一個元素是不是在一個集合里,一般想到的是將集合中所有元素保存起來,然后通過比較確定。鏈表、樹、散列表(又叫哈希表,Hash table)等等數(shù)據(jù)結(jié)構(gòu)都是這種思路。
但是隨著集合中元素的增加,我們需要的存儲空間越來越大。同時檢索速度也越來越慢,上述三種結(jié)構(gòu)的檢索時間復(fù)雜度分別為。
布隆過濾器即可以解決存儲空間的問題, 又可以解決時間復(fù)雜度的問題.
布隆過濾器的原理是,當(dāng)一個元素被加入集合時,通過K個散列函數(shù)將這個元素映射成一個位數(shù)組中的K個點(diǎn),把它們置為1。檢索時,我們只要看看這些點(diǎn)是不是都是1就(大約)知道集合中有沒有它了:如果這些點(diǎn)有任何一個0,則被檢元素一定不在;如果都是1,則被檢元素很可能在。這就是布隆過濾器的基本思想。
三、布隆過濾基本概念
布隆過濾器(Bloom Filter,下文簡稱BF)由Burton Howard Bloom在1970年提出,是一種空間效率高的概率型數(shù)據(jù)結(jié)構(gòu)。它專門用來檢測集合中是否存在特定的元素。
它實(shí)際上是一個很長的二進(jìn)制向量和一系列隨機(jī)映射函數(shù)。
實(shí)現(xiàn)原理
布隆過濾器的原理是,當(dāng)一個元素被加入集合時,通過K個散列函數(shù)將這個元素映射成一個位數(shù)組中的K個點(diǎn),把它們置為1。檢索時,我們只要看看這些點(diǎn)是不是都是1就(大約)知道集合中有沒有它了:如果這些點(diǎn)有任何一個0,則被檢元素一定不在;如果都是1,則被檢元素很可能在。這就是布隆過濾器的基本思想。
BF是由一個長度為m比特的位數(shù)組(bit array)與k個哈希函數(shù)(hash function)組成的數(shù)據(jù)結(jié)構(gòu)。位數(shù)組均初始化為0,所有哈希函數(shù)都可以分別把輸入數(shù)據(jù)盡量均勻地散列。
當(dāng)要插入一個元素時,將其數(shù)據(jù)分別輸入k個哈希函數(shù),產(chǎn)生k個哈希值。以哈希值作為位數(shù)組中的下標(biāo),將所有k個對應(yīng)的比特置為1。
當(dāng)要查詢(即判斷是否存在)一個元素時,同樣將其數(shù)據(jù)輸入哈希函數(shù),然后檢查對應(yīng)的k個比特。如果有任意一個比特為0,表明該元素一定不在集合中。如果所有比特均為1,表明該集合有(較大的)可能性在集合中。為什么不是一定在集合中呢?因?yàn)橐粋€比特被置為1有可能會受到其他元素的影響(hash碰撞),這就是所謂“假陽性”(false positive)。相對地,“假陰性”(false negative)在BF中是絕不會出現(xiàn)的。
下圖示出一個m=18, k=3的BF示例。集合中的x、y、z三個元素通過3個不同的哈希函數(shù)散列到位數(shù)組中。當(dāng)查詢元素w時,因?yàn)橛幸粋€比特為0,因此w不在該集合中。
優(yōu)點(diǎn)
1.不需要存儲數(shù)據(jù)本身,只用比特表示,因此空間占用相對于傳統(tǒng)方式有巨大的優(yōu)勢,并且能夠保密數(shù)據(jù);
2.時間效率也較高,插入和查詢的時間復(fù)雜度均為, 所以他的時間復(fù)雜度實(shí)際是
3.哈希函數(shù)之間相互獨(dú)立,可以在硬件指令層面并行計(jì)算。
缺點(diǎn)
1.存在假陽性的概率,不適用于任何要求100%準(zhǔn)確率的情境;
2.只能插入和查詢元素,不能刪除元素,這與產(chǎn)生假陽性的原因是相同的。我們可以簡單地想到通過計(jì)數(shù)(即將一個比特?cái)U(kuò)展為計(jì)數(shù)值)來記錄元素?cái)?shù),但仍然無法保證刪除的元素一定在集合中。
使用場景
所以,BF在對查準(zhǔn)度要求沒有那么苛刻,而對時間、空間效率要求較高的場合非常合適.
另外,由于它不存在假陰性問題,所以用作“不存在”邏輯的處理時有奇效,比如可以用來作為緩存系統(tǒng)(如Redis)的緩沖,防止緩存穿透。
假陽性概率的計(jì)算
假陽性的概率其實(shí)就是一個不在的元素,被k個函數(shù)函數(shù)散列到的k個位置全部都是1的概率??梢园凑杖缦碌牟襟E進(jìn)行計(jì)算: p = f(m,n,k)
其中各個字母的含義:
1.n :放入BF中的元素的總個數(shù);
2.m:BF的總長度,也就是bit數(shù)組的個數(shù)
3.k:哈希函數(shù)的個數(shù);
4.p:表示BF將一個不在其中的元素錯判為在其中的概率,也就是false positive的概率;
A.BF中的任何一個bit在第一個元素的第一個hash函數(shù)執(zhí)行完之后為 0的概率是:
B.BF中的任何一個bit在第一個元素的k個hash函數(shù)執(zhí)行完之后為 0的概率是:
C.BF中的任何一個bit在所有的n元素都添加完之后為 0的概率是:
D.BF中的任何一個bit在所有的n元素都添加完之后為 1的概率是:
E.一個不存在的元素被k個hash函數(shù)映射后k個bit都是1的概率是:
結(jié)論:在哈數(shù)函數(shù)個數(shù)k一定的情況下
1.比特?cái)?shù)組m長度越大, p越小, 表示假陽性率越低
2.已插入的元素個數(shù)n越大, p越大, 表示假陽性率越大
經(jīng)過各種數(shù)學(xué)推導(dǎo):
對于給定的m和n,使得假陽性率(誤判率)最小的k通過如下公式定義:
四、使用布隆過濾器實(shí)現(xiàn)去重
Flink已經(jīng)內(nèi)置了布隆過濾器的實(shí)現(xiàn)(使用的是google的Guava)
package com.lyh.flink12;import com.atguigu.flink.java.chapter_6.UserBehavior;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter;
import org.apache.flink.shaded.guava18.com.google.common.hash.Funnels;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;public class Flink02_UV_BoomFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 創(chuàng)建WatermarkStrategyWatermarkStrategy<UserBehavior> wms = WatermarkStrategy.<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {@Overridepublic long extractTimestamp(UserBehavior element, long recordTimestamp) {return element.getTimestamp() * 1000L;}});env.readTextFile("input/UserBehavior.csv").map(line -> { // 對數(shù)據(jù)切割, 然后封裝到POJO中String[] split = line.split(",");return new UserBehavior(Long.valueOf(split[0]), Long.valueOf(split[1]), Integer.valueOf(split[2]), split[3], Long.valueOf(split[4]));}).filter(behavior -> "pv".equals(behavior.getBehavior())) //過濾出pv行為.assignTimestampsAndWatermarks(wms).keyBy(UserBehavior::getBehavior).window(TumblingEventTimeWindows.of(Time.minutes(60))).process(new ProcessWindowFunction<UserBehavior, String, String, TimeWindow>() {private ValueState<Long> countState;private ValueState<BloomFilter<Long>> bfState;@Overridepublic void open(Configuration parameters) throws Exception {countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("countState", Long.class));bfState = getRuntimeContext().getState(new ValueStateDescriptor<BloomFilter<Long>>("bfState", TypeInformation.of(new TypeHint<BloomFilter<Long>>() {})));}@Overridepublic void process(String key,Context context,Iterable<UserBehavior> elements, Collector<String> out) throws Exception {countState.update(0L);// 在狀態(tài)中初始化一個布隆過濾器// 參數(shù)1: 漏斗, 存儲的類型// 參數(shù)2: 期望插入的元素總個數(shù)// 參數(shù)3: 期望的誤判率(假陽性率)BloomFilter<Long> bf = BloomFilter.create(Funnels.longFunnel(), 1000000, 0.001);bfState.update(bf);for (UserBehavior behavior : elements) {// 查布隆if (!bfState.value().mightContain(behavior.getUserId())) {// 不存在 計(jì)數(shù)+1countState.update(countState.value() + 1L);// 記錄這個用戶di, 表示來過bfState.value().put(behavior.getUserId());}}out.collect("窗口: " + context.window() + " 的uv是: " + countState.value());}}).print();env.execute();}
}