張振宇
摘要:消息中間件是利用可靠高效的消息遞送機制幫助分布式系統進行平臺數據交互的系統軟件。分析了消息中間件的Java消息服務(JMS)標準、消息模式和發展現狀,討論了消息中間件帶來的幾個優勢和應用消息中間件帶來的若干問題,并提出了解決方案。以SpringBoot工程中應用ActiveMQ為例,通過編碼實現展示了應用ActiveMQ的主要方法。
關鍵詞:JMS;消息中間件;ActiveMQ
中圖分類號:TP393.09文獻標志碼:A文章編號:1008-1739(2022)14-43-5

云計算是分布式計算的一種,主要是通過將龐大的應用服務拆分成多個較小的子程序,再分配給多臺服務器進行處理,不同服務器的子程序之間必然需要通過網絡進行通信協作,因而可靠、安全、高效的通信基礎設施對于云計算的成功非常重要。
通常,消息通信機制分為同步通信和異步通信2種。在同步通信中,客戶端直接請求服務端,并等待服務結果返回后才繼續執行。這種實現方式結構簡單,但是會造成客戶端阻塞,導致程序服務并發訪問效率低下,降低了服務的可用性。在異步通信中,客戶端和服務端不會直接進行通信,而是客戶端把請求以消息的形式放入消息隊列中,服務端從消息隊列中獲取請求。這樣客戶端和服務端之間不會阻塞并等待對方響應,從而提高了并發訪問的效率。
異步消息通信機制使云計算各層次中內部組件之間和各層次之間解耦合,同時有利于服務的動態伸縮性。異步通信機制經過多年發展,逐步形成了以JMS標準規范為代表的消息中間件體系。
1.1 JMS
JMS是由SUN公司提出的基于Java平臺消息中間件的接口規范,定義了消息的發送和接收等操作接口、消息格式和各個模塊的功能語義[1]。JMS由生產者、消費者、服務提供者和消息等部分組成。JMS是在Java標準化組織內開發的標準,2001年6月發布了JMS1.0.2b版本,定義了點對點和發布訂閱消息模式,當前已不建議使用。2002年3月發布了JMS1.1版本,引入了統一的接口定義來實現2種類型的消息傳遞。2013年4月發布了JMS2.0版本,在1.1版本基礎上大大簡化了接口操作,使用戶能夠更加專注于業務邏輯的開發工作[2]。
1.2消息模式
在JMS中定義了2種消息模式:點對點模式和發布訂閱模式[3]。在點對點模式中,一條消息只能被一個接收者消費,是一對一的關系[4],點對點模式如圖1所示。

點對點模式有以下特點:
①一條消息僅能被一個接收者消費。
②消息并不是自動推送給接收者的,而是需要接收者主動從隊列中請求獲得。當有多個接收者監聽同一個隊列時,根據先請求先獲得的原則確定接收者。
③如果沒有接收者在監聽該消息隊列,消息會保存在該隊列中,直到被消費或超時。
④接收者收到消息后必須向隊列發送接收確認,否則消息中間件認為該消息沒有被接收,仍可被其他接收者消費。
⑤發送者和接收者運行先后順序沒有限制。
在發布訂閱模式中,一條消息可以被多個訂閱者消費,是一對多的關系[4]。發布訂閱模式如圖2所示。

