翟俊海, 田石, 張素芳, 王謨瀚, 宋丹丹
(1. 河北大學 數學與信息科學學院 河北省機器學習與計算智能重點實驗室, 河北 保定 071002;2. 中國氣象局 氣象干部培訓學院河北分院,河北 保定 071000)
聚類分析是數據挖掘的基本任務之一,已成功應用于許多領域,聚類算法可大致分為2類:層次聚類算法和劃分聚類算法[1].K-means算法是著名的劃分聚類算法,在2006年舉行的國際數據挖掘學術會議上評出的十大數據挖掘算法中[1],K-means名列第2位.K-means算法是一種硬聚類算法,具有非此即彼的性質,即一個樣例只能屬于一個簇.Bezdek[2]對K-means算法進行了改進,提出了模糊K-means算法.模糊聚類建立了樣例對類別的不確定性描述,更能客觀地反映現實世界,往往比清晰聚類能獲得更好的聚類效果.不管是清晰聚類還是模糊聚類,都容易受聚類個數與初始聚類中心的影響,研究人員提出了許多自適應確定聚類個數的方法.針對K-均值聚類算法,Gupta等[3]提出了一種利用最小化聚類中心之間距離快速自動確定聚類個數的方法.受人類觀察視野范圍內山峰峰頂的啟發,Masud等[4]提出了一種確定聚類個數的新方法,該方法用混合高斯模型對觀測點和對象之間距離的分布進行建模.Lord等[5]提出了一種利用對象的穩定性確定聚類個數的方法,定義了一個聚類中對象的穩定性指標.Yu等[6]提出了一種利用決策理論粗糙集自動確定聚類個數的方法,文獻[7]和[8]對確定聚類個數的方法進行了全面深入的綜述.隨著大數據時代的到來,聚類算法在大數據環境中的可擴展性研究引起了研究人員的極大關注,已有一些大數據聚類方面的研究成果.例如,Havens等[9]研究了FCM(Fuzzy C-Means)算法在大數據環境中的可擴展性問題,對3種FCM的擴展方法(抽樣方法、增量方法和核化方法)進行了比較研究,得出了一些有價值的結論.Ludwig[10]研究了FCM算法并行化實現問題,提出了基于MapReduce的FCM算法,并討論了其穩定性.Bharill等[11]提出了一種可擴展的迭代抽樣FCM算法,可對大數據進行聚類,并用大數據開源平臺Spark實現了該算法.Wu等[12]研究了模糊一致性聚類問題,定義了模糊一致性度量指標,提出了基于模糊一致性的大數據聚類算法,并在Spark平臺上實現了該算法.王磊等[13]也研究了基于Spark的大數據聚類及系統實現.李應安[14]和陽美玲[15]分別研究了基于MapReduce的大數據聚類算法.基于動態信任評估.張彬等[16]討論了針對政務大數據分析的云服務平臺設計問題.高學偉等[17]討論了基于Hadoop大數據建模問題.吳信東等[18]對MapReduce與Spark用于大數據分析進行了全面的比較.宋杰等[19]對MapReduce大數據處理平臺與算法的研究進展進行了全面而深入的分析.翟俊海等[20]提出了基于Spark和SimHash的大數據K-近鄰算法.張素芳等[21]對大數據與大數據機器學習研究進行了全面的綜述,具有很高的參考價值.然而,目前還未見到基于Hadoop和Spark的大數據模糊K-means算法的比較研究,從算法執行時間、同步次數、文件數目、容錯性能、資源消耗這5方面進行比較,得出的結論對從事大數據研究的人員具有較高的參考價值.
模糊K-均值算法[2]是K-均值算法的改進,它允許一個樣例點以不同的隸屬度屬于不同的簇.模糊K-均值算法最小化下面的目標函數:
(1)
其中,N是樣例數,m是大于1的參數,uij是樣例xi屬于第j個簇的隸屬度,cj是第j個簇的中心.與K-均值算法一樣,模糊K-均值算法也是一個迭代算法,在迭代過程中,模糊隸屬度uij用公式(2)更新,簇中心cj用公式(3)更新.
(2)
(3)
算法的迭代停止條件為
(4)
在公式(4)中,ε∈(0,1),n表示迭代次數.
基于MapReduce的模糊K-均值算法包括下面2個MapReduce過程.
第1個MapReduce過程用于計算聚類中心.這一過程包括下面3步:
1) 首先,系統會從HDFS中讀取數據并隨機生成每個數據隸屬于每一簇的隸屬度.然后,對數據進行分片,每個分片傳入一個Map節點進行處理.并且Map任務在計算完畢后會對中間數據進行排序,使其分區內有序.
2) combiner會將每個Map的計算結果進行合并以減少數據傳輸,提高運行效率.
3) Reduce會將combiner傳來的數據進行合并,從而計算出聚類中心.
第2個MapReduce過程用于計算隸屬度.這一過程系統依然會從HDFS中讀取數據,進行分片,傳遞給Map節點并利用之前計算出的聚類中心重新計算隸屬度,多個Map節點都會將計算結果傳遞給Reduce節點進行比較隸屬度是否收斂,不收斂則繼續迭代,收斂則終止運行.
不同的Map階段、combiner階段、Reduce階段之間互不干擾,都是一個獨立的任務,只有前面的任務全都完成之后才能進行后面的任務,并且每一個任務的計算結果都要寫入磁盤.圖1通過一個例子展示了MapReduce一次迭代的流程.

