李俊麗
(晉中學院 計算機科學與技術系,山西 晉中 030619)
隨著大數據時代的到來,數據量以驚人的速度增長。大數據應用的出現給數據處理帶來了巨大的挑戰[1,2],越來越多的高效并行計算平臺,如MapReduce[3]和Spark[4-6],被廣泛采用來處理大數據。互信息是對兩個隨機變量之間共享的信息量的度量。互信息的計算量很大,特別對于處理大規模的類別數據。互信息可以廣泛應用于數據挖掘[7,8]算法中。為了提高互信息計算的效率,Spark內存計算模型是最好的選擇,但要面對Spark數據傾斜的性能優化問題。針對Spark中的數據傾斜問題,近年來提出了很多算法和模型。例如,文獻[9]提出了Spark平臺上基于特征分組的并行離群挖掘算法。SCID算法[10]設計了一種Pond-sampling算法來收集數據分布信息,并對總體數據分布進行估計。在數據劃分過程中,SCID實現了Bin-packing算法對Map任務的輸出進行桶狀處理。此外,在分區過程中,還會進一步切割大型分區。SP-Partitioner算法[11]將到達的批次數據作為候選樣本,在系統抽樣的基礎上選擇樣本,預測中間數據的特征。該方法根據預測結果生成參考表,指導下一批數據的均勻分布。文獻[12]優化了笛卡爾(笛卡兒積)算子。由于計算笛卡爾積需要連接操作,因此可能會出現數據傾斜。文獻[13]提出了SASM(Spark adaptive skew mitigation),通過將大分區遷移到其它節點,同時平衡各任務之間的大小,來緩解數據傾斜問題。與這些現有的方法不同,DVP算法針對文獻[9]中并行互信息計算中出現的數據傾斜問題進行研究和改進。DVP算法探索了數據虛擬劃分,其中虛擬前綴附加在一個大分區中的所有鍵之前,然后是一個輔助散列。DVP中的虛擬分區確保消除了大分區。DVP算法是在Spark計算平臺上設計并實現的一種數據虛擬劃分的方案,主要針對數據傾斜情況下大規模類別數據的互信息并行計算,解決了數據分布不均勻導致的數據傾斜問題。
互信息是信息論中對兩個隨機變量關聯程度的統計描述,可以表示為這兩個隨機變量概率的函數。
假設DS是一個包含n個對象的數據集,每個對象都由m個特征表示。我們使用H(yi,yj)和MI(yi;yj)分別表示集合DS上計算的特征yi和yj之間的熵和互信息。熵可以表示如下

(1)
其中,Pij(yi=vik∧yj=vjl)為特征yi和yj分別等于vik和vjl的概率。式(1)中di和dj為特征yi和yj的取值個數;vik和vjl可以在集合D(yi)和D(yj)中找到,其中D(yi)={vi1,…,>vidi},D(yj)={vj1,…,>vjdj}。熵H(yi,>yj)是概率Pij和logPij的乘積的函數。
MI(yi;yj)作為特征yi和yj之間的互信息。我們將互信息MI(yi;yj)表示為

(2)
其中,概率Pij,特征yi和yj,值vik和vjl,域di和dj,集合D(yi)和D(yj)與式(1)中的相同,Pi和Pj分別為特征yi和yj等于vik和vjl的概率。
互信息可以廣泛應用于數據挖掘算法中,DVP算法中的互信息是作為度量指標來量化類別數據特征之間的相似性。
在Spark Shuffle階段,Spark必須將相同的鍵從每個節點拉到節點上的任務中。這樣的過程可能會給單個節點帶來沉重的負載。此時,如果某個鍵對應的數據量特別大,就會出現傾斜。
圖1描述了分區2總體上比分區1和分區3大。由于輸入數據分布不均勻,使用系統的默認哈希分區可能導致子RDD中每個分區的大小存在較大差異,從而導致數據傾斜。當遇到數據傾斜問題時,整個Spark作業的執行時間由運行時間最長的任務控制,這使得Spark作業運行得相當慢。在最壞的情況下,由于最慢的任務處理了過多的數據,Spark作業可能耗盡內存。

