陳思媛,易國洪*,2,金 旺
1.武漢工程大學計算機科學與工程學院,湖北 武漢 430205;2.智能機器人湖北省重點實驗室(武漢工程大學),湖北 武漢 430205
現如今汽車的擁有量達到一個相當龐大的數字,導致交通擁堵問題日益嚴重[1],智慧停車解決城市“停車難”問題已刻不容緩。智能立體停車庫以獨特的占地面積少[2],交互敏捷等特點成為新時代解決停車問題的關鍵。其中,移動終端和應用服務器之間的通訊存在穩定性、實時性和準確性等方面的問題。參考如今市面上使用較廣的幾種開源消息中間件,例如ActiveMQ、kafka、ZeroMQ等,經過測試發現ActiveMQ 采用消息推送方式,所以最適合的場景是默認消息都可在短時間內被消費,當消息隊列流量達到峰值時,容易造成消息在消費者客戶端堆積[3],這與智能停車系統要求在短時間內完成大量的數據請求特性不符;Kafka 特有的異步刷盤機制會導致在處理海量數據時產生過高開銷甚至出錯等問題[4];而ZeroMQ 的并發是通過通訊隊列使用無鎖操作[5]實現的,不能保證線程安全。傳統的同步點對點通信由于應用程序與服務器之間一對一的通信方式無法滿足大量用戶、多臺中控同時請求通信的需求,綜合考慮,本文設計了一種智能停車異步消息中間件(smart car asynchronous messaging middleware,SCAMM)較好地解決了以上的問題。SCAMM 采用基于消息的分布式通信模式,消息生產者(客戶端)將攜帶標志的消息發布到消息代理服務器,再由消息代理服務器發向已注冊該標志的消費者(對應的服務器)。這種模式下實現了生產者和消費者之間的松耦合,同時基于Java 非阻塞(non-blocking input output,NIO)事件驅動模型的異步通信技術能滿足大量用戶同一時刻發送請求,在此基礎上對數據的粘包拆包問題進行了處理。
SCAMM 消息中間件在智能停車系統中的應用實例如圖1 所示。當用戶點擊停車按鈕時,客戶端將請求數據發送到消息中間件,隊列管理器將讀取消息并寫入對應的通道,通道以輪詢的方式發送消息至中控端,用戶停車成功則在客戶端告知停車完成,車位不足時則返回停車失敗。智能停車系統的重點在于上下班的高峰期對可停車位的資源搶奪,這一過程要求系統既要應對短時間內高并發多目標中控的指令請求,又要保證指令準確完整的送達對應中控機,SCAMM 消息中間件能夠很好解決這一問題。

圖1 SCAMM 體系結構圖Fig.1 Architecture of SCAMM
消息中間件中支持的消息模型分為兩種:點對點模型(peer to peer,P2P)和發布/訂閱模型。圖2 為兩種通信模型圖。

圖2 兩種通信模型圖:(a)點對點,(b)發布者/訂閱者Fig.2 Two-communication models:(a)P2P,(b)publisher/subscriber
P2P 模型的主體有3 個:消息的發送者,消息轉發隊列以及消息的接收者。消息產生后通過發送者發送到一個特定的消息隊列,排隊轉發給被接受者,消息被接收者接受或者過期之后會被丟出消息隊列,一個消息只能被一個接收者所接受,當接受者離線時,這條消息也會在消息隊列中被移除。發布/訂閱模式下的主題有3 個:發布者,訂閱者和主題。發布者指定消息的主題,并將消息主動發送到主題的通道中或者等待訂閱者來輪詢,訂閱者可以通過訂閱主題來使用指定通道中的消息。發布/訂閱模式有以下特點:每個訂閱者可以訂閱多個主題,每個主題中的消息可以被所有訂閱者訂閱,可以實現一對多的消息傳遞。
異步通信是指消息在生產者和消費者之間的收發不需要通過公共的時鐘信號來控制,而是采用異步應答方式來實現雙方的通信[6]。消息中間件SCAMM 采用Java NIO 異步非阻塞I/O 模型來實現異步通信,Java NIO 是在JDK 1.4 開始提供新的API[7],基于事件驅動模型開發[8],以塊的方式處理數據[9],通過雙向通道(channel)進行傳輸。異步通信機制的本質是當應用進程向CPU 發送一個I/O 請求,如果此時內核內沒有可用資源被調用,內核會向客戶端返回一個錯誤碼,客戶端進程會有可用資源的時間再次進行請求操作,這樣就避免了進程的阻塞,這種往返的操作也被稱為輪詢[10]。可以將異步通信中的非阻塞I/O 模型用以下的偽代碼來表示:

消息隊列是指消息由生產者發送到1 個擁有特定標志的虛擬通道,消費者連接到這個特定通道上獲取通道中的消息。在分布式和異構環境中使用消息隊列技術可以很大程度地減少系統的耦合,消息隊列對消息的生產者和消費者的語言、平臺不做要求,只需要在同一套編碼規范下即可完成通信。當使用消息隊列時,生產者和消費者需要知道隊列的標志,消息才能夠被發送到指定標志下的隊列。消息在隊列中會按照先進先出的順序被轉發給消費者。在這種模式下,生產者和消費者可以不必同時處在工作狀態,虛擬通道會按照先進先出的順序在消費者能夠正常工作后將消息轉發給消費者,1 條消息只能被一個消費者接受,當這條消息被使用或者過期后,將會被拋出消息隊列,當消費者接收到消息后將返回通道1 個確認信息。整個過程中生產者只需要發送1 條消息后無需等待,消費者也只需從隊列中讀取消息進行執行[11],摒棄了傳統應用中通信雙方需要同步執行的缺點。
消息中間件是1 種消息代理,是統一消息收發的接口,通過高效可靠的消息傳遞機制實現跨平臺,跨語言進行數據交換[12]。作為軟件中間層,消息中間件通過采用同步或異步的通信方式來達到穩定可靠的數據交流,在分布式環境下擴展進程間的通信[13]。本文采用形式化語言巴科斯范式(backus naur form,BNF)進行描述消息中間件,BNF 的優點在于它描述的是軟件的抽象結構,無需考慮其具體實現細節。使用BNF 語言既避免了在描述中間件語義上的二義性、模糊性,又在垂直方向上明確各部分繼承父對象的規格說明[14]。中間件的描述需要明確兩個方面的信息:中間件操作來實現,這些操作包含輸入輸出參數和約束的構成和各構成模塊的行為信息,每個部件都有一個或多個接口實現,每個接口都有一個或多個條件。以下給出了中間件的主要組成部分和各個部分需要實現的方法。
<SCAMM>::=<Function>* <InterFace>* <Message Scheduling >
<Function>::=<ChannelHandle><start><close><transport >[Heart-beat][link-protect]
<InteFace>::=<Connection><message class><Communication>[self-Protocol]
<Message Scheduing >::=<message queue|message topic>
消息中間件SCAMM 定義為3 種成分:至少1種的Function 功能(Function)、至少1 個外部接口(InterFace)和1 種確定的Message Scheduing(消息調度類型)。功能模塊包含邏輯處理模塊、啟動模塊、關閉模塊、傳輸通信協議(transmission control protocol,TCP)長連接心跳檢測模塊、鏈路保護模塊,每個子模塊實現的功能不同,協同實現消息在中間件內部分發過程。外部接口模塊包含通訊接口、連接接口以及自定義的消息格式接口,外部接口模塊是為了使消息中間件不限制于單一類型的客戶端或者通信協議,降低了系統的耦合程度同時保證了消息中間件良好的拓展性。消息中間件需要明確某1 種通信模式,通常通信模型分為消息隊列或者主題(topic)。

圖3 SCAMM 的系統功能模塊圖Fig.3 Function module diagram of SCAMM system
在智能停車消息中間件SCAMM 通信模型中使用Java NIO 事件驅動模型來實現異步通信。智能停車系統中需要考慮消息的高并發和準確性,在SCAMM 模型中采用Channel 來傳輸數據,這個通道連接起客戶端和中控端,當某1 個中控機與服務器建立連接后,在服務器設置的監聽器立即啟動連接模塊為這個中控機注冊1 個全新的Channel TCP 長連接并擁有唯一標志,有標志了發往這個中控的消息會經由Channel 轉發。在整個通信過程中,消息首先由生產者(客戶端)傳遞給SCAMM 消息中間件,通過消息管理模塊調用ChannelRead 讀取消息標志,根據消息標志寫入對應Channel 的消息隊列中,在經過自定義的消息格式編碼后通過消息轉發模塊發送給中控,中控接收到數據包后會回復1 個數據包,這個回復包再通過消息轉發模塊的接收方法獲取,解碼后從原通道返回至消息隊列,消息隊列發送給客戶端。SCAMM 消息中間件通信的系統結構如圖3 所示。
SCAMM 消息中間件服務端分為5 個模塊:連接模塊、長連接心跳監測模塊、消息封裝模塊、消息管理模塊和消息轉發模塊。連接模塊通過監聽中控服務狀態來建立與服務器的TCP 長連接,當遠程中控失去連接時,調用close()方法釋放掉已注冊的通道信息,下次遠程中控上線后重新注冊。長連接心跳監測模塊通過定時發送心跳報文監測通道是否存活,及時關閉失活通道并重新注冊該通道避免了服務端可能存在的消息丟失的情況。消息封裝模塊由SmartCarDecoder 類通過自定義通信協議,保證了數據傳輸的隱蔽性和安全性,并且通過在服務端和客戶端的pipeline 上加上相應的解碼器解決了TCP 的粘包、拆包問題;消息管理模塊的ChannelHandle 組件實現了服務器對從客戶端接受的數據的處理。消息轉發模塊通過transport類來實現消息傳遞。
2.2.1 ChannelHandle API 啟動Server 時,進行服務器初始化,這一步的操作包括讀取主機的IP 地址、監聽進行socket 通信的端口、創建ChannelHandle 組件并注冊相對應中控端(消費者)的消息通道(Channel)。通過讀取到客戶端發送過來的消息的相關參數,ChannelHandle 組件進行一系列相應的通信過程的接發處理,ChannelHandle 組件存在兩個比較重要的子接口,ChannelInboundHandle 處理入站數據和各種變化的子接口,ChannelOutboundHandle 處理出站數據并且包含各種攔截操作的命令的子接口。對于ChannelInboundHandle子接口,表1 給出了相關方法的描述。

