葛慶寶,陶耀東,高 岑,田月,孟祥茹
1(中國科學院大學,北京 100049)
2(中國科學院 沈陽計算技術研究所,沈陽 110168)
Spark是起源于美國加州大學伯克利分校AMPLab的大數據計算平臺,它提出的彈性分布式數據集(Resilient Distributed Dataset,RDD)概念能夠在分布式內存中快速處理大數據,因此Spark 在內存處理速度上比Hadoop MapReduce要高100倍,除此之外Spark的功能涵蓋了數據的離線批處理和實時流數據處理,機器學習,圖計算和SQL數據處理等各種類型的操作[1].這一系列因素使得Spark在大數據領域被越來越廣泛的使用.在實際使用中通常需要根據需求優化Spark平臺,但是因為Spark的底層運行機制對用戶來講是透明的,所以如何合理分配集群資源和優化Spark作業性能就成為一個值得研究的問題.性能預測是集群資源分配優化模型的基礎和核心,本文正是基于此提出了一種基于關鍵階段分析的Spark性能預測模型.通過對Spark作業運行性能進行預測從而合理分配集群資源提高作業運行效率.
近幾年,隨著各種大數據計算平臺的流行,為了高效的管理和利用集群資源,很多學者提出了一些性能預測模型和方案.文獻[2–8]只是部分針對Hadoop平臺的性能預測研究.
目前針對Spark平臺的性能預測研究還比較少.Wang KW等[9]提出了理論分析模擬Spark作業的方法預測其性能(運行時間和內存使用等),能夠獲得較好的準確度,但是需要對作業進行詳細的分析.Wang GL等[10]提出了使用機器學習的方式預測作業在不同參數下的性能表現,對比了決策樹、線性回歸、支持向量機和神經網絡方法在預測Spark性能方面的準確度,但是該方法完全采用黑盒方法,沒有結合Spark作業的運行特征,對不同類型的作業性能預測缺少支持.陳僑安等[11]提出的基于近鄰搜索算法的 Spark 參數自動優化,主要思路是通過提取 Spark作業的特征,比如運行日志和DAG圖等,然后在歷史數據庫中查詢相似作業,并把相似作業執行效率最高的配置參數作為當前作業的最優配置參數,這種方法有幾個缺陷,第一是通過圖的編輯距離來判斷DAG圖的相似性,計算性較大且圖的編輯距離方法本身已經被證明為是一個NP難問題;第二是該方法的相似性判斷不一定能夠選取出相似作業.
本文在借鑒前人工作的基礎上,通過分析大量的Spark作業運行特征,提出了Spark作業關鍵階段的概念,并基于此建立了一種Spark性能預測模型.本文組織結構如下:第3節闡述Spark原理,提出關鍵階段的概念,并對關鍵階段和Spark性能之間的關系進行分析;第4節介紹通過關鍵階段建立Spark性能預測模型;第5節通過實驗驗證該模型,證明該模型能夠得到較好的預測效果.
用戶提交的程序被Spark分解成n個作業(job),每個作業又被分解為m個階段(stage),各個階段之間串行執行.每個階段中包含若干任務(task).為了能夠并行進行計算,每個階段中的一批任務并行處理其中的可容錯彈性分布數據集(RDD).因此我們可以將Spark應用的執行時間定義為如下公式:

其中,Tapplication 表示用戶程序運行時間,Tstart 和Tend表示程序提交階段和運行結束后清理階段運行時間,Tjobi表示第i個作業的運行時間(0≤i≤n),Tstagej表示第j個階段運行的時間(0≤j≤m).
因為每個階段內部的任務是按照批次進行的,同一批次內部并行執行,并行執行受到集群的資源分配(機器核數,executor個數等)和作業類型等多個因素的影響,所以不能按照簡單批次時間累加.如果去分析所有的任務的執行時間會消耗大量的計算成本和時間成本.我們經過大量的實驗發現,在Spark作業的每個階段在不同輸入數據量的情況下對作業執行時間產生影響不同,如圖1所示.我們可以發現一個共同特點:有的階段運行時間在不同的輸入數據量下基本保持不變,有的則會產生較大變化從而影響整個作業的運行時間,還有的階段對應的運行時間為0,這是因為該階段的計算結果因為緩存到了內存之中,在后續需要使用該結果的時候可以直接取出從而不再需要計算.以圖1(a)中的WordCount程序為例,我們可以看出Stage0階段的運行時間在整個作業運行中的時間比重較大,而且隨著數據量的變化產生明顯的變化.Stage1所占比例較小,且基本穩定.我們通過實驗分析其他幾種類型的作業,比如Sort,PageRank和K-Means,均有相似性質,因此我們提出了關鍵階段的概念.
定義1.關鍵階段.對于一個Spark作業其關鍵階段需要同時滿足以下條件:
(1)該階段的運行時間大于整個作業運行時間的均值
(2)該階段的運行時間隨輸入數據量的變化而變化的幅度大于該作業每個階段運行時間變化幅度的均值
本文我們選取Spark作業的運行時間作為Spark性能的衡量指標,因此本文的目的即預測在不同的輸入數據量的情況下Spark作業的運行時間.為了建立該模型,我們需要分析關鍵階段的運行時間和和Spark作業不同輸入數據量之間的關系.