圖1 Spark Shuffle數據分布
接下來建立了一個數據傾斜模型來量化由Spark創建的分區之間的數據傾斜度引起的問題。
圖1描述了Spark集群中默認的哈希分布機制,該機制執行以下3個步驟。首先,Map任務檢索輸入數據。然后,這些數據由Map任務處理,Map任務生成以鍵值對格式組織的中間結果。最后,使用鍵將中間結果分組到分區中。最后一步中的一個障礙是,由于數據傾斜,這些分區的大小不均勻。
假設根據鍵值聚合數據時有p個唯一的鍵,我們設K表示鍵,K={k1,>…,>kp}。我們把V表示為集合k中所有鍵的值

(3)
假設有p個分區,每個分區中的值共享一個鍵。值得注意的是,所有分區的大小可能不同。例如,第i和第j個分區的大小分別為li和lj。這兩個分區的大小可能不同(即li≠lj)。

現在使用域dom(ki)的大小來度量鍵ki的第i個分區的大小,它的形式是|dom(ki)|。平均分區大小由|dom(K)|avg表示,|dom(K) |avg由平均域大小來衡量,具體如下表示
(4)
分區之間的數據傾斜度定義為分區大小的偏差(即,|dom(ki)|)。設(ki)為第i個分區或域dom(ki)的傾斜度。在形式上,域dom(ki)的傾斜度s(ki)如下表示
(5)
聚合操作符是Spark Shuffle階段的性能瓶頸。并行計算互信息的一個關鍵挑戰在于countByKey或reduceByKey操作符(參見算法1第(4)行和第(12)行),它引入了包含兩個階段的shuffle。在shuffle過程中,第一階段執行shuffle write操作分區數據。具有相同鍵的已處理數據被寫入相同的磁盤文件。
一旦countByKey或reduceByKey操作符執行,第二階段中的每個任務都會執行shuffle read操作。執行此操作的任務提取屬于前一階段任務節點的鍵,然后對同一鍵執行全局聚合或連接操作。在這個場景中,鍵值被累積。如果數據分布不均勻,就會發生數據傾斜。
數據虛擬劃分是一種針對shuffle操作(例如,reduceByKey)可能引起數據傾斜而進行的虛擬分區機制。為了減少shuffle操作中的數據傾斜,DVP算法只在統計單個特征的取值時進行虛擬分區,因為特征對的取值不容易發生數據傾斜。圖2描述了虛擬分區的過程。

圖2 虛擬分區過程
在這里,首先為RDD中的每個鍵添加一個隨機前綴,然后是reduceByKey聚合操作。通過向同一個鍵添加隨機前綴并將其更改為幾個不同的鍵,一個任務最初處理的數據被分散到多個任務中,以便進行本地聚合。這種虛擬分區的策略減少了單個任務處理的過量數據。刪除每個鍵的前綴后,再次執行全局聚合操作以獲得最終結果。
DVP算法主要由以下基本步驟完成:首先,使用關鍵字val定義一個可變長數組doubleCol用于存放特征對的計算結果。其次,使用map映射操作將RDD數據datapre轉換為鍵值對的形式,即pair((x(m);x(n));1)。值得注意的是,((x(m);x(n))是特征對m和n的取值;1表示特征對的值出現一次,并且記錄每一對特征對取值的整體出現情況。然后,使用關鍵字val定義另一個可變長數組singleCol用于存放單個特征的計算結果,由于在計算單特征值時容易出現數據傾斜,為了緩解數據傾斜問題,最后需要對單個特征的取值進行數據虛擬劃分。
具體算法如下:
算法1:DVP算法
輸入:數據集DS(nobjects ×mfeatures),由數據集生成的名為 datapre的RDD
輸出:兩個變長數組
(1) val doubleCol = new Array [ArrayBuffer [Map [(String, String), Long] ] ](dimension) //關鍵字val定義了一個可變長度數組,即doubleCol
(2) for (m= 0;m≤dimension;m++)
(3) for (n= 0;n≤dimension;n++)
(4) doubleCol(m)(n)+ = datapre:map(x≥((x(m);x(n)); 1))>.countByKey()>.toMap //將RDD數據的datapre轉換為pair ((x(m); >x(n));>1)使用映射轉換,并計算特征對取值的整體出現情況
(5) end for
(6) end for
(7)val singleCol = ArrayBuffer[Map[String,Long]]() //關鍵字val定義了一個可變長度數組,即singleCol
(8) for (k= 0;k≤dimension;k++)
(9) singleCol+= datapre.map
(10) val random:Random = new Random()
(11) val prefix:Int = random.nextInt(10)
(12) prefix+-+x(k),1)).reduceByKey(-+-).map (line ≥(line.-1.split("-")(1), line.-2)).reduceByKey (-+-).collectAsMap().toMap // 數據虛擬劃分
(13) end for
DVP算法在一個配備了24個節點的Spark集群中實現并驗證,每個節點都有一個Intel處理器(即,E5-1620 v2系列3.7 GHz),4芯16 GB RAM。主節點硬盤配置為500 GB;其它節點的磁盤容量是2 TB。集群中的所有數據節點都通過千兆以太網連接;使用SSH協議保證節點之間的通信。我們在Spark的standalone模式下實現了DVP算法。
在DVP實現中使用的編程語言是Scala,這是一種在Java虛擬機(JVM)上運行的函數式面向對象語言。Scala無縫集成了現有的Java程序。利用集成開發環境IntelliJ IDEA開發了DVP算法。
表1中列出了DVP算法中所用到的Spark集群的配置情況。

