王永貴,郭昕彤
遼寧工程技術大學 軟件學院,遼寧 葫蘆島 125105
在如今這個數據呈指數級增長的互聯網時代,每天的這些海量數據之中不乏有很多具有重要價值的信息,而關聯規則挖掘則可以發現大量數據中項集間的關聯[1]。分布式和并行計算正是為了解決在實際應用中大數據量的存儲和計算效率問題[2]。Spark 則為最常用的分布式計算框架。Spark是在借鑒了MapReduce的基礎之上發展而來,相比較于MapReduce它繼承了分布式計算的優點[3]并對過大的I/O開銷、低效率,以適用于多迭代算法不夠靈活等問題都做出了重大改進。
Qiu等人[4]提出YAFIM算法,這是首次在Spark上實現了Apriori 算法的并行化。它引進了Spark 的彈性分布式數據集RDD 的抽象且支持DAG 圖的分布式并行計算的編程框架,減少了迭代過程中數據的落地,這很好地解決了MapReduce 框架存在的問題[5]。該算法的實驗結果也說明了在Spark上實現傳統算法的效率相比基于Hadoop上的有很大改善。但算法在執行過程時自連接存在難題,得到的頻繁集會多次掃描已有的數據庫,當候選集數目龐大時,會很大地降低算法的效率。Rathee 等人[6]在YAFIM 算法的基礎上消除了候選集的生成過程并且引入了布隆過濾器,利用布隆過濾器相比較于哈希樹的空間和時間的優勢代替了哈希樹,與YAFIM 算法相比具有更高的擴展性以及加速比,但是算法生成過多無用項集,同時算法泛化性較差。Luo等人[7]基于Spark提出稀疏布爾矩陣分布式頻繁挖掘算法FISM,算法通過事務與項目構成的矩陣減少了掃描數據庫的次數,算法性能有了提高但還是有大量重復性操作。而后謝志明等人[8]提出了Apriori_MMR算法,該算法也是引入了布爾矩陣但結合數據分片進行了并行化實現,通過矩陣化使事務數據庫得到進一步的壓縮,在大數據集上挖掘時其效率及性能上均有較大提升。但其中布爾矩陣無法將數據存儲在某小節點上垂直劃分,所以還存在著一定的局限性。Krishan 等人[9]提出了一種新的混合頻繁項集挖掘算法,該算法利用數據集的垂直布局來解決每次迭代中數據集的掃描問題,垂直數據集攜帶信息以查找每個項目集的支持度使得計算起來變得簡單,但同時也占用過多的內存。Karim 等人[10]提出了一種利用Spark 平臺挖掘最大頻繁模式的有效方法,利用一個基于素數數據轉換技術。Chon等人[11]充分利用CPU 的計算能力執行大量數據,進而提升算法候選集的生成以及支持度計數的速度。
雖然上述從不同角度上提出了改進方法,一定程度上提高了算法性能,但存在挖掘模式單一、計算支持度的方法復雜、計算許多無用數據、數據量大時受限于RDD的存儲和計算效果。
Spark SQL是Apache Spark中的一個新模塊,提供了關系處理的豐富集成[12]。它用于處理結構化數據的模塊,包括一個基于成本的優化器、列存儲和代碼生成,以加快查詢速度[13],與此同時使用Spark 引擎擴展到數千個節點和多小時查詢,提供了完整的中間查詢容錯。它使用聲明性DataFrame API擴展Spark以允許關系加工,提供諸如自動優化等優點,讓用戶編寫混合了關系和復雜分析的管道[14]。
RDD是分布式的Java對象的集合,DataFrame是分布式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子以外,更重要的特點是提升執行效率、減少數據讀取以及執行計劃的優化,比如filter下推、裁剪等。因為它規定了具體的結構對數據加以約束。由于DataFrame具有定義好的結構,Spark可以在作業運行時應用許多性能增強的方法。例如,定制化內存管理:數據以二進制的方式存在于非堆內存,節省了大量空間之外,還擺脫了GC的限制;優化的執行計劃:查詢計劃通過Spark catalyst optimiser 進行優化。Spark 對于DataFrame在執行時間和內存使用上相對于RDD有極大的優化[15],Catalyst優化引擎使執行時間減少75%、Project Tungsten Off-heap內存管理使內存使用量減少75%。
本文算法基于SparkSql的分布式數據結構DataFrame進行編程,解決了RDD內存資源和計算速度受限問題,使得本文算法在標準數據集上得到更高效的結果。數據集存儲在RDD和DataFrame上的區別如圖1所示。

