更新時(shí)間:2022-08-17 來源:黑馬程序員 瀏覽量:
消息隊(duì)列(Message Queue),字面意思就是存放消息的隊(duì)列。最簡(jiǎn)單的消息隊(duì)列模型包括3個(gè)角色:
消息隊(duì)列:存儲(chǔ)消息。
生產(chǎn)者:發(fā)送消息到消息隊(duì)列,在秒殺任務(wù)中負(fù)責(zé)判斷秒殺時(shí)間和庫存,校驗(yàn)消費(fèi)者權(quán)限是否是一人一單,發(fā)送優(yōu)惠券id和用戶id到消息隊(duì)列中。
消費(fèi)者:從消息隊(duì)列獲取消息并處理消息,接受到訂單消息之后,完成下單。
Redis提供了三種不同的方式來實(shí)現(xiàn)消息隊(duì)列:
list結(jié)構(gòu):基于List結(jié)構(gòu)模擬消息隊(duì)列。
PubSub:基本的點(diǎn)對(duì)點(diǎn)消息模型。
Stream:比較完善的消息隊(duì)列模型。
消息隊(duì)列(Message Queue),字面意思就是存放消息的隊(duì)列。而Redis的list數(shù)據(jù)結(jié)構(gòu)是一個(gè)雙向鏈表,很容易模擬出隊(duì)列效果。
隊(duì)列是入口和出口不在一邊,因此我們可以利用:LPUSH 結(jié)合 RPOP、或者 RPUSH 結(jié)合 LPOP來實(shí)現(xiàn)。
不過要注意的是,當(dāng)隊(duì)列中沒有消息時(shí)RPOP或LPOP操作會(huì)返回null,并不像JVM的阻塞隊(duì)列那樣會(huì)阻塞并等待消息。因此這里應(yīng)該使用BRPOP或者BLOPO來實(shí)現(xiàn)阻塞效果。
List的消息隊(duì)列可利用Redis存儲(chǔ),不受限于JVM內(nèi)存上限,是基于基于Redis的持久化機(jī)制,數(shù)據(jù)安全性有保證,同時(shí)也可以滿足消息隊(duì)列的有序性。但只支持但消費(fèi)者,無法避免消息丟失是它最大的問題。
PubSub(發(fā)布訂閱)是Redis2.0版本引入的消息傳遞模型。顧名思義,消費(fèi)者可以訂閱一個(gè)或多個(gè)channel,生產(chǎn)者向?qū)?yīng)channel發(fā)送消息后,所有訂閱者都能收到相關(guān)消息。對(duì)應(yīng)channel發(fā)送消息后,所有訂閱者都能收到相關(guān)消息。
SUBSCRIBE channel [channel] :訂閱一個(gè)或多個(gè)頻道。
PUBLISH channel msg :向一個(gè)頻道發(fā)送消息。
PSUBSCRIBE pattern[pattern] :訂閱與pattern格式匹配的所有頻道。
基于PubSub的消息隊(duì)列采用發(fā)布訂閱模型,支持多生產(chǎn)、多消費(fèi),但也有不支持?jǐn)?shù)據(jù)持久化、無法避免消息丟失,消息堆積有上限的缺點(diǎn),超出時(shí)數(shù)據(jù)會(huì)丟失。
例如:
Stream讀取消息的方式之一:XREAD
例如:
在業(yè)務(wù)開發(fā)中,我們可以循環(huán)的調(diào)用XREAD阻塞方式來查詢最新消息,從而實(shí)現(xiàn)持續(xù)監(jiān)聽隊(duì)列的效果,偽代碼如下:
注意:當(dāng)我們指定起始ID為$時(shí),代表讀取最新的消息,如果我們處理一條消息的過程中,又有超過1條以上的消息到達(dá)隊(duì)列,則下次獲取時(shí)也只能獲取到最新的一條,會(huì)出現(xiàn)漏讀消息的問題。
STREAM類型消息隊(duì)列的XREAD命令,消息可回溯,一個(gè)消息可以被多個(gè)消費(fèi)者讀取,同時(shí)可能阻塞讀取,有消息漏讀的風(fēng)險(xiǎn)。
基于Stream的消息隊(duì)列-消費(fèi)者組
消費(fèi)者組(Consumer Group):將多個(gè)消費(fèi)者劃分到一個(gè)組中,監(jiān)聽同一個(gè)隊(duì)列。具備下列特點(diǎn):
1.消息分流:隊(duì)列中的消息會(huì)分流給組內(nèi)的不同消費(fèi)者,而不是重復(fù)消費(fèi),從而加快消息處理的速度。
2.消息標(biāo)示:消費(fèi)者組會(huì)維護(hù)一個(gè)標(biāo)示,記錄最后一個(gè)被處理的消息,哪怕消費(fèi)者宕機(jī)重啟,還會(huì)從標(biāo)示之后讀取消息。確保每一個(gè)消息都會(huì)被消費(fèi)。
3.消息確認(rèn):消費(fèi)者獲取消息后,消息處于pending狀態(tài),并存入一個(gè)pending-list。當(dāng)處理完成后需要通過XACK來確認(rèn)消息,標(biāo)記消息為已處理,才會(huì)從pending-list移除。
創(chuàng)建消費(fèi)者組:
XGROUP CREATE key groupName ID [MKSTREAM]
key:隊(duì)列名稱
groupName:消費(fèi)者組名稱
ID:起始ID標(biāo)示,$代表隊(duì)列中最后一個(gè)消息,0則代表隊(duì)列中第一個(gè)消息
MKSTREAM:隊(duì)列不存在時(shí)自動(dòng)創(chuàng)建隊(duì)列。
其它常見命令:
# 刪除指定的消費(fèi)者組 XGROUP DESTORY key groupName # 給指定的消費(fèi)者組添加消費(fèi)者 XGROUP CREATECONSUMER key groupname consumername # 刪除消費(fèi)者組中的指定消費(fèi)者 XGROUP DELCONSUMER key groupname consumername
從消費(fèi)者組讀取消息:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
group:消費(fèi)組名稱
consumer:消費(fèi)者名稱,如果消費(fèi)者不存在,會(huì)自動(dòng)創(chuàng)建一個(gè)消費(fèi)者
count:本次查詢的最大數(shù)量
BLOCK milliseconds:當(dāng)沒有消息時(shí)最長等待時(shí)間
NOACK:無需手動(dòng)ACK,獲取到消息后自動(dòng)確認(rèn)
STREAMS key:指定隊(duì)列名稱
ID:獲取消息的起始ID:">":從下一個(gè)未消費(fèi)的消息開始
其它:根據(jù)指定id從pending-list中獲取已消費(fèi)但未確認(rèn)的消息,例如0,是從pending-list中的第一個(gè)消息開始。
消費(fèi)者監(jiān)聽消息的基本思路:
STREAM類型消息隊(duì)列的XREADGROUP命令特點(diǎn):消息可回溯,一個(gè)消息可以只能被一個(gè)消費(fèi)者讀取。可以阻塞讀取.沒有消息漏讀的風(fēng)險(xiǎn),有消息確認(rèn)機(jī)制,保證消息至少被消費(fèi)一次。