白玉辛,劉曉燕
(昆明理工大學 信息工程與自動化學院,昆明 650500)
大數據時代背景下,信用卡交易、傳感器測量、機器日志、網站或移動應用程序上的用戶交互等各種類型的數據都是作為事件流產生的,傳統數據挖掘算法處理這些大規模數據集的能力出現瓶頸,因此需要將傳統數據挖掘算法和現有大數據框架進行結合,改變傳統數據挖掘算法無法高效處理大規模數據集的現狀.支持向量機[1](Support Vector Machine,SVM)基于統計學習理論,在模式識別、文本分類和圖像識別等眾多領域表現出優異實踐性能的機器學習算法.SVM相比較其他常用的數據挖掘算法而言,在算法訓練過程中很少會出現過度擬合、屬性特征過多造成的維數災難對算法性能影響微乎其微、對核函數運用巧妙,可以讓算法處理數據集線性不可分的情況.但是,當傳統SVM算法處理大規模數據集時,會出現訓練速度慢,內存溢出,運行崩潰等性能低下問題[2-4].
針對傳統單機SVM算法面對大規模數據集處理效率低下等問題,最近幾年數據挖掘領域的專家主要使用兩種方法改進傳統支持向量機算法:1)采用“分而治之”的思想處理大規模數據集,該思想是將一個完整的數據集切分成若干訓練子集,在每一個訓練子集上訓練局部支持向量并剔除大量非樣本邊界樣本點,使其并行訓練,達到算法并行化訓練的目的;2)利用GPU或FPGA強大的矩陣運算能力來提高收斂速度.如Li等人[5]基于GPU設備設計實現了一個并行SVM智能分類系統,提高SVM預測及訓練階段算法執行的效率.胡福平等人[6]使用FPGA實現SVM并行計算結構,借助Verilog HDL程序設計語言完成了各模塊的結構設計,該結構實現的SVM的分類性能要略優于Libsvm.丁宣宣等人[7]利用MapReduce和Bagging的并行組合支持向量機訓練算法,采用Bagging集成算法的思想,結合隨機次梯度SVM算法對剩余的支持向量訓練,以提高算法的分類精度.劉澤燊[8]等人使用基于內存計算的分布式處理框架—Spark,實現基于Spark的并行SVM算法,克服了并行SVM算法在MapReduce模型中迭代計算的不足.李坤等人[9]和何經緯等人[10]則研究如何在Spark框架提高SVM參數尋優的時間.通過這些研究一定程度上可以解決傳統支持向量機訓練效率低下等問題,但隨著電子商務的高速發展,互聯網電商平臺的數據呈指數級別的增長,鑒于實時機器學習的思想提出,因此,需要使用實時處理框架來對傳統SVM算法做出改進,使其能夠適應框架,最終達到線下訓練,線上實時預測的目的.
2008年,柏林理工大學研發出一個大數據處理平臺,此為Flink[11]的前身,隨后在2014年成為了ASF(Apache Software Foundation)的頂級項目之一.Apache Flink是一個框架和分布式處理引擎,用于對無邊界和有邊界數據流上進行有狀態的計算,并且它可以用于Google數據流模型[12],同時Flink能在所有常見集群環境中運行,以內存速度和任意規模進行計算.目前互聯網領域的實時搜索、數據分析和機器學習等任務都可以在Flink平臺上運行.
本文對Cascade SVM算法和Flink計算框架進行了深入研究分析,采用數據切分的思想,實現了基于Flink平臺并行SVM算法,有助于向線下訓練,線上實時預測模型準確度.具體工作主要有:1)深入研究CascadeSVM算法的訓練結構及合并策略,設計基于Flink計算框架的并行SVM算法;2)對部分并行操作算子進行優化,通過廣播變量的方式將訓練過程中的局部支持向量發送給下一級算子,優化了原本中間結果需要寫到本地磁盤的過程.實驗對比表明,該算法能夠加快模型訓練的速度,保證精度損失較小的前提下,提高了算法執行效率.
Apache Flink(1)http://flink.apache.org一款能夠同時支持高吞吐、低延遲、高性能的分布式處理框架,在2010年-2014年間,由柏林工業大學、柏林洪堡大學和哈索普拉特納研究所聯合發起名為“Stratosphere:Information Management on the Cloud”研究項目,并于2014年4月,貢獻給Apache基金會孵化器項目,期間改名Flink.自2015年9月發布第一個穩定版本到現在為止,更多的社區開發成員逐步加入,現在Flink在全球范圍內擁有350多位開發人員,在國內比較出名的互聯網公司如阿里巴巴,滴滴等都在大規模使用Flink作為企業的分布式大數據處理引擎.在不久的將來,Flink也將成為企業內部主流的數據處理框架,最終成為下一代大數據處理的標準.
Apache Flink是一個集眾多具有競爭力的特性于一身的第3代流處理引擎.它支持精確的流處理,能同時滿足各種規模下對高吞吐和低延遲的要求,尤其是以下功能使其在同類系統中脫穎而出:1)同時支持事件時間和處理時間語義.事件時間語義能夠針對無序事件提供一致、精確的結果.處理時間語義能夠用在具有極低延遲需求的應用中;2)提供精確一次(exactly-once)的狀態一致性保障;3)在每秒處理數百萬條事件的同時保持毫秒級延遲,同時基于Flink的應用可以擴展到數千核心之上;4)層次化的API在表達能力和易用性方面各有權衡;5)支持高可用性配置(無單點失效),如Apache Kafka、Apache Cassandra、JDBC、Elasticsearch以及分布式文件系統(HDFS和S3)等.
現實世界中,所有的數據都是以流式的形態產生的,根據現實的數據產生方式和數據產生是否含有邊界(具有起始點和終止點)角度,將數據分為兩種類型的數據集,一種是有界數據集,另一種是無界數據集.Flink正是用于這兩種數據集上進行有狀態計算,其核心模塊是一個數據流執行引擎,主要是通過Java代碼實現的.其中有界數據集具有時間邊界,在處理過程中數據一定會在某個時間范圍內起始或結束,對有界數據集的數據處理方式被稱為批處理(Batch Process);對于無界數據集,數據從一開始生成就持續不斷地產生新的數據,因此數據是沒有邊界的,對無界數據集的數據處理方式被稱為流處理(Streaming Process).Flink最近提出了本地閉環迭代操作符[13]和基于成本的自動優化器,它能夠重新排序操作符,并更好地支持流,所以Flink 在處理迭代和實時數據處理問題時,能夠在保證穩定性和狀態一致性的同時提高系統的運行速度,支持數據高吞吐和低延遲等優點.
支持向量機SVM算法是建立在統計學習理論和結構風險最小化原理基礎上,其基本模型是在特征空間上尋找具有最大邊緣距離(Maximum Marginal)分類超平面的線性分類器,有效克服了傳統機器學習算法中出現維數災難、過度擬合和局部最小值等缺點.算法描述如下:給定線性可分訓練集T={(x1,y1),(x2,y2),…,(xN,yN)},其中xi∈X=Rn,yi∈Y={+1,-1},i=1,2,…,M;對于給定的訓練集,構造并求解約束最優化問題,然后引入拉格朗日乘子,對其參數求偏導,得到與原問題對應的對偶問題,如式(1)所示:


αi≥0,i=1,2,…,N
(1)

對于訓練樣本集線性不可分的情況,設置懲罰函數C,使分類器模型容忍部分噪聲點的同時避免了過擬合學習的情況,求解線性分類問題,線性分類支持向量機是一種非常有效的方法,但如果訓練數據線性不可分時,對偶問題和決策函數中有樣本點的內積計算,通過引入核函數將樣本從低維空間映射到高維空間,等價于隱式地在高維空間中學習非線性支持向量機,這樣的方法稱為核技巧,優點是實質的分類效果表現在高維空間上,使原問題變成線性可分,通過使用核技巧(knernel trick)及軟間隔最大化,學習非線性支持向量機.核函數包括多項式核、高斯核和線性核,一般的核函數表達式為:
K(x,z)=<σ(x)·σ(z)>
(2)
Deng Kun等人和Grafs等人提出了分組訓練和層疊訓練算法,目的是為了解決單機SVM處理大規模數據集出現訓練速度慢,內存溢出,運行崩潰等性能低下問題,但文獻[14]指出分組訓練SVM模型在分組數目分配過多的情況下,會導致訓練子集中的局部支持分布與原樣本集的分布差別過大,導致準確率下降;層疊訓練SVM模型在第一層訓練過程中剔除大部分非支持向量,隨后層級訓練過程中消耗大量的時間,且過濾的非支持向量較少,效率低,所以將這兩種算法結合使用訓練模型.算法基本思路是采用3層固定串聯訓練結構,層與層之間采用隨機合并的方式,首先將訓練樣本集隨機劃分成大小相等的樣本子集(TD1-TD8),在樣本子集上并行訓練SVM,將得到的支持向量合并再隨機劃分成大小相等新的樣本子集(TD9-TD12),同樣進行并行訓練,最后一次迭代將這4個訓練子集合并進行全局訓練得到全局支持向量機模型.算法的詳細流程圖如圖1所示,TDi表示第i個樣本子集,SVi表示在TDi樣本子集上訓練得到SVM模型.

