卞 琛 于 炯 修位蓉 錢育蓉 英昌甜 廖 彬
1(新疆大學信息科學與工程學院 烏魯木齊 830046)
2 (新疆財經大學統計與信息學院 烏魯木齊 830012)
近年來,各行業應用數據規模呈爆炸性增長,大數據的4V特性發生不同程度的變化,表現出增速快、增量大、類型多樣、結構差異明顯等特征[1].傳統的并行計算系統由于其計算模型的天生缺陷,在大數據處理過程中存在IO效率低下、并發控制困難、數據處理總體性能較低等諸多問題,難以有效應對實時、即席、交互式分析的復雜業務訴求[2].因此,并行計算系統的性能優化成為大數據研究領域的熱點問題,而充分利用內存的低延遲特性改進系統性能成為并行計算新的研究方向[3-4].
通過多年的技術積淀和創新,硬件技術的發展已經突破Dennard Scaling法則.多核技術、異構多核集成技術(CPU與GPU的組合)以及多CPU的并行處理技術相繼問世,出現了多核共享內存及多處理器共享內存的新型架構.新興的存儲技術也相繼走出實驗階段,開始實現產品化.閃存、相變存儲器(PCM)、磁阻式隨機存儲器(MRAM)和電阻式隨機存儲器(RRAM),其非易失、隨機訪問延遲小、并行度高、低功耗、高片載密度等優良特性,為內存計算提供了新的支撐環境.硬件革新催生內存計算技術的發展,內存計算的研究領域也從內存數據管理技術逐漸過渡到基于內存計算的分布式系統.以Berkley研究成果Spark[5-6]為代表的內存計算框架,有效緩解了頻繁磁盤IO性能瓶頸,解放了多核CPU配合大容量內存硬件架構的潛在高性能,成為工業界一致認可的高性能并行計算系統.雖然內存計算框架的性能表現相對于傳統的并行計算系統提高了數十倍,但與大數據時代的即時應用需求相比,還存在不小的差距.因此,從計算模型的角度研究內存計算框架的性能優化方法具有一定的現實意義.
為進一步優化內存計算框架性能,提高作業執行效率,本文選取開源內存計算框架Spark為研究對象,但并不失一般性,本文的研究成果同樣適用于Flink[7],Impala[8],HANA[9],MapReduce[10]等其他類似系統.Spark是繼Hadoop之后出現的通用高性能并行計算框架,采用彈性分布式數據集(resilient distributed datasets, RDD)[11]作為數據結構,通過數據集血統(lineage)[11-12]和檢查點機制(checkpoint)[13-14]實現系統容錯,編程模式則借鑒了函數式編程語言的設計思想,簡化了多階段作業的流程跟蹤、任務重新執行和周期性檢查點機制的實現.作為新的基于內存計算的分布式系統,Spark參考MapReduce計算模型實現了自己的分布式計算框架;基于數據倉庫Hive實現了SQL查詢系統Spark SQL[15];參考流式處理系統Storm[16]實現了流式計算框架Spark Streaming[17];并面向機器學習、圖計算領域分別設計了算法庫MLlib[18]和GraphX[19].
Spark的并行化設計思想源于MapReduce,但與MapReduce不同的是,Spark可以將作業的中間結果保存在內存中,計算過程中不需要再讀寫HDFS,從而避免了大量磁盤IO操作,提高了作業的執行效率.因此,Spark更適用于需要迭代執行的數據挖掘和機器學習算法.由于能夠部署在通用平臺上,并且具有可靠性(reliable)、可擴展性(scalable)、高效性(efficient)、低成本(economical)等優點[20],Spark在大數據分布式計算領域得到了廣泛運用,并逐漸成為工業界與學術界事實上的大數據并行處理標準.雖然Spark具有眾多優點,但與其他并行計算框架一樣,寬依賴同步操作導致的作業延時問題仍是不可規避的性能瓶頸.由于Shuffle過程需要等待所有輸入數據計算完成,因此高效節點與慢任務節點的強制同步產生大量作業延時和資源浪費.為解決這一問題,本文主要做了4項工作:
1) 對內存計算框架的作業執行機制進行分析,建立資源需求模型和執行效率模型,給出資源占用率、RDD計算代價和作業執行時間的定義,證明了計算資源有效利用的相關原則.
2) 通過分析作業的任務劃分策略及調度機制,建立任務分配及調度模型,給出任務并行度、分配效能熵(allocation efficiency entropy, AEE)和節點貢獻度(worker contribution degree, WCD)的定義,并證明這些定義與作業執行效率的邏輯關系,為算法設計提供基礎模型.
3) 在相關模型定義和證明的基礎上,提出局部數據優先拉取策略的優化目標,以此作為算法設計的主要依據.
4) 設計基礎數據構建算法和局部數據優先拉取算法(partial data shuffled first algorithm, PDSF),并通過分析算法的基本屬性,證明算法帕累托最優.
內存計算技術研究的基礎領域是內存數據管理技術,工業界出現了許多相關產品.Memcached[21]是應用最為廣泛的全內存式數據存取系統,該系統通過DHT構建網絡拓撲實現數據布局及查詢方法,為上層應用提供了高可用的狀態存儲和可伸縮的應用加速服務,因其具有良好的通用性和魯棒性,被Facebook,Twitter,YouTube,Reddit等多家世界知名企業使用.與Memcached類似,VMware的Redis[22]也提供了性能卓越的內存存儲功能,支持包括字符串、Hash表、鏈表、集合、有序集合等多種數據類型,提供更加簡單且易于使用的API,相比于Memcached,Redis提供了更靈活的緩存失效策略和持久化機制.此外,還有如微軟的Hekaton[23]和開源社區的FastDB[24]等內存數據庫產品隨著需求的發展仍在不斷涌現.
近年來,高性能內存計算框架也在不斷地充實和發展,除本文的研究對象Spark外,Flink也是較為典型的兼容批處理和流式數據處理的通用數據處理平臺,支持增量迭代并具有迭代自動優化功能.Flink具有獨立內存管理組件、序列化框架和類型推理引擎,內存管理對JAVA虛擬機的依賴度很低,因此能更有效地掌控和利用內存資源.Cloudera的Impala是基于內存計算技術的新型查詢系統,實現嵌套型數據的列存儲,有效提高數據查詢效率;通過多層查詢樹結構降低系統的廣播開銷,提高查詢任務的并行度.SAP的HANA已不僅僅是一個內存數據庫,更是基于內存計算技術的高性能實時數據處理平臺.平臺中包含了內存數據庫和內存計算引擎,提供完整的內存數據存儲和分析計算服務,具有靈活、多用途、數據源無關等諸多優良特性.Apache的Storm更加注重大數據分析的實時性,通過數據在不同算子之間持續流動,達到數據流與計算同步完成的實時性目的,更適用于高響應、低延遲的業務應用場景.此外,Yahoo的S4[25]和微軟的TimeStream[26]也是內存計算框架研究領域的重要成員.
隨著內存計算框架不斷地推陳出新,一些研究成果致力于系統的擴展和完善.文獻[27]提出簡單而高效的并行流水線編程模型,文獻[28]基于BitTorrent實現了內存計算框架的廣播通信技術.文獻[29]提出關系型大數據分析的標準架構.文獻[30]提出圖計算的并行化設計方案.文獻[31]針對作業中間結果的重復利用問題,設計使用程序分析并定位公共子表達式的復用方法.文獻[32]提出集群資源的細粒度共享策略,從而使不同的應用通過相同的API發起細粒度的任務請求.實現資源在不同平臺間的動態共享.文獻[33-34]設計了統一的內存管理器,將內存存儲功能從計算框架中分離出來,使上層計算框架可以更專注計算的本身,以通過更細的分工達到更高的執行效率.文獻[35]設計了分布式數據流計算的標準化引擎.文獻[36]實現了高性能的SQL查詢系統.文獻[37-38]提出了差分數據流和及時響應應用的并行計算方法.文獻[39]設計了大數據交互式分析的聯合聚合通用模型.文獻[40]實現了內存計算集群的隱私消息通信系統.文獻[41]提出了內存計算框架的分布式調度算法,使多個應用可以非集中化地在同一集群上排隊工作,同時提供數據本地性、低延遲和公平性,極大地提升系統的可擴展性.
另外一些研究成果關注內存計算框架的性能優化.文獻[42]提出充分利用數據訪問時間和空間局部性,設計了提高本地性的數據訪問策略.文獻[43]通過分析任務并行度對緩存有效性的影響,設計適應于內存計算的協調緩存算法.文獻[44]通過監測作業的計算開銷,發現reduce任務的并行度對類MapReduce系統的性能有較大影響,由此設計了適應資源狀況的任務調度算法.文獻[45-46]針對慢任務節點問題提出了不同的優化方法,保障作業執行的持續性.文獻[47]通過批處理事務和確定性執行2種策略,使系統擁有更好的擴展性和可靠性.文獻[48]以推測worker響應時間的方式,將作業劃分為不同的區塊,采用延遲隱藏技術提高緊密同步型應用程序的執行效率.文獻[49]提出了工作節點的通信成本邊界模型,并通過調整邊界閾值的方法找到任務并行度與通信成本的最佳平衡點.
本文與上述研究成果的不同之處在于從計算模型的基本原理入手,以提高作業執行效率和改進系統性能為目的,建立了內存計算框架的局部數據優先拉取策略.通過分析作業的執行過程,建立了資源需求模型和執行效率模型.提出了資源占用率、RDD計算代價的定義,并證明了資源有效利用的相關原則.建立任務分配及調度模型,提出了分配效能熵、節點貢獻度的定義,并證明了上述2個定義與作業執行效率的邏輯關系.根據局部數據優先拉取策略的問題定義進行求解,提出了基礎數據構建算法和局部優先拉取算法,通過任務的適度傾斜分配,充分利用高效工作節點的計算能力;通過局部數據優先拉取,緩解寬依賴同步的節點空閑問題,提高工作節點的參與度,從而從整體上優化作業執行效率,改進系統性能.相比于已有的研究工作,局部數據優先拉取策略更適宜于內存計算框架的性能優化,并具有較高的普適性和易用性.
本節首先分析作業的并行執行機制,建立資源需求模型、執行效率模型和任務分配及調度模型,然后提出局部數據優先拉取策略的問題定義,為第3節基礎數據構建算法和局部數據優先拉取算法提供理論基礎.
Spark的作業執行采用了延時調度機制,當用戶對一個RDD執行Action(如count,collect)操作時,調度器會根據RDD的血統(lineage)來構建一個由Stage組成的有向無環圖(DAG),然后為工作節點分配任務執行程序.Spark任務DAG的典型示例如圖1所示,其中實線圓角方框表示RDD,矩形表示分區,虛線框為Stage.Action操作的執行將會以寬依賴分區來構建各個Stage,每個Stage都包含盡可能多的連續的窄依賴,Stage內部的窄依賴前后連接構成流水線.而Stage之間的分界則是寬依賴的Shuffle操作,各Stage同步順序執行,直到最終得出目標RDD.各工作節點的任務分配根據數據存儲本地性來確定,若一個任務需要處理的某個分區剛好存儲在某個節點的內存中,則該任務會分配給這個節點;否則,將任務分配給具有最佳位置的工作節點.

