胡亞紅,盛 夏,毛家發(fā)
(1.浙江工業(yè)大學(xué)計算機學(xué)院,浙江 杭州 310023;2.寧波銀行股份有限公司,浙江 寧波 315100)
隨著各大數(shù)據(jù)中心、超算中心和互聯(lián)網(wǎng)公司等機構(gòu)設(shè)備的更新?lián)Q代和高性能部件(如GPU等)的引入,集群中各節(jié)點逐步變得異構(gòu)[1,2],因而各節(jié)點的綜合計算能力出現(xiàn)較大的差異,整個集群處于資源不均衡的狀態(tài)。這里的異構(gòu)性是指計算節(jié)點在CPU、內(nèi)存和IO等方面具有不同的性能而導(dǎo)致其自身的處理能力出現(xiàn)差異。由于集群中各節(jié)點的能力不同,相同任務(wù)分配到不同節(jié)點將對節(jié)點負載產(chǎn)生不同的影響。Spark默認的任務(wù)調(diào)度算法基于集群節(jié)點同構(gòu)的理想化設(shè)計,并未考慮集群異構(gòu)性及節(jié)點資源利用和負載變化的情況,因此無法滿足資源異構(gòu)模式下系統(tǒng)的效率和負載平衡等要求。
目前國內(nèi)外學(xué)者已經(jīng)提出了很多并行框架下的任務(wù)調(diào)度算法。文獻[3]指出分布式集群通常存在負載均衡問題,并對Yarn的公平調(diào)度算法進行了改進。文獻[4]指出MapReduce進行任務(wù)調(diào)度時未考慮Map和Reduce任務(wù)的差異以及集群的異構(gòu)性,提出了一種針對Hadoop平臺上截止時間約束的擴展MapReduce任務(wù)調(diào)度算法。文獻[5]為了最大限度地減少計算機的資源爭用,提出了可以感知節(jié)點資源的任務(wù)調(diào)度策略,即控制節(jié)點通過收集分析工作節(jié)點資源消耗情況來進行調(diào)度。考慮并行和分布式異構(gòu)計算系統(tǒng),文獻[6]提出了基于啟發(fā)式的任務(wù)調(diào)度算法,以進行任務(wù)優(yōu)先級的確定和處理器的選擇。文獻[7]提出一種資源感知的Hadoop調(diào)度程序,它考慮了計算資源的異構(gòu)性和云計算環(huán)境中任務(wù)分配的配置費用。文獻[8]提出的算法可動態(tài)計算任務(wù)進度并自動適應(yīng)連續(xù)變化的環(huán)境。提交作業(yè)時,該算法會將作業(yè)拆分為許多細粒度的Map和Reduce任務(wù),然后將它們分配給合適的節(jié)點。文獻[9]提出的算法通過滑動窗口技術(shù)動態(tài)監(jiān)控系統(tǒng)中執(zhí)行的作業(yè)數(shù)量,自適應(yīng)地管理系統(tǒng)負載平衡,并為不同類型的作業(yè)劃分差異化服務(wù)的優(yōu)先級,使Hadoop原始的算法得到了改進。文獻[10]針對Hadoop提出了一種新的基于數(shù)據(jù)局部性的調(diào)度器,它根據(jù)節(jié)點的處理能力分配需要處理的數(shù)據(jù)塊。文獻[11]根據(jù)異構(gòu)Hadoop集群中各個計算節(jié)點的負載動態(tài)變化和不同任務(wù)的節(jié)點性能差異,提出了一種基于動態(tài)工作負載調(diào)整的自適應(yīng)任務(wù)調(diào)度策略。在Spark方面,文獻[12]提出了一種新的調(diào)度策略以優(yōu)化Spark在異構(gòu)集群中的表現(xiàn)。新策略引入了分層調(diào)度的思想,使Spark在調(diào)度時能綜合考慮任務(wù)復(fù)雜度、節(jié)點性能及節(jié)點資源使用情況等因素,實現(xiàn)了更加高效公平的任務(wù)調(diào)度算法。文獻[13]中,作者認為Spark沒有考慮到異構(gòu)環(huán)境下節(jié)點的性能差異,因此提出了一種自適應(yīng)任務(wù)調(diào)度算法,可以通過檢測節(jié)點負載和資源利用率來提高集群的運行性能。但是,該算法考慮資源影響因素不夠全面,權(quán)值過于依賴設(shè)定的閾值,有一定的主觀性。還有一些基于人工智能及生物信息的任務(wù)調(diào)度優(yōu)化算法,如蟻群算法ACO(Ant Colony Optimization)[14,15]、遺傳算法[16 - 18]等,這些算法能夠進行多目標(biāo)優(yōu)化,優(yōu)化效果比較顯著。但是,這些算法的原理比較復(fù)雜,其實現(xiàn)計算量較大、非常耗時[19]。
現(xiàn)階段的研究大多數(shù)是針對Hadoop進行優(yōu)化,Spark異構(gòu)環(huán)境優(yōu)化方面的研究略有不足。因此,本文通過分析每個節(jié)點的計算能力,對Spark的底層調(diào)度算法進行優(yōu)化,充分考慮節(jié)點的異構(gòu)性、資源利用和負載等情況,以期提高Spark系統(tǒng)的運行效率,縮短任務(wù)整體運行時間。
本文的組織結(jié)構(gòu)如下:第2節(jié)建立了節(jié)點性能評價體系;第3節(jié)介紹了節(jié)點優(yōu)先級調(diào)整算法;第4節(jié)給出了基于節(jié)點優(yōu)先級的Spark動態(tài)自適應(yīng)調(diào)度算法;第5節(jié)對提出的算法進行了實驗驗證,并對實驗結(jié)果加以分析;第6節(jié)對本文加以總結(jié),并指出下一步的工作方向。
有效的節(jié)點性能評價是進行任務(wù)調(diào)度的基礎(chǔ)。本文采用節(jié)點優(yōu)先級反映節(jié)點能力的大小,優(yōu)先級由節(jié)點的靜態(tài)性能指標(biāo)和動態(tài)性能指標(biāo)計算得到。
定義1節(jié)點優(yōu)先級P:用于表示節(jié)點的計算能力,值越大代表節(jié)點計算能力越強,被選中執(zhí)行任務(wù)的概率越大。第i個節(jié)點的優(yōu)先級為Priorityi,簡寫為Pi。
定義2節(jié)點性能指標(biāo):用于描述節(jié)點性能的1組指標(biāo),包括靜態(tài)性能指標(biāo)S和動態(tài)性能指標(biāo)D。靜態(tài)性能指標(biāo)是與任務(wù)執(zhí)行狀態(tài)無關(guān)的指標(biāo),其值由多個靜態(tài)因素(如CPU核數(shù)、內(nèi)存大小等)決定。動態(tài)性能指標(biāo)則是指會隨著任務(wù)執(zhí)行狀態(tài)而變化的指標(biāo),其值由多個動態(tài)因素(CPU剩余率、任務(wù)隊列長度等)決定。第i個節(jié)點的靜動態(tài)性能指標(biāo)分別為Statici和Dynamici,簡寫為Si和Di。
節(jié)點優(yōu)先級P與靜態(tài)性能指標(biāo)S和動態(tài)性能指標(biāo)D的關(guān)系如式(1)所示,靜態(tài)性能指標(biāo)的計算如式(2)所示,動態(tài)性能指標(biāo)的計算如式(3)所示。
Pi=αDi+βSi
(1)
Si=n1s1(i)+n2s2(i)+…+nksk(i)
(2)
Di=m1d1(i)+m2d2(i)+…+mjdj(i)
(3)


