魏占辰,劉曉宇 ,黃秋蘭,孫功星
1.中國科學院 高能物理研究所,北京 100049
2.中國科學院大學,北京 100049
隨著信息技術的快速發展,數據量呈現爆炸式的增長,海量數據給存儲和處理帶來了極大的挑戰,越來越多的應用朝著分布式系統的方向發展,因此以Hadoop[1]、Spark[2]為代表的大數據處理框架應運而生。Spark具有良好的數據處理效率,保證了系統穩定性、可擴展性和可用性,還提供交互式編程接口,非常適合迭代式、交互式或實時數據分析的場景,具有廣泛的適用性,能夠適應各類領域的大數據應用。
雖然Spark等分布式系統能夠極大提升數據處理效率和系統吞吐量,但不可避免地帶來額外的性能損耗,這些損耗包括分布式任務派發、調度、結果收集,以及算法自身在分布式場景的開銷。如果不能有效地降低額外損耗,它們可能成為分布式計算中影響性能提升的瓶頸。在部分迭代密集型計算的場景中,例如高能物理數據分析中的分波分析(Partial Wave Analysis,PWA)計算,由于每輪迭代過程只有極少量參數發生變化,迭代次數多,迭代任務短,因此在現有的Spark 機制下,框架帶來的額外損耗成為了計算過程中的主要性能瓶頸點。目前學術界對于Spark 優化方法的研究,主要集中在開發原則優化、內存優化、配置參數優化、調度優化和Shuffle 過程優化這5 個方面[3],具體研究內容有數據填充和拉取策略研究[4],內存數據緩存和替換策略的研究[5],優化數據存儲機制的研究[6],以及Shuffle過程中的數據讀寫過程優化[7]、文件寫入機制優化[8]、調度算法優化[9]、數據壓縮算法決策[10]等研究。對于迭代計算的研究,則主要有為增量型數據設計的增量式迭代計算模型[11],以及利用數據的分散性和局部性進行分級、分區域的迭代計算方法[12]等。文獻[13]提出了一個Spark 性能預測模型,文獻[14]分析了CPU、磁盤和網絡等計算資源對Spark 性能的影響,但是它們均未對執行效率及Spark框架自身進行深入分析。上述文獻對于迭代密集型應用沒有給出一個有效的優化策略和解決方案。
在系統性分析Spark 的核心機制后,本文歸納并總結了Spark 應用在執行過程中的額外消耗,據此提出一種分析Spark 執行效率的公式。通過該公式可以分析Spark 應用的性能瓶頸點,并針對迭代密集型應用提出相應優化策略。經過測試,該公式可以準確分析性能瓶頸,優化策略能夠極大提高計算效率。
在Spark中,引入了一個可并行化處理的、帶有容錯性的新型分布式數據模型——彈性分布式數據集[15](Resilient Distributed Dataset,RDD),用戶所提交的程序或啟動的交互式Shell稱為一個Application,包含若干RDD運算。RDD通過劃分分區(Partition)進行數據分片和計算并行化,通過兩類算子Transformation 和Action完成復雜運算。每個Action 算子會觸發一個真實的運算過程,稱為Job。每個Transformation算子會產生一個新的RDD,據此建立與父RDD的依賴關系。RDD的依賴關系有兩種,分別是依賴固定父RDD 分區數量的窄依賴關系和依賴全部父RDD 分區并進行數據混洗(Shuffle)的寬依賴關系。由于Shuffle 的后續數據處理過程必須等待Shuffle完成后才能計算,因此Spark在寬依賴關系處進行切分,劃分出不同的Stage,以Shuffle連接,形成有向無環圖(Direct Acyclic Graph,DAG),圖1為一個典型的Spark 單個Job 的計算流程圖,即以RDD的依賴關系形成的有向無環圖。