圖 1 幾種常見Spark程序處理在不同輸入數據量時的各個階段運行時間
以WordCount程序為例,我們通過實驗得出關鍵階段的運行時間在不同輸入數據量情況下的關系圖,如圖2所示.

圖2 關鍵階段運行時間和不同輸入數據量關系圖
在圖中我們可以看出在WordCount程序中關鍵階段的運行時間和數據量之間可以擬合成一種線性回歸關系,即

雖然不同類型的Spark 作業的資源消耗情況等各種運行情況不盡相同,比如 CPU 密集型作業和I/O密集型作業需要的集群資源不同,但是我們通過實驗發現多數作業的關鍵階段運行時間和數據量之間的關系
具有回歸關系.所以公式(3)可以表達為以下公式:

通過上述兩節的分析以及公式(2),我們定義對Spark作業執行時間的預測公式:

其中,Tkeystage表示關鍵階段的運行時間,Trstage表示其它階段的時間,這些階段的運行時間在不同輸入數據量的情況下基本可以認為保持不變.
具體建模步驟:
(1)運行小批量不同大小的數據集,收集作業的運行信息:階段個數,每個階段的起止時間等.
(2)根據算法1求出關鍵階段.

?

作業的運行時間均值Tmean.② 依次判斷每個階段的運行時間是否大于Tmean,并將大于均值的編號存儲到List1.③ 遍歷List1中的編號,獲取對應的每個階段運行時間的變化幅度和求出總的變化幅度的均值Dmean.④ 返回List1中大于Dmean的階段即為所求.
(3)根據步驟(1)的信息結合有關機器學習回歸算法獲取公式(4)的具體表達式.
(4)根據公式(5)(6)獲取作業的預測運行時間.
本文所使用的實驗環境的是三個節點組成的Spark分布式集群,集群采用主從架構,其中的一個節點是主節點,另外兩臺為從節點.節點的操作系統是CentOS Linux release 7.2.1511 64 bit,內存為8 G,處理器4核4線程;使用的Spark版本是2.0.1,Hadoop版本為2.7.3,并且使用Hadoop 的YARN作為分布式集群的資源調度器;java版本是1.8.0_131;基準測試程序BigDataBench[12].實驗中我們分別針對WordCount,PageRank和K-Means程序進行作業性能預測.首先使用小數據量數據集得出公式(4)的表達式,然后根據該公式預測大數據量情況下作業的運行時間.

圖3 幾種常見Spark程序在該模型下預測各階段運行時間和實際運行時間的對比
針對WordCount和K-Means程序,采用的訓練數據量是100 M~1.5 G,間隔200 M,預測5 G數據量下運行時間;針對PageRank程序,采用的訓練數據圖為節點1024到32 768,邊數為2147到99 489,預測節點數為3 048 576,邊數為4 610 034圖的處理時間.得到結果如圖3.
如圖3,實驗結果顯示預測模型能夠整體上反映Spark作業的運行態勢,而且可以較為準確的預測出Spark作業的運行時間.同時分析圖3(c)中的PageRank預測結果可以發現后面有幾個階段(18,19,20)的預測運行時間均為0,這是因為實驗中運行的小批量訓練數據集和驗證運行的大數據集的階段個數不一致導致的.因為這些階段不是關鍵階段,也就是在整個作業運行時間中占有極小的一部分比例,所以對整體運行時間的預測結果影響不大.將實驗結果和文獻9的結果相對比發現兩者所預測的作業執行時間準確度相似,但是本文所使用的方法不需要人工對作業進行詳細的分析從而能具有更好的效率.
本文首先通過實驗分析Spark作業的不同階段運行時間在整個作業運行時間所占比重的不同,提出了Spark作業的關鍵階段這個概念,然后通過分析Spark作業輸入數據量和關鍵階段運行時間之間的關系提出了一種基于關鍵階段分析的Spark性能預測模型,并且通過實驗驗證該模型預測結果較為有效.除此之外,關鍵階段分析對Spark平臺的其它資源消耗預測(比如CPU和內存等)具有參考意義.