中文亚洲精品无码_熟女乱子伦免费_人人超碰人人爱国产_亚洲熟妇女综合网

當前位置: 首頁 > news >正文

手機網站注冊頁面百度一下你就知道移動首頁

手機網站注冊頁面,百度一下你就知道移動首頁,構建一個網站的步驟,做視頻網站要什么格式一、概述 消息隊列 定義 消息隊列模型:一種分布式系統(tǒng)中的消息傳遞方案,由消息隊列、生產者和消費者組成消息隊列:負責存儲和管理消息的中間件,也稱為消息代理(Message Broker)生產者:負責 產…

一、概述

消息隊列

  1. 定義
    1. 消息隊列模型:一種分布式系統(tǒng)中的消息傳遞方案,由消息隊列、生產者和消費者組成
    2. 消息隊列:負責存儲和管理消息的中間件,也稱為消息代理(Message Broker)
    3. 生產者:負責 產生并發(fā)送 消息到隊列的應用程序
    4. 消費者:負責從隊列 獲取并處理 消息的應用程序
  2. 功能:實現(xiàn)消息發(fā)送和處理的解耦,支持異步通信,提高系統(tǒng)的可擴展性和可靠性
  3. 主流消息隊列解決方案
    1. RabbitMQ:輕量級,支持多種協(xié)議,適合中小規(guī)模應用
    2. RocketMQ:阿里開源,高性能,適合大規(guī)模分布式應用

Stream

  1. 定義:Stream:Redis 5.0 引入的一種數據類型,用于處理高吞吐量的消息流、事件流等場景
  2. 功能:按時間順序 ”添加、讀取、消費“ 消息,支持消費者組、消息確認等功能

二、Stream 工作流程

  1. 寫入消息
    1. 生產者通過?XADD?向 Stream 中添加消息。每條消息自動獲得唯一的 ID,按時間順序存入 Stream。
  2. 創(chuàng)建消費者組
    1. 如果使用消費者組,首先需要通過?XGROUP CREATE?創(chuàng)建消費者組。
    2. 消費者組會根據時間順序將消息分配給組內的消費者。
  3. 讀取消息
    1. 消費者使用 XREADGROUP 命令讀取 Stream 中的消息。
    2. 消息按規(guī)則分配給不同消費者處理,每個消費者讀取到不同的消息。
  4. 確認消息
    1. 消費者在處理完消息后,使用?XACK?命令確認消息,表示該消息已成功處理。
    2. 如果消息未確認(例如消費者崩潰或超時),它將保持在?Pending?狀態(tài),等待重新分配給其他消費者。
  5. 重新分配未確認消息
    1. 如果消息在一定時間內沒有被確認,其他消費者可以讀取未確認的消息并進行處理。
    2. 可通過 XPENDING 命令查看未確認消息,或在消費者組中設置時間閾值自動重新分配。
  6. 刪除消費者組
    1. 不再需要消費者組時,使用 XGROUP DESTROY 命令刪除消費者組

三、Stream 實現(xiàn)

消費者組模式

  1. 定義:Redis Streams 的一部分,用于處理消息的分布式消費
  2. 優(yōu)點
    1. 消息分流:多消費者爭搶消息,加快消費速度,避免消息堆積
    2. 消息標示:避免消息漏讀,消費者讀取消息后不馬上銷毀,加入 consumerGroup 維護的 pending list 隊列等待 ACK
    3. 消息確認:通過消息 ACK 機制,保證消息至少被消費一次
    4. 可以阻塞讀取,避免盲等
  3. 實現(xiàn)方法 :通過 Stream 數據類型實現(xiàn)消息隊列,命令以 “X” 開頭

常用命令

XGROUP CREATE key groupName ID [MKSTREAM]

  1. 功能:創(chuàng)建消費者組
  2. 參數
    1. key:隊列名稱
    2. groupName:組名稱
    3. ID:起始 ID 標識,$ 表示隊列中最后一個消息,0 表示隊列中第一個消息
    4. MKSTREAM:隊列不存在則創(chuàng)建隊列

XGROUP DESTORY key groupName

  1. 功能:刪除指定消費者組

XGROUP CREATECONSUMER key groupName consumerName

  1. 功能:添加組中消費者