表1 ChannelInboundHandle 的方法描述Tab.1 Methods description of channelInboundHandle class
當某個ChannelInboundHandle 的實現重寫了ChannelRead()方法時,它將負責顯式的釋放與池化的ByteBuf 實例相關的內存。最后,將已經啟動的Channel 保存在列表中,該列表保存所有當前活動的中控端(消費者)的長連接。
2.2.2 SmartCarDecoder 類 消息中間件協議不僅定義了進行信息交換的通信方式,而且間接體現了協議支持的系統結構方式[15]。當客戶端發送一條消息或者服務端接收到一條消息時,就會發生一次數據交換,通過自定義消息的編碼器與解碼器可以保證消息的獨立性與隱私性。入站消息會被解碼,解碼器將接收到的字節流轉換成中間格式json 字符流,再由字符流轉換成1 個Java 對象,出站消息的模式是相反方向的,編碼器將對象轉換成字符流,再通過字符流轉化成字節流與消費者進行網絡數據的通信。本文設計的中間件使用的是基于長度的協議的解碼器,基于長度的協議通過它的長度編碼到幀的頭部來定義幀,而不是使用某種特殊的分隔符來標記它的結束。協議開始使用標準數據包包頭整型占4 B,傳輸數據的長度為整型,占4 B,同時為了防止socket 流的攻擊,傳輸的數據長度不應該超過2 048 位。服務端消息轉發模塊通過讀取并匹配提取的幀的數據來分發消息。SmartCarDecoder 類同時解決了消息的粘包、拆包問題。消息在客戶端和服務端之間是以一種流的方式來進行轉播的。對于應用層而言,客戶端發送的是一個個獨立完整的數據包,而在數據鏈路層、傳輸層或網絡層這種底層會根據TCP 緩沖區的實際大小進行包的劃分,就可能會出現將一個完整的數據包拆分成多個包發送或者將多個包封裝成一個大的數據包發送,這就是TCP 的粘包、拆包。當出現粘包、拆包問題時,系統中的消息就會成為不可讀的消息,SmartCarDecoder 類將消息分為消息頭和消息體,消息頭中包含表示消息總長度的字段,該解碼器能夠在獲取消息頭的時候解析出消息長度,然后向后讀取該長度的內容。對于實現中間件的編解碼類,表2 給出了相關方法的描述。

表2 SmartCarDecoder類的方法描述Tab.2 Methods description of SmartCarDecoder class
2.2.3 transport 類的實現 ChannelHandle 組件通過transport 類實現消息傳遞。transport 類的方法描述如表3 所示。當服務器需要轉發一條消息給消費者時,transport 類通過其SendMessage()方法輪詢的發送隊列中的消息。消息轉發模塊線程將一直處于輪詢狀態直到調用了StopTransport()方法[11],當服務器接收到消費者的反饋時,通過ReceiveMessage()方法將反饋消息通過自定義協議封裝傳回給客戶端。
2.2.4 心跳檢測類的實現 在異步長連接中,客戶端發送正常報文時可能發生通道連接異常的情況,為了監測通道連接是否正常,SCAMM 消息中間件在服務端定時發送心跳報文,當心跳報文發送或者解析后不匹配時則認為該通道失活,服務端關閉該通道并重新注冊激活。該類使用線程避免了心跳報文與正常通信報文在同一個套接字連接中產生的資源交叉,使用線程池對心跳線程進行管理避免了通道數過多時產生的服務器資源占用過多的問題,線程間的通信可使用全局變量從而簡化程序復雜度;使用線程內部機制——靜態互斥量進行正常報文和心跳報文的同步,程序簡單可靠[16]。心跳檢測類的方法描述如表4 所示。