圖1 數據集基于RDD和DataFrame存儲的區別
倒排索引[16]源于實際應用中需要根據屬性的值來查找記錄。這種索引表中的每一項都包括一個屬性值和具有該屬性值的各記錄的地址。由于不是由記錄來確定屬性值,而是由屬性值來確定記錄的位置,因而稱為倒排索引。本文引入倒排索引的思想來優化支持度的計算,取代通過判斷數據集中包含項集的事務個數來計算支持度的方法,避免每次支持度都要訪問一次數據集,產生巨大的I/O 開銷[17]。本文在計算支持度時的具體優化如下。
(1)將數據集加載到SparkSql,以DataFrame結構存儲,其事務列為包含對應項集列中項集所在的事務集合,集合的長度即所求支持度。
(2)候選集的k個項目對應事務列的集合交集,即k個項目共同存在的事務集合,也就是包含候選集的事務集合。計算候選集k項集的支持度時,通過執行Spark-Sql語句對DataFrame中對應k個項目的事務列求交集,得到集合的長度,即候選集的支持度。
本文算法挖掘所有頻繁集只需要讀取一次數據集,同時每次計算候選集支持度,只需計算候選集的k個項目而不必計算整個數據集;計算支持度的過程中,利用SparkSql語句求交集的方法,較基于RDD編程計算支持度的方式提升執行效率、減少數據讀取以及執行計劃的優化。支持度計算過程如圖2所示。

圖2 支持度計算示例圖
布隆過濾器是一種概率型數據結構,特點是高效地插入和查詢,可以用來判斷某個元素或集合是否存在,它是用多個哈希函數,將一條數據映射到位圖結構。例如,判斷一個項目是否在數據集中,布隆過濾器將數據集中每個項目都通過k個不同的Hash函數隨機散列到數組的k個位置上,并將這些位置置為1。現要判斷項目w是否在數據集中,通過Hash函數將事務w映射成位陣列中的點,判斷這些點是否都為1,可以得到元素是否存在于數據集。示例如圖3所示。

圖3 布隆過濾器示例圖
布隆過濾器隨著k的個數增加,可以提升布隆過濾器的存儲效果,但是也會增大內存訪問次數和計算復雜度,降低算法效率。本文算法通過添加存儲集合元素的輔助信息,即唯一ID,改進后訪問一次相當于布隆過濾器的兩次訪問,同時輔助信息中存儲了項目對應的事務位置,可以有效地減少頻繁集挖掘過程中查找項目屬于某個事務所浪費的計算資源,并減少訪問內存的次數,提升運行速度。
構建階段分三步進行。(1)設K2+1個獨立的Hash函數,H(x)={h0(x),h1(x),…,hk2(x)}使其具有均勻分布的輸出數據的能力,構造一個m位大小的數組B,其中每個比特被初始化成0;(2)存儲集合S的元素信息,計K2+1個哈希值與此同時還需要計算集合S的元素e的偏移值為每個元素的輔助信息O(e);(3)將K2+1 位B[h1(e)%m],B[h2(e)%m],…,B[hk2(e)%m]置為 1,同時將K2+1 位B[h1(e)%m+O(e)],B[h2(e)%m+O(e)],…,B[hk2(e)%m+O(e)]置為1。設B[hi(e)%m]為第j個 bit(一個字節的位j,1 ≤j≤8),讀取它之前需要讀j-1 位。為了一次訪問內存讀取B[hi(e)%m]、B[hi(e)%m+O(e)],需要一次內存讀取訪問j-1+wˉ位,所以j-1+wˉ<w當j取最大值8時,wˉ<w-7,則為保證在一次內存讀取中獲得B[hi(e)%m]、B[h1(e)%m+O(e)]需要wˉ<w-7(64 位系統=64)。O(e)≠0,若O(e)=0則B[hi(e)%m]、B[h1(e)%m+O(e)]兩位在數據的相同位置。構建階段最大哈希函數的數量為K2+1。查詢是項目e是否存在于集合B:用一次內存讀取元素位置和唯一ID的偏移位置,如果兩位同時為1,讀取下兩位,如此遍歷。當所有位都為1,得出元素存在于集合中。判斷一個項目是否存在的最大內存訪問次數比布隆過濾器減少了一半,從而減少了計算時間。改進后的布隆過濾器的數據結構如圖4所示。

