梁璦云,袁 丁,嚴 清,劉小久
(四川師范大學 計算機科學學院,四川 成都 610101)
關聯規則算法是數據挖掘算法中的一個經典算法,其目的在于從海量的、價值密度低的數據中挖掘出潛在的、高價值的關聯關系[1],用以輔助用戶獲取有效信息。傳統的Apriori算法設計簡單、結構清晰、易于實現,盡管如此,該算法仍然有值得優化的地方:①大量臨時形成的無效項集引發內存溢出;②多次訪問數據庫引發系統I/O負荷失衡,甚至系統宕機;③采用單機環境實現,執行效率低、花耗時間長、實效性低。值得注意的是,現有的單機串行的算法運行模式已經無法滿足當前大數據時代背景下的需求。此時,MapReduce分布式并行處理框架的出現,正好為關聯規則算法的研究提供了新的思路和研究方向。本文主要針對關聯規則算法所存在的無效候選集多、單機運行時間短等問題,提出基于Spark平臺的Apriori并行優化算法(本文簡稱Apriori_MC_SP算法),利用矩陣結構,壓縮事務數據庫的存儲空間,減少了算法的掃描次數;同時利用矩陣存儲頻繁k項集,采用分組模式,直接生成局部頻繁k項集,簡化了頻繁k+1項集的生成過程;利用Spark分布式計算框架實現并行化處理,增加系統的可擴展性,提高了算法的執行效率。
隨著大數據分析技術發展越來越成熟,研究學者們開始嘗試將關聯規則算法與并行化技術相結合,針對已有的并行Apriori算法所存在的問題,提出了一系列的解決方案。
在國外,文獻[2]用MapReduce思想提出了一種的Apriori并行化算法,該算法伸縮性較好,但索引能力較差,僅適合處理小量的結構化數據集。文獻[3]提出一種有效的關聯規則算法——ScadiBino,將離散化的數據集轉換為二進制編碼,通過執行多個Map操作和執行一個Reduce操作,實現算法的并行化處理,提高算法的運行效率。文獻[4]提出一種基于Spark平臺的關聯規則算法YAFIM,該算法引入HashTree概念,對候選集存儲進行優化,節省存儲空間,提高算法的運算效率。
在國內,Guo Fangfang等[5]利用水平分割的思想,提出一種基于多叉樹的并行Apriori算法,該算法利用多叉樹生成候選項集,有效減少了中間節點產生候選項集的數量,降低了數據庫的訪問次數以及各節點間的通信次數,有效地節省了數據存儲的空間。Wang Qing等[6]針對在Hadoop平臺下MapReduce模型處理節點失效、基于磁盤讀取數據緩慢等缺陷,將優化后的Ariori算法移植到Spark平臺進行實現,該優化方法在一定程度上節省了內存存儲空間,但在Map過程中產生的大量中間節點存儲在內存中,造成資源浪費。Yan Mengjie等[7]基于Spark平臺提出一種改進算法——IABS算法,對事務數據庫的轉化過程和候選集產生過程進行優化,并與文獻[4]中提出的YAFIM算法進行比較,該優化算法的性能明顯提高。Niu Hailing等[8]提出一種AMRDD算法,利用Spark平臺基于內存計算的優勢,運用局部剪枝和全局剪枝策略,縮減生成候選項集的數量,但該算法仍然沒有解決產生大量無效候選集的問題。Xie Zhiming等[9]基于Hadoop平臺利用MapReduce框架提出一種Apriori_MMR算法,該算法結合數據劃分的思想,利用矩陣的特性,簡化候選項集的生成過程,從而提高算法的性能。
本文采用部分符號標識及其相關定義見表1。
本文對關聯規則算法進行優化,引入了矩陣的概念,其相關定義如下:
定義1 事務布爾矩陣B,矩陣的行表示事務數據集D中的記錄,列表示所有數據集中項的集合[15],其形式如式(1)所示
(1)


表1 部分數學表達式定義
定義2 某頻繁k項集{Ii,Ij,…,Ip} 的內積運算方式,詳細定義參見文獻[15],如式(2)所示
(2)
其中,(Ii,Ij,…,Im)?I,B(Ii)表示布爾矩陣中的某Ii列的數據。
定義3 某1項集{Ii}的支持度計數[14]sup_count滿足如式(3)所示