圖1 MapReduce一次迭代的流程Fig.1 Schematic diagram of one iteration by MapReduce
基于MapReduce的模糊K-均值算法包括4個作業、16個任務和4次Shuffle.下面給出基于MapReduce的模糊K-均值算法的偽代碼.
1) class Mapper
2) method Map(key ,value ,context)
3) for(i= 1 tok)
4) 隸屬度的2次方
5) for(j= 1 ton)
6) 數據與隸屬度的2次方相乘
7) end
8) end
9) class Combiner
10) method combiner(key ,value ,context)
11) 具有相同鍵的數據將其值進行相加
12) class Reducer
13) method Reduce(key ,value ,context)
14) 具有相同鍵的數據將其值進行相加
15) 計算聚類中心
16) class Mapper
17) method Map(key ,value ,context)
18) 根據聚類中心重新計算隸屬度
19) class Reducer
20) method Reduce(key ,value ,context)
21) 計算前后2次隸屬度之差的絕對值
22) 判斷隸屬度之差的絕對值的最大值是否小于閾值
Spark是一種基于內存計算的大數據處理平臺,它用RDD(resident distributed dataset)將數據組織在內存中,并通過RDD算子對數據進行操作.基于Spark的模糊K-均值算法需要5次轉換操作,共產生5個RDD.圖2通過一個例子展示了5個RDD的轉換過程.

圖2 Spark一次迭代的流程Fig.2 Schematic diagram of one iteration by Spark
Spark在一次迭代過程中需要進行2次Shuffle操作,可以劃分成3個階段.基于Spark的模糊K-均值算法迭代2次被劃分為4個階段,包括8個任務和3次Shuffle.與MapReduce相比,減少了任務數和Shuffle次數,提高了運行效率.下面給出基于Spark的模糊K-均值算法的偽代碼.
1) rdd = sc.textFile(Data) //讀取名為Data的文件
2) memdeg[n][k] //初始化隸屬度
3) memdegRDD = sc.parallelize(memdeg) //并行化
4) while (x是否大于閾值)
5) 計算隸屬度的2次方
6) 將數據與隸屬度的2次方進行相乘
7) 計算聚類中心
8) 更新隸屬度
9) 計算前后2次隸屬度之差的絕對值的最大值x
受文獻[18]的啟發,下面從算法執行時間、同步次數、文件數目這3方面,對2種大數據模糊K-means算法進行理論比較.
1)算法執行時間
基于MapReduce模糊K-means算法的執行時間主要用于中間數據排序和文件傳輸.中間數據排序只存在于MapReduce中,這是為了對數據進行初步的歸并操作,避免因為傳輸文件數量過多給網絡造成重大壓力.在運算時,MapReduce需要執行2次MapReduce操作.在第1次操作中,每個Map需要比較的次數為3,每個Combiner需要比較的次數為1;在第2次操作中,每個Map需要比較的次數為1.因此在一次迭代任務中,MapReduce任務需要比較4次,而Spark在處理數據時不需要中間結果有序,所以不需要比較.
在MapReduce中,假設平均每個Map任務處理m條數據,平均每個Reduce處理r條數據,可以得出排序時間為O(mlogm),而在Spark中,排序時間為O(0).
關于中間數據的傳輸時間,在不考慮網絡傳輸性能帶來的差異的情況下,假設其傳輸速度為Cr,需要處理的數據大小為D,所以傳輸時間為D×Cr.在MapReduce任務中,數據是由執行Map任務的節點發送到Reduce節點的,需要執行2次Map-Reduce任務,2次發送的數據量D1=8,D2=8,所以1次迭代時間為16Cr,2次迭代時間為32Cr.Spark任務的執行過程與MapReduce相同,需要發送2次數據,第1次傳送數據D1=4,D2=8,所以1次迭代時間為為12Cr,2次迭代時間為為24Cr.
2)文件數目
如果文件數目過多,在執行計算時就會有大量I/O操作,嚴重影響系統性能.所以應該盡量減少數據文件的數量.
在MapReduce處理數據的過程之中,每個Map節點都會產生一個文件,即使是不同的分區,數據也會存儲在一個文件之中,這是由MapReduce的數據存儲原理決定的,即每個數據都是由鍵值對的形式存儲,面對不同的分區可以設置不同的鍵.而在Spark中,不需要對數據進行預排序,因此需要對數據進行分區存儲.
因為MapReduce一次迭代共產生4個Map任務,所以產生2+2=4個文件;在Spark中,每個Map任務被劃分為2個分區,從而共產生2*2+2*2=8個文件.
3)同步次數
因為在MapReduce計算模型中,只有所有的Map任務完成后才可以進行Reduce任務,所以MapReduce是同步模型.而Spark可以看作為異步模型,在需要網絡傳輸時,它才需要同步.在算法執行的過程中,同步次數執行的越少,算法的執行速度就越快.
在執行模糊K-均值聚類時,MapReduce與Spark的1次迭代同步次數都為2次,2次迭代MapReduce的同步次數為4次,Spark為3次.隨著算法迭代次數的增多,Spark比MapReduce的優勢會越來越明顯.
4)容錯性能
在實際應用中,難免遇到用戶代碼錯誤、進程崩潰、機器宕機等情況,良好的容錯性能能使系統快速地從故障中恢復過來,合理的容錯機制是算法執行的保證[20].使用MapReduce的好處是它有很強的容錯能力,使你的Job能夠成功完成.MapReduce通過冗余備份實現容錯,雖然會產生大量的I/O操作,但是在計算出現問題時容易恢復,安全性比較高.而Spark通過血緣關系實現容錯,文獻[19]系統分析了MapReduce和Spark的容錯能力,證明了MapReduce的容錯性能確實比Spark要好.
5)資源消耗
在處理大數據問題時,不同的算法對計算機性能的要求是不同的,如排序和查詢算法需要有大量的I/O操作,對網絡帶寬要求比較高;而執行K-均值、PageRank等迭代算法時需要做大量的計算,對CPU要求比較高.當使用MapReduce與Spark運行模糊K-均值算法時,MapReduce在內存、網絡以及磁盤的占用率上都要低于Spark.顯然,MapReduce對硬件要求比Spark更低,成本更低.
實驗所用的數據集包括3個人工大數據集和4個UCI數據集,對基于2種開源平臺的大數據模糊K-means算法從運行時間和迭代次數2方面進行了實驗比較.為描述方便,基于MapReduce的模糊K-均值算法記為MK-Means,基于Spark的模糊K-均值算法記為SK-Means.4個UCI數據集分別是Covtype、HT_Sensor、Poker-hand-testing和SUSY;3個人工大數據集都是用高斯分布生成的.第1個人工數據集GAUSS1是一個2類包含100萬個點的數據集,每類包含50萬個樣例.2類服從的高斯分布為p(x|ωi)~N(μi,∑i),i=1,2,參數列于表1中.