圖1 層疊分組并行SVM訓練算法流程
第1層:根據大數據“分而治之”的思想,將樣本隨機劃分成大小相等的樣本子集(TD1-TD8),在各個訓練子集上并行訓練SVM模型,本層將濾掉大量非支持向量,同時剔除非邊界樣本.
第2層:將上層得到的局部支持向量先合并,然后再隨機劃分成大小相等新的訓練子集(TD9-TD12),并在各自樣本子集上并行訓練SVM模型.
第3層:將第2層得到4個訓練樣本子集合并,得到一個訓練集(TD13),然后進行全局支持向量的訓練,得到全局支持向量機模型,同時判斷是否滿足迭代停止條件.如果不滿足,將結果送回反饋,重復以上步驟,直到滿足收斂條件.
深入研究層疊分組SVM算法訓練策略,結合Flink計算框架的優勢,設計基于Flink的并行SVM算法.算法的訓練流程如圖2所示,在訓練模型之前需將訓練集樣本及測試集樣本上傳到Flink集群上,Flink設置批處理執行環境ExecutionEnvironment,通過readTextFile()方法讀取文件,Flink中的JobManager在集群中創建任務同時負責集群任務的調度以及資源管理.算法在Flink集群上進行分層的并行SVM訓練將局部支持向量進行合并,直至最后訓練完成,得到全局支持向量模型輸出到本地文件系統.

圖2 基于Flink的并行SVM的訓練流程
算法在Flink分布式框架上的并行實現首先搭建Flink集群,設置集群模式為standalone,上傳訓練數據集到集群,通過readTextFile()方法讀取訓練集并轉換成DataSet[String]數據模型,同時設置任務并行度setParalleism(int),代表將大規模數據集隨機切分成大小適中的獨立數據塊并分區.然后使用mapPartition()方法并行SVM模型訓練,過濾掉樣本子集中大量非邊界樣本同時保留局部支持向量SVi,大幅度減少后續訓練樣本數據的大小,加快了算法的訓練速度,同時第一層訓練完以后將局部支持向量合并再隨機劃分為大小相同的數據塊TDi,同時以廣播變量的方式發送給下次迭代,作為下一次迭代的輸入,最后得到一個數據集GTDi,在GTDi上訓練SVM,得到全局支持向量模型GModel.基于Flink的并行SVM算法的描述如算法1所示.
算法1.基于Flink的并行SVM算法
輸入:Training Datasets,FilePath
輸出:SVi,GModel
1.val env = ExecutionEnvironment.getExecutionEnvironment
2.val env.setParalleism(8)
3.val DataSet[String]←env.readTextFile(FilePath)
4.Do i=1,…,L
5.val SVi←DataSet.mapPartition(Train_SVM).setBroadcast()
6.val TDi←SVi.rebalence()
7.val SVi←TDi.mapPartition(Train_SVM).setBroadcast()
8.val GTDi←SVi.rebalence()
9.val GModel←GTDi.mapPartition(Train_SVM)
10.Until滿足收斂條件Return GModel
Flink有3種數據分區模式:Rebalance,Hash-Partition和Range-Partition并行操作算子.其中Rebalance根據輪詢調度算法,將數據均勻地分發給下一級節點,平衡了有些節點局部支持向量較少,有些節點局部支持向量過多的情況,有效利用集群資源,達到均衡負載的目的.其次Flink中廣播變量(Broadcast Variable)是算子的多個并行實例間共享數據的一類方法.廣播變量以集合的方式定義在某個需要共享的算子上,算子的每一個實例可以通過集合訪問共享變量,將局部支持向量以廣播變量的方式發送給下一級算子,減少兩個算子的網絡通信消耗,提高了算法訓練速度,但廣播變量存儲在TaskManager的內存里,其大小不易過大,所以需要通過第一層剔除大量非邊界樣本點后得到部分支持向量,才能將其以廣播變量的方式發送給下一級算子.
實驗使用LibSVM3.24工具,利用5臺PC機構建Flink集群,包括1臺Master節點和5個Slave節點(其中一臺機器即是Master節點又是Slave節點),集群的任務調度模式為standalone.硬件配置:CPU雙核,內存4GB,硬盤20G.軟件配置:集群搭建使用Flink-1.7.2-bin-scala_2.11,Scala選用Scala2.11,Java選用JDK1.8(Linux版本),操作系統選擇CentOS7.訓練集采用a9a,covtype.Binary兩個標準數據集(2)https://www.csie.ntu.edu.tw/~ cjlin/libsvmtools/datasets.a9a數據集是用來預測一個成年人的年收入,該數據集是一個二分類問題,類標號分別是-1和+1,該數據集擁有123個屬性、32562個訓練樣本和16281個測試樣例;covtype.binary數據集原始為森林覆蓋率數據,有581912個樣本,每個樣本具有54維特征.
5.2.1 使用廣播變量性能分析
首先使用covtype.binary數據集進行實驗,將數據集切分成8個大小相同的數據塊,然后再相同條件下使用廣播變量和不使用廣播變量的情況下,訓練不同規模的樣本集,相關結果展示在圖3中,可以看出,隨著樣本數量的增加,兩者的訓練時間都在增大,但是當樣本數量超過20萬時,使用廣播變量將局部支持向量轉發給下一級算子的訓練時間明顯少于不使用廣播變量,這是因為將局部支持向量保存為廣播變量是存儲在內存中,不用寫到本地磁盤,下一級算子直接從內存中讀取上一層局部支持向量繼續計算,減少了中間結果寫到本地磁盤和從本地磁盤讀取中間結果的時間,算法整體效率更高,極大地減少了合并的時間.因此最后采用廣播變量的方式將局部支持向量發送給下一級算子.

