邰宇
摘 要 互聯網的快速發展促進了海量數據的產生,而實時處理海量數據離不開具有良好性能的分布式消息隊列,可明顯提高數據處理的效率,海量數據采用何種消息隊列進行傳輸是關鍵問題之一。分析研究應用最頻繁的Apache Kafka、Rocket-MQ及Rabbit-MQ三種消息隊列特點及實現原理,在實時大數據計算場景下基于此對消息隊列分別搭建集群測試環境,比較有關結果后實現對消息隊列的性能設計優化。
關鍵詞 海量數據;消息隊列;大數據時代
中圖分類號 G2 文獻標識碼 A 文章編號 1674-6708(2018)204-0106-02
1 概述
隨著互聯網技術的迅速發展,網絡日志在主流網站及應用中每日產生的都是海量級的,其數據價值與其產生時間之間存在負相關關系。基于此研發的實時流計算系統,使這些數據體現出最大的價值。將數據向計算系統傳輸已成為對計算效率產生影響的一個主要瓶頸,結合特定業務場景科學合理地選擇分布式消息隊列更為適宜,可在一定程度上使實時計算效率明顯提高。對不同分布式消息隊列在實時計算場景下實時性、并發性、可靠性及擴展能力等方面表現出的差異比較,以確定最優性能的消息隊列。
2 消息隊列
2.1 Kafka及其基本架構
Kafka是可實現、發布及訂閱功能的分布式消息隊列系統,生產者生產消息并將指定話題的消息向消息集群中發布,消費者會對消息集群中的指定話題消息主動訂閱,中間對持久化消息的存儲稱為Broker。消息偏移量在消費者中存儲,因Kafka消息隊列無狀態,用于對Kafka中當前消費者的消費狀況進行記錄。
若某個節點在集群中宕機,系統還能提供正常服務,但容易丟失存儲在宕機節點上的信息。無狀態的Kafka需消費者定期維護消息隊列集群中消費的偏移量,詳細記錄之前的消費狀態。消息偏移量是不連續增量,在對下一個消息位置計算時,應將當前消息長度以原來偏移量為基礎進行相加計算。
2.2 Rocket-MQ及其基本架構
Rocket-MQ主要是由服務器端的NameServer、Broker和客戶端的Producer、Consumer四種節點組成,其Broker、Producer、Consumer與Kafka具有基本相同的功能。NameServer主要用于提供給Producer和Consumer生產消費的Broker地址,Rocket-MQ集群隨著啟動的Broker集群,發起連接指定NameServer的請求,Broker將以30s為周期會自動發送具有目的topic消息的一次心跳,同時NameServer每隔兩分鐘對是否存在心跳進行主動檢測,若未檢測到心跳,將自動斷開連接。若Broker掛掉,也將斷開連接,NameServer迅速感知到并將topic和broker的關系更新。但不會向客戶端主動通知,在客戶端啟動時,對部分NameServer指定具體的網址,客戶端自動與指定NameServer進行連接,若不能成功連接,客戶端就會嘗試連接其他NameServer地址,連接成功將每隔30s對路由信息進行查詢。
2.3 Rabbit-MQ及其基本架構
由Exchanges與Queues組成的Broker是Rabbit-MQ與其他2種消息隊列的主要區別,向Exchanges中push生產者生產的消息,系統利用RoutingKey將找到消息與Queue的對應存儲位置。Queue利用routing keys進行綁定,在消息傳輸中,若消費者對客戶端的發送消息正確接收并消費,系統將這條消息從Queue中刪除。多個消費者可接收發送來的同一消息,及時將數據向消費者發送后同時在隊列中將這條數據刪除。
3 實驗設計
采用本地虛擬機PC搭建測試環境,對測試主機進行網絡配置。對實驗系統進行設計,其主要過程為生產者向Broker集群中Push數據,然后對消費者Pull到Broker集群中的數據進行計算。由程序自動生成實驗數據,再向Broker集群中存儲。預先搭建好Storm實時計算系統,3種消息隊列分別與生產者和消費者建立連接,再對消息分別統計分析其生產和消費效率。
4 性能優化設計及實驗結果
4.1 創新性
為使數據計算提高準確性,采用全新Kafka消息結構,放棄消費者利用對offset的維護消費Kafka集群中的數據。消費者對數據接口的讀取和消費者對數據偏移量接口的修改分別進行重新設計,同時由消費者調用讀取數據接口和修改偏移量接口,以確保其消費端具有較高的可靠性。
4.2 消費者可靠性設計
丟失傳送消息和重新傳送消費過的消息是讀取Kafka原生的消費者端數據中比較常見的2個問題,因此,基于此采取可靠性設計方案。將主鍵Id分別添加到生產者中的每條消息中,在消費者中若檢測出重復Id則進行自動過濾,以確保不會重新消費已被消費的消息。確保不丟失傳送數據的方法主要是采取消費者同步處理數據和對數據偏移量修改,即消費者將一條數據處理完后再依次將另一條數據依次進行處理。
4.3 測試主要用例
測試主要是實現對以上3種消息隊列的磁盤 IO、吞吐量及CPU資源消耗率之間存在差距的比較。正常啟動Zookeeper和3個消息隊列,測試是將消息隊列集群與Storm計算集群啟動,push消息隊列中100萬條準備好的測試數據,對topic分別創建,計算Storm計算集群pull出消息隊列中的數據。
4.4 實驗結果
通過比較分析以實時流處理場景為基礎的吞吐量,Kafka最高,Rabbit-MQ的broker磁盤IO處于瓶頸。Rocket-MQ比較穩定,磁盤IO使用率已接近全部。Rabbit-MQ在消耗CPU資源方面較大。再對服務端處理同步發送的有關消息隊列的性能進行比較,Kafka消息隊列最高,Rabbit-MQ消息隊列最低。
5 結論
綜上所述,基于實時流處理業務場景對存儲和讀取需處理數據的消息隊列進行選擇十分必要。通過對Kafka消息結構的優化,再比較基于Storm集群實時計算場景中性能較好的3種消息隊列,研究結果顯示以極大的實時計算數據量和較低延遲的要求為基礎,綜合評價這3種消息隊列的吞吐量、磁盤IO及消耗CPU標準等有關指標,Kafka消息隊列的優勢比較明顯。
參考文獻
[1]王巖,王純.一種基于Kafka的可靠的Consumer的設計方案[J].軟件,2016(17):61-66.
[2]馬浩然.基于NS3的分布式消息系統Kafka的仿真實現[J].軟件,2015(1):94-99.
[3]張鵬,李鵬霄,任彥,等.面向大數據的分布式流處理技術綜述[J].計算機研究與發展,2014(s2):1-9.
[4]周京暉.集成消息服務和定時通知的分布式內存數據庫[J].軟件,2013,34(1):82-92.
[5]譚玉靖.基于ZooKeeper的分布式處理框架的研究與實現[D].北京:北京郵電大學,2014.endprint