表1 GAUSS1的參數
第2個人工數據集GAUSS2是一個包含3類樣例的數據集,每一類包含40萬個樣例,服從的概率分布分別為

第3個人工數據集GAUSS3是一個4類包含100萬個點的3維數據集,每類250 000個點,每類服從的高斯分布為p(x|ωi)~N(μi,∑i),i=1,2,3,4,參數如表2所示.

表2 GAUSS3的參數
實驗客戶端開發環境為Eclipse,JDK版本為jdk-7u80-windows-x64.實驗所用集群開發環境主、從節點的基本信息如表3所示.

表3 實驗環境的基本信息
在人工數據集的實驗中,隱去數據集的類別標簽,并設置聚類數據K為實際的類別數,實驗結果列于表4中.

表4 2種算法實驗比較的結果
從表4可以看出,相同的數據在不同平臺迭代次數并不完全相同,是因為數據的初始隸屬度的確定具有隨機性,具有一定的差異.其中,GAUSS3,HT_Sensor與SUSY差異較大一些,是因為GAUSS3,HT_Sensor數據集的類別數較多,SUSY的數據量較大,隨機性也相對大一些,迭代次數差異相較于其他數據自然也大一些.
本文對基于MapReduce和Spark的2種大數據模糊-K均值算法進行了比較研究,得出如下結論:1)從算法執行時間上看,基于MapReduce的模糊K-均值算法較慢,基于Spark的模糊K-均值算法較快;2)從需要緩存的文件數目方面來看,基于MapReduce的模糊K-均值算法需要緩存的較少,基于Spark的模糊K-均值算法需要緩存的較多;3)從同步次數方面來看,基于MapReduce的模糊K-均值算法需要的同步次數較多,基于Spark的模糊K-均值算法需要的同步次數較少;4)從容錯性能方面來看,基于MapReduce的模糊K-均值算法較強,而基于Spark的模糊K-均值算法較弱,這種容錯性能差異是由大數據開源平臺決定的,而不是由模糊K-均值算法決定的;5)從資源消耗方面來看,基于MapReduce的模糊K-均值算法資源消耗較少,基于Spark的模糊K-均值算法資源消耗較多.