Fig. 1 Directed acyclic graph of Spark job圖1 Spark任務的有向無環圖
在并行計算集群中,資源池由一系列工作節點構成,定義工作節點集合W={1,2,…,m},每個工作節點包含多種計算資源,如CPU、內存、磁盤等.定義資源種類集合R={1,2,…,k},記r∈R.對于每個工作節點w∈W,記cw=(cw1,cw2,…,cwk)為該工作節點的可用資源向量,這里cwr為工作節點w上可用資源r的數量.不失一般性,對于集群每一類資源r進行正則化,即:
(1)
記workload={1,2,…,h}為Spark框架一個時間段同時運行的作業.對于每個作業i,記di=(di1,di2,…,dik)T為其在集群中的資源需求向量,這里dir為作業i對資源r需求量占集群資源r總量的比例,由于每個作業的資源需求都是正向的,即:
dir>0,i∈workload,r∈R,
(2)
那么所有作業的資源需求為k×n的矩陣,即:
(3)
并行計算框架要求作業執行前首先提供資源需求表,用于描述的是每個工作節點需要占用的各種資源量,如2個CPU核心、16 GB內存.并行計算框架選擇空閑資源符合資源需求表的工作節點執行作業,記Extors={1,2,…,n}為執行作業i的工作節點集合,則Extors?W,記Aiw=(Aiw1,Aiw2,…,Aiwr)為作業i在工作節點w上的資源分配向量,原則上每個執行任務的工作節點都嚴格按照資源需求表分配資源,則有:
(4)
定義1. 資源占用率.用于衡量作業使用的資源量占集群資源總量的比例.為使衡量標準更加精確,度量過程以Tcycle為1個周期,記Tjobi為作業i的執行時間.由于已對集群資源進行了正則化,因此對于作業占用的任意類型資源r,其資源占用率可表示:
(5)
定理1. 資源有效利用原則.在不影響作業執行效率的前提下,單位時間資源占用率越小,則集群任務的并發度越高,集群資源的利用率也越高.
證明. 設作業x調度時集群的空閑資源向量為B=(b1,b2,…,br),僅當所有類型資源的需求量均小于空閑資源量,即dxr
(6)
若當前周期內正在執行的作業集合為workload={1,2,…,h},根據定義1,任意類型的資源空閑量可表示為
(7)
由于整個集群資源總量恒定,Uir越小,則br越大,dxr
證畢.
根據Spark的延遲調度機制,作業在執行到Action操作時,生成由多個RDD組成的DAG,首先以寬依賴分界劃分Stage,每個Stage包括多個RDD,每個RDD又被劃分成多個分區工作節點并行計算,因此,對于每一個作業,記其Stage集合為stages={stage1,stage2,…,stagei},記每個Satge的RDD集合為stagei={RDDi1,RDDi2,…,RDDij},這里RDDij表示第i個Stage中第j個RDD,對于每個RDD,記其分區集合為RDDij={Pij1,Pij2,…,Pijk},其中,Pijk表示RDDij中的第k個分區.
定義2. RDD計算代價.Spark任務中,分區是以一個或多個父節點為輸入數據計算生成,設Parentsijk為分區Pijk的父節點集合.分區的計算首先要讀取所有的輸入數據,然后根據閉包和操作類型進行計算.因此分區Pijk的計算代價為數據讀取代價與數據處理代價之和,評估過程以分區計算時間作為衡量計算代價的指標,即:
TPijk=read(Parentsijk)+proc(Parentsijk).
(8)
RDD的所有分區由集群工作節點并行計算生成,因此其計算代價為所有分區計算代價的最大值,即:
(9)
定義3. 作業執行時間.如圖1所示,Spark以寬依賴為分界點,將作業劃分為多個Stage執行,那么每個Stage包含1個寬依賴RDD或多條流水線(每條流水線包括多個RDD的不同分區).設stagei共有m個RDD,除末尾的寬依賴RDDim外,其余RDD劃分為x條流水線,單條流水線的分區集合為pipeix={Pi1x,Pi2x,…,Pijx},那么單條流水線的執行時間可表示為
(10)
對于stagei,記其流水線集合為Pipesi={pipei1,pipei2,…,pipeix},那么stagei的執行時間應為各流水線執行時間最大值與RDDim計算時長之和,即:
(11)
若Spark將作業劃分為n個Stage,各Stage同步順序執行,因此作業的執行時間為
(12)
并行計算任務調度時,將作業按照分區劃分成任務,一個分區對應一個計算任務.在實際的任務分配過程中,數據本地性是分配的首要因素,因此當前的分區計算任務會優先分配到其前導分區所在的工作節點.依次類推,系統會將到達目標RDD的一條路徑分配給一個工作節點,因此圖1中Stage3的執行將產生如圖2中所示的任務分配方案.