圖1 Spark計算流程圖
在Stage 內部全部為窄依賴關系,因此每個分區的數據可以獨立計算,Spark將按照RDD的依賴順序形成一條計算流水線,稱為Task;在Shuffle 處,Spark 將前一個Stage 產生的數據排序并序列化寫入磁盤,交由后續Stage進行處理,Shuffle過程是Spark內最容易影響性能的瓶頸點。
在一個Application 內,執行用戶數據處理邏輯、分析RDD依賴關系、生成并派發Task的組件稱為Driver,存儲數據、接收并執行Task的組件稱為Executor。用戶在Driver 中提交的Job 均會由DAGScheduler 分析RDD依賴關系,并在各Stage 內生成一組TaskSet,交由Task-Scheduler 調度并序列化為二進制數據,由Scheduler-Backend分發到Executor執行,每組Task的數量與RDD的分區數量一致,代表了計算的并行度,圖2是Spark任務調度運行的流程圖。
廣播變量(Broadcast)用于Application 內的數據共享,它在每個節點內只保存一份,因此廣播變量產生的數據副本的數量與節點數一致。累加器(Accumulator)是一種分布式變量,它在Task中進行數值更改,最后在Driver中聚合這些修改,因此它的副本數量與Task數量一致。
根據2.1節的分析,可知Spark應用在運行時不可避免地會產生額外的消耗,包括分布式系統自身的消耗和實現分布式算法所引入的額外消耗。因此,需要建立一個用于分析分布式計算額外消耗和執行效率的模型,為有針對性地優化Spark分布式程序提供相關理論基礎。

圖2 Spark任務調度運行流程圖
定義1(有效計算時間)一個算法或任務為得到相應結果而進行的必要的數據計算、處理和分析所用的時間。有效計算時間是衡量一個算法的性能和復雜度的重要指標。
定義2(分布式并行計算代價)分布式并行計算代價為Spark在執行某一任務、處理某一數據時,因框架自身和算法所需而產生的額外消耗。由于這些代價需要額外CPU 時間處理,因此本文以處理分布式并行計算代價的時間(即計算代價時間)作為其評估指標。
定義3(有效計算比)一個算法執行的總時間包含了有效計算時間和計算代價時間,有效計算時間與總時間的比值即為有效計算比。有效計算比越大,表明該數據處理過程的效率越高,越容易達到理想的并行加速比。
下面根據Spark 應用執行流程確定其有效計算時間、計算代價時間和有效計算比。根據定義1、定義2和定義3,在某個Job中,假設有m個Stage,則有(m-1)個Shuffle,則其組成可定義為Job={stage1,stage2,…,stagem}∪{shuffle1,shuffle2,…,shufflem-1} ,使用了v個Executor。由于每個Stage會產生與輸入RDD的分區數量一致的Task,若stagej輸入RDD的分區數為nj,則其任務集為Taskj={task1,j,task2,j,…,tasknj,j}。對于taski,j,設其序列化時間為di,j,傳輸到executor 的時間為ei,j,反序列化的時間為fi,j,由于Spark 的任務序列化和發送工作在Driver 節點串行執行,由此可以得到Taskj的任務準備時間Dj為:

在原有串行算法改為并行后,需要有額外的初始數據分配、額外的結果收集以及輔助算子才能完成數據處理過程。設初始數據分配時間為M,結果數據集為Result={result1,result2,…,resultnm} ,resulti的序列化時間、傳輸時間和反序列化時間分別為gi、hi、li,由于Driver接收數據及反序列化過程為串行,因此可以得到結果收集的時間R為:

使用輔助運算功能(如Accumulator、Broadcast)的計算代價與實際使用情況密切相關,并且具有一定的節點內共享特性,因此要根據實際情況測算算法帶來的計算代價E。若一個 Job 中,Accumulator 和 Broadcast 均為1 個,且它們的平均傳輸時間分別為a、b,并行度為u,executor數量為v時,可以得到一個關于E的計算公式為:

除此之外,由于Spark 在Stage 之間要產生Shuffle,設shufflej的計算代價為sj,由此可以得到一個Job中總的計算代價C為:

若taski,j的有效計算時間為ti,j,不難得出對于任務集Taskj的有效計算時間Tj和整個Job 的有效計算時間V為:

由公式(4)、(6)可以得出一個Job 的有效計算比K為:

根據公式(1)~(7),可以根據實際情況推導出整個Spark 應用的有效計算時間、計算代價時間和有效計算比,因此本文不再贅述。
由此可以看出,計算代價受到框架自身、分布式并行算法以及原始數據分布等多方面的影響。在大多數Spark大數據應用中,由于原始數據難以預測,因此很容易造成數據傾斜,將會造成大量的Shuffle。由公式(3)、(7)可知,巨大的Shuffle 過程會極大提升計算代價,嚴重拖慢Spark應用的性能,降低整個應用的有效計算比。
在Spark 效率分析公式的基礎上,可以根據實際情況進行有針對性的優化,從而提高數據分析效率。在高能物理中使用的分波分析方法是一類Spark迭代密集型應用,下面以此為例具體介紹優化過程。
分波分析是一種觀察高能物理實驗中產生的輕強子之間的共振態結構的數據分析方法,它能夠精確測量共振態參數以及其產生衰變的性質[16]。分波分析需要在高統計量樣本數據上進行數值擬合的計算,其核心過程為使用最大似然法估計待定參數,需要反復迭代以求得最優參數,因此分波分析是一類典型的大數據科學計算。
由于樣本數據的計算過程是獨立的,因此可以很容易地將該過程并行化,將數據劃分成若干區塊放入不同的RDD分區中,在各部分計算完成后由Spark框架匯總結果,并決定是否進行下一輪迭代計算。
由Spark 分波分析計算的執行流程可知,該類計算在Spark中每進行一次迭代就會產生一個Job,每次迭代的任務完全一致,只有若干參數進行更新,沒有Shuffle過程,并且每一個Job僅包含一個Stage。由于輸入數據不發生變化,因此除第一次計算外,其他迭代計算過程均沒有數據的初始分配時間。若總迭代次數為p,忽略數據分配時間,則可得到第i次迭代的計算代價時間ci以及總計算代價C為:

有效計算時間和有效計算比的計算方法與公式(6)和公式(7)一致,由此可以得到,計算代價時間與迭代次數正相關,降低迭代過程中每一輪的計算代價以及消除計算代價與迭代次數的相關性是優化該類問題的關鍵。
由于每次迭代計算過程不變,因此可以采用將多個迭代計算的Job 化簡為一個Job,即將Task 只分發一次的策略,消除冗余Task分發,從而達到將計算代價與迭代次數的相關性消除的目的,為計算過程帶來極大的性能提升。
為實現上述策略,本文基于Spark 現有運行機制設計并實現了一個迭代控制服務模塊(Iteration Control Service,ICS),將參數更新分發、迭代流程控制以及結果收集過程交由該模塊負責,從而與Spark 原有任務和數據分發機制進行一定分離,達到降低計算代價的目的。ICS由Master和Worker兩部分組成,其中Master為迭代主控制模塊,負責控制Worker 計算、分發迭代參數、收集結果;Worker為迭代計算模塊,執行具體計算任務,并緩存每輪迭代計算結果以確保數據同步和任務容錯性。由此可以很容易得出基于ICS 機制優化過后的分波分析總計算代價C′為:

由公式(10)可以看出,使用ICS消除了任務分發的冗余,使計算代價僅與參數更新和結果收集相關,而ICS模塊將允許使用者將這兩部分自行控制,根據數據分析的實際情況,進行細粒度的優化與控制。
為評估本文提出的Spark迭代計算優化方法的實際效果,本文設計了一組實驗來進行性能對比。
本文的實驗環境是一個5 節點組成的集群,使用Spark Standalone模式進行任務調度。除Spark外,還部署了Hadoop 和Alluxio 以完成分波分析的計算工作。Standalone Master、NameNode 和 AlluxioMaster 部署于主節點中,Standalone Worker、DataNode和AlluxioWorker部署于4個從節點中,實驗環境的詳細情況如表1所示。

表1 測試集群軟硬件環境
為分析RDD 迭代分波分析計算的計算代價情況,實驗選擇了約6 GB 樣本數據作為目標分析數據,完成該樣本的分析需要迭代237次,實驗過程使用全部資源進行計算,即并行度為48。
由于分波分析過程的迭代次數與實際數據的擬合過程相關,無法對其進行精確控制,因此為測試不同迭代次數情況下的Spark 分布式并行分波分析的運行性能,在現有基礎上將迭代次數修改為直接由用戶指定,以此仿真實際情況中不同迭代次數的運算過程。
本文首先測試了選取的分波分析樣例在優化前后的性能對比以及與串行程序的對比,評估指標選取了執行時間和有效計算比。執行時間越短、有效計算比越高表明計算任務的性能和效率越好。測試結果如表2所示。