Figure 1 Node performance evaluation index system圖1 結(jié)點性能評價指標(biāo)體系
影響節(jié)點性能的靜態(tài)和動態(tài)因素很多。經(jīng)過分析,靜態(tài)因素考慮CPU速度、CPU核數(shù)、內(nèi)存大小以及磁盤容量等;動態(tài)因素考慮CPU負載、CPU剩余率、內(nèi)存剩余率和磁盤剩余率等。
節(jié)點性能評價指標(biāo)體系如圖1所示。
靜態(tài)因素可由機器已有配置得到,但是動態(tài)因素是根據(jù)任務(wù)運行狀態(tài)變化的,因此需要實時關(guān)注集群資源利用情況以獲取相關(guān)信息。本文采用Ganglia[20,21]來進行節(jié)點資源的采集。Ganglia是1個可擴展的分布式集群資源監(jiān)控系統(tǒng),能夠?qū)崿F(xiàn)對集群內(nèi)存、CPU、硬盤和網(wǎng)絡(luò)流量等信息的監(jiān)控,并支持自定義的插件來實現(xiàn)對指定服務(wù)的定向監(jiān)控。Ganglia資源消耗小,對節(jié)點的性能影響非常小。
本文搭建的實驗集群中有5個節(jié)點,分別為Master、Slave1~Slave4。圖2給出了Ganglia采集的各節(jié)點的靜態(tài)因素值,圖3顯示了某一時刻各節(jié)點的動態(tài)因素值。

