王鵬,王睿婕
(長春理工大學 計算機科學技術學院,長春 130022)
隨著計算機網絡通信技術的迅速發展,如今已經進入了大數據時代。大體上講,大數據(Big Data)是指在一定時間內,不能夠使用常規計算機和軟硬件工具對其進行感知、獲取、管理、處理和服務的數據集合[1]??焖僭鲩L的大數據,在處理復雜度與存儲規模上對現有的計算機體系架構提出了嚴峻的考驗,傳統的關系型數據庫并行管理技術在擴展性方面遇到了巨大的障礙[2]。MapReduce編程模型則可以很好地處理這一問題,提高數據挖掘算法的執行效率。劉義等人[3]提出了MapReduce框架下R-樹的k-近鄰連接算法,顧榮等人[4]提出了MapReduce框架下預測復雜網絡鏈路的實現,饒君等人[5]提出了高效可擴展語義推理引擎的MapReduce模型實現。MapReduce是Google提出的可在Hadoop分布式云集群上并行處理海量數據集的編程模型[6],具有良好的容錯性與擴展性,Map和Reduce的概念,即映射與化簡,是借鑒于函數式的編程語言,這也是該模型的重要思想。它將大數據“分而治之”地分解為多個小規模數據,并分布到多個集群節點中去,共同完成對大數據的處理。這樣可以提高運行效率,降低每一部分的運算強度,最終達到節省時間的目的。
聚類分析是處理大數據的常用算法,常常被用于市場分析、圖像分割、語音識別等科研領域,為了提高對大規模數據的聚類效率,很多并行算法相繼被提出,呂奕清等人[7]提出了基于MPI的并行PSO混合K-均值聚類算法,張軍偉等人[8]提出了二分K-均值聚類算法,陶冶等人[9]提出了基于消息傳遞接口的并行K均值算法。本文將K-均值聚類算法應用于MapReduce編程框架中,并在Hadoop分布式云計算平臺上進行對大規模數據聚類實驗。
MapReduce模型因其簡單、高效、易寫等優勢,近年來被廣泛運用于并行處理海量數據集,可實現于Hadoop開源分布式云計算平臺之上。在Hadoop集群中,執行MapReduce任務的節點分為Job-Tracker和 TaskTracker兩個角色[10],JobTracker負責分配、調節任務,TaskTracker負責執行任務,一個集群中只有一個JobTracker節點。每一個MapReduce任務會經歷兩個階段:Map階段與Reduce階段。用戶可按需定義Map函數和Reduce函數,數據文件首先被分割成多個split,以<key,value>對的形式輸入至Map函數,經多個Map函數并行處理后,得到新的<key,value>對,作為中間結果,系統將包含相同Key值的<key,value>對合并后排序,形成諸如<key,list<value>>形式的中間結果,并輸入到Reduce函數,最后將各個Reduce函數并行處理后形成的結果整合,得到最終結果。MapReduce計算流程圖如圖1所示。
K-均值算法簡潔和高效的優點,使之成為最著名的劃分聚類算法,并獲得了廣泛的應用。用戶給定一個數據點集合D和需要的聚類數目K,在對樣本類別不清晰的情況下,根據已有樣本特征向量對樣本進行類別劃分,K-均值算法根據距離函數反復的將數據點劃分成K類,并返回給用戶劃分結果[11]。

其中k表示聚類數目,Cw表示第w個聚類數目,mw是聚類Cw的聚類中心,即Cw聚類中所有數據點均值向量,dist(x,mw)表示點x與聚類中心mw的距離。
K-均值算法適用于均值能被計算的數據集
定義1 包含在某一聚類中所有數據點的均值,即為聚類中心。
定義2 設數據點的集合為D,D={x1,x2,x3,…,xn},其中 xi=(Xi1,Xi2,Xi3,...,Xir),是實數空間X?Rr中的向量,r表示數據空間的維度,在這里指數據點的屬性數目。該算法將給定的數據點集劃分為K個聚類,每個聚類有一個聚類中心,通常用聚類中心來表示這一聚類,又因為共劃分為K個聚類,所以該算法的名字為K-均值聚類算法。
算法的描述如下:
(1)Algorithm K-均值(K,D)
(2)隨機選得K個數據點作為聚類中心
(3)Repeat
(4)For 獲取集合D中的每一個數據點X do
(5)計算數據點x到K個聚類中心的距離
(6)選擇與x距離最小的聚類,將x歸為此聚類
(7)End for
(8)在當前劃分的情況下,計算新的聚類中心
(9)Until停止規則出現 返回結果
其中停止規則可以是以下任何一個:
a.不再有數據點被重新分配,或者有在規定很小數量內的數據點被重新分配;
b.不再有聚類中心發生變化,或者有在規定很小數量內的數據中心發生變化;
c.誤差平方和(SSE)局部最小。誤差平方和的計算公式如下:上。在歐式空間中,可以用下面的公式計算聚類均值:

圖1 MapReduce計算流程圖


MapReduce框架并不適合所有算法,只有具備可伸縮性的算法才適用。MapReduce框架在處理大規模數據時采用分-總的方式進行并行計算,Map函數負責對數據進行“分布計算”,各個Mapper節點在工作時不能夠實時的交互,只有中間結果生成完畢才能得到最終的結果,這一點也要求必須保證分到每一個Mapper節點的工作相互獨立且相同,無論任務大小,工作方法都不改變。Reduce函數負責對中間結果進行處理,以得到最終結果。完整的MapReduce框架程序由Main函數、Map函數和Reduce函數組成。
K-均值算法是迭代式運行,數據運行完一遍后,判斷是否滿足終止條件,從而決定是否停止下一次迭代,本算法以第一種終止規則為例,即判斷是否存在重新分配的點,若不存在,則停止迭代。
該算法運行過程如下:
(1)Main函數負責生成隨機的聚類中心,并發送到每一個Mapper節點上。
(2)Map函數負責計算數據點與聚類中心的距離。輸入<Key,Value>對應于<數據分片中的行號,數據記錄>。輸出<Key,Value>對應于<數據點的唯一標識,聚類中心編號+該數據點與該聚類中心的距離>。Map函數偽代碼如下:
輸入:數據點集;
輸出:數據點與各個聚類中心的距離;
1.Key:數據文件行號 Value:數據記錄
2.For each datapoint in DataPointDS
3.For i←0 To K//K為聚類數
4.設置Key:數據點 Value:i_distance(數據點,cluster[i])
//使用公式3,求出數據點到每個聚類中心的距離
5.輸出(Key,Value)
(3)Reduce函數負責比較出距每個數據點最近的聚類中心。輸入<Key,Value>與Map函數的輸出一致。輸出<Key,Value>對應于<數據點的唯一標識,與該數據點距離最近聚類中心編號>。Reduce函數偽代碼如下:
輸入:數據點到各個聚類中心的距離列表
輸出:數據點、與該數據點距離最近的聚類中心
1.Key:數據點 Values:Iterator(1_distance_1,2_distance_2,…,K_distance_K)
2.設置MinDistance:MAX_VALUE
3.For i←0 To K
4.IF distance_i<MinDistance
5.設置MinDistance:distance_i
6.設置Value:MinDistance
7.輸出(Key,Value)
(4)Main函數最后將數據點分配給各個聚類,判斷是否有數據點被重新劃分,若劃分情況沒有改變,則說明算法已完成,程序結束,返回聚類中心及分類情況,否則重新計算聚類中心,啟動下一次迭代。
根據上述思路,設計出基于MapReduce框架的分布式K-均值聚類算法,運行過程流程圖如圖2所示。