圖4 改進的布隆過濾器的數據結構
本文提出一種基于SparkSql 的分布式編程的算法。首先,利用SparkSql 存儲數據集和計算支持度、改進后的布隆過濾器存儲迭代過程生成的項目集;其次,通過精簡步得到頻繁項目、候選集、事務集;最后,提出兩種迭代的方法和選擇條件,自適應數據集選擇最優方式進行頻繁集挖掘的算法。本文算法分兩個階段進行。第一階段從數據集中挖掘出所有的頻繁一項集。第二階段從數據集中迭代挖掘出所有頻繁項集。
本文基于先驗定理在每次迭代過程進行精簡事務集、項目和候選集。基于SparkSql 的DataFrame 數據結構的高效性對項目、事務進行精簡;針對改進后布隆過濾器上精簡后的候選集支持度的計算進行簡化。算法通過精簡條件的前三步刪除不能生成頻繁集的事務、項目、候選集,減少頻繁集挖掘過程需要計算的數量同時第四步優化支持度的計算方法,進而達到提升頻繁集挖掘效率并節省計算資源的目的。具體精簡過程如下。
(1)頻繁k集必須由k(k-1)/2 個頻繁子集生成,刪除頻繁子集個數不等于k(k-1)/2 的候選集。
(2)頻繁k項集必須有k個項目組成,刪除頻繁項目數小于k的事務。
(3)頻繁k項集必須由頻繁項目組成,刪除不頻繁的項目。
(4)算法基于項目-事務-計數的DataFrame,通過對所求項目集中每個項目對應的事務列求交集,得到支持度。
例如圖5 是數據集加載到SparkSql 的Dataframe 結構,最小支持度為50%挖掘頻繁集。第一次迭代,除了a、b都為頻繁一項集、第二次迭代的事務至少有兩個項目,必須都是頻繁項目,所以刪除T1、T6、a、b,最后對候選二項集對應T列求交集得到頻繁二項集cg、dg、fg。第三次迭代,頻繁三項集必須有3 個頻繁子集,候選集cdg、cfg、dfg,都只有2個頻繁子集,所以沒有頻繁三項集,不必計算,挖掘過程結束。

圖5 精簡示例圖
第k次迭代,事務個數t,每個事務中的平均項目個數n,頻繁k-1 項集數g,頻繁項目個數i,頻繁候選集個數c,布隆過濾器中存儲和搜索一個元素花費的時間為1。由先驗定理可以得出公式(1):

第一種方法迭代時的三個步驟:第一步頻繁k-1項集存儲到布隆過濾器中的時間Ts。第二步生成和精簡候選集的時間Tn。第三步生成鍵值對的時間Tj。
第k次迭代花費的時間為Ta:

存儲頻繁k-1 項集到布隆過濾器中的時間為:

生成頻繁候選集和精簡候選集的時間為:

生成精簡候選集的鍵值對時間為:

第k次迭代的時間為:

第二種方法迭代時所用時間的三個步驟:第一步生成頻繁事務和項目的時間Tg。第二步存儲頻繁k-1項集的項目在布隆過濾器中的時間Tq。第三步生成鍵值對的時間Tj。方法2進行第k次迭代:

生成頻繁事務和項目的時間:

頻繁k-1 項集中的頻繁項目存儲在布隆過濾器中的時間:

生成鍵值對的時間:

第k次迭代的時間:

精簡的候選集c一定小于等于候選集個數 |Ck|,得到如下條件。如果滿足Tb >Ta就用方法1,反之用方法2。

第一階段從數據集挖掘出所有頻繁一項集。首先,定義一個模式字符串“Item Tid Count”,根據模式字符串,生成模式(Filed)。其次,用Schema 描述模式信息,模式中包含Item、Tid、Count三個字段。最后,數據集加載到DataFrame,注冊為臨時表,通過sql 語句生成頻繁一項集。頻繁一項集的挖掘過程如圖6 所示。頻繁一項集的挖掘應用示例如圖7所示。

圖6 頻繁一項集的挖掘過程

圖7 頻繁一項集挖掘的應用示例
第二階段獲取頻繁k+1 項集。迭代地使用k項頻繁集來生成k+1 項頻繁集,直到沒有頻繁集生成或達到終止條件。基于當前迭代數據集和頻繁k項集的特征,使用一個動態方法選擇生成候選集的方法即方法1或者不生成候選集的方法即方法2來完成第k次迭代,如果該次迭代滿足條件用方法2進行迭代,否則用方法1算法進行挖掘頻繁k項集。
第k次迭代,如果不滿足條件,使用方法2 完成本次迭代。首先,將頻繁集中的頻繁k-1 項集中的項目存儲在布隆過濾器;其次,應用intersection 算子于DataFrame 進行修剪,刪除不存在于布隆過濾器中的頻繁項目;再次,應用map 算子,刪除項目事務DataFrame中項目數小于k的事務,應用map算子于布隆過濾器中的頻繁項目,映射生成可能出現的k項集;最后,通過sql 語句得到k項集中每個項目對應的事務集合的交集,從而計算支持度,進而得到頻繁集。
如果滿足條件,使用方法1 進行本次迭代。首先,將頻繁集中的頻繁k-1 項集中的項目集合存儲在布隆過濾器。其次,連接前k-2 項相同的頻繁k-1 項集,生成候選k項集。再次,刪除頻繁k-1 項子集不等于k(k-1)/2 的候選集。最后,通過sql 語句得到k項集中每個項目對應的事務集合的交集,從而計算支持度,進而得到頻繁集。頻繁k項集的挖掘流程如圖8 所示。應用示例如圖9所示。

圖8 頻繁k 項集挖掘過程

圖9 頻繁二項集生成的應用示例圖
實驗數據是UCI 中六個用于驗證關聯規則算法性能,并且特點不同的數據集,具體如表1所示。
實驗的軟件環境為64 位Ubantu 14.04 Linux 操作系統、JDK-1.8 Hadoop-2.7.1 Scala-2.12.1 Spark-1.6.1Python-3.6.3 Anaconda-2.0;硬件環境為Intel corei7-6500U 處理器,4 個 3.10 GHz 處理器核、8 GB RAM 和500 GB HD。采用20 臺計算機搭建Spark 集群運行環境,集群共20個節點,把其中兩臺計算機設為master節點,其他18臺機器設為slave節點。

表1 實驗數據集
實驗采用增加節點的個數M和數據集的復制倍數N為變量,用SpeedUp、SizeUp、ScaleUp 等評估指標驗證算法的性能。選用Retail 數據集、Musroom 數據集、Kosarak 數據集、BMSWebView2 數據集四個不同數據集,在最小支持度為0.1%的條件下對算法進行驗證。
隨著集群節點數M增加,加速比變化情況如圖10所示。算法在不同數據集中隨節點數的增加,加速比總會達到最優值,并且加速比最終都呈穩定趨勢,進而證明該算法可以應用于更大的集群規模。