Fig. 2 Task allocation of traditional Spark圖2 傳統Spark的任務分配

(13)
那么工作節點w在整個作業上的任務分配可以表示為
ATw=AT1w∪AT2w∪…∪ATiw.
(14)
任務分配滿足3個特性:1)任意工作節點上的任務分配與其他節點的分配沒有交集;2)對于工作節點w的任務分配ATw,其相同Stage中的相鄰任務必為前導后續關系;3)各工作節點的任務分配相對平均,滿足負載均衡.
定義5. 分配效能熵.用于衡量任務分配與工作節點計算能力的適應度.記TW為作業的總工作量,對于參與作業計算的工作節點集合Extors={1,2,…,n},CPS={cp1,cp2,…,cpn}表示Extors中每個工作節點的計算能力.那么所有節點任務執行時間的均值可定義為
(15)
在不考慮寬依賴同步問題的前提下,對于任意的工作節點w,其任務分配ATw的執行時間可表示為
(16)
因此工作節點任務執行時間的方差可表示為
(17)
那么節點的分配效能熵可表示為
(18)
定理2. 對于所有參與計算的工作節點,其分配效能熵越大,作業的執行時間越短,計算效率越高.
證明. 基于定義3,從任務分配的角度來看,作業的執行時間也可表示為
(19)
根據式(18),節點的分配效能熵與方差成反比,因此熵值越大,方差越小,表示節點任務完成時間越趨近均值,因此當所有工作節點的分配效能熵取最大值時,作業的執行時間最短,執行效率最高.
證畢.
定義6. 任務并行度.用于衡量同一時間并發的任務數.在內存計算框架中,系統通過文件的Block數量自動推斷并行度,稱為默認并行度.這一參數表示用戶無介入條件下執行作業的任務并發數,因此默認并行度與單個Stage內的流水線數量相同.在實際運行環境中,默認并行度僅是個理論參考值,因為劃分的多個任務能否并發,還要依賴于工作節點的數量以及每個節點分配的CPU核心數.根據2.2節資源需求模型,記作業調度時符合資源需求表的工作節點數為n,每個工作節點分配的CPU核心數為g,那么硬件環境所能支持的最大并發數為n×g,稱為物理并行度.設輸入數據的Block數量為l,那么對于默認并行度l和物理并行度n×g,應當遵循最小值優先,因此實際的任務并行度可以表示為
dpi=min{l,n×g}.
(20)
定義7. 節點空閑時間.用于表示工作節點因任務分配不均勻導致的空閑時間段.根據定義6,當默認并行度大于物理并行度,即l>n×g時,表示Stage內的流水線數大于任務并行度,那么工作節點需要被多輪分配,任務分配輪數可表示為
(21)
其中,ceiling函數表示取大于等于參數值的最小整數.通過式(21)可以看出,當l為n×g整數倍時,參與計算的每個工作節點在每輪分配中都能得到任務,而l與n×g相除余數不為0時,必有部分工作節點在最后一輪分配時空閑,輪空的工作節點數可表示為
Countbye=n×g-mod(l,(n×g)),
(22)

