張樂

摘 要 針對現在移動互聯網復雜、信號不穩定的網絡特點,設計了一套以 RabbitMQ消息中間件的即時通訊系統,在確保服務質量的情況下,減少了消息的冗余,減少了應用間的耦合關系。使移動設備更加省電省流量,適用于當今移動互聯網。
【關鍵詞】AMQP RabbitMQ 即時通訊
1 引言
近幾年來,移動互聯網憑借其攜帶方便、接入迅速、業務內容豐富等特點,取得了前所未有的高速發展。目前,移動應用服務呈現多樣化,但是移動終端最基本的功能是滿足用戶的溝通需求,因此即時通訊類應用以其隨時隨地溝通的特點,滿足了用戶的需求。
目前常用于即時通訊的兩大協議是XMPP協議和SIMPLE協議。XMPP和SIMPLE 都是根據消息體里的消息頭尋址的,在消息體里面含有消息的發送者、接收者、消息路由用的會話標示信息等眾多頭域,使得消息體的體積變大,帶寬利用率較低。XMPP 和 SIMPLE協議都是基于字符文本的通信協議,其優點是可讀性強,便于抓包分析,但字符文本協議通信效率較低,并且,為了保證通信安全,采用TLS 等加密傳輸計算量也較大,耗能較高。
移動互聯網相對傳統互聯網最顯著的特點就是其移動性,這種移動性隨之帶來的是諸多不穩定、不可靠和隨意性,使得移動互聯網上的應用與應用服務器之間建立的網絡連接可靠性較差,很難保持長時間的連接狀態,因此傳統的通信模型,比如 SIMPLE和XMPP 中采用的消息傳遞,不適合應用于移動互聯網,移動互聯網應用與應用服務器之間可以通過松散耦合的關系來改善移動性帶來的問題。
2 AMQP及RabbitMQ
2.1 AMQP
AMQP(AdvancedMessage Queuing Protocol),即高級消息隊列是一個基于消息異步處理的應用層高級消息隊列協議,是消息中間件的開發標準。它的主要特征是面向消息、隊列、路由,且安全、可靠。AMQP是基于客戶端/代理模式,為客戶端應用與消息中間件之間提供異步、安全、高效的交互。基于此協議的客戶端與消息中間件可傳遞消息,并不受客戶端和中間件不同產品、不同開發語言等條件的限制。
2.2 RabbitMQ
RabbitMQ是流行的開源消息隊列系統,由以高性能、健壯以及可伸縮性出名的erlang 語言開發,是 AMQP 的標準實現。它可以支持各種消息交換的體系結構:
(1)存儲轉發(多個消息發送者,單個消息接收者);
(2)分布式事務(多個消息發送者,多個消息接收者);
(3)發布訂閱(多個消息發送者,多個消息接收者);
(4)基于內容的路由(多個消息發送者,多個消息接收者);
(5)文件傳輸隊列(多個消息發送者,多個消息接收者);
(6)點對點連接(單個消息發送者,單個消息接收者)。
RabbitMQ的主要特性包括如下方面:
(1)定義、隊列、通道等概念,使對消息投遞的控制更加精準靈活;
(2)支持高可用,以及消息和隊列的持久化;
(3)支持集群,允許節點動態變化和節點故障;
(4)支持眾多語言的客戶端,包括Java、C/C++、php、.Netpython等;
(5)高性能、消息吞吐量大。
生產者發送消息到RabbitMQ 服務器,在服務器內部,用戶創建交換機和隊列,通過綁定規則將兩者聯系在一起。交換機負責分發消息,根據類型綁定的不同分發策略有區別。消息最后來到隊列中,等待消費者取走。
3 即時通訊系統的設計
本文提出一種以RabbitMQ為消息中間件,采用發布/訂閱模型作為消息傳輸機制,以protobuf作為數據傳輸格式。采用C/S 體系結構,將消息存儲與轉發,確保消息不重不漏,充分的適應移動互聯網的一套即時通訊的方案。
本文的消息系統可以看做是一個發布/訂閱系統,有消息生產者publisher,消息中轉者RabbitMQ Server,消息處理者Message Server,以及消息消費者Receiver.這個系統中的publisher和receiver共同構成了一個channel。
4 消息流程
4.1 發送流程
如圖1所示 。
(1)客戶端A發送一條消息內容為send_client_uId【發送者id】 , channel name ,msg time【消息發送時間,精確到秒】,msg content【消息內容】,保存在本地的SqliteSQL數據庫,然后用protobuf序列化后發發個RabbitMQ。
(2)Message服務器的消息隊列通過RabbitMQ收到來自客戶端A的消息,反序列化。
(3)Message服務器收到消息后以channel name+ msg time為key到本地消息緩存中查詢消息是否已經存在,如果存在則終止消息流程,通過RabbitMQ服務器發送"duplicate msg"這個msg ack 給客戶端A,否則繼續。
(4)Message服務器到Counter服務器(消息計數器,為每個text等類型的消息分配msg id)以channel name為key查詢其最新的msg id,把msg id自增一后作為這條消息的id。
(5)Message服務器把分配好id的消息插入本地msg cache和msg DB中。
(6)Message服務器給客戶端A返回ack, 內容為msg id , msg time , channel name。
(7)客戶端A收到ack包后終止消息流程,并刪除本地數據庫中的數據。如果在發送流程超時后仍未收到消息則轉到步驟1進行重試,并計算重試次。
(8)如果重試次數超過兩次依然失敗則提示“系統繁忙” or “網絡環境不佳,請稍后再嘗試發送”等,終止消息發送流程。
4.2 接收流程
Message服務器到RabbitMQ中檢測Channel中的receive_client是否在線,如果在線則將消息發送出去。
4.3 心跳發送流程
(1)客戶端發送心跳包,內容為{client_uId, network type, list{channel name:newest channel msg id} },即心跳包要上報客戶端所在的所有channel,以及本地歷史消息記錄中每個channel最新的消息的id;心跳包轉給專門處理心跳邏輯的心跳服務器。
(2)心跳服務器收到心跳包后到Counter服務器循環查詢每個channel的最新消息id,如果客戶端上報的id與這個id不等,就發送一條消息通知Message服務器,消息內容為{publish_uId, channel name, client newest msg id of channel【channel內的最新消息Id】}。
(3)Message服務器收到這條消息后,重新啟動消息下發邏輯,到緩存中取出所有的大于{client newest msg id of channel}的id列表。
(4)Message服務器依據list中的id到消息存儲服務器中依次取出每條消息。
(5)Message服務器把這些消息作為"未讀消息"下發給客戶端。
(6)心跳服務器給客戶端下發heartbeat ack包,數據包括其所在的每個channel的最新消息的msg id。
(7)客戶端收到heartbeat ack包后,依據每個channel的最新的msg id與本地消息緩存中對應的channel的最新消息id做對比,如果id不等,客戶端可以啟動拉取消息流程。
5 結束語
本文簡單介紹了基于AMQP協議的標準實現RabbitMQ,并提出了以開源跨平臺的RabbitMQ為消息中間件,松耦合的發布/訂閱模型為通訊方式,protocolBuffer為數據傳輸格式的一套完備即時通訊系統設計,詳細敘述了消息下發的技術流程,同時保證了消息不重不漏。從根本上解決了由于消息冗余而占用大量的帶寬利用率,使得該方案更加適合移動互聯網復雜的網絡環境。下一步的工作將著重研究在復雜的移動網絡下,用于保持連接的心跳包的發送時間間隔的方案和消息傳遞時的安全問題。
參考文獻
[1]王默涵.面向移動互聯網的Presence/IM機制的設計與實現[D].小型微型計算機系統,2015(04).
[2]高曉婷.基于AMQP的信息發布與訂閱[D].2013(10).
[3]RabbitMQ與AMQP協議詳解.http://www.cnblogs.com/frankyou/p/5283539.html
[4]Rabbit MQ[EB/OL].http://www.rabbitmq.com/.z