(3)
定義4 某多項集{Ii,Ij,…,Ip}支持度計數的求和運算如式(4)所示

(4)
定義5 支持度sup,即項集Ii在整個數據庫中所得比例,詳細定義見參見文獻[15],計算方式如式(5)所示
sup(Ii)=sup_count(Ii)/counts(D)
(5)
其中,counts(D)表示數據庫D的總事務數。
Apriori算法主要采用簡單的迭代技術,利用已知的頻繁k項集“自連接”生成候選k+1項集,通過逐層搜索統計候選k項集出現的次數對候選項集進行“剪枝”。其具體實現流程見表2。

表2 傳統Apriori算法的實現流程
從表2中可以看出,傳統的Apriori算法不僅會多次掃描初始數據庫,而且在頻繁k-1項集“連接”過程中會形成大批無用的候選k項集,占用內存空間,拖慢執行進度。為適應當前大數據時代背景,設計高效的算法優化方案是非常必要的。
本文所提出的Apriori優化算法利用“分治”思想,將大型數據庫切割成多個均勻的數據塊,針對各個數據塊轉化為布爾矩陣。該算法主要對頻繁k(k=2)項集生成頻繁k+1項集的過程進行優化。算法具體實現流程如下:
步驟1 掃描事務數據庫D,對數據庫中的數據進行均勻分塊,分割成n個相同大小的數據塊。
步驟2 當k==1時,將n個數據塊轉化為n個不同的事務布爾矩陣Bi,形如式(1)所示,生成局部頻繁1項集l_fi1,返回n個數據(Bi,l_fi1),i取值為1,2,3…,n。
在對數據塊進行轉化為布爾矩陣時,需注意,當生成局部布爾矩陣Bi,對矩陣進行簡約化處理,即判斷矩陣的每行、列是否均滿足所給條件(行的求和結果不小于k,列的求和結果不小于局部支持度計數),滿足則保留,不滿足則刪除,如圖1所示。

圖1 事務布爾矩陣生成過程
步驟3 當k==2時,連接局部頻繁1項集,直接生成局部頻繁項集l_fi2,采用矩陣進行存儲,結果返回(Mi2,l_fi2)。
對頻繁1項集中的各項進行自連接生成2項集的過程中,運用式(2)、式(3)計算某2項集的計數是否不小于局部支持度計數,滿足條件,則將該項集保存到二維矩陣l_fi2中,最后將l_fi2擴展為1維數組,去重后對分塊矩陣進行行列篩選,形成新的矩陣。
步驟4 當k>2時,首先將頻繁k-1項集進行排序后按照前k-2項進行劃分,并采用前k-2項屬性值作為塊標號;其次采用下述連接方法,利用式(2)、式(4)形成頻繁k項集,避免無效項集的產生,占用資源,返回(Mik,l_fik)。對該步驟進行迭代,直至滿足條件k,或出現頻繁項集為空時返回k-1時的結果值。
自連接方式主要為兩種:塊內自連接、塊間自連接。
(1)塊內自連接:當塊內項集的個數不小于1時,采用該連接方式執行,如圖2所示,以標號為“I2”的塊進行解析,(I2,I5), (I2,I7)連接生成(I2,I5,I7),采用式(2)、式(3)運算其支持度計數;滿足條件,則添加到頻繁k項集矩陣中。
(2)塊間自連接:當總塊數不小于1時,采用該連接方式執行,圖2中以(I2,I5)為例(此處k=3),采用后1個項作為查找依據,即“I5”去查找對應的分塊標號,然后連接生成(I2,I5,I7)、 (I2,I5,I8)兩個項集,在進行自連接的同時,需判斷該項集是否存在于l_fck中,若不存在,則需對矩陣中的相應列進行相關運算,運算結果滿足條件則保留至二維矩陣l_fck中。如圖3所示,當k=4時,劃分頻繁3項集后,應采用前兩項作為塊表示,后兩項作為連接依據。

圖2 生成局部頻繁3項集