(23)
定義8. 節點停等時間.根據定義4描述的任務分配過程,每個參與計算的工作節點至少分配一條流水線和一個寬依賴RDD分區,流水線各工作節點并行執行,進度各有快慢.而在計算寬依賴RDD時,由于其每個分區的計算需要依賴父RDD的所有分區,而父RDD不同分區是由不同工作節點流水線中計算的.因此,在寬依賴RDD計算開始前,所有參與計算的工作節點需要先同步,即等待所有父分區計算完成后統一開始寬依賴RDD的計算.計算效率較高的工作節點執行到寬依賴RDD,需要等待慢節點的計算結果.記執行作業的工作節點集合Extors={1,2,…,n}已按分配流水線的完成順序排列,相鄰工作節點流水線完成時間差分別為T1,T2,…,Tn-1,那么對于工作節點w,其停等時間可表示為
(24)
需要說明的是,在2.2節資源需求模型的描述中,只有符合資源需求表的工作節點才能參與作業執行,理論上各工作節點可用的資源量一致,流水線執行效率也應當基本相同.但資源需求表僅對不同類型資源作模糊量化,例如CPU核心數、內存容量等(參見2.2節示例),異構云環境下工作節點的CPU及內存型號多種多樣,參數也各有不同,即使都符合資源需求表,不同工作節點的計算能力也有差異.另外,由于輸入數據locality的限制,工作節點的網絡傳輸能力也會影響流水線執行效率.
定義9. 節點貢獻度.用于衡量工作節點在作業執行過程中實際參與計算的比例.值越大則參與度越高,說明工作節點計算能力被利用的越充分.根據前面的定義,工作節點在完成作業過程中,存在空閑時間和停等時間.因此對于工作節點w,其貢獻度應為實際計算時間與作業執行時間和比值,表示如下:
(25)
節點貢獻度精確刻畫了工作節點計算能力的發揮程度,在作業計算量和工作節點計算能力穩定的前提下,貢獻度越大,工作節點計算能力的利用度越高.
定理3. 對于所有參與計算的工作節點,其貢獻度越大,則節點任務執行時間的均值越小,作業執行效率的優化度越高.