Figure 2 Static factor values of each node圖2 采集的各節(jié)點靜態(tài)因素值

Figure 3 Dynamic factor values of each node collected for the first time圖3 第1次采集的各節(jié)點動態(tài)因素值
根據(jù)圖2和圖3采集的靜態(tài)和動態(tài)指標(biāo)值,可以使用式(1)~式(3)計算得到當(dāng)前系統(tǒng)中各個節(jié)點的優(yōu)先級,如圖4所示。

Figure 4 Priorities of the cluster nodes圖4 系統(tǒng)中各節(jié)點的優(yōu)先級
在集群工作過程中,參與計算的各個節(jié)點的優(yōu)先級會發(fā)生變化。本文提出節(jié)點優(yōu)先級調(diào)整算法NPAA(Node Priority Adjustment Algorithm),將根據(jù)節(jié)點資源的使用情況和節(jié)點的狀態(tài)變化動態(tài)地調(diào)整節(jié)點的優(yōu)先值。
NPAA的基本思想是實時監(jiān)控系統(tǒng)中各個節(jié)點的動態(tài)因素指標(biāo),需要進行任務(wù)調(diào)度時Master節(jié)點計算出各Slave節(jié)點的優(yōu)先級。算法1給出了NPAA的偽代碼。
算法1節(jié)點優(yōu)先級調(diào)整算法NPAA
輸入:監(jiān)控文件。
輸出:集群節(jié)點優(yōu)先級隊列。
Step1集群啟動時,觸發(fā)監(jiān)控啟動心跳。
Step2各Slave節(jié)點監(jiān)聽檢測端口,獲取自身的參數(shù)配置信息,包括CPU速度、CPU核數(shù)、內(nèi)存大小和節(jié)點磁盤容量。
Step3各Slave節(jié)點將Step 2中獲取的信息上傳給Master節(jié)點。
Step4Master節(jié)點對收到的信息進行歸一化處理,使用式(2)計算出每個Slave節(jié)點靜態(tài)性能指標(biāo)S的值。
Step5Slave節(jié)點根據(jù)自定義配置利用Gang-lia對各自的CPU剩余率、內(nèi)存剩余率、磁盤剩余率和CPU負載情況進行檢測,獲取相應(yīng)的數(shù)據(jù)。
Step6Master節(jié)點對集群中每個Slave節(jié)點進行輪詢,將得到的節(jié)點動態(tài)因素數(shù)據(jù)存入輪詢數(shù)據(jù)庫中。
Step7當(dāng)集群發(fā)出節(jié)點排序請求時,Master節(jié)點取出輪詢數(shù)據(jù)庫中各節(jié)點當(dāng)前的CPU剩余率、內(nèi)存剩余率、磁盤剩余率和CPU負載,進行歸一化處理,并使用式(3)得到每個Slave節(jié)點的動態(tài)性能指標(biāo)D。
Step8根據(jù)每個Slave 節(jié)點的靜態(tài)性能指標(biāo)S、動態(tài)性能指標(biāo)D和式(1),Master節(jié)點計算得到其優(yōu)先級。
Step9Master節(jié)點輸出節(jié)點優(yōu)先級隊列。
以包含3個Slave節(jié)點的集群為例,節(jié)點優(yōu)先級調(diào)整算法實施架構(gòu)如圖5所示,整體架構(gòu)由集群的主節(jié)點Master、從節(jié)點Slave和基于Ganglia的監(jiān)控服務(wù)器3方組成。每個Slave節(jié)點上添加1個檢測模塊,利用Ganglia周期性獲取節(jié)點的各種資源使用情況和任務(wù)隊列長度。獲取的信息被傳輸?shù)組aster節(jié)點進行存儲和優(yōu)先級計算。