XGROUP DELCONSUMER key groupName consumerName

  1. 功能:刪除組中消費者

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]

  1. 功能:讀取組中的消息
  2. gourp:消費者組名稱
  3. consumer:消費者名稱(不存在則自動創(chuàng)建)
  4. count:本次查詢的最大數量
  5. BLOCK milliseconds:當沒有消息時最長等待時間
  6. NOACK:無需手動 ACK,獲取到消息后自動確認
  7. STREAMS KEY:指定隊列名稱
  8. ID:獲取消息的起始 ID,> 表示從下一個未消費消息開始 (常用)

XPENDING key group [ [ IDLE min-idle-time ] start end count [consumer] ]

  1. 功能:獲取 pending-list 中的消息
  2. IDLE:獲取消息后、確認消息前的這段時間,空閑時間超過 min-idle-time 則取出
  3. start:獲取的最小目標 ID
  4. end:獲取的最大目標 ID
  5. count:獲取的數量
  6. consumer:獲取 consumer 的 pending-list

XACK key group ID [ ID … ]

  1. 功能:確認從組中讀取的消息已被處理
  2. key:隊列名稱
  3. group:組名稱
  4. ID:消息的 ID

表格版命令

  1. 命令

    命令功能
    XGROUP CREATE key groupName ID [MKSTREAM]創(chuàng)建消費者組
    XGROUP DESTORY key groupName刪除指定消費者組
    XGROUP CREATECONSUMER key groupName consumerName添加組中消費者
    XGROUP DELCONSUMER key groupName consumerName刪除組中消費者
    XREADGROUP GROUP groupName consumerName [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]讀取組中的消息,ID 填寫 “>” 則讀取第一條未讀消息
    XACK key group ID [ ID … ]確認從組中讀取的消息已被處理
  2. 屬性

    屬性名定義
    key隊列名稱
    groupName消費者組名稱
    ID起始 ID 標示,$ 代表隊列中最后一個消息,0 代表第一個消息
    MKSTREAM隊列不存在時自動創(chuàng)建隊列
    BLOCK milliseconds沒有消息時的最大等待時長
    NOACK無需手動 ACK,獲取到消息后自動確認
    STREAMS key指定隊列名稱

運行邏輯

while(true) {// 嘗試監(jiān)聽隊列,使用阻塞模式,最長等待 2000 msObject msg = redis.call("XREADGROUP GROUP group1 consumer1 COUNT 1 BLOCK 2000 STREAMS s1 >");if(msg == null) {continue;}try {// 處理消息,完成后一定要 ACKhandleMessage(msg);} catch (Exception e) {while(true) {// 重新讀取阻塞隊列消息Object msg = redis.call("XREADGROUP GROUP group1 consumer1 COUNT 1 STREAM S1 0");if(msg == null)               // 如果阻塞隊中的消息已經全部處理則退出pending-listbreak;try {handleMessage(msg);    			// 重新處理 pending-list 中的消息} catch (Exception e){continue;                   // 如果還出錯, 則繼續(xù)重新讀取}}}
}