表2 分波分析測試結果對比
從測試結果中可看出,優化前的Spark 分波分析程序相比于原有的串行程序,執行時間縮短了約80.2%;優化后的Spark 分波分析程序相比于串行程序,執行時間縮短了約93.7%;與優化前相比,執行時間縮短了約68.2%,單次迭代的計算代價減小了約80.5%,有效計算比提升了約0.373。由上述數據可以看出,Spark效率分析公式和依據該公式制定的優化策略能夠有效提升高能物理分波分析程序的性能和執行效率。
為測試本文提出的優化策略在不同迭代次數和不同子任務長度下對于執行效率和有效計算比的提升情況,在仿真條件下分別測試了程序的有效計算比和執行性能,其中執行性能以執行時間評估。本文設計了三組仿真作業,所有仿真作業均人為指定迭代次數,其中仿真作業1的迭代子任務與真實計算過程一致,仿真作業2和仿真作業3的迭代子任務會將計算過程反復執行若干次,從而延長有效計算時間,經測算仿真作業2的單次有效計算時間約為400 ms,仿真作業3約為1 000 ms。圖3和圖4展示了仿真作業1在迭代次數為100、200、400、800、1 600、3 200 和 6 400 時的有效計算比和執行時間,為便于圖形展示,執行時間取對數后作為圖4的縱坐標。

圖3 優化前后仿真作業1有效計算比對比圖

圖4 優化前后仿真作業1執行時間對比圖
從圖3 中可以看出,優化前的仿真作業1 有效計算比基本保持不變;而優化之后由于減少了任務的分發次數,提高了參數分發和結果收集過程的效率,使得有效計算比得到約0.066~0.520的提升。從圖4中可以看出,優化前的仿真作業1與串行版本相比,執行時間縮短了約83.4%~90.6%,而優化后與串行相比縮短了約91.4%~97.2%,與優化前相比縮短了約48.3%~69.4%。
圖5、圖6 和圖7 展示了仿真作業2 和仿真作業3 在迭代次數為100、200、400、800和1 600時有效計算比和執行時間的對比情況,為便于圖形展示,執行時間取對數后作為圖6、圖7的縱坐標。

圖5 優化前后仿真作業2、3有效計算比對比圖

圖6 優化前后仿真作業2執行時間對比圖

圖7 優化前后仿真作業3執行時間對比圖
由上述測試可以看出在單次迭代的有效計算時間較長時,本文提出的優化策略也能夠在一定程度上提高迭代計算的執行效率和有效計算比。在有效計算比方面,優化后的仿真作業2 提升了約0.038~0.073,優化后的仿真作業3提升了約0.027~0.067。在執行時間方面,優化前的仿真作業2相比于串行程序縮短了約96%,優化后與優化前相比縮短了約16.2%~20.6%,與串行程序相比縮短了約97%。優化前的仿真作業3 相比于串行程序,執行時間縮短了約96.5%,優化后相比于串行程序,執行時間縮短了約97%,相比于優化前,執行時間縮短了約6.6%~12.9%。
Spark是當前應用最廣泛的并行計算框架和模型之一,本文主要致力于Spark 運行迭代密集型應用的性能優化研究。通過深入研究Spark 分布式任務執行流程,提出有效計算時間、計算代價和有效計算比等概念,以此構建Spark 效率分析公式,為精確分析Spark 應用的效率提供理論支持。在此基礎上,本文還提出了一個針對迭代密集型應用(例如高能物理中的分波分析方法)的優化策略,將Spark多次任務分發過程簡化為一次,并優化了參數更新和結果收集等過程,從而減少了計算代價,提升了有效計算比,使執行效率得到大幅提升,在實際應用中取得了較好的優化效果。本文在實現迭代密集型應用的優化策略時設計了迭代控制服務模塊,該模塊擴展了標準Spark 的功能,保證了良好的編程接口和可擴展性,能夠為類似應用提供參考。
Spark效率分析公式能夠為開發者合理安排并行算法并行度和有針對性優化分布式并行應用程序提供理論依據和參考。在未來的工作中,將進一步研究Spark的性能瓶頸點,繼續完善和挖掘Spark的性能優化方法,提升整個系統的效率和吞吐率,擴大Spark在多個領域,特別是科學計算領域的應用。