Figure 5 Architecture for node priority adjustment圖5 節(jié)點優(yōu)先級調(diào)整架構(gòu)圖
集群啟動后,各Slave節(jié)點根據(jù)本地配置文件定義的采集調(diào)度方案進行節(jié)點動態(tài)因素值的檢測,監(jiān)聽數(shù)據(jù)時使用簡單的監(jiān)聽/通告協(xié)議。Slave節(jié)點使用單播,將數(shù)據(jù)匯聚到Master節(jié)點。Master節(jié)點負責(zé)整合所有信息,它利用輪詢器對每個Slave節(jié)點進行輪詢,并將返回的數(shù)據(jù)存入各Slave節(jié)點對應(yīng)的輪詢數(shù)據(jù)庫中。
在集群節(jié)點資源不平衡的情況下,為保證任務(wù)的完成時間最短,需要準(zhǔn)確地衡量各個節(jié)點的性能,從而選擇出合適的節(jié)點進行任務(wù)分配。本文提出了一種基于節(jié)點優(yōu)先級的Spark動態(tài)自適應(yīng)調(diào)度算法SDASA(Spark Dynamic Adaptive Scheduling Algorithm)。
基于節(jié)點優(yōu)先級的Spark動態(tài)自適應(yīng)調(diào)度算法運行在Master節(jié)點上,由計算節(jié)點優(yōu)先級、優(yōu)先級排序、分配任務(wù)到Slave節(jié)點、任務(wù)調(diào)度4部分組成。SDASA偽代碼如算法2所示:
算法2SDASA
輸入:任務(wù)集TaskSet,任務(wù)個數(shù)為m;集群節(jié)點集合WorkerOffer,節(jié)點個數(shù)為h。
輸出:返回任務(wù)列表,即第i個任務(wù)分配在第j個節(jié)點上。
Step1檢測集群有無節(jié)點更新,若有,根據(jù)NPAA重新獲取各個節(jié)點的靜態(tài)因素值,計算靜態(tài)性能指標(biāo)。
Step2根據(jù)NPAA,依次計算每個Slave節(jié)點的動態(tài)性能指標(biāo)。
Step3依次計算每個Slave節(jié)點的優(yōu)先級。
Step4根據(jù)各Slave節(jié)點優(yōu)先級對節(jié)點集合WorkerOffer進行排序。
Step5從優(yōu)先級最高的Slave節(jié)點開始,依次遍歷每個節(jié)點。在每個節(jié)點輪流遍歷TaskSet中的每個Task,循環(huán)執(zhí)行Step 6。
Step6獲取Task在節(jié)點上的本地化參數(shù),并進行判斷,如果參數(shù)是最大的,執(zhí)行Step 7。
Step7分配該Task給該節(jié)點。
仍以包含3個Slave節(jié)點的集群為例,SDASA的設(shè)計架構(gòu)如圖6所示。其中每個Slave節(jié)點有1個信息收集模塊,負責(zé)定期檢測節(jié)點自身負載和資源剩余情況并傳遞給Master節(jié)點。當(dāng)需要分配Task時,Master計算出每個Slave節(jié)點優(yōu)先級的值,并對節(jié)點進行排序,選擇排序靠前的部分節(jié)點進行任務(wù)分配。

Figure 6 Framework of SDASA圖6 SDASA設(shè)計架構(gòu)圖
本節(jié)主要展示NPAA執(zhí)行時節(jié)點優(yōu)先級隨動態(tài)性能指標(biāo)而變化的情況。搭建的集群中有Master、Slave1~Slave4 5個節(jié)點。圖2和圖3分別給出了實驗開始時系統(tǒng)的靜態(tài)因素值和動態(tài)因素值。
向集群提交數(shù)據(jù)集大小為3 GB的WordCount任務(wù)。任務(wù)執(zhí)行開始,系統(tǒng)進行動態(tài)因素值的采集。圖7選取了系統(tǒng)運行17 s時第2次采集的各節(jié)點的動態(tài)因素值。圖8給出了這個時刻的節(jié)點優(yōu)先級。

Figure 7 Dynamic factor values of each node collected for the second time圖7 第2次采集的各節(jié)點動態(tài)因素值

Figure 8 New priority of nodes after job running圖8 任務(wù)運行后集群節(jié)點優(yōu)先級
對比圖3和圖7中的數(shù)據(jù),CPU剩余率和內(nèi)存剩余量的值有所變化,而磁盤剩余量不變。這是因為在這17 s中,集群開始分解并執(zhí)行WordCount,需要消耗CPU和內(nèi)存。由于沒有磁盤存儲等操作,因而沒有產(chǎn)生磁盤開銷。將圖8和圖4進行對比,可以明顯看出節(jié)點優(yōu)先級隨著動態(tài)性能指標(biāo)的變化而發(fā)生了變化。
本節(jié)分別使用Spark默認調(diào)度算法和SDASA進行任務(wù)調(diào)度,檢測Spark集群的計算能力是否得到了提升。實驗采用的節(jié)點配置如表1所示。