發布訂閱模式有以下特點:
①消息中間件根據訂閱者所訂閱的主題將消息傳遞給訂閱者。
②訂閱同一個主題的多個訂閱者會接收到同一條消息。
③消息會自動廣播,訂閱者無需主動請求或輪詢主題獲得新消息。
④訂閱者必須先訂閱主題,然后再等待發布者發布消息。
1.3消息中間件發展現狀
目前主流消息中間件有ActiveMQ,RabbitMQ,ZeroMQ[5]。
ActiveMQ是一款由Apache組織發布的開源的(Apache License 2.0)獨立的企業級通用消息中間件,完整地實現了JMS1.1和J2EE1.4規范,同時支持集群部署、事物控制、存儲轉發和持久化等一些企業級特性。ActiveMQ可以借助JVM跨平臺的特性,可以運行于現階段所有主流操作系統中。具備JMX和Web兩種控制臺,便于開發調試和運行監控,并且所有功能和特性都可以通過XML文件進行配置[6]。
RabbitMQ是一個開源的高級消息隊列協議(Advanced Message Queuing Protocol,AMQP)實現,服務器端用Erlang語言編寫,支持Python、.NET、Java,C語言、PHP等多種客戶端;支持消息持久化和崩潰恢復,重新啟動應用程序之后消息不會丟失;用于在分布式系統中存儲轉發消息,在易用性、擴展性和高可用性等方面表現不俗。
ZeroMQ是一個開源的、跨平臺、高性能及精簡靈活的網絡消息中間件。支持新的輕量級socket風格的接口,支持多種底層協議和AMQP,支持多種平臺和CPU架構。具有獨特的非中間件的模式,不需要安裝和運行一個消息服務器或中間件。只需要引用ZeroMQ程序庫,就可以在應用程序之間發送消息,部署非常簡單。性能方面比其他消息中間件要強。但是ZeroMQ不支持消息持久化和崩潰恢復,且穩定性較差。
2.1異步處理
在分布式服務情況下,一次請求可能會調用多個子系統的接口,需要等待所有的接口都處理完畢返回響應,才能獲取最終的執行結果。這種同步接口調用的方式總耗時比較長,非常影響用戶的體驗,特別是在網絡不穩定的情況下,極易出現接口調用超時問題。
例如,用戶在注冊界面提交注冊信息后,系統需要處理注冊信息并寫入數據庫,用戶發送注冊郵件通知注冊成功,再向用戶發送注冊短信通知注冊成功,都處理完后返回用戶注冊界面提示用戶注冊完成。假設3個業務節點處理時間都是50 ms,不考慮網絡開銷等其他處理時延,用戶從提交注冊到顯示注冊成功的等待時間是150 ms,同步處理流程如圖3所示。

通過使用消息中間件將同步調用改為異步調用,將非必須的業務邏輯進行異步處理改造,能夠顯著減少系統響應時間。仍以用戶注冊為例,用戶在注冊界面提交注冊信息后,系統需要處理用戶注冊信息并寫入數據庫,再寫入消息隊列,然后直接返回用戶界面提示用戶注冊完成。向用戶發送注冊郵件通知和發送注冊短信通知都改為異步處理。假設3個業務節點處理時間都是50 ms,寫入消息隊列耗時5 ms,不考慮網絡開銷等其他處理時延,用戶注冊等待時間是55ms,異步處理流程如圖4所示。

2.2流量削峰
在類似秒殺的電商場景下,用戶請求瞬時激增,所有的請求最終都壓到數據庫,可能會導致數據庫響應變慢甚至無響應,傳統模式流程如圖5所示。

使用消息中間件之后,用戶請求能夠緩存在消息隊列中,從而起到流量削峰的作用。訂單系統接收到用戶提交訂單信息后,將請求直接發送到消息中間件,然后其他業務處理從消息中間件中獲取訂單信息進行處理和寫庫操作。由于消息隊列服務處理消息速度比數據庫快很多,如果出現請求峰值的情況,超量的消息能夠暫存在消息中間件的隊列中,訂單處理系統會按照自己的處理能力來消費消息,不會對系統的穩定性造成影響,流量削峰流程如圖6所示。

2.3系統解耦
復雜的業務系統一般都會拆分成多個子系統。以用戶下單為例,請求會先通過訂單系統,然后分別調用支付系統、庫存系統、積分系統和物流系統,系統之間耦合性太高。如果調用的任何一個子系統出現異常,整個請求都會異常,對系統的穩定性非常不利。另外,如果需要增加商品推薦系統,也需要對訂單系統進行修改,業務系統耦合關系如圖7所示。

使用消息中間件相當于在各子系統之間插入了一個基于數據的隱含接口層,各子系統都要實現這一接口,從而在保證接口不變的前提下,允許各業務系統分別進行獨立的修改。新增商品推薦系統時,也不需要修改其他業務系統,業務系統解耦后關系如圖8所示。