四、示例

  1. 目標:消息隊列實現(xiàn)數據庫異步修改數據庫,將下單 message 緩存在 redis 中,減小下單操作對數據庫的沖擊

  2. 項目結構

    1. RedisConfig?配置類:創(chuàng)建消費者組是一次性的操作,適合放在配置類中
    2. VoucherOrderHandler?內部類:消費者的邏輯和訂單業(yè)務相關,因此適合放在?VoucherOrderServiceImpl?中
    3. 多線程啟動邏輯:消費者線程的啟動與訂單業(yè)務密切相關,直接放在?VoucherOrderServiceImpl?類中更符合職責分離原則
    src/main/java
    ├── com/example
    │   ├── config
    │   │   └── RedisConfig.java                  // Redis 配置類,包含消費者組初始化
    │   ├── service
    │   │   ├── VoucherOrderService.java
    │   │   └── impl
    │   │       └── VoucherOrderServiceImpl.java  // 包含 VoucherOrderHandler 內部類
    │   ├── entity
    │   │   └── VoucherOrder.java                 // 優(yōu)惠券訂單實體
    │   ├── utils
    │   │   └── BeanUtil.java                     // 用于 Map 轉 Bean 的工具類
    │   └── controller
    │       └── VoucherOrderController.java       // 如果有 Controller
    
  3. 創(chuàng)建消費者組(config.RedisConfig)

    @Bean
    public void initStreamGroup() {// 檢查是否存在消費者組 g1try {stringRedisTemplate.opsForStream().createGroup("stream.orders", "g1");} catch (RedisSystemException e) {// 如果 group 已存在,拋出異常,可忽略log.warn("消費者組 g1 已存在");}
    }
    
  4. 創(chuàng)建消費者線程

    1. 位置:作為 VoucherOrderServiceImpl 內的預構造部分
    @PostConstruct
    public void startConsumers() {for (int i = 0; i < 5; i++) { // 5 個線程,模擬多個消費者new Thread(new VoucherOrderHandler()).start();}
    }
    
  5. 添加消息到消息隊列 (src/main/resources/lua/SECKILL_SCRIPT.lua)

    --1. 參數列表
    --1.1. 優(yōu)惠券id
    local voucherId = ARGV[1]
    --1.2. 用戶id
    local userId = ARGV[2]
    --1.3. 訂單id
    local orderId = ARGV[3]--2. 數據key
    local stockKey = 'seckill:stock:' .. voucherId          --2.1. 庫存key
    local orderKey = 'seckill:order' .. voucherId           --2.2. 訂單key--3. 腳本業(yè)務
    --3.1. 判斷庫存是否充足 get stockKey
    if( tonumber( redis.call('GET', stockKey) ) <= 0 ) thenreturn 1
    end
    --3.2. 判斷用戶是否重復下單 SISMEMBER orderKey userId
    if( redis.call( 'SISMEMBER', orderKey, userId ) == 1 ) thenreturn 2
    end
    --3.4 扣庫存 incrby stockKey -1
    redis.call( 'INCRBY', stockKey, -1 )
    --3.5 下單(保存用戶) sadd orderKey userId
    redis.call( 'SADD', orderKey, userId )
    -- 3.6. 發(fā)送消息到隊列中
    redis.call( 'XADD', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'id', orderId )
    
  6. 創(chuàng)建消費者類(ServiceImpl)

    1. 位置:作為 VoucherOrderServiceImpl 內的私有類
    // 在ServiceImpl中創(chuàng)建一個VoucherOrderHandler消費者類,專門用于處理消息隊列中的消息
    private class VoucherOrderHandler implements Runnable {@Overridepublic void run() {while (true) {try {// 1. 獲取消息隊列中的訂單信息List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),StreamOffset.create( "stream.order", ReadOffset.lastConsumed()));// 2. 沒有消息則重新監(jiān)聽if (list == null || list.isEmpty() ) continue;// 3. 獲取消息中的 voucherOrderMapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 4. 創(chuàng)建訂單createVoucherOrder(voucherOrder);// 5. 確認當前消息已消費 XACKstringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch ( Exception e) {log.error("處理訂單異常", e);// 6. 處理訂單失敗則消息會加入pending-list,繼續(xù)處理pending-listhandlePendingList();}}}// 處理pending-list中的消息private void handlePendingList() {while(true) {try {// 1. 消費pending-list中的消息List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(Consumer.from("g1", "c1"),                                   // 消費者此消息的消費者StreamReadOptions.empty().count(1),                          // StreamOffset.create("stream.order", ReadOffset.from("0"))     // 從pending-list的第一條消息開始讀);// 2. 退出條件, list 為空 -> pending-list 已全部處理if(list == null || list.isEmpty()) break;// 3. 獲取消息中的 voucherOrderMapRecord<String, Object, Object> record = list.get(0);Map<Object, Object> value = record.getValue();VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);// 4. 創(chuàng)建訂單createVoucherOrder(voucherOrder);// 5. 確認消息已消費(XACK)stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());} catch (Exception e) {log.error("處理pendding訂單異常", e);try{Thread.sleep(20);     // 如果發(fā)生異常則休眠一會再重新消費pending-list中的消息} catch (Exception e2) {e.printStackTrace(); }}}}
    }
    
  7. 創(chuàng)建消息方法

    1. 目標:用戶通過這個方法發(fā)送一條創(chuàng)建訂單的 Message 給 Redis Stream
    // 創(chuàng)建Lua腳本對象
    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;// Lua腳本初始化 (通過靜態(tài)代碼塊)
    static {SECKILL_SCRIPT = new DefaultRedisScript<>();SECKILL_SCRIPT.setLocation(new ClassPathResource("lua/SECKILL_SCRIPT.lua"));SECKILL_SCRIPT.setResultType(Long.class);
    }@Override
    public void createVoucherOrder(Long voucherId, Long userId) {// 生成訂單 ID(模擬)long orderId = System.currentTimeMillis();// 執(zhí)行 Lua 腳本Long result = stringRedisTemplate.execute(SECKILL_SCRIPT,Collections.emptyList(),                    // 使用空的 key 列表voucherId.toString(), userId.toString(), String.valueOf(orderId));// 根據 Lua 腳本返回結果處理if (result == 1) {throw new RuntimeException("庫存不足!");} else if (result == 2) {throw new RuntimeException("不能重復下單!");}// 如果腳本執(zhí)行成功,則訂單消息會進入 Redis Stream,消費者組會自動處理System.out.println("訂單創(chuàng)建成功!");
    }
    

(缺陷) 單消費者模式

  1. 常用命令
    1. XADD key [NOMKSTREAM] [MAXLEN | MINID [=|~] threshold [LIMIT count] * | ID field value [field value …]
    2. XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ ID … ]
  2. 缺陷:有消息漏讀風險

五、其他消息隊列方案

(缺陷) List 實現(xiàn)

  1. 優(yōu)點
    1. 不受 JVM 內存上限限制:因為利用 Redis 存儲
    2. 數據安全 :因為基于 List 結構本身是數據存儲,基于 Redis 持久化機制
    3. 消息有序性:通過 List 結構的 LPUSH & BRPOP 命令實現(xiàn)順序
  2. 缺點
    1. 消息丟失:BRPOP 的時候如果宕機則消息會丟失
    2. 只支持單消費者

(缺陷) PubSub 實現(xiàn)

  1. 定義
    1. Publish & Subscribe 模型,一種消息隊列模型
    2. 生產者向指定的 channel 來 public 消息
    3. 消費者從 subscribe 的 channel 中接收消息
  2. 功能:支持多消費者模式,多個消費者可以同時 subscribe 一個 channel
  3. 優(yōu)點:采用發(fā)布訂閱模型,支持多生產者、消費者
  4. 缺點
    1. 不支持數據持久化
    2. 無法避免消息丟失
    3. 消息堆積有上限,超出時數據丟失

三種消息隊列對比


http://m.risenshineclean.com/news/64579.html

相關文章:

  • 阿里云輕云服務器可以放多個網站啊怎么做做網站需要準備什么
  • dreamweaver最新版本引擎優(yōu)化是什么意思
  • 響應式建站工具58同城發(fā)布免費廣告
  • 公司名logo設計圖片seow是什么意思
  • 設計公司給公司做網站用了方正字體app開發(fā)教程
  • 網站做app的軟件叫什么seo快速優(yōu)化排名
  • 個人網站系統(tǒng)優(yōu)秀網站網頁設計圖片
  • 代辦公司營業(yè)執(zhí)照長沙seo推廣公司
  • 平度網站建設網絡優(yōu)化工程師簡歷
  • 網站建設入門教程視頻抖音代運營公司
  • 站酷網站的圖是用什么做的百度指數app下載
  • 做機械的網站有哪些軟文營銷實施背景
  • 海外營銷網站設計英文seo實戰(zhàn)派
  • 網頁設計推薦網站百度推廣咨詢
  • 自己買域名可以做網站嗎百度如何做推廣
  • 和碩網站建設騰訊廣告投放平臺官網
  • php無版權企業(yè)網站管理系統(tǒng)優(yōu)化軟件下載
  • 淮安神舟建設招標網站電商平臺怎么做
  • 外匯網站模版網絡流量分析工具
  • 做任務賺錢的網站起什么名字好網站建設百度推廣
  • 網站開發(fā) 上海查排名的軟件有哪些
  • app制作平臺要多少錢seo網站優(yōu)化培訓班
  • 網站怎么做移動圖片百度一下網頁
  • 企業(yè)網站建設基本流程危機公關處理方案
  • 現(xiàn)在最流行的網站推廣方式有哪些搜索引擎優(yōu)化的簡稱是
  • 網站建設都包括哪些方面怎么做平臺推廣
  • 國外有哪些網站做推廣的比較好黃頁88網站推廣方案
  • 網站制作專業(yè)的公司叫什么win優(yōu)化大師有用嗎
  • 云南網絡公司網站萬能瀏覽器
  • wordpress for bae哪里搜索引擎優(yōu)化好