圖10 隨著節點增加,算法SpeedUp的變化
在節點數為6的條件下,隨著數據集復制倍數的增加,觀察算法應用在不同數據集的SizeUp 的變化情況如圖11 所示。隨著不同數據集的倍數增長,算法的SizeUp呈緩慢的增長趨勢,說明本文算法具有處理更大數據集的能力。
隨著節點數M和數據集復制倍數N的同時同倍數增加,觀察算法應用在不同數據集的SizeUp 的變化情況如圖12 所示。隨著數據集復制倍數和集群節點數同向增長時,觀察算法對不同數據集的ScaleUp 一直在0.9 附近穩定波動,進而證明該算法當應用到大規模計算時,具有較好的高效性和泛化性。綜上所述,本文算法可以應用到更大規模數據和集群條件下挖掘頻繁集。

圖11 隨著數據量增大,算法SizeUp的變化

圖12 隨著節點和數據同時增大,算法ScaleUp的變化
將本文提出的兩種頻繁集挖掘方法和最終算法基于Musroom 數據集和Kosarak 數據集,分別在最小支持度為0.3、0.1%的條件下方法1、方法2 和本文算法的比較,如圖13 和圖14 所示。實驗結果表明本文算法每次迭代會選擇最優的迭代方法,從而提升每一次迭代的計算效率,進而提升算法的高效性。與此同時,本文算法在不同數據集和支持度時總是選擇最優的迭代方法,有著很好的泛化性。
本文選用基于Spark框架的R-Apriori算法和YAFIM算法進行對比實驗。將上述兩種算法和本文算法基于T10I4D100K 數據集、BMSWebView2 數據集和 Retail 數據集,分別在支持度為0.1%、0.15%、0.5%的條件下進行對比,如圖15和圖16、圖17所示。實驗結果表明本文算法在不同數據集和支持度的條件下,挖掘頻繁集的每次迭代過程相較于兩種對比算法都表現出更高效的計算效率,進而證明本文算法有更好的高效性和泛化性。
將本文算法和對比算法基于Kosarak 數據集和BMSWebView2 數據集上,分別在支持度為0.1%、1%、10%的條件下進行對比,結果如圖18 和圖19 所示。實驗結果表明本文算法不同數據集和支持度的條件下,在挖掘所有頻繁集的總運行時間上優于兩種對比算法,進而證明本文算法在挖掘大數據中所有的頻繁集上,有更好的高效性和泛化性。
本文基于不同數據集,不同支持度進行對比實驗,通過上述實驗可以得到以下結論。在總挖掘效率上優于兩種對比算法;在挖掘頻繁集的每一次迭代根據數據集的特點總是選擇最優的方法;在挖掘頻繁集的每一次迭代都較好于兩種對比算法;綜上所述,本文算法有著較好的高效性和泛化性。

圖13 基于Musroom數據集、min_sup=0.3的條件,算法對比

圖14 基于Kosarak數據集、min_sup=0.1%的條件,算法對比

圖15 基于T10I4D100K數據集、min_sup=0.1%的條件,算法對比

圖16 基于BMSWebView2數據集、min_sup=0.15%的條件,算法對比

圖17 基于Retail數據集、min_sup=0.5%的條件,算法對比

圖18 基于Kosarak數據集不同支持度下算法對比

圖19 基于BMSWebView2數據集不同支持度下算法對比
本文算法圍繞如何提升算法計算效率進行改進。首先,基于SparkSql分布式編程,減少算法對單臺服務器造成的壓力,將數據集加載到DataFrame,解決了RDD內存資源和計算速度受限問題;其次,充分利用改進后的布隆過濾器存儲過程數據的時間和空間優勢來滿足挖掘過程中的存儲、查詢項集的需求;再次,基于先驗定理對事務、項目和項集進行精簡,同時通過對項集中項目對應事務集合求交集的方式計算項集支持度,進而提升支持度計算效率和避免大量不必要的數據存儲;最后,提出了兩種迭代算法和選擇條件,增強本文算法對各種數據集的泛化性。本文進行了多組性能實驗和對比實驗,實驗結果表明,算法有效提升挖掘所有頻繁集的效率,同時較好地提升每次迭代效率;每次迭代總是根據數據集特點選擇最優的迭代方式;通過多個并行算法評估指標驗證本文算法有著較好的泛化性和高效性,可以應用到更大規模的數據集和集群。