圖3 局部頻繁4項集的形成過程
步驟5 根據相同的“健”(即局部頻繁項集)進行“值”(即局部頻繁項集的求和結果)相加,合并所有分塊的局部頻繁項集,生成全局候選頻繁項集fk,再根據最小支持度計數,進行全局剪枝。
Spark框架是專為大規模數據處理所設計的通用并行框架,引入彈性分布式數據集[10](resilient distributed dataset,RDD)概念,RDD是一個具有容錯性的數據集合,該數據被分成多個分片,存儲在集群的各個節點的內存中,當內存容量不夠時,這些數據會自動寫入磁盤。RDD具有兩種類型的操作模式:transformation(轉換)和action(動作),前者的主要目的是從現有的RDD數據對象中重新創建或重新計算,從而獲得一個新的RDD數據對象,例如map()、flatMap()、filter()、distinct()、groupByKey()等。這些轉換操作具有惰性,即在對RDD進行轉換操作時,該操作并不會立即執行,僅僅記住這些操作;后者則通過某種特定的方法將多個RDD對象進行合并,生成一個最終結果,如count()、collect()、foreach()、reduce()等函數。在程序執行過程中,當RDD調用Action函數時,才會執行相關的Transformation函數進行計算。
Spark系統以MapReduce框架為核心,主要針對Hadoop存在的問題而設計的一套框架[11],具有使用簡單、自動容錯、負載均衡、擴展性強、可靠性高等特點[1],相較于Hadoop框架,它主要有以下優勢:
(1)Spark實時性強,擅長處理流數據,而Hadoop則擅長處理批量數據。當隨機訪問數據時,Hadoop執行效率明顯低于Spark。
(2)Spark最大的優勢在于面向內存進行計算[12]。Hadoop平臺面向磁盤存儲數據,在調用數據時,Hadoop需要先將數據從磁盤中調入內存進行運算,而Spark直接在內存中進行計算,無需進行磁盤I/O操作,相比之下,內存處理速度明顯高于磁盤速度。
(3)Spark改進了shuffle過程,提升了Spark的運行速度。Shuffle是Map階段和Reduce階段交流的紐帶,在Hadoop平臺上,shuffle過程強制要求數據按key值進行排序后分發到Reducer上,而Spark平臺上的shuffle過程則是用戶在有需求的情況下才會對結果數據進行排序(即調用sortBy()或sortByKey())。兩者相比,Spark的設計更加合理有效。
基于以上幾點,本文將改進后的Apriori_MC算法移植到Spark平臺,通過執行k個Map操作以及一個Reduce操作過程,實現算法的并行化處理。在Map階段,Spark將數據分成多個分塊(分塊數個數視集群情況而定),將各個分塊分別部署到不同的工作階段中去,按照第2.3節中描述的Apriori_MC算法的執行流程進行部署實現;在Reduce階段,將在Map階段產生的分塊結果按照鍵值進行求和,獲得全局候選k項集;最后將通過filter()函數對全局候選項集進行篩選過濾,從而獲得全局頻繁k項集。整個算法的流程如圖4所示。
Apriori_MC_SP算法核心偽代碼如下:

圖4 Apriori_MC_SP算法執行流程
輸入: 存儲路徑path, 最小支持度min_sup, 所求頻繁項集數k, 分塊數n
輸出: 頻繁k項集fk
(1)某事務數據庫被存儲在path路徑下的文件中, Master工作節點利用textfile()讀取文件, 讀取后創建一個新的RDD, 該RDD中的數據包含n個數據塊。
rdd←sc.textfile(path, n)
(2)Map階段:
for t←1 to k do:
if t==1 then :
rdd← rdd. mapPartitions( fc1(split, min_sup, k))
if t==2 then :
rdd← rdd.map( fc2(split, min_sup, k))
if t>2then :
rdd←rdd.map( fcn(split, min_sup, k))
(3)Reduce階段:
fk←rdd.flatMap(_).reduceByKey(_+_).filter(_>= min_sup_count)
上述過程中主要涉及以下3個函數:
(1)函數fc1(split, min_sup, k):
該函數目的是構造簡約化布爾矩陣M1, 生成頻繁1項集f1, 返回結果(M1,f1), 相關實現如下:
begin
L1←sorted( unique ( flatten ( split) ) )
#擴展數據庫為1維數組, 同時進行去重、 排序
w ← len(split) # w為分片長度
h ← len(L1) # h為L1長度
sup_count←min_sup * w
MatrixM←zero(size=(w, h) ); #構建零矩陣
for m←0 to h do :
for m←0 to w do :
if L1[m] in split[n] then :M[m, n] ←1
for m←0 to h do :
sums←sum(M[:,m]) #對矩陣每列進行求和
if sums >=sup_count then: f1.add(L1[m])
M1← Mat_process (M,f1, k)
# 調用Mat_process為篩選行/列的函數
return (M1,f1)
end
(2)函數fc2(split, min_sup, k):
該函數的目的在于生成頻繁2項集, 返回結果(M2,f2), 相關描述如下:
begin
f2,f1,M← [], split [1], split [0]
for m←0 to len(f1)-1 do :
for n←m+1 to len(f1) do:
sums←sum(M[:,m]&M[:,n] )
l←[ [f1[m] ,f1[n]] , sums] #二維數組
if sums>=sup_count and l not inf2then:
f2.add( l)
M←Mat_process(M,f2, k)
return (M,f2)
end
(3)函數fcn(split, min_sup, k):
該函數主要針對當k>2時, 采用塊內自連接和塊間自連接兩種方式, 返回結果(Mn,fn), 相關描述如下:
begin
f,M, result ,fn←split [1][:,0], split [0], 1, []
gid=unique(f[:,0:k-2]) #對二維矩陣進行分塊標記
for each id in gid do :
group← select(f2[:,0:k-2]=id)
for each i←0 to len(gid)-1 do:
#塊內連接
for each p←i+1 to len(group) do:
l←sorted(group[i,:] ∪group[p,:])
for u in A:result*=M[:,u]
if sums(result)>sup_count then :
fn.add( [l, sums(result)])
#塊間連接
dev ← group[i][-(k-2):]
Group←select(f2[:,0:k-2]= dev)
for ecah item in Group do:
l← sorted( group[i,:]∪item )
for u in l: result*=M[u]
sums←sum(result)
if(sums>= sup_count) and [l,sums] not infn:
fn.add([l, sums(result) ])
M← Mat_process (M,fn,k)
return (M,fn)
end
假設在Spark平臺上單個分塊的事務數為D,其中事物項集的數量為M,共N個這樣的分片。
在單個分片中,采用傳統的Apriori算法,掃描一次數據庫以及獲得頻繁1項集的耗時O(MD),在頻繁k-1項集生成候選k項集的“自連接”過程中,所需耗時O(Lk-1*(Lk-1-1)),“剪枝”生成頻繁k項集所需耗時O(Ck)。遍歷整個數據庫計算候選頻繁項集Ck所需耗時O(Ck*D),傳統的Apriori算法平均情況下,所需的時間復雜度為

(6)
對Apriori_MC算法來說,當k==1時,其轉換矩陣以及生成頻繁1項集所耗時O(MD);當k==2時,“自連接”步驟直接生成頻繁k項集所耗時O(Lk-1*(Lk-1-1));當k>=3時,對頻繁二項集進行分組操作,在最壞的情況下,其時間復雜度為O(Lk-1*(Lk-1-1)),則該算法在最壞的情況下所需的時間復雜度為

(7)
對比式(6)與式(7),明顯傳統Apriori算法的時間復雜度高于Apriori_MC算法的時間復雜度。其原因是Apriori_MC算法掃描數據庫操作僅進行一次,忽略了局部“剪枝”過程,減少中間結果的產生,釋放部分內存,縮短了算法的時間復雜度。
本實驗部分將采用兩種數據集進行實驗測試,數據集特征見表3。
(1)數據來源于頻繁項集挖掘數據集知識庫[13]的數據集:Connect。
(2)數據有IBM數據生成器隨機生成數據。