(26)
由于作業的總工作量TW為定值,節點的原始計算能力cpw也為定值,那么當節點的貢獻度Qw取最大值時,均值E為最小值,作業執行效率具有最高的優化度.
證畢.
2.2~2.4節已經對作業資源需求、任務執行效率和任務調度過程作了比較詳細的闡述,基于這些定義,局部優先拉取任務調度算法可形式化為
object
min(Tjob),
(27)
s.t.
(28)
目標是作業執行效率最大化,約束條件是資源分配量符合資源需求表,即在資源穩定的前提下尋求作業執行效率最大化的目標.然而這一目標定義的實際操作性不強,度量方法也過于粗糙.因此,根據分配效能熵和節點貢獻度的定義,可將上述問題等價于:
object
(29)
(30)
s.t.
(31)
目標是最大化分配效能熵和節點貢獻度,約束條件同上.很顯然,在任務分配中,根據節點計算能力作適度傾斜,可以使工作節點得到最大化的分配效能熵.通過削減工作節點的空閑時間和停等時間,能夠提高節點貢獻度,最終達到作業執行效率的優化目標.
本節基于模型的相關定義及定理證明,首先構建算法所需的基礎數據;然后提出局部數據優先拉取算法,并對算法的基本屬性進行分析和證明;最后對算法附加開銷進行評估.
局部數據優先拉取算法需要構建的基礎數據如下:
1) 空閑節點池freePool.用于存放已完成任務并處于等待狀態的工作節點.無論是流水線還是局部拉取任務(參見3.2節),工作節點只要計算完畢就進入freePool,因此freePool在作業執行過程中不斷變化.
2) 輸入分區表inputParts.用于保存工作節點所執行任務的計算結果.工作節點在輸入分區表中的記錄數與其分配的流水線數量相同,每條記錄只保存流水線所在路徑最近一次的計算結果.需要說明的是,計算結果加入inputParts的過程不存在數據復制,inputParts中的記錄只保存引用,實際數據仍分散在各個工作節點上.
3) 分區狀態表partsState.與輸入分區表相對應,用于標識計算結果是由哪些父分區計算生成.標識的方法采用追加式,即局部拉取使用了哪些父分區,就在記錄中追加這些分區的編號.
基礎數據為局部數據優先拉取算法的各步驟提供數據和計算支持.空閑節點池提供局部拉取任務的節點候選者,輸入分區表為局部拉取任務提供輸入數據,而分區狀態表一方面避免重復計算,另一方面為局部拉取任務提供分配依據.
算法1. 基礎數據構建算法.
輸入:工作節點列表nodes;
輸出:freePool,inputParts,partsState.
① fori=0 tonodes.Length-1 do
② ifnodes[i].Finish=true then
③freePool.add(nodes[i]);
④ ifinputParts.contains(nodes[i]) then
⑤inputParts.replace(nodes[i].LastPt);
⑥ else
⑦inputParts.add(nodes[i].LastPt);
⑧ end if
⑨partsState.update();
⑩ end if
本節求解2.5節所定義的帶約束條件的最優化問題.算法的主要思想是削減寬依賴同步產生的節點空閑,充分發揮高效工作節點的計算能力,彌補慢節點導致的作業延時,從而提高作業執行效率.本文提出基于啟發式算法的局部優先拉取調度算法,主要有4個步驟:
1) NodeGroup劃分.首輪劃分中,先后2個進入空閑節點池freePool的工作節點劃分為一個NodeGroup.生成一個NodeGroup表示輸入分區集合至少包含2個分區,因此可以開始計算基于這2個分區的Shuffle結果.當NodeGroup的局部拉取任務完成后,工作節點再次加入空閑節點池,等待下一輪NodeGroup劃分,再次劃分至少需要加入1個新的工作節點或其他NodeGroup.因為同一個NodeGroup的工作節點可能再次劃分到一個新的NodeGroup中,而這2個節點的局部拉取任務已完成,僅有這2個節點的NodeGroup屬無效劃分.
2) 中斷令牌輪轉.空閑節點池有1個中斷令牌,在生成的NodeGroup之間傳遞,中斷令牌總由最近生成的NodeGroup持有.當最后一個工作節點進入空閑節點池時,若沒有組成NodeGroup(空閑節點池中只有1個節點),則獲得中斷令牌的NodeGroup終止局部拉取任務并回溯狀態,與最后一個工作節點合并為一個新的NodeGroup,開始3方的局部拉取任務.
3) 生成局部拉取任務.每個任務需要輸入數據、操作和閉包3個要素,操作和閉包直接從寬依賴RDD繼承(所有局部拉取任務的操作和閉包與寬依賴RDD相同).輸入數據則需要根據partsState中的記錄確定.對于NodeGroup中工作節點,查詢分區狀態表partsState中的相應記錄,將這些記錄取并集記為sumParts,那么將sumParts與工作節點記錄取差集即為該節點需要的輸入數據.
4) 快慢節點任務交換.從第2輪劃分開始,每個NodeGroup生成后,NodeGroup內的工作節點進行任務交換,交換的依據為最大均量原則,即最快的節點與最慢的節點互換任務,其余工作節點的任務保持不變.任務交換完成后,每個工作節點根據自己的新任務從inputParts中讀取輸入數據執行計算.
局部數據優先拉取算法的操作過程如算法2所示:
算法2. 局部數據優先拉取算法.
輸入:空閑節點池freePool、分區表inputParts.
① iffreePool.Count>1 then