Table 1 Hardware configuration of cluster nodes表1 集群節(jié)點硬件配置表
實驗中進行節(jié)點優(yōu)先級計算時,靜態(tài)因素和動態(tài)因素的權(quán)值分別為0.3和0.7;各靜態(tài)因素的權(quán)值分別為:CPU速度0.05,CPU核數(shù)0.45,內(nèi)存大小0.45,磁盤容量0.05;各動態(tài)因素所用權(quán)值分別為:CPU負載0.2,CPU剩余率0.1,內(nèi)存剩余率0.6,磁盤剩余率0.1。
本文采用的負載來自于中國科學(xué)院計算技術(shù)研究所研發(fā)的基于大數(shù)據(jù)基準(zhǔn)測試的開源性程序集BigDataBench[22]。它對5個典型且重要的大數(shù)據(jù)應(yīng)用領(lǐng)域進行了模擬,包括搜索引擎、社交網(wǎng)絡(luò)、電子商務(wù)、多媒體分析和生物信息學(xué)。BigDataBench總共包含了14個真實數(shù)據(jù)集和34個大數(shù)據(jù)工作負載。本文從BigDataBench基準(zhǔn)測試套件中選擇了3種工作負載,包括WordCount、Sort和K-means。
本文進行了2種不同類型的對比實驗,即同種任務(wù)不同數(shù)據(jù)量和不同種任務(wù)同種數(shù)據(jù)量。為了避免其他因素導(dǎo)致的實驗誤差,每種實驗分別進行10次并統(tǒng)計出任務(wù)在集群上的運行時間,最后取其平均值。
5.2.1 相同任務(wù)不同數(shù)據(jù)量
本實驗處理的任務(wù)是WordCount程序,實驗使用了規(guī)模不同的數(shù)據(jù)集,分別是2 GB,4 GB,6 GB,8 GB和10 GB。實驗結(jié)果如圖9所示,圖中橫坐標(biāo)代表不同數(shù)據(jù)量,縱坐標(biāo)代表完成整個任務(wù)所需要的時間。

Figure 9 Execution time comparison with different data volumes圖9 不同數(shù)據(jù)量完成時間比較
由圖9可以看出,在相同任務(wù)下,使用SDASA比使用默認算法整體效率有所提升。在5種不同的數(shù)據(jù)量下,性能提升最高為8.27%,最低為6.24%,平均6.99%。
5.2.2 不同任務(wù)相同數(shù)據(jù)量
本實驗分別使用3種不同的任務(wù),即WordCount、Sort和K-means,實驗使用的數(shù)據(jù)量皆為4 GB。使用默認的任務(wù)調(diào)度算法和SDASA得到的實驗結(jié)果如圖10所示。

Figure 10 Execution time comparison with different applications圖10 不同任務(wù)完成時間比較
由圖10可以看出,運行WordCount、Sort和K-means任務(wù)時,使用SDASA比使用默認算法集群的運行效率分別提升了6.24%,7.06%,5.64%,平均6.32%。
從以上2個實驗結(jié)果可以看出,與默認的任務(wù)調(diào)度算法相比,SDASA針對不同的數(shù)據(jù)量和不同類型的任務(wù),都可以更好地發(fā)揮異構(gòu)Spark集群的計算能力,從而提升集群的運行效率。
針對Spark資源調(diào)度設(shè)計的不足,本文提出了基于節(jié)點優(yōu)先級的Spark動態(tài)自適應(yīng)調(diào)度算法SDASA。通過在任務(wù)運行過程中實時更新集群中各節(jié)點的優(yōu)先級,SDASA能為當(dāng)前任務(wù)找到最優(yōu)執(zhí)行節(jié)點。實驗結(jié)果證明了算法的有效性。
后續(xù)的研究內(nèi)容主要包括:
(1) 針對任務(wù)的性質(zhì)調(diào)整節(jié)點優(yōu)先級評估體系中指標(biāo)的權(quán)值,并通過實驗進行驗證,進一步增加評估的準(zhǔn)確性。
(2) 任務(wù)類型多種多樣,如CPU密集型、IO密集型等,不同類型的任務(wù)對節(jié)點性能有不同的需求。改進任務(wù)調(diào)度算法,增加對任務(wù)類型的考慮。
(3) 為了進一步驗證SDASA的有效性,下一步將搭建更大規(guī)模的集群,增加數(shù)據(jù)集的數(shù)目進行更多的實驗。