表3 數據集特征
本文采用的實驗環境:6臺CPU Inter Core4、4G內存、1T、磁盤ubuntu14.04的操作系統的臺式計算機,每臺計算機通過交換機構建小型局域網,并利用操作系統提供的SSH服務進行免密通信。在搭建Spark Standalone集群的過程中,需要JDK1.8、Scala2.10、Hadoop2.6以及Spark1.6等軟件包的支持,本文實驗主要采用Python3.5進行實現,其依賴Pyspark、Numpy等庫文件。
本文實驗部分將采用3種不同的Apriori方案進行驗證:傳統的Apriori算法、來自文獻[14]的優化方案、以及前文設計的Apriori_MC方案,將這3種方案移植到Spark平臺部署實現,分別命名為:Apriori_SP、MApriori_SP、以及Apriori_MC_SP算法。
實驗1:支持度閾值的設定對算法的影響
支持度閾值的設置對關聯規則算法的執行速度和頻繁項集的數量會帶來一定的影響。若支持度閾值設定過高,產生頻繁項集數量過少,則無法更好展現算法的執行效果;若支持度閾值設置的過低,生成的頻繁項集過多,容易造成集群的中間節點運行失敗后再次計算,增加了集群的工作量。本實驗主要目的在于驗證支持度發生變化時不同方案的執行效率驗證以及輸出結果。該實驗采用Connect數據集進行驗證,輸出結果要求返回頻繁3項集,3種方案的執行效率見表4,頻繁3項集的數據見表5。

表4 支持度變化時3種方案的性能對比

表5 支持度變化時3種方案的輸出結果對比
在表4中可以得出,在數據集一直地情況下,隨著支持度的減小,3種方案的執行時間隨之增加。在表4中,傳統Apriori算法的執行時間最長,運行效率最低。將MApriori_SP與Apriori_MC_SP方案進行對比,當支持度sup>=0.4時,兩種方案的執行時間相差不大;當sup<0.4時,Apriori_MC_SP算法的運行效率明顯優于MApriori_SP方案。
從表5中可以得出,隨著支持度的降低,頻繁項集的數量也相應增加;同時,本文提出的Apriori_MC_SP方案與傳統的Apriori算法的頻繁項集在數量上保持一致。
因此,支持度的選擇是影響關聯規則算法執行效率的一個重要因素。
實驗2:不同數據集下算法的執行效率對比實驗
本實驗將采用100萬條的隨機數據集中分別提取20萬、40萬、60萬、80萬以及100萬條數據集進行測試,由于該數據集的事務數量過大,為避免結果產生的頻繁項集過少的情況,將最小支持度閾值設置為0.1,挖掘頻繁3項集,在數據集量不同的情況下,3種方案的執行效果如圖5所示。

圖5 不同數據規模下3種方案的性能對比
在圖5中可以得出,隨著事務數據集的規模逐步擴大,算法的執行時間也隨之增長,同時也反映出傳統Apriori算法比其它兩種方案的執行時間長,并不滿足當前大數據時代對數據分析的要求。當數據集的數據量小于40萬時,Apriori_MC_SP與MApriori_SP方案的執行時間相差較小,但當數據量越來越小時,Apriori_MC_SP方案的執行效率則快于MApriori_SP算法。
實驗3:Spark集群的工作節點數對算法的影響
該實驗的目的是驗證集群的工作節點是否影響算法的運行效率。該實驗選取隨機生成的60萬條數據作為驗證對象,最小支持度閾值設置為0.1,分別驗證不同工作節點下Apriori_MP_SP方案的執行狀況,實驗結果如圖6所示。

圖6 不同工作節點下的執行效率
從圖6中可以得出,在數據集不變的情況下,工作節點增加,集群可使用的內存資源加大,使得集群處理大數據量的能力加強,從而加快了算法的執行時間。圖中,當工作節點數大于等于3時,執行效率下降的速度明顯減緩。
綜上所述,支持度的設定、數據規模、Spark集群規模均會影響算法的執行效率。本文提出Apriori_MC_SP方案在保證輸出結果與傳統Apriori算法一致地情況下,在很大程度上提升了關聯規則算法的執行效率,且該算法更適用于大數據量的關聯規則提取。
本文提出的Apriori_MC算法,運用分治思想,基于Spark集群框架,實現了關聯規則算法的并行化計算,有效解決了數據庫掃描次數多、系統I/O負載量大、無效的中間局部頻繁項集多等問題,提高了關聯規則算法的運行效率。當然,在算法的實現過程中仍然存在一些缺陷,在實驗過程中,隨著數據量的增長,算法產生的中間結果越多,在采用Spark集群進行測試的過程中,未充分考慮實驗過程中數據傾斜所帶來的問題,造成各工作節點運行時間不穩定的現象。后期將著重對這些問題進行深層次的研究,以期獲得更理想的效果。