圖3 使用廣播變量
5.2.2 訓練結果與計算節點數量的關系
為了評估算法在不同計算節點數量下訓練時間和準確率的變化情況下,選用了a9a數據集進行實驗,不同規模的集群(集群中Slave節點的數量)在訓練時間取3次訓練的平均時間,準確率是通過訓練出的SVM模型對相應測試集進行預測.最終結果如圖4所示.
從圖4可以看出,隨著計算節點數目的增加,算法的訓練時間逐步下降,這是由于多個taskManager同時對算法并行訓練,雖然有效的降低模型訓練時間,但是同時增加了分區數據傳輸的網絡開銷.開始時,可以看見算法訓練時間急劇下降,這是單機與并行訓練最大的不同,當計算節點達到一定數量后,集群中每個taskManager計算時間趨于平穩,所以整個訓練時間下降會變得平緩;算法的測試準確率隨著計算節點數量的增加而降低,這是因為將數據集切分過多的數據塊,每個小數據塊與原始數據集的支持向量分布差別過大,所以導致訓練出來的全局支持向量與單機SVM訓練的結果不一致.平衡集群的計算能力與數據集切分大小的關系是至關重要的,我們不能一味追求計算速度的提升而忽略算法的準確率.

圖4 計算節點對測試精度和訓練時間的影響
5.2.3 算法運行時間對比
算法運行時間對比實驗采用covtype.binary數據集,因為此數據集數據量大,可以有效看出單機SVM算法、Cascade SVM算法和FL-SVM算法訓練的模型所需時間的差異,圖5展示了3種算法在不同規模的數據集上訓練所耗費的時間.當數據集較小時,3種算法訓練時間差別不是很大.隨著樣本規模增大,并行算法在集群上運行的優勢也就明顯,說明在大規模數據集下訓練支持向量的任務初始化和網絡開銷占整個作業運行時間的比重小,同時并行SVM算法會將樣本隨機均勻劃分,同時分發給集群中的計算節點上,從而節省大量的訓練時間.Flink的任務調度方式以及作業“流水線”的執行等等,使得FL-SVM算法在效率上要明顯優于Cascade SVM算法,能夠有效在大規模數據集上訓練中減少訓練時間并提高分類效率.

圖5 算法運行時間對比
5.2.4 算法準確率對比
為了驗證改進后的FL-SVM算法模型的準確率,本次實驗采用a9a和covtype.binary數據集進行驗證實驗,使用單機SVM算法、Cascade SVM算法和Fl-SVM算法對這兩個數據集進行訓練,并得到支持向量機模型,然后使用測試數據集對模型進行評測,得到測試準確率如圖6所示,從中可以看出FL-SVM算法在2個數據集的準確明顯高于Cascade SVM算法但略低于單機SVM算法,且誤差范圍不超過0.01.總體而言,FL-SVM算法的精度損失在我們能忍受的范圍內,同時其能有效減少訓練時間,避免內存溢出的等情況,所以采用FL-SVM算法處理大規模數據集是可行的.

圖6 算法的測試精確度對比
支持向量機算法從問世至今已有20年的歷史,在大數據時代背景下的今天,傳統的支持向量機算法已無法高效處理大規模數據集.本文對分組層疊SVM算法的并行訓練策略深入的研究,結合Flink計算框架,實現了一種基于Flink的并行SVM訓練算法,有效克服單機SVM算法在處理大規模數據集出現內存溢出,訓練效率低,訓練時間慢等問題.實驗結果表明FL-SVM算法在保證一定的準確率的情況下,能大幅度提高訓練速度,是SVM算法解決大規模數據集的一種有效的解決方案.
以后的工作中重點著手于Flink平臺的性能優化,以及實時機器學習等方面,將支持向量機算法結合Flink實時計算的低延遲、高吞吐、高性能的優勢,達到線下訓練模型,線上實時預測模型.同時,也會深入研究Flink平臺的參數調優,以達到更好的訓練效果目的.