(重慶市北碚區中醫院 重慶 400700)
Spark計算節點同構環境下Executor的內存分配優化模型
朱蓉
(重慶市北碚區中醫院重慶400700)
在對Spark云計算平臺的作業執行機制進行研究分析的基礎上,針對目前Spark處理作業時Executor的人為請求資源機制,提出了一種Spark計算節點同構環境下Executor所需內存資源的優化分配算法,該算法能夠基于處理數據的數據量對分布式內存資源進行彈性分配,達到了Spark在數據量變化情況下合理利用分布式內存資源這一目的,并對此進行了仿真實驗驗證。
Spark;Executor;內存資源;資源分配
近幾年來,云計算技術成為人們討論的熱點,隨著云計算研究的不斷深入和發展,Spark大數據處理平臺受到了越來越多的關注。通過依靠Scala強有力的函數式編程、Actor通信模式、閉包、容器、泛型等手段,借助統一資源分配調度框架Mesos和YARN[2],并融合了MapReduce和Dryad[3]等技術,使得Spark站在巨人的肩膀上,成為一個簡潔、直觀、靈活、高效的分布式大數據處理框架。
本文對Spark中任務調度和資源分配機制進行了研究,提出了一種基于計算節點同構,即計算節點硬件環境一致下的資源分配優化算法SMAO(Spark’s memory allocation optimizing),能夠基于作業的數據量對分布式內存資源進行彈性分配,達到內存資源合理分配的目的,并對此進行了仿真實驗驗證。
Spark在實時計算方面速度卓越的一個核心原因就是因為有統一的RDD(Resilient Distributed Datasets,彈性分布式數據集),Spark中使用了RDD抽象的分布式計算,即使用RDD對應的transform/action等操作來執行分布式計算;并且基于RDD之間的依賴關系組成lineage以及checkpoint等機制來保證整個分布式計算的容錯性[4]。與Hadoop不同,Spark一開始就瞄準性能,將數據(包括中間結果)放在內存中進行計算,并且將用戶重復利用的數據也緩存到內存中進而提高計算效率,因此Spark尤其適合迭代型和交互性任務的計算,雖然Spark架構下的計算需要大量的內存,但其性能可隨著機器的數目呈現線性的增長,其內存計算速度比Hadoop的MapReduce要快100倍左右[5]。下面我們先看一下RDD,再對Spark運行架構進行了解。
(一)彈性分布式數據集
在Spark中一切運算都是以RDD為基礎和核心的,而RDD的核心是RDD的分區存儲和RDD的有向無環圖的執行[6]。在提交Job后,Spark依照對Job分解后的RDD之間的依賴關系形成Spark的調度順序,并結合任務的分發、跟蹤、執行等過程,最終形成了整個的Spark應用程序的執行。
(二)Spark的應用程序執行機制
Spark應用程序的執行是基于SparkConf、SparkContext等環境信息的,在Spark 1.3.0中,可以人為設置參數spark.Executor.memory的大小來定義每一個Executor所需的內存資源生成SparkConf配置變量,并通過val sc=new SparkContext(new SparkConf())來實例化該應用程序的SparkContext上下文信息,Driver主控進程會在SparkContext中構建DAGScheduler等重要對象,負責對應用進行解析、stage切分并調度Task到Executor執行。
Executor的運行[8]是通過Spark的Akka消息機制向Master注冊程序后發送SparkConf配置信息給Worker進而觸發啟動的。在Master的消息響應中會調用schedule方法和launchExecutor方法,在schedule方法中要啟動Driver程序,然后調用Worker,而在launchExecutor內部,Master會發送消息(消息即為LaunchExecutor)給Worker節點讓它發起一個Executor,從Spark1.3.0的Executor消息處理源碼可以看出,ExecutorRunner負責分配Worker節點請求的CPU核數和Memory大小,實時地維護executor進程,并負責executor的開始和消亡。
Executor執行所占用內存基本上是Executor內部所有任務共享的,而每個Executor可以支持的任務的數量取決于Executor所管理的CPU Core資源和內存資源的多少,目前的資源分配方式是是用戶了解每個任務的數據規模的大小,主觀地推算出每個Executor大致需要多少內存和CPU。而在一般應用中,Spark需要處理的數據量大小是變化的,比如本論文的基金項目中使用Spark來分析三峽庫區的水質監測數據,我們提出一種Spark計算節點同構環境下的Executor內存資源分配的優化模型,能夠基于監測數據量對Spark計算節點Executor所需的分布式內存資源進行彈性分配,達到內存資源合理分配的目的,如下描述:
Executor所消耗的內存,除了用于RDD數據集本身的開銷,還包括算法所需各種臨時內存空間的使用,即
SE≈SR+Sn
(1)
SE表示Executor所消耗的內存,SR表示Executor中RDD數據集所占用的空間大小,Sn表示算法執行所需要的各種空間,包括JVM的臨時空間消耗等各種計算消耗。默認情況下,Spark采用整體物理內存(spark.executor.memory)的60%來管理SR產生的RDD Cache數據,這表明在任務執行期間,最多有40%的內存可以給Sn用來保證任務運行,用戶可以通過設置參數spark.storage.memoryFraction來改變這個比例大小。在使用Executor的過程中,如果數據量增大或分配的SR不夠則會報程序運行空間不足,如果分配的Sn不夠,則會影響GC效率甚至報java.lang.OutOfMemoryError:Java heap space的錯誤。用SF來表示頻繁發生Full GC時候的Sn的臨界值,在對Executor內存資源分配時候中要保證以下條件:
k*SE≥SR,且(1-k)*SE≥SF
(2)
k表示參數spark.storage.memoryFraction,用戶可以自己設置這個參數來調整SR和Sn的比例。
用SF作為Sn的臨界值的時候,公式(1)變為
SE≈SR+SF
設X為原始數據的數據量,原始數據經HDFS文件系統再轉換為Spark環境下的RDD數據集的轉換,RDD數據集所占空間即SR與原始數據的數據流大小呈現線性關系:
SR=p1X+p2
(3)
而算法執行所需要的的開銷Sn包含數據集本身的開銷及各種算法的開銷,算法不同,開銷也不同,雖然這種開銷難以統計。但是Sn與SR近似地呈現出Spark程序算法的空間復雜度的關系,也就意味著臨界值SF與原始數據X也近似地呈現Spark程序算法的空間復雜度的關系。對于SR,可以將RDD cache在內存中,從Spark的log日志輸出或者UI輸出中可以看到每個Cache分區的大小,從而計算某數據量對應的SR,對于SF,可以使用測試GC的方法來獲得。
本文提出一種Spark計算節點同構情況下的Executor執行時的內存消耗優化算法SMAO(Spark’s memory allocation optimizating),計算步驟如下:
1.根據該應用程序所運行的作業量及RDD內存消耗量的歷史記錄SR采用最小二乘法擬合得到公式(3)的參數p1和p2,進而計算下一個作業的任務量X’對應的SR’。
2.計算Spark所運行的該應用程序所消耗RDD的空間復雜度S(X),根據該應用程序所運行的歷史作業的SF,進而計算下一個作業X’,所產生的SF’。
3.由SR和SF代入公式1和公式2,計算SE’
4.在計算節點同構情況下,對Spark提交下一個作業時候可以使用SE’的資源量作為每一個Executor的內存分配請求,此時的SE’即Executor的內存優化分配的閾值。
此優化算法相比沒有使用該算法優化而言,使得Spark在處理數據時候隨著數據量的變化而彈性分配內存資源,達到內存資源合理分配的目的,下面將進行仿真實驗。
(一)仿真實驗環境及數據集
實驗環境為由搭建的3個節點組成的虛擬機集群環境,使用ubuntu-12.04的64位版本的linux操作系統,產生一個主節點nameNode和兩個從節點dataNode1和dataNode2。虛擬機nameNode主節點的內存為4GB,處理器數量為2,雙核,兩個dataNode內存為2GB,處理器數量為2,單核。Java版本為JDK1.7.0_71,Hadoop的版本為2.4.0版本,Spark版本為1.3.0,實驗的程序是根據Spark自帶的PCA程序而改寫的帶有標準化處理的PCA程序(SNPCA,Spark’s Normalized Principal Component Analysis)。
前面講過,在Executor中所消耗的內存,除了用于RDD數據集本身的開銷外,還包括算法所需各種臨時內存空間Sn的使用,SF表示頻繁發生Full GC時候的Sn的臨界值,SF受系統環境影響,我們可以假設實驗程序SNPCA引起的SF開銷符合以下空間復雜度:
S(x)=q1x3+q2x2+q3x+q4
(4)
使用最小二乘法對SF的歷史數據來進行多項式擬合,進而計算后三個測試集數據[131,143,155]MB實際的SF量為[361,379,482]MB,而依照SMAO算法計算得[414.9,498.5,599.1]MB,對比如下:

實際的SF量與依據SMAO算法計算的SF量對比圖
使用最小二乘法對SF的歷史數據來進行多項式擬合的結果,即公式4的系數為:[0.0002,-0.0347,4.0694,-17.0236],也能看出來實驗程序SNPCA算法的空間復雜度基本上還是一個線性函數。
計算出SR量和SF量后,可以依照公式1、公式2計算計算節點同構環境下的SE’,但是對于實驗的SNPCA程序,Executor中SF開銷與原始數據量呈線性關系,并且SR與數據量X(公式1)也是線性關系,所以此時Executor的分配量與數據量也呈線性關系,所以仍然可以使用公式4對數據量X’和Executor的開銷SE’進行分析,為充分起見,可以對Executor的內存分配“盈余”一定比例,比如“盈余”0.1時,取Executor內存優化分配量為1.1*SE’,來充分提供除
(二)實驗結果分析
理論上給Executor的內存分配當然是多多益善,實際受機器配置、以及運行環境、資源共享、JVM GC效率等因素的影響難以具體衡量,但是還是可以使用SR和SF的歷史數據來進行內存合理分配。
從以上可以看出,依據SMAO算法計算的SR與實際大小相比幾乎一樣,Spark在作業處理時候的內存消耗量除了RDD數據集本身的SR開銷之外,還包括所需各種臨時內存空間的使用帶來的其他消耗量SF。
需要注意的是,SMAO算法目前應用于計算節點同構的情況下,SRAO算法的使用需要預先知道Spark應用程序產生RDD數據集的空間復雜度,因此對于不同的作業程序,需要人為地事先計算空間復雜度,另外,如果Executor執行的時候頻繁發生Full GC,可以考慮減小這個spark.storage.memoryFraction比值來減少Full GC發生的次數,來改善程序運行的整體性能。
本文首先對Spark的架構和應用執行機制進行了介紹,并引出了Spark 1.3.0在Executor的內存分配方面還不太令人滿意,目前采用的是人為的任務分配機制,針對此問題,提出了基于Spark的同構計算節點環境下Executor的內存分配優化算法,并展開實驗驗證該優化方案,實驗結果顯示優化方案的有效性。
[1]http://spark.apache.org/.Accessed Oct,2015
[2]Hindman B,Konwinski A,Zaharia M,et al.Mesos:A Platform for Fine-Grained Resource Sharing in the Data Center[C].NSDI.2011,11:22-22.
[3]Isard M,Budiu M,Yu Y,et al.Dryad:distributed data-parallel programs from sequential building blocks[C].ACM SIGOPS Operating Systems Review.ACM,2007,41(3):59-72
[4]Zaharia M,Chowdhury M,Das T,et al.Resilient distributed datasets:A fault-tolerant abstraction for in-memory cluster computing[C]//Proceedings of the 9th USENIX conference on Networked Systems Design and Implementation.USENIX Association,2012:2-2.
[5]Zaharia M,Chowdhury M,Franklin M J,et al.Spark:cluster computing with working sets[C]//Proceedings of the 2nd USENIX conference on Hot topics in cloud computing.2010:10-10.
[6]Zaharia M,Chowdhury M,Das T,et al.Resilient distributed datasets:A fault-tolerant abstraction for in-memory cluster computing[C].Proceedings of the 9th USENIX conference on Networked Systems Design and Implementation.USENIX Association,2012:2-2.
[7]高彥杰.Spark大數據處理技術、應用與性能優化[M].北京:機械工業出版社,2014
[8]王家林.大數據Spark企業級實戰[M].北京:電子工業出版社,2015