顏曉蓮,章 剛,邱曉紅
(1.江西理工大學 軟件工程學院(南昌),江西 南昌 330013;2.江西北大科技園,江西 南昌 330013)
分布式消息系統作為分布式系統重要的模塊間消息傳遞組件,利用可靠、高效的與平臺無關的消息傳遞與分發,可實現分布式系統內部解耦以及分布式系統各模塊的有效集成,因而受到業界高度關注[1]。
ApacheKafka[2-5]是當前較為主流、基于發布-訂閱機制的高吞吐量分布式消息系統,前期由LinkedIn開發后由Apache基金管理并開源。其優勢包括:(1)支持上層應用端多語言開發,如C#、JAVA、PHP、Python、Ruby等;(2)支持與平臺無關的消息傳遞與分發;(3)支持準實時性的大規模消息處理;(4)支持on-line水平擴展。相對其他消息系統,Kafka憑借眾多技術優勢,已在各行業企業級應用中普及。
在Kafka中,多個Broker(服務器)組成Kafka集群,并被ZooKeeper集中管理。Producer為消息生產者,Consumer為消息消費者,Kafka將每個新產生的消息進行劃分并歸類到某個主題Topic中(Topic可理解為邏輯存儲單元)。每個Topic被劃為多個分區Partition(Partition可理解為實際存儲單元),這些分區Partition按某種規則均勻地部署到多個Broker上。根據Kafka系統定義和推薦,Producer生產的消息,依據Hash算法被分發至其所屬Topic相應的Partition上。
Consumer作為消費者訂閱其關注的主題Topic(可訂閱多個),Kafka按Range策略(即均勻分配)將Consumer分配至其關注Topic下眾多Partition之一上,由該Partition作為服務接入端,并依次消費其關注的主題Topic下所有Partition的感興趣消息。當訂閱流量或分發消息數量增加時,Kafka可通過配置文件管理增加Partition數量,實現on-line水平擴展從而提升系統性能與吞吐量。
伴隨大數據時代來臨,各行各業對大數據技術的需求越發強烈。Kafka作為消息中間件,不僅在分布式系統中扮演重要角色,同時也已成為大數據流處理框架Apache Samza的核心組件之一。但隨著Kafka應用的多樣化,其自身的一些不足逐漸顯現。
其中不足之一便是Partition過載問題(Partition overload problem,POP)。Partition在Kafka中扮演承上啟下角色,上連消息生產者Producer,下連消息消費者Consumer,Partition服務性能決定著Broker及Kafka整體性能,對POP問題研究將為后期優化Broker、Consumer乃至Kafka整個系統性能打下基礎。
在綜合衡量已有研發成果及文中所關注的重心的基礎上,認為POP問題指消息分發、消息存儲或消息消費、消息訂閱等操作造成主題Topic下Partition過度服務,并影響到支撐Partition的實際物理載體Broker的性能。
通常而言,在大型商業應用影響下,某時刻會造成某個(或多個)主題Topic源源不斷地涌入新消息,并依據Hash算法向其Partition分發,此時Partition不僅要處理消息存儲還要處理Consumer的服務請求,當新消息數量達到某閾值時,必將導致Partition過載,而這將影響到Partition的物理載體Broker的性能。
雖然,Kafka可通過配置文件增加Partition數量,緩解Partition過載現象出現,但依然存在如下問題:(1)這種由人為主觀判定及人為修改的方式,不僅準確度無法保證而且極為僵化;(2)Partition文件配置管理與基于Hash算法的消息分發相互獨立、相互分離,無法根據Partition實際情況建立協同工作機制。這些問題的存在,已使得Kafka無法滿足當前多樣化應用需求。
當前有關Kafka中Partition過載問題討論極為少見。研究成果較為常見的包括:(1)ZooKeeper集中管理機制[6-7],主要討論業務復雜化后,Broker、Consumer、Consumer Group等注冊管理,Topic與Broker映射關系以及Partition分配等管理機制,有助于提升系統整體效率;(2)Broker[8-9]負載均衡,主要討論虛擬化背景下Broker如何實現接入負載均衡,有助于提升Broker資源利用率;(3)Consumer[10-11]負載均衡,主要討論大規模數據處理環境下,傳統Kafka易造成的高開銷、高誤差率等問題,有助于降低系統耗能、提升服務質量。這些雖都對Kafka系統實現優化,但都無法解釋Partition過載問題。
針對此,提出一種改進型Partition負載優化算法(IPOOA算法),該算法實現消息分發預測以及消息分發與文件配置管理協同,從而可有效緩解Partition過載問題出現。
算法思想:新消息產生后,IPOOA算法先根據實際業務提取業務關鍵字Key,依據Hash分發規則計算分發至Partition,接著算法評估該Partition的即時服務耗量,如果即時服務耗量在閾值范圍內,則新消息被分發至該Partition,否則算法依次計算與該Partition相似度較高的候選Partition,并評估候選Partition的即時服務耗量,如果滿足閾值范圍,則新消息被分發至候選Partition,否則重復計算候選Partition,直至迭代次數超過半數Partition總量。如果依然沒有完成消息分發任務,則通知Kafka自動修改配置文件新增Partition并存儲新消息,從而能夠有效緩解Partition過載。
按照Kafka的定義,消息分發機制共包括Hash分發、隨機分發以及輪詢分發等(如圖1所示),實際中企業級應用使用范圍較廣的是Hash分發機制。

(a)Hash分發機制

(b)隨機分發機制

