陳瑤 李洋磊








摘 ?要:本文分析了ActiveMQ數據傳輸的底層原理,以解決數據突發洪峰時期的隊列數據積壓問題。利用增加并發消費者、調整消息預取值、批量消息確認等參數,實現了傳輸性能的多倍提升。最后還根據業務運行出現過的問題,優化了服務端的配置,加強了薄弱環節的監控,提升了系統的穩定性。
關鍵詞:ActiveMQ;民航數據傳輸;數據傳輸框架
中圖分類號:TP368.5 ? ? ?文獻標識碼:A 文章編號:2096-4706(2019)16-0128-04
Abstract:This paper has analyzed the underlying principle of ActiveMQ data transmission,to solve the data backlog problem during the peak period of data. By adding concurrent consumers,adjusting message prefetch values and batch message validation parameters,the transmission performance is improved many times. Finally,according to the problems in the operation of the business,the configuration of the server is optimized,the monitoring of weak links is strengthened,and the stability of the system is improved.
Keywords:ActiveMQ;civil aviation data transfer;data transfer framework
0 ?引 ?言
為保證數據及時、可靠傳輸,選用了中間件ActiveMQ,設計了一套通用的數據傳輸框架。該框架現階段主要應用于民航氣象數據的傳輸,為南方航空、華南國際商務航空等用戶提供高效、可靠的數據傳輸服務。該系統投入使用后,數據傳輸的及時性增強、可靠性提高,得到了用戶單位的認可。
在幾年的運行過程中,也發現了許多可以改進的地方。該框架現階段傳輸的民航氣象數據,具有在整點及半點數據量較大的特點,框架承載的數據量越來越多,用戶數也在增加。在數據量突發洪峰的情況下,如果某個用戶的數據獲取能力較差,ActiveMQ的數據消費能力被阻塞,容易產生大量的數據等待確認、大量數據重發的情況,嚴重影響數據的吞吐量,導致數據積壓,也給ActiveMQ服務器的運行帶來潛在的風險。
為解決該問題,對系統運行中的數據進行分析,提出適用于不同場景的消息隊列優化過程。
1 ?ActiveMQ消息的傳送機制
如圖1所示,ActiveMQ的消息由生產者(即數據發送 端)發出后,會被ActiveMQ的Broker保存,消費者(即數據接收端)已經在Broker上注冊,Broker會確保消息被發送給這些消費者,確保消息已經送達后,該消息才會被刪除。
ActiveMQ的消息傳送機制如圖1所示,通過消費者正常接收到消息后,返回一個確認接收狀態的消息——ACK消息給Broker,如果層次較為復雜,則會一層一層的返回ACK消息。
ActiveMQ中的ACK消息有以下幾種類型,定義在字段ACK_TYPE中,如表1所示。
從ACK_TYPE的值可以看出,在ActiveMQ中,消息確認的頻率是可以由開發者選擇的。可以消費一條消息返回一條確認消息,也可以選擇另外一種模式——延時確認。在消費者成功消費消息后,不立即返回ACK,而是等到這些ACK消息的條數積攢到某個閾值時,返回一個ACK消息把他們全部確認。
從這個定義也可看出,延時確認具有更好的性能。特別是在網絡擁堵的時期,N條消息只會有1條ACK消息,相比N條消息N條ACK返回,大大減輕了網絡負荷。但這樣的確認機制也存在一定的弊端,如果消費端出現異常,無法正常返回ACK,會導致N條消息重發,反而會造成網絡負擔。
并且大量消息如果不得到及時的確認,Broker需保存這些消息,并將他們放置于隊列中排隊等待確認,這將消耗Broker服務器的內存、硬盤等資源,如果該服務器的性能低下,將給Broker的運行帶來潛在的風險。
所以需要分析運行的實際情況,根據已有的資源進行靈活的配置、調優。
2 ?問題定位及分析
利用現有的框架和數據傳輸模式,模擬測試數據突發洪峰時的數據吞吐量,從分析消息包處理耗時入手,進行各個參數的調優。首先在生產端生成大量的消息,以在測試數據突發洪峰時期,每個消費者的消息處理速度、消息積壓數[2]。如表2所示。
從表2的數據可以看出,消費端的消費能力存在的差距,消費能力差的客戶端在突發數據洪峰時容易發生數據積壓。兩個消費者的網絡狀態類似,可以排除因網絡原因導致的消息積壓。通過進一步分析消費能力弱的消費端,研究其消息處理流程,發現其接收到消息后還需進行串行處理,處理過程更加復雜,導致消息處理更加耗時,消息返回ACK的時間也更長,導致了Broker需等待這個更慢的消費者。
針對消費端處理速度存在瓶頸的問題,設想通過提高消費端的處理速度入手。消費端的消息處理流程為業務需要,無法精簡處理流程來加快消費速度。那還有其他什么手段可以增加消費端對消息的消費速度?既然現存的串行等待的時間無法縮短,那是否可以通過并行多個消費者程序來提高效率?
3 ?增加并發消費者方案測試
擬通過增加并發消費者的方式,看是否能提高消息處理的速度。要使用并發消費者[3],可修改框架中Spring的JMS配置,增加多個Listener實例。配置項為Simple Message Listener Container[4],可以配置固定的實例個數,也可以配置一個實例數的區間,這樣消費者可以根據消息的壓力情況動態調整并發數。
配置文件:
<bean class="org.springframework.jms.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connec tionFactory"/>
<property name="destinationName" value="${jms.queue.name}"/>
<property name="messageListener" ref="message Receiver"/>
<property name="concurrency" value="10-20"/>
或者:
<property name="concurrentConsumers" value= "20"/>
</bean>
測試結果如表3所示。
4 ?消費者優化
通過測試發現,增加并行消費者后,消息的消費速度出現明顯的提升。但消費者數目大于10以后,消息處理速度不再提升,在多個消費者中,有些消費者很忙碌,需要處理大量的消息,有些消費者很空閑。為什么會出現這樣的情況?
在目標URI的定義中,有一個prefetchSize[5]參數值可配置,如下代碼所示:
String queueURI = "queueForGuest?customer.prefetchSize=100";
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue(queue URI);
prefetchSize參數定義了一次有多少條消息推送給消費者,若在代碼中沒有指定prefetchSize參數值,系統將給其一個默認值,如表4所示。
從這個默認值中可以看出,使用默認值無法與現實需求吻合。如果消費者處理消息的能力很差,一次推送1000個消息給消費者,無疑會造成消費者端的擁堵。如果消費者端性能好、處理速度快,可配置較高的prefetchSize值[7]。
而本系統中消費端航空公司2的消費者就是一個慢消費者,消費消息的速度慢,如果使用默認prefetchSize值1000,一次將1000個消息推送給該消費端,剩下的推送給該消費端并行的消費者,就出現了上文中的情況,部分消費者要處理的消息很多,消費能力差,消費速度慢,這些消費者特別忙碌,甚至出現擁堵現象。而其他并行的消費者沒有消息需要處理。
所以嘗試將該消費端的prefetchSize值進行調整,提升消費端整體的性能。
為了測試系統中現有消費者的適合的prefetchSize值大小,將prefetchSize值分別配置為100、1000,并進行了對比分析測試,測試結果如表5所示。
將queuePrefetch參數修改為100后,消費端Consumer2并行且忙碌的消費者數量增加,消費10000條消息減少。
除了批量獲取多個消息可以使性能提高,批量確認多個消息也將使性能大大提升,ACK的模式有很多種,如ActiveMQ消息的傳送機制表中的說明,其中效率最高的機制為optimizeAcknowledge模式,當有prefetchSize的65%個消息被正確消費后,消費端將返回一條ACK消息,并批量確認這些消息。
這樣的模式雖然效率高,但若消費端出現異常,未正常返回這些消息的ACK,Broker將重發這些消息,這樣的模式適用于高吞吐量、對重復消息有容錯能力的系統。觀察系統運行時這樣的異常情況較少,且在消費端均做了重復消息處理,同時本系統現應用于傳輸氣象報文,對實時性要求很高,提高吞吐量對系統的運行意義重大。故這種高效模式適用于本系統。
將原來的逐條消息ACK改為optimizeAcknowledge模式后,消費端、Broker端的資源消耗降低,處理速度提高,測試結果如表6所示。
5 ?Broker優化
從表2的測試結果可以看出,不同的消費端的消費速度差異較大,系統運行中同時存在快消費者和慢消費者,在現場運行的過程中也多次發現這樣的問題,某個消費者的消費能力較慢,不能及時消費消息或者返回ACK,導致Broker必須在內存中保存這些消息,增加了內存的消耗,消息積壓過多時,需要將內存的消息寫入到磁盤中,增加了Broker端的磁盤I/O消耗。如果情況進一步嚴重,Broker將阻塞生產者,迫使其降低生產消息的速率甚至不生產消息。
一個慢的消費者不僅給Broker端的運行帶來了巨大的潛在風險,還有可能導致快的消費者也無法正常獲取消息。這是在運行環境中必須高度重視的一個問題。
保證系統運行的穩定至關重要,但與此同時,即使用戶是慢消費者,保證他們及時獲取到數據也很重要,如何滿足這個矛盾的需求,主要從以下三個方面進行了優化:
(1)關閉producerFlowControl,即使有慢消費者,先保證消息生產及快消費者消費的速度,保證消息傳輸不會因為慢消費者而終止。
(2)捕獲Broker資源消耗異常,及時進行干預、優化。
在默認情況下,producerFlowControl是開啟的,在這種模式下,如果消費者消費能力差,Broker將降低消息的生產,以保證消費端不會由于消息擁堵而資源耗盡,該模式為調節Broker來配合慢消費端。
如果選用該模式,消息的生產者也可以進行一些異常的處理,可以進行異常告警,并且生產者可以在等待設定的時間后進行重試,避免由于失敗而使發送消息的請求立即被阻塞,生產者變成假死的狀態。
(3)監控Broker資源使用情況,監控消費者消費情況,及時發現慢消費者,對異常及時進行干預、優化。監控每個消費者消費消息的情況,主要監控參數為消費者是否掉線、阻塞的消息數、等待確認的消息數、進隊列消息數、出隊列消息數等。
6 ?結 ?論
通過各個參數的調優,傳輸系統數據積壓的問題得到了解決,消息傳輸的性能得到了提高,消息傳輸的速度提升情況如表7所示。
表7 ?優化后傳輸時間變化及傳輸效率提升情況
通過方案調整及參數優化,系統的性能及穩定性都得到了較大的提高,達到了預期目標。基于中間件ActiveMQ的調優方法還有很多,例如消息傳送優先級、虛擬通道、分布式網絡、roker集群等,在進一步的研究工作中可從這些方面進一步提高性能及系統穩定性。
參考文獻:
[1] APACHE software foundation. ActiveMQ [EB/OL].http://activemq.apache.org/index.html,2018-09-10.
[2] 王鵬,從波,李國杰,等.基于ActiveMQ消息總線的性能測試方法 [J].測試技術學報,2019,33(2):147-152.
[3] 周聰.基于改進的Active MQ的通信模型的設計和實現 [D].長春:吉林大學,2017.
[4] Spring AMQP. Spring [EB/OL].https://docs.spring.io/spring-amqp/api/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.html,2019-01-01.
[5] Bruce Snyder,Dejan Bosanac,Rob Davies. Introduction to Apache ActiveMQ Green Paper from Active MQ in action [M].London:Manning,2017:20-23.
[6] Bruce Snyder,Dejan Bosanac,Rob Davies. Active MQ inaction [M].London:Manning,2005:4-5.
[7] 龐佳麗.分布式系統中基于中間件的異步通信可靠性研究 [D].杭州:浙江工業大學,2017.
作者簡介:陳瑤(1987.04-),女,漢族,湖南湘潭人,工
程師,碩士,研究方向:數據傳輸框架;李洋磊(1983.01-),男,漢族,河南洛陽人,工程師,碩士,研究方向:民航氣象信息系統設備維護。