Fig. 3 Task allocation of PDSF圖3 局部數據優先拉取策略的任務分配
② iffreePool.checkNodeGroup()=true then
③waitforOtherNode();
④ else
⑤ng=freePool.createNodeGroup();
⑥freePool.clear();
⑦ng.getToken();
⑧ng.exchangeTask();
⑨ng.doPartialTask();
⑩ end if
在局部數據優先拉取算法中,工作節點不需要完全同步,只要寬依賴RDD有2個父分區計算完成,局部拉取任務即開始分配執行.采用PDSF的算法,圖2給出的作業執行時序改變為圖3所示的狀態.
在傳統的Spark框架中,作業執行到寬依賴RDD要進行強制同步,即所有輸入數據必須計算完成且劃分到不同的Bucket中才啟動寬依賴RDD的計算任務,從輸入分區的Bucket拉取數據并執行后續計算.而從圖3中可以看出,當分區E1,E2數據劃分完畢后,PDSF即啟動局部拉取任務PF1和PF2,其中PF1拉取E1,E2中Bucket3中的數據計算局部結果,PF2拉取E1,E2中Bucket4中的數據計算局部結果.若PF1,PF2完成后,任務D2也完成數據劃分,則Worker2加入NodeGroup,開始3方局部拉取任務.此時Worker3與Worker2交換任務(PF1為第1個完成的局部拉取任務),Worker2拉取PF1的局部結果,與本地保存的Bucket3的計算結果合并,生成新的局部結果.Worker3則拉取D2,E1,E2中的Bucket2,計算局部結果.Worker4的任務不變,拉取D2的Bucket4,計算局部結果.當3方局部拉取任務完成后,若任務D1也完成數據劃分,則執行最后一輪局部拉取任務,Worker4與Worker1交換任務(3方局部拉取任務中Worker4率先完成),Worker1拉取PF5的局部結果,與本地保存的Bucket4的計算結果合并,生成最終的分區F4,Worker4拉取D1,D2,E1,E2中的Bucket1,計算最終分區F1,Worker2和Worker3的任務不變,分別生成最終分區F3和F2.
在異構集群環境中,只有縮減慢節點的任務執行時間才能提高作業的執行效率,下面通過慢節點Worker1任務執行具體過程來分析算法的整體效率.在傳統Spark實現中,Worker1計算分區F1需要拉取3個Bucket的數據,計算4個Bucket的數據才能得到結果(其中1個Bucket本地存放).通過PDSF,Worker1僅需拉取3個Bucket的數據,計算1個Bucket的數據即可得到結果.另一方面,Worker4領取Worker1的任務后,需要拉取3個Bucket的數據,計算4個Bucket的數據,與傳統Spark實現相比,拉取數據量和計算數據量沒有發生變化,任務交換不會帶來更大的作業延時.需要說明的是,在評估數據拉取量和計算量時都以Bucket為單位,因此Bucket的數據量要相對平均,算法實現中采用了文獻[50]的研究成果,保障Bucket劃分的均衡性.從上述分析結果來看,慢節點的計算數據量顯著減少,而通過網絡拉取的數據量則沒有變化,優化效果并不明顯.實際上,Spark劃分Bucket的過程會將數據Spill到磁盤文件中,因此從Bucket中拉取數據的時長為磁盤IO時長與網絡傳輸時長的總和.而Worker1拉取的3個Bucket的數據實為局部拉取任務的計算結果,這些結果存儲在內存中,拉取過程僅有網絡延時沒有磁盤IO,因此慢節點的數據拉取效率也有顯著提高.此外,由于傳統Spark在同步完成后才啟動寬依賴RDD的數據拉取和計算工作,因此PDSF前幾輪的局部數據拉取所附加的網絡開銷處于網絡空閑期內,對作業執行效率并無影響.
值得注意的是,圖3表示局部數據優先拉取算法的理想狀況,即每個已劃分NodeGroup的局部拉取任務都能夠完成,不存在使用中斷令牌回溯狀態的情況,在實際應用中很難達到這種理想狀態.但對于PDSF算法,只要正常完成一次局部拉取任務,作業執行效率就能得到一定程度的優化,即便一次都沒有完成,也不會對作業執行效率產生負面影響,因為中斷令牌策略能夠保障最慢工作節點的計算連續性.
局部數據優先拉取算法符合以下原則:節點空閑時間清零原則、節點停等時間最小化原則、適度傾斜任務分配原則和數據本地性恒定原則.
定理4. 節點空閑時間清零原則.
證明. 根據定義7,在默認并行度大于物理并行度的情況下,流水線的最后一輪的分配很可能不均勻,因此輪空節點存在空閑時間.通過算法2描述,輪空節點加入空閑節點池freePool,由于最后一輪分配時,之前分配的流水線都已完成計算,inputParts已包含多個輸入分區,partsState中也存在多條記錄,那么所有輪空節點生成NodeGroup,根據partsState中的相應記錄到inputParts中獲取輸入數據,執行局部優先拉取任務,因此局部數據優先拉取算法無節點空閑時間.
證畢.
定理5. 節點停等時間最小化原則.
證明. 設執行作業的工作節點集合Extors={1,2,…,n}已按其分配流水線的完成順序排列,相鄰工作節點流水線完成時間差分別為T1,T2,…,Tn-1,那么根據定義8,對于工作節點w,其停等時間為

極限情況下,最后一個節點進入freePool時,工作節點x一個局部拉取任務都沒有完成,且被回滾與最后一個節點重組NodeGroup,此時x的停等時間與傳統調度算法的停等時間一致,因此有:
通過NodeGroup的劃分規則可知,每個節點至多再等待一個節點即開始局部拉取任務,節點具有最小的同步開銷,編號越小的工作節點停等時間的優化度越高,因此局部數據拉取算法符合節點停等時間最小化原則.
證畢.
定理6. 適度傾斜任務分配原則.
證明. 設當前NodeGroup的工作節點已按照計算能力從大到小排列,所有節點在inputParts中的記錄集合為{P1,P2,…,Pm}.根據NodeGroup的劃分規則,工作節點x已完成多輪局部拉取任務,而工作節點y還沒有執行局部拉取任務.那么對于工作節點x,在partsState中的記錄格式為“1,2,…,m-1”,表示x已完成前m-1個分區的數據局部拉取;而對于工作節點y,在partsState中的記錄格式為“m”,表示y未執行過局部拉取任務.
在任務交換前,工作節點x將要執行的局部拉取任務可定義為
工作節點y將要執行的局部拉取任務可定義為