表3 transport類的方法描述Tab.3 Methods description of transport class

表4 心跳檢測的方法描述Tab.4 Methods description of heartbeat detection class
2.2.5 Abstract-Message 抽象消息類 Message 對象由抽象消息類創建,該類定義了一條消息需要具備的內容,包括消息的指令類型、目標地址標記、預約時長、車牌號、用戶ID 等等,通過格式化編碼將這些信息封裝成一個對象,用于消息中間件的接發。抽象消息類的參數如表5 所示。

表5 Abstract-Message 抽象消息類的參數描述Tab.5 Parametric description of Abstract-Message class
為了檢驗SCAMM 中間件是否能夠提高智能停車系統中指令接受的準確率,利用單一因素實時實驗原理進行了實驗。實驗中的計算機運行的是Microsoft Windows10 x64,具 有1 個IntelCore I5-6 200U 處理器,雙核四線程、8 GB 的RAM 和一個500 GB 磁盤。將服務端和客戶端部署在同一臺計算機上同時結束所有非必要的進程以防止對實驗結果的影響。使用測試工具postman 檢查響應數據包(Response Body)是否等于約定消息長度和內容,當Response Body 的返回值與消息完全匹配時返回true,否則返回false,每次測試重復3 次并采取平均值進行記錄。
實驗選擇socket 點對點通信、擁有高吞吐量性能的中間件kafka 和SCAMM 中間件并分別將3 種通訊方式應用于智能停車系統進行了測試。由于SCAMM 自定義協議類SmartCarDecoder 用來處理消息粘包、拆包的特點,就并發請求數量級、消息大小這兩個方面進行實驗,兩次實驗均采用單一變量原則。在消息數量對準確率的影響實驗中,固定每次發送的消息大小為256 B,依次改變并發請求總數100、500、1 000、2 000、5 000,記錄測試工具postman 提供的完全匹配成功數和匹配失敗數;在消息大小對準確率的影響實驗中,固定請求總數1 000,依次改變消息的大小256 B、512 B、1 kB、5 kB、10 kB,記錄測試工具postman 提供的完全匹配成功數和匹配失敗數。以上每次實驗前結束所有無關進程避免對實驗造成干擾,每次實驗重復3 次并計平均值。
表6 為智能停車系統中采用傳統點對點的socket 通信、使用kafka 中間件和使用SCAMM 中間件進行不同并發請求數下的對比。由表6 可知,當并發請求數<500 時,兩種情況下指令的準確率相差無幾;當并發數量>1 000 時,SCAMM 中間件能夠明顯的優于兩者。SCAMM 在不同并發請求數下的準確率為100%。
表7 為智能停車系統中采用傳統點對點的socket 通信、使用kafka 中間件和使用SCAMM 中間件在不同消息規模下通信的準確率對比。由表7可知,當消息長度<1 kB 時,3 種情況都表現了良好的準確率;當消息長度>1 kB 時,SCAMM 能夠明顯的優于傳統的socket 通信;當消息長度≥10 kB 時,SCAMM 中間件的準確率比kafka 中間件的準確率提高了14%,比socket 點對點通信提高了32%。
由實驗結果可知,在高并發請求數和大規模消息長度情況下,SCAMM 中間件能夠顯著提高消息的準確率。

表6 不同并發數下準確率實驗對比Tab.6 Comparison of accuracy rates of different requests

表7 不同消息長度下準確率實驗對比Tab.7 Comparison of accuracy rates of different message sizes
在大型企業中消息中間件的應用具有十分廣闊的前景,目前,基于消息的異步分布式通信設計的消息中間件SCAMM 已經在實例中取得良好應用,有著巨大的發展空間。相比于傳統應用中的同步點對點通信來實現消息傳遞的系統,基于消息的異步分布式通信設計的消息中間件能夠提供更穩定,擴展性更好的中間橋梁,同時使系統在高并發量請求下仍然能夠高效的處理數據。SCAMM 忽略了發送方和接收方的平臺異構性,簡化了集成分布式系統的難度,降低了維護系統的成本。SCAMM 中間件通過自定義的通信協議和心跳監測模塊,能夠有效的保持指令傳輸過程中的安全性,提高指令被接收的準確度。