表1 Spark集群中的軟件配置
DVP使用人工合成類別數據集來進行性能評估。為了評估DVP算法,構造了兩種類型的合成數據集:均勻分布數據集和正態分布數據集。通過以下兩個步驟生成數據集。首先,創建一個相對較小的類別屬性數據集。接下來,不斷復制第一步中創建的數據集,以擴大數據集的大小。合成數據集包含100個特征,這些數據集的大小分別為8 GB、16 GB、24 GB和32 GB。數據集見表2。

表2 人工合成數據集
DEFH是使用最廣泛的哈希算法,是Spark中的一種默認機制。當鍵值呈現均勻分布時,可以獲得較好的性能。
(1)不同數據大小下的執行時間:圖3為DVP和DEFH算法處理不同數據大小的均勻分布數據和正態分布數據所使用的運行時間。分別將數據大小設置為8 GB、16 GB、24 GB和32 GB。計算節點的數量配置為24個。

圖3 不同數據大小下均勻分布和正態 分布數據的執行時間
由圖3(a)可以看出,在分布均勻的數據集中,由于虛擬分區的副作用,DVP算法的運行時間比DEFH稍長。然而,從圖3(b)可以看出,對于正態分布的數據集,DVP算法優于DEFH。這是預期的結果,因為正態分布數據集包含了分布不均勻的數據,導致了數據傾斜,從而耗費了時間。而由虛擬分區支持的DVP算法可以很好地處理傾斜數據。
另外,從圖3(a)和圖3(b)還可以看出,增加數據量會導致所有算法的運行時間增加。直觀地說,這是因為處理大規模數據需要更長的時間。
(2)不同計算節點下的執行時間:圖4展示了DVP和DEFH算法在不同數量的計算節點上處理均勻分布數據和正態分布數據所使用的時間。節點的數量分別配置為4、8、16和24。數據大小設置為8 GB。圖4(a)顯示,由于虛擬分區的開銷,我們的DVP算法在均勻分布數據中的運行時間要比DEFH的運行時間長。這一趨勢與圖3(a)所示一致。圖4(b)顯示,在不均勻分布的情況下,正態分布數據集中DVP算法的性能要明顯優于DEFH。DVP在DEFH上的性能改進歸功于數據虛擬劃分,它有效地緩解了數據的傾斜。同樣,這些結果和圖3(b)所描述是一致的。

圖4 不同數量節點下均勻分布和正態 分布數據的執行時間
另外,從圖4(a)和圖4(b)還可以看出,隨著計算節點數量的不斷增加,兩個算法的運行時間都有所減少。這主要是因為集群計算能力的不斷增加。
(3)數據傾斜度的影響:由于均勻分布數據集中不會發生數據傾斜,因此選擇正態分布數據集進行實驗。從處理時間的角度對數據傾斜度的影響進行了評價。
圖5顯示了不同數據傾斜度下DVP和DEFH算法的處理時間,傾斜度從1到3不等,增量為0.5。我們觀察到,DVP算法的處理時間對數據傾斜度的敏感性小于DEFH。例如,當我們將傾斜度從1.5提高到3時,DVP和DEFH算法的處理時間分別增加了7.2%和28.4%。實驗結果表明,DVP算法利用數據虛擬劃分有效地緩解了數據傾斜帶來的性能問題。因此,在不平衡數據集中,DVP算法優于DEFH。在較高的數據傾斜度下,DVP算法對數據傾斜的改善更為顯著。