證畢.
定理7. 數據本地性恒定原則.
證明. PDSF算法中,每個NodeGroup生成后,快慢節點交換任務,因此節點計算所需的輸入數據發生變化.但對于NodeGroup中的慢節點,無論是否發生任務交換,慢節點本地內存中都只包含輸入數據的一個分區,計算要用到的其他分區都需要從別的節點獲取,因此慢節點的數據本地性不因任務交換而改變,作業的執行效率不受影響.
證畢.
通過上述4個原則的證明可以看出,局部數據優先拉取算法滿足2.5節定義的優化目標,在作業寬依賴同步問題不可歸避的條件下,算法符合帕累托最優.
假設系統當前執行的作業共包含μ個寬依賴操作,寬依賴RDD分區數為f,根據局部數據優先拉取算法的執行過程,每2個分區計算完成可以開始1次局部拉取任務,因此至多分配μ×(f-1)個局部拉取任務,所以局部數據優先拉取算法的時間復雜度為O(μ(f-1)),在用戶請求的作業數量較大時,可以將局部數據拉取任務的分配過程交由多個空閑節點計算,當分配給k個工作節點計算時,只需要做一個簡單的同步操作,可以將時間復雜度降低為O(μ(f-1)k).
算法的存儲開銷包括空閑節點池freePool、輸入分區表inputParts和分區狀態表partsState占用的存儲空間.其中,freePool用于保存空閑節點編號,每個工作節點的編號為4 B的GUID,即使在上千節點的大型集群上,freePool最多占4 MB左右的存儲空間,更何況由于局部拉取任務的分配,節點編號會不斷地移進移出,同一時間點上的記錄總數要遠遠小于集群節點數.對于inputParts和partsState,保存的記錄數與寬依賴節點的分區數相等,每條記錄僅僅保存分區引用和編號,可以忽略不計.另外,算法中劃分的NodeGroup只是一個邏輯概念,分配一次局部拉取任務產生一個NodeGroup,但NodeGroup信息不需要持久化,因此不占用額外的存儲空間.
算法的通信開銷主要是freePool,inputParts,partsState中的記錄更新.根據算法存儲開銷的分析,3張表僅存儲簡單類型和引用,每條記錄的數據量很少,記錄更新過程僅相當于一次平衡心跳,而且記錄更新工作是由空閑節點完成,因此算法并無附加通信開銷.
由上述分析可以看出,算法具有較低的時間復雜度,無附加通信開銷,僅在Master上產生極微量的存儲開銷.因此,PDSF算法完全適應任務密集的并行計算集群.

Fig. 4 Performance of PDSF圖4 局部數據優先拉取策略的整體性能
實驗環境用1臺服務器和8個工作節點建立計算集群,服務器作為Spark的Master和Hadoop的NameNode.為體現工作節點的計算能力不同,8個工作節點由1個高效節點、6個普通節點和1個慢任務節點組成,其中普通節點的配置如表1所示,高效節點配備4顆CPU陣列、64 GB內存和4個千兆網卡,而慢任務節點僅有單核CPU、2 GB內存和1個百兆網卡.任務執行時間的數據來源于Spark的控制臺,而內存使用狀況的監控由nmon完成.在Spark框架下,任務的執行速度很快,通常會在幾秒內完成,這并不利于準確監控任務執行時間和資源使用狀況,因此實驗選擇在小型集群上進行測試,以便觀察到作業執行的更多細節.
實驗數據選取SNAP[51]提供的6個標準數據集,均為有向圖,如表2所示.作業選用PageRank算法,PageRank的每輪迭代都包含join和reduceByKey共2個寬依賴操作,因此更有利于驗證算法的有效性.

Table 1 Configuration Parameters of Worker表1 Worker節點配置參數

Table 2 Information of Datasets表2 測試數據集列表
根據2.4節定理2和定理3,局部數據優先拉取算法能夠有效提高作業執行效率.實驗選擇了4個不同大小的數據集測試算法性能,驗證理論模型的正確性.實驗結果如圖4所示:
由圖4可以看出,對于每一個數據集,傳統Spark與PDSF算法的作業執行時間都隨迭代次數的增加而增長.PDSF算法的作業執行時間明顯小于傳統Spark,從而證明PDSF算法對Spark框架的性能具有優化效果.從作業執行的總體趨勢來看,迭代次數越多,作業執行時間的優化效果越明顯.但從不同迭代次數的優化效果來看,作業執行時間的縮減比例并未隨迭代次數增加呈線性增長.一方面,在不同的時間點,工作節點的計算能力表現不同,某個系統服務運行或突發的網絡訪問,都會對節點的計算能力產生影響;另一方面,在PDSF算法中,作業執行時間的優化率取決于局部拉取任務的完成情況,中斷回溯的次數越少,優化效果越明顯.
從圖4整體的對比結果來看,在不同數據集環境下,隨著迭代次數的增加,傳統Spark任務執行時間的上升趨勢較為明顯,PDSF算法由于局部拉取任務的優先執行,提前進行部分數據的Shuffle操作,因此任務執行時間的上升趨勢相對緩和.由此可以看出,傳統Spark對寬依賴操作的敏感度很高;而對于PDSF,作業的寬依賴操作越多,局部數據拉取任務的調度概率越大,而局部數據拉取任務的完成度越高,作業執行的加速效應也越明顯.