圖2 K-均值算法分布式流程圖
MapReduce編程模型運行所在的Hadoop平臺使用HDFS分布式文件系統作為數據存儲方式,將大規模待聚類數據分片存儲,降低了計算機存儲量,K-均值聚類算法中每迭代一次都要計算出數據點與每個聚類中心的距離,對于屬性數量較多的數據集,是個龐大的工作量。而在Hadoop集群中,每個節點僅需對所分配Split數據進行處理,少量數據被裝入CPU進行計算,減少了CPU的工作量,提高了數據處理效率,設集群中TaskTracker工作節點數為n,忽略啟動MapReduce任務所耗時間,則集群節點CPU工作量為原單機環境下CPU工作量的1/n,處理效率提高了n倍。
該試驗運行所需的云平臺由實驗室5臺電腦組成,每臺電腦裝有3臺虛擬機,共15個節點。Hadoop分布式云計算集群中各點配置如下:Intel Core i7-2630CPU、3.1GHz、2048MB Memory,并采用Centos6.0操作系統、jdk-8u20-linux-i586版本的jdk、hadoop-1.1.2版本的Hadoop。其中一個作為Master節點,其余作為Slave節點。
實驗樣本仿真于淘寶用戶購買商品類別記錄,設商品種類為8種(服飾、學習用品、廚房用品等),每個樣本有8個屬性,每個屬性值為購買某類商品的數量。分別生成106(數據大小為20M)、107(數據大小為200M)和108(數據大小為2000M)個樣本構造成數據集,現將用戶分為7類(初級主婦,在校學生,高級廚神等),即聚類數K設置為7。本實驗主要內容為:比較K-均值聚類算法在相同硬件環境支持下單機運行時間,與在Hadoop不同數量節點集群上運行時間的長短、加速比。實驗結果如圖4所示。
由圖3(a)可看出,單機運行時間遠遠少于并行運行的時間,這歸因于以下兩個方面:
(1)MapReduce類庫將文件分為大小為64M得塊,由于該數據量小于64M,所以無需分割,每個Mapper節點可運行兩個Mapper任務,在此只需執行一個Mapper任務,所以直接分給一個Mapper節點運行,節點數的增加與否并不影響執行時間。
(2)單機環境下運行小規模數據集時較為靈活,而初始MapReduce任務需要耗費一定的時間,因此單機比并行所花費的時間少。由此可得出結論,處理小規模的數據不適合將其在MapReduce框架中并行計算。
由圖3(b)可看出,隨著數據量的增大,并行計算的優勢慢慢顯露出來,200M會分為4個分片,因此需要兩個節點來進行并行計算。節點數在增加到2時,加速比有了明顯的提高。節點數大于2后,計算節點數的增加對執行時間已無影響。
由圖3(c)可以看出,當數據規模足夠大時,運行時間隨著集群中節點數目的增加在不斷減少,這里數據大小為2000M,可分為32個分片,也就是說當節點數達到16個時,運行時間最短。
比較得出,當數據量較小時,使用云計算平臺執行任務沒有單機環境下執行效率高,當數據規模足夠大時,并且每一個數據分片都在進行處理工作時,集群的效率最高。
由實驗可得出結論,將算法改造為MapReduce模型算法后,運行結果與理論性能相符合,降低了CPU的工作量,提高了處理數據效率,降低了對大數據的聚集工作復雜度。
本文首先介紹了當前處理大數據的重要性,為了提高大規模數據處理效率,提出了解決該問題的思路,即對傳統算法進行MapReduce編程框架的實現,本文將K-均值算法應用于MapReduce編程模型中,并用實驗驗證該方法的可行性。當數據量較小時,單機處理相對比較簡單快捷,但數據量較大時,用此方法可以提高運行效率,節省運行時間。

圖3 K-均值聚類算法在不同數量級的樣本運行時間及加速比
將算法并行運行于Hadoop集群中雖然提高的執行效率,但仍然存在一些問題需要改進。首先,K-均值算法需要反復迭代,對于大規模數據而言,每一次迭代都要花費很長時間,如何控制迭代次數需要好好研究。其次,在分布式計算中,根據特定樣本,如何選取初始聚類中心也是一個很關鍵的問題,這將直接影響運行的效率與結果的準確性,需要在未來進一步研究,加以完善。
[1]李國杰,程學旗.大數據研究:未來科技及經濟社會發展的重大戰略領域——大數據的研究現狀與科學思考[J].中國科學院院刊,2012,27(6):647-657.
[2]覃雄派,王會舉,杜小勇,等.大數據分析——RDBMS與 MapReduce的競爭與共生[J].軟件學報,2012,23(1):32-45.
[3]劉義,景寧,陳犖,等.MapReduce框架下基于R-樹的k-近鄰連接算法[J].軟件學報,2013,24(8):1836-1851.
[4]顧榮,王芳芳,袁春風,等.YARM:基于 MapReduce的高效可擴展的語義推理引擎[J].計算機學報,2014,37(24):1-13.
[5]饒君,吳斌,東昱曉.MapReduce環境下的并行復雜網絡鏈路預測[J].軟件學報,2012,23(12):3175-3186.
[6]Zhang Shufen,Yan Hongcan,Chen Xuebin.Proceedings of 2010 International Conference on Future Information Technology and Computing(FITC 2010)[C].Beijing University,Intelligent Information Technology Application Research Association,Beijing,2010.
[7]呂奕清,林錦賢.基于MPI的并行PSO混合K均值聚類算法[J].計算機應用,2011,31(2):428-431+437.
[8]張軍偉,王念濱,黃少濱,等.二分K均值聚類算法優化及并行化研究[J].計算機工程,2011,37(17):23-25.
[9]陶冶,曾志勇,余建坤,等.并行k均值聚類算法的完備性證明與實現[J].計算機工程,2010,36(22):72-74.
[10]Polo J,Carrera D,Becerra Y,et al.Performance-Driven task co-scheduling for MapReduce environments[J]//Network Operationsand Management Symposium(NOMS):IEEE,2010:373-380.
[11]Bing Liu.Web數據挖掘[M].北京:清華大學出版社,2012:89-91.