3.1可用性降低
引入消息中間件后,各業務系統都依賴消息隊列服務,而一旦消息隊列服務失效,會導致各業務系統之間無法通信,所以需要考慮消息服務故障導致的可用性降低問題。
針對單點故障導致的可用性降低問題,可以通過搭建消息服務集群來解決。消息中間件集群有多種實現方式,下面以ActiveMQ為例,通過Zookeeper搭建集群,消息服務集群如圖9所示。

各ActiveMQ服務器會向Zookeeper注冊,Zookeeper會為每個ActiveMQ服務器分配序列號,其中序列號最小的為主用服務器。生產者和消費者連接消息隊列服務器時,會首先從Zookeeper獲取當前主用服務器地址再進行后續訪問操作。當主用服務器失效,Zookeeper會檢測到并刪除失效節點,然后選取當前最小序號的服務器升級為主用服務器,同時通知所有客戶端,以完成主備切換,保證消息服務的高可用性。
3.2復雜性提高
系統引入消息中間件后,消息可能會被重復消費或者消息可能在消費前丟失。為了保證消息沒有被重復消費和處理消息丟失的情況,系統復雜性不可避免地會有提高。
(1)消息重復問題
解決思路是增加一張消費信息表,給所有消息分配全局唯一的標識用作唯一索引。消費者開始消費之前,先去消費信息表中查詢有沒有匹配的消費記錄,如果有就說明已經消費過,不再處理,消息重復解決流程如圖10所示。

(2)消息丟失問題
解決思路是增加一張消息發送記錄表,當生產者發完消息后,往該表寫入一條數據,其狀態標記為待確認。如果消費者讀取消息后,調用生產者的接口更新該消息的狀態為已確認。另外新增一個任務,定時檢查消息發送表,如果一定時間間隔后,仍有待確認狀態的消息,則認為該消息已經丟失,并重新發送該消息,消息丟失處理流程如圖11所示。

3.3數據一致性
消息隊列帶來的異步可以提高系統響應速度,但是萬一消息消費者在處理消息過程中沒有正確處理,會導致數據不一致的情況出現。解決思路是通過在業務邏輯中增加重試機制來保持最終一致性。
對于消息量較小的業務場景,可以采用同步重試。在消費消息時如果處理失敗,立刻重試若干次。如果還是失敗,則寫入到記錄表中。如果消息量較大,不建議采用同步重試方式。如果出現網絡異常,可能會導致大量的消息不斷重試,影響消息讀取速度,造成消息堆積。對于消息量較大的業務場景,可以采用異步重試。在消費者處理失敗之后,立刻寫入重試表,建立另外一個任務,定時讀取重試表進行處理,數據一致性處理流程如圖12所示。

本文以在Java主流開發的SpringBoot工程中引入ActiveMQ消息中間件為例,建立了一個消息生產者工程和一個消息消費者工程,同時在本機中運行5.7.0版本的ActiveMQ程序。



由上述代碼示例可以發現,在SpringBoot工程中引入ActiveMQ消息中間件僅需要增加少量的代碼。
消息中間件為應用系統提供了高效、可靠的消息通信手段,能夠很好地實現異構平臺之間的信息交互。消息中間件以其獨特的優勢為各種分布式應用的開發注入了強大的活力,極大地推動了應用系統集成和發展。
[1]陳安林.基于JMS的消息中間件的研究與實現[D].哈爾濱:哈爾濱工程大學,2006.
[2]朱方娥,曹寶香.基于JMS的消息隊列中間件的研究與實現[J].計算機技術與發展,2008(5):172-175.
[3]孫弋,溫迅.一種面向消息的中間件的設計與實現[J].物聯網技術,2019,9(3):81-84.
[4] AHUJA S P, MUPPARAJU N.Performance Evaluation and Comparison of Distributed Messaging using Message Oriented Middleware[J].Computer and Information Science, 2014,7(4):9-20.
[5]王小霞,陳亮.一種消息隊列中間件的設計與實現[J].計算機工程,2006(21):81-83.
[6] TIMOTHY B.Instant Apache ActiveMQ Messaging Application Development[M].Birmingham:Packt Publishing, 2013.