Fig. 5 CPU utilization of high performance executor圖5 高效節點的CPU利用率
根據3.3節定理4和定理5,通過局部數據優先拉取,能夠有效減少節點空閑時間和停等時間,提高高效節點的貢獻度.本節實驗用于驗證PDSF算法對節點貢獻度的影響,實驗選取節點和連接數差異較大的2個數據集,迭代計算10次.采用nmon監控作業執行過程中各類型資源使用細節,通過觀察發現,內存和網絡的利用率對節點貢獻度的體現能力較差,而CPU利用率最能真實反映節點參與計算的具體細節,因此在作業執行過程中重點監測高效節點的CPU使用情況,實驗結果如圖5所示,圖5(a)(b)分別為數據集web-BerkStan和Higgs-Twitter的測試結果.
由實驗結果可以看出,由于PDSF算法能夠提高作業執行效率,因此PDSF的作業執行時間明顯小于傳統Spark.CPU占用方面,傳統Spark和PDSF算法在2次測試中都具有較高的CPU占用率,這是因為Spark計算框架充分發揮了內存低延遲特性,CPU計算能力得到了更有效地發揮.比較CPU占用率變化曲線,傳統Spark作業執行過程中,CPU占用率的波動幅度較大,特別是幾個波谷處的下降幅度明顯.由于實驗集群中高效節點與慢節點的計算能力差異較大,高效節點在每次寬依賴操作時,都要強制同步等待其他節點,此時CPU處于相對空閑的狀態,因此從整個作業執行過程來看,CPU占用率的變化幅度較大,空閑時段也頻繁出現.而對于PDSF算法,CPU占用率的整體幅度較為平穩,無明顯的空閑時段,因為PDSF進行適度傾斜的任務分配,只要有2個節點流水線計算完成即開始局部拉取任務,從而最小化節點空閑時間和停等時間,有效提高節點在作業執行過程中的參與度,使CPU始終保持較為穩定的高占用狀態.
雖然PDSF算法穩定的CPU占用率符合預期,但對作業執行實際的優化效果有限.在PDSF算法中,為了保障作業執行時間不高于傳統Spark,慢任務節點不能出現停等時間或空閑時間,一旦慢任務節點流水線計算完成,若空閑節點池中沒有節點與其形成新的NodeGroup,則持有中斷令牌的NodeGroup停止局部拉取任務,并與慢任務節點合并成新的NodeGroup執行下一輪任務.因此穩定的CPU高占用率也并非都屬于有效計算,也包括一些未完成的局部拉取任務被中斷回溯的過程.
為了驗證PDSF算法在多個不同類型作業并發環境下的性能表現,實驗將Spark官方示例中的多個作業組成工作集,其中包括最小二乘法、邏輯回歸、K-means聚類、SortByKey等多種作業類型.作業的資源需求則在符合集群既有條件下隨機變化,工作集中的作業按FIFO方式組成隊列,只要集群空閑資源符合作業需求,即開始執行作業.實驗監測了不同時間點的作業完成總數,對于在監測時間點未完成的作業,則用已執行時間與作業執行總時間的比值計算完成比例,實驗結果如圖6所示:

Fig. 6 Performance comparison in multi task concurrent environments圖6 多任務并發環境的性能對比
由圖6可以看出,在所有的監測點,PDSF算法的作業完成數都大于傳統Spark,從而證明了PDSF算法在多作業并發環境下仍具有良好的優化效果.通過觀察所有監測點數據發現,不同監測點作業完成量的提高程度各不相同,這是因為不同時間段內執行的作業類型不同,不同類型作業寬依賴操作個數不同,因此在作業類型隨機變化條件下,PDSF算法的優化效果無明顯規律.
此外,通過定理1的證明,作業執行時間的減少能夠提高資源的有效利用率,從任務調度的角度來看,同一時間段內調度的作業數量越多,資源的有效利用率越高,因此在多作業并發實驗中提取了作業調度概率的累積分布,用于反映資源有效利用率的變化.記p(r)為資源需求為r的作業被調度的概率,記F(r)=P(p(r)=1)為資源需求為r的所有作業被調度的累積分布函數,圖7顯示了傳統Spark與PDSF算法作業調度概率的累積分布.從圖7中可以看出,PDSF算法具有良好的效果,作業調度概率的分布趨勢明顯優于傳統Spark.因此,PDSF算法在優化作業執行效率的同時,提高了集群資源的有效利用率.

Fig. 7 CDF of job scheduled probability圖7 作業調度概率累積分布
本文針對內存計算框架寬依賴同步操作的作業延時問題,對內存計算框架的作業執行機制進行深入分析,建立資源需求模型和執行效率模型,給出了資源占用率、RDD計算代價和作業執行時間的定義,證明了計算資源有效利用原則.通過分析作業的任務劃分策略及調度機制,建立了任務分配及調度模型,給出了任務并行度、分配效能熵和節點貢獻度的定義,并證明這些定義與作業執行效率的邏輯關系,為算法設計提供基礎模型.在相關模型定義和證明的基礎上,提出了局部數據優先拉取策略的優化目標,并對評估指標加以細化,以此作為算法設計的主要依據.根據模型和優化目標,設計了基礎數據構建算法和局部數據優先拉取算法,通過分析算法的基本屬性,證明了算法的帕累托最優性.最后,通過不同的實驗證明算法的有效性,實驗結果表明,PDSF算法提高了內存計算框架作業執行效率,并使集群資源得到有效利用.
工作主要集中在4個方面:
1) 異構集群多作業并發環境下,研究工作節點計算能力利用率最大化的任務分配策略.
2) 分析內存計算框架不同類型操作資源需求的一般規律,設計適應作業類型的資源分配和任務調度策略.
3) 隨著GPU計算技術的發展,利用GPU提高作業執行效率變得可行.通過構建CPU+GPU的多核集成協同計算架構提升系統性能是今后的一個研究方向.
4) 高速緩存也是影響內存計算框架性能的重要因素,針對高速緩存低命中率引發的偽流水線問題,設計高速緩存級別的優化策略是今后研究的另一個方向.