如何編輯 wordpress 主題seo短視頻保密路線
文章目錄
- 本系列
- 漏桶限流算法
- uber的漏桶算法
- 使用
- mutex版本
- 數(shù)據(jù)結(jié)構(gòu)
- 獲取令牌
- 松弛量
- atomic版本
- 數(shù)據(jù)結(jié)構(gòu)
- 獲取令牌
- 測試漏桶的松弛量
- 總結(jié)
本系列
- 開源限流組件分析(一):juju/ratelimit
- 開源限流組件分析(二):uber-go/ratelimit(本文)
- 開源限流組件分析(三):golang-time/rate
漏桶限流算法
漏桶限流算法思路很簡單,水(數(shù)據(jù)或者請求)先進入到漏桶里,漏桶以一定的速度出水,當(dāng)水流入速度過大會直接溢出,漏桶算法能強行限制請求的處理速度
相比于令牌桶中只要桶內(nèi)還有剩余令牌,調(diào)用方就可以馬上消費令牌的策略。漏桶相對來說更加嚴(yán)格,調(diào)用方只能按照預(yù)定的間隔順序消費令牌
例如:假設(shè)漏桶出水的間隔是10ms,上次在0ms時刻出水,那么下次是10ms時刻,再下次是20ms時刻
uber的漏桶算法
uber對標(biāo)準(zhǔn)的漏桶算法做了一些優(yōu)化,加了一些松弛量,這么做了后就能應(yīng)對突發(fā)流量,達(dá)到和令牌桶一樣的效果
關(guān)于什么是松弛量,在介紹獲取令牌流程時再詳細(xì)分析
閱讀的源碼:https://github.com/uber-go/ratelimit,版本:v0.3.1
使用
下面展示一個沒有使用松弛量的漏桶使用示例:
func Example_default() {// 參數(shù)100:代表每秒放行100個請求,即每10ms放行一個請求rl := ratelimit.New(100)prev := time.Now()for i := 0; i < 10; i++ {// Take獲取令牌,返回獲取令牌成功的時間now := rl.Take()if i > 0 {// 打印每次放行間隔fmt.Println(i, now.Sub(prev))}prev = now}
}
打印結(jié)果:
1 10ms
2 10ms
3 10ms
4 10ms
5 10ms
6 10ms
7 10ms
8 10ms
9 10ms
可以看出每次放行間隔10ms,符合預(yù)期
uber的漏桶提供了mutex版本
和atomic版本
兩種實現(xiàn),主要區(qū)別在于怎么控制并發(fā),以及怎么記錄松弛量
mutex版本
數(shù)據(jù)結(jié)構(gòu)
type mutexLimiter struct {sync.Mutex// 上次請求放行的時刻last time.Time// 桶中積累的松弛量sleepFor time.Duration// 每個請求之間的間隔perRequest time.Duration// 最大松弛量maxSlack time.Durationclock Clock
}
初始化桶:
func newMutexBased(rate int, opts ...Option) *mutexLimiter {config := buildConfig(opts)perRequest := config.per / time.Duration(rate)l := &mutexLimiter{perRequest: perRequest,// config.slack默認(rèn)值=10, 例如當(dāng)perRequest=10ms時// maxSlack就是 -10 * 100ms = -1smaxSlack: -1 * time.Duration(config.slack) * perRequest,clock: config.clock,}return l
}func buildConfig(opts []Option) config {c := config{clock: clock.New(),// 最大松弛量默認(rèn)是perRequest的10倍slack: 10,per: time.Second,}for _, opt := range opts {opt.apply(&c)}return c
}
主要設(shè)置兩個變量:
-
每次放行時間間隔
perRequest
:根據(jù)參數(shù)rate計算,rate表示每單位時間per能放行多少請求- 例如當(dāng)per=1s,rate=100時,每次放行時間間隔perRequest=10ms
-
最大松弛量
maxSlack
:config.slack默認(rèn)值=10- 例如當(dāng)perRequest=10ms時,maxSlack就是
-10 * 100ms = -1s
- 例如當(dāng)perRequest=10ms時,maxSlack就是
獲取令牌
要實現(xiàn)標(biāo)準(zhǔn)的漏桶算法,其實比較簡單:
記錄上次放行時間last
,當(dāng)本次請求到來時, 如果此刻的時間與last
相比并沒有達(dá)到 perRequest 規(guī)定的間隔大小, sleep 一段時間即可
對應(yīng)代碼為(刪除了松弛相關(guān)代碼):
func (t *mutexLimiter) Take() time.Time {t.Lock()defer t.Unlock()now := t.clock.Now()// 如果第一次請求,直接放行if t.last.IsZero() {t.last = nowreturn t.last}// 當(dāng)前時間距離上次放行時間的間隔,sleepFor := t.perRequest - now.Sub(t.last)// 如果比規(guī)定的間隔小,需要sleepif sleepFor > 0 {t.clock.Sleep(t.sleepFor)t.last = now.Add(t.sleepFor)t.sleepFor = 0return t.last
}
松弛量
如果不引入松弛量,按照標(biāo)準(zhǔn)漏桶算法的流程:
假設(shè)現(xiàn)在有三個請求,req1
、req2
和 req3
,獲取令牌間隔perRequest規(guī)定為 10ms
-
req1
先到來 -
當(dāng)
req1
完成之后20ms
,req2
才到來,此時距離上次放行有20ms
,大于10ms,可以對req2
放行 -
當(dāng)
req2
放行后5ms
,req3
到來,此時距離上次放行才5ms
,不足 規(guī)定的間隔10ms
,因此還需要等待5ms
才能繼續(xù)放行req3
這種策略有什么問題?無法應(yīng)對任何突發(fā)流量,沒有彈性,定死了相鄰兩次請求放行的間隔必須大于等于規(guī)定的值
于是引入了松弛量的概念:當(dāng)較長時間(超過perRequest)沒有請求到來時,會在桶中積累一些松弛量,這樣接下來的請求可以先消耗松弛量,而不會被漏洞的獲取令牌間隔限制。直到把松弛量耗盡為止
當(dāng)然松弛量也不是無限積累,這樣當(dāng)很長時間沒有請求后,就跟沒有限流沒區(qū)別了。因此桶中規(guī)定了最多積累多少松弛量maxSlack
加上松弛量后代獲取令牌代碼如下:
-
計算
perRequest - now.Sub(t.last)
- 如果大于0,說明當(dāng)前時間距離上次放行時間不足規(guī)定間隔,需要等待,或者需要消耗松弛量
- 如果小于0,說明當(dāng)前時間距離上次放行時間超過了規(guī)定間隔,那么當(dāng)前請求一定可以放行,并且可以在桶中積累一些松弛量,給下次請求使用
- 將結(jié)果累加到t.sleepFor
-
假設(shè)累加的松弛值超過了maxSlack,修正為maxSlack,默認(rèn)為10倍的規(guī)定間隔
-
如果
t.sleepFor > 0
,說明本次請求應(yīng)該等待的時間,在使用完桶中的松弛量后還不夠,需要sleep這段時間,結(jié)束后將sleepFor置為0 -
否則
t.sleepFor <= 0
,說明桶中的存儲松弛余量可以滿足本次請求,本次請求放行
func (t *mutexLimiter) Take() time.Time {t.Lock()defer t.Unlock()now := t.clock.Now()// 如果第一次請求,直接放行if t.last.IsZero() {t.last = nowreturn t.last}// 假設(shè)perRequest=10ms,now.Sub(t.last)=5ms,那么需要睡5ms// 假設(shè)perRequest=10ms,now.Sub(t.last)=15ms,說明距離上傳請求的時間超過了漏桶的限流間隔10ms,那么sleepFor變?yōu)?5t.sleepFor += t.perRequest - now.Sub(t.last)// 默認(rèn)最多松弛 10倍的perRequest// 如果累加的松弛值超過了maxSlack,修正為maxSlackif t.sleepFor < t.maxSlack {t.sleepFor = t.maxSlack}// 需要sleepif t.sleepFor > 0 {t.clock.Sleep(t.sleepFor)t.last = now.Add(t.sleepFor)t.sleepFor = 0// 如果sleepFor <= 0,說明還有存儲的松弛余量,可以放行} else {t.last = now}return t.last
}
atomic版本
數(shù)據(jù)結(jié)構(gòu)
type atomicInt64Limiter struct {// 上次在哪個時刻發(fā)放許可state int64// 每次請求間隔perRequest time.Duration// 最大松弛量maxSlack time.Durationclock Clock
}func newAtomicInt64Based(rate int, opts ...Option) *atomicInt64Limiter {config := buildConfig(opts)perRequest := config.per / time.Duration(rate)l := &atomicInt64Limiter{perRequest: perRequest,// 最大松弛量:10倍的perRequestmaxSlack: time.Duration(config.slack) * perRequest,clock: config.clock,}atomic.StoreInt64(&l.state, 0)return l
}
獲取令牌
atomic版本的漏桶不是用一個變量存儲桶中的松弛量,而是記錄了上次在哪個時刻發(fā)放許可timeOfNextPermissionIssue
,那松弛量用什么表示呢?當(dāng)前時間now減去上次發(fā)放許可時間timeOfNextPermissionIssue就代表松弛量
怎么理解?這個值代表應(yīng)該在什么時刻發(fā)放令牌:
- 如果該值 比now大,說明需要sleep一段時間等待時間流逝,本次才能獲取到令牌
- 如果該值 比now小,說明應(yīng)該在更早的時間就可以獲取令牌,本次請求不需要等待
每次請求時會在timeOfNextPermissionIssue的基礎(chǔ)上加上perRequest,并更新timeOfNextPermissionIssue,表示本次發(fā)放許可的時間比上次前進了perRequest時間
如果較長時間沒有請求,那天然就積累了一些松弛量,因為上次發(fā)放許可時間就會比較小,即便加上本次的消耗perRequest也沒有now大,就可以直接放行
-
如果上次發(fā)放許可時間,加上固定間隔perRequest小于now,說明本次可以放行,并更新上次發(fā)放許可時間 += perRequest
- 當(dāng)然timeOfNextPermissionIssue不能比now小太多,否則限流就沒意義了。因此規(guī)定該值最多比now小10倍的perRequest,表示松弛量最多為perRequest的10倍
-
否則不能放行,需要等待時間流逝到timeOfNextPermissionIssue + perRequest為止
func (t *atomicInt64Limiter) Take() time.Time {var (newTimeOfNextPermissionIssue int64now int64)for {now = t.clock.Now().UnixNano()// 上次在哪個時刻發(fā)放許可timeOfNextPermissionIssue := atomic.LoadInt64(&t.state)switch {// 第一次請求,或者不是第一次請求,且(沒有松弛量,且now 比 上次發(fā)放許可時間 多了超過perRequest )case timeOfNextPermissionIssue == 0 || (t.maxSlack == 0 && now-timeOfNextPermissionIssue > int64(t.perRequest)):// 本次放行,本次在now時刻發(fā)放許可newTimeOfNextPermissionIssue = now// 可以有松弛量,且 當(dāng)前時間 - 上次發(fā)放許可時間 > 11倍perRequestcase t.maxSlack > 0 && now-timeOfNextPermissionIssue > int64(t.maxSlack)+int64(t.perRequest):// 本次在 now - 最大松弛量 時刻發(fā)放許可newTimeOfNextPermissionIssue = now - int64(t.maxSlack)/**1.沒有松弛量,且當(dāng)前時間距離 上次發(fā)放許可時間 超過了 不到perRequest,本次應(yīng)該在上次發(fā)放許可時間 + perRequest 再發(fā)放2.有松弛量,且 當(dāng)前時間 - 上次發(fā)放許可時間 <= 11倍perRequest,那本次應(yīng)該在上次發(fā)放許可時間 + perRequest 再發(fā)放*/default:// 更新上次發(fā)放許可時間,+= perRequestnewTimeOfNextPermissionIssue = timeOfNextPermissionIssue + int64(t.perRequest)}if atomic.CompareAndSwapInt64(&t.state, timeOfNextPermissionIssue, newTimeOfNextPermissionIssue) {break}}sleepDuration := time.Duration(newTimeOfNextPermissionIssue - now)// 沒有余量了,需要sleepif sleepDuration > 0 {t.clock.Sleep(sleepDuration)// 返回放行時的時間return time.Unix(0, newTimeOfNextPermissionIssue)}return time.Unix(0, now)
}
測試漏桶的松弛量
下面對漏桶的松弛量進行測試:如果先積累一些松弛量,就會起到因?qū)ν话l(fā)流量的效果,例如讓時間先走45ms
func Example_default() {// 每10ms放行一個請求, 最大松弛量=100msrl := ratelimit.New(100)// 在0ms發(fā)放許可rl.Take()// 此時時間來到45mstime.Sleep(time.Millisecond * 45)prev := time.Now()for i := 0; i < 10; i++ {now := rl.Take()if i > 0 {fmt.Println(i, now.Sub(prev))}prev = now}
}
打印結(jié)果:
1 1μs
2 103μs
3 4μs
4 4.79ms
5 10ms
6 10ms
7 10ms
8 10ms
9 10ms
可以看出:前4次都沒等,直接獲取到令牌,第4次等了大約5ms,之后每次放行間隔都是10ms
執(zhí)行到for循環(huán)時,上次發(fā)放許可時間10ms,當(dāng)前時刻45ms
拆解for循環(huán)中的每次take中的漏桶狀態(tài):
- i=0, 下次應(yīng)該在
10ms
發(fā)放許可,當(dāng)前時刻45ms,放行 - i=1, 下次應(yīng)該在
20ms
發(fā)放許可,當(dāng)前時刻45ms,放行 - i=2, 下次應(yīng)該在
30ms
發(fā)放許可,當(dāng)前時刻45ms,放行 - i=3, 下次應(yīng)該在
40ms
發(fā)放許可,當(dāng)前時刻45ms,放行 - i=4, 下次應(yīng)該在
50ms
發(fā)放許可,當(dāng)前時刻45ms,更新timeOfNextPermissionIssue=50,需要sleep5ms
- i=5, 下次應(yīng)該在
60ms
發(fā)放許可,當(dāng)前50ms,需要sleep10ms
,更新timeOfNextPermissionIssue=60
如下圖所示:
總結(jié)
加上松弛量后,這個漏桶和標(biāo)準(zhǔn)的令牌桶區(qū)別就不大了:
- 都能應(yīng)對一定的突發(fā)流量
- 當(dāng)桶中沒有松弛量(沒有令牌時),都按照固定時間間隔放行請求
這個庫有個問題是:對桶中等待中的請求數(shù)沒有限制,這樣當(dāng)并發(fā)量非常大時,會導(dǎo)致請求一直在桶中堆積
因此工程中要使用的話,最好對源碼進行改造:當(dāng)?shù)却械恼埱髷?shù)超過閾值就快速返回失敗