(c)輪詢分發機制
該機制大致過程如下:
Step1:指定消息的Key(通常選取實際業務所含關鍵字符);
Step2:基于Key實現Hash(Key);
Step3:根據mod(Hash(key))結果將消息分發至指定Partition;
Step4:返回Step1。
Hash分發機制相對其余兩種方式,其能夠較好地保證消息均勻有序分發,因而被行業廣泛普及使用。但Hash消息分發無法根據Partition實際負載情況進行有序分發,從而易加重Partition負載。
1.2.1 即時服務耗量(instant service consumption,ISC)
ISC反映當前t時刻Partition中消息消費產生的服務消耗量,對任意消息k而言,在時刻t產生的服務消耗量Cmt由t時刻消息k訂閱數CmtNum及t時刻消息k訪問連接數CmtCon線性加權組成,如式(1):
Cmtt(k)=λ1CmtNumt(k,N1)+λ2CmtCont(k,N2)
(1)
其中,λ1∈(0,1)和λ2∈(0,1)為權重系數,N1和N2分別為訂閱總數和連接總數。
t時刻Partition的ISC可表示為:
(2)
1.2.2 Partition相似度(partition similarity,PS)
PS反映某一時刻兩個Partition所存消息的相似程度,對任意Partition而言,在時刻t所存儲消息隊列表示為Partitiont={Meg1,Meg2,…,MegNUM},NUM為消息總數。則時刻t任意兩個Partition的PS可根據加權閔可夫斯基距離(Minkowski distance)計算,如式(3):
其中,p≥1為指數參數,θ∈(0,1)為權重系數。
1.2.3 算法過程
Step1:初始化Partition配置文件,載入Kafka系統中,并設置各類參數λ1,λ2,p,θ以及Θ(Cmt閾值),設定迭代次數,轉入Step2;
Step2:等待新消息導入,并根據Hash分發算法計算其分發至Partition,轉入Step3;
Step3:根據式(1)、式(2)計算該Partition的ISC值,轉入Step4;
Step4:判定該Partition的ISC值是否滿足閾值Θ,如果滿足則新消息存儲并轉入Step2.;否則轉入Step5;
Step5:根據式(3)依次計算該Partition與候選Partition的PS值,并挑選出最優PS值,轉入Step3;
Step6:如果迭代次數超過半數Partition總量,則通知Kafka自動修改配置文件新增Partition,并將新消息存儲在新增Partition上,根據實際情況轉入Step2或轉入Step7;
Step7:退出算法。
軟硬件環境:選取12個Broker(服務器)作為Kafka集群,CPU型號為Xeon E5-2620V3,內存8G,SATA硬盤300G,操作系統為SUSE Linux Enterprise Server 15。
核心參數設置:在綜合考慮文獻對參數取值的建議和基于多次重復實驗的結果,參數設定如下:p=1 ORp=2,λ1,λ2∈(0.35,0.65),θ1,θ2,…∈(0.1,0.9),Θ∈[0.5,0.65],其中實驗中所有權重系數之和都為1。
場景模擬:12個Broker服務器分成3個功能區,其中3個服務器作為Producer消息生產者不斷模擬分發消息,3個服務器作為Consumer消息消費者不斷模擬消費消息,Producer與Consumer隨機分布在不同區域,另外6個服務器作Kafka集群服務器集中管理,處理Producer消息分發以及Consumer消息消費[12-15]。
對比算法:為展示實驗的客觀性,分別選取傳統Kafka算法[2-5],基于Broker負載均衡的BL算法[8]和基于Consumer負載均衡的CL算法[10]與融合文中IPOOA算法的Kafka相比較。
測試指標:為體現實驗的全面性,將從多個維度驗證算法的性能:(1)Kafka集群CPU使用率(Kafka CPU rate,KCR);(2)Kafka服務延時率(Kafka service delay rate,KSDR);(3)Kafka系統收斂延時比(Kafka system convergence delay rate,KSCDR)。
實驗方案:
實驗1:在并發規模為2 000環境下,KCR、KSDR及KSCDR對比如表1所示。

表1 在并發規模為2 000環境下,4種算法的KCRKSDRKSCDR對比 %
實驗2:在并發規模為3 500環境下,KCR、KSDR及KSCDR對比如表2所示。

表2 在并發規模為3 500環境下,4種算法的KCRKSDRKSCDR對比 %
實驗3:在并發規模為5 000環境下,KCR、KSDR及KSCDR對比如表3所示。

表3 在并發規模為5 000環境下,4種算法的KCRKSDRKSCDR對比 %
實驗總結:在并發規模逐漸增加下,融合文中算法的Kafka系統(IPOOA_Kafka)在各項指標層面相對較優,主要原因在于IPOOA_Kafka能夠實現預測消息分發以及消息分發與文件配置協同工作,從而能緩解Partition過載問題出現,提升系統整體性能。
針對Kafka中Partition文件配置管理所存在的被動、僵化及孤立等不足,使得Partition過載問題無法有效解決,提出一種改進型Partition過載優化算法。該算法通過即時服務耗量,Partition相似度和配置文件自動修改相結合,實現消息分發預測以及消息分發與文件配置管理協同,從而可有效緩解Partition過載問題出現。實驗從Kafka集群CPU使用率、Kafka服務延時率、Kafka系統收斂延時比等幾個方面驗證了算法的有效性及合理性。未來將重點圍繞消息分發、消息訂閱及文件配置管理等多層面協同展開研究。