圖5 數據傾斜度對DVP和DEFH處理時間的影響
(4)Shuffling-Cost分析:通過改變計算節點的數量來比較DVP和DEFH算法的Shuffling-Cost成本。以節點的shuffle-write-size作為對算法的Shuffling-Cost進行監控。以正態分布數據集(8 G)為例,圖6是兩個算法Shuffling-Cost對比。

圖6 不同數量計算節點上的DVP和DEFH的 Shuffling-Cost
顯而易見,所有測試用例中DVP算法的shuffle-write-size都明顯小于DEFH。更重要的是,隨著計算節點數量的不斷增加,兩種解決方案之間的shuffle-write-size差距也在擴大。DEFH依賴于Spark的默認哈希分區,導致任務頻繁地跨多個節點訪問數據。例如,當節點數量從4個更改為24個時,DEFH算法的shuffle-write-size從667.0 MB上升到1590.0 MB。與DEFH算法不同的是,DVP算法利用了數據虛擬劃分技術來減少Spark環境中的數據傳輸量。因此,DVP算法的shuffle-write-size僅僅從260.0 MB跳到628.0 MB。就shuffling-cost而言,這比DEFH的情況要好得多。
(5)可擴展性分析:在這組實驗中,通過增加計算節點的數量(分別設置為4、8、16和24)和調節數據集大小(配置分別為8 GB、16 GB、24 GB和32 GB),對DVP算法進行可擴展性分析,評估DVP算法在集群系統中處理大規模數據的能力。
圖7(a)顯示了Spark集群中節點數量對并行互信息計算時間的影響。由圖7(a)可以看出,隨著計算節點數量的增加,DVP算法的執行時間明顯減少。大數據集(如32 GB)的下降趨勢非常明顯。當數據集很小(例如4 GB)時,集群擴展性能提高很微弱。結果表明,DVP是一種對大數據集具有高擴展性的并行計算方法。

圖7 DVP的可擴展性分析
圖7(b)展示了計算節點數量對系統加速的影響。從圖7(b)可以看出,對于大多數數據集來說,DVP的加速率接近線性。例如,在32 GB的情況下,DVP的加速性能幾乎與線性性能相當。結果表明,我們的并行計算算法能夠保持大規模高維類別數據集的計算性能。
上述DVP的高可擴展性主要歸功于以下幾個因素。首先,并行互信息計算的時間在很大程度上取決于任意兩個特征之間的互信息計算,這種互信息計算時間與分配給節點的數據對象數量成正比。其次,所有計算節點都獨立地并行計算。最后,由于數據虛擬劃分,DVP在所有節點之間保持了良好的負載平衡性能。
本文基于Spark平臺開發了DVP算法,在大規模類別數據的背景下并行計算互信息。DVP的核心是數據虛擬分區方案。更具體地說,虛擬分區技術緩解了shuffle過程中出現的數據傾斜問題。最后在一個24節點驅動的Spark集群上采用人工合成類別數據集驗證了DVP算法。
大量的實驗結果表明,該算法在效率和負載均衡等方面優于Spark集群默認的DEFH算法。此外, 在Spark處理大型類別數據集時,DVP能夠很好地減輕數據傾斜,從而優化網絡性能。
在未來的工作中,將重點從內存資源的角度優化shuffle性能。當數據分布變得不均勻時,分配給分區的數據量就不平衡。因此,任務所需的內存空間本質上是不同的。如果給每個任務分配固定比例的內存空間,任務中頻繁的內存溢出將是不可避免的。這樣的內存資源問題會對Spark的整體性能產生負面影響。打算研究一種分配內存資源的方法,以進一步優化所有任務之間的shuffle進程。該技術有望從內存計算的角度提高Spark應用程序的性能。