郭乃網 覃 晟 談子敬 曹滿亮
1(國網上海市電力公司電力科學研究院 上海 200437)
2(復旦大學 上海 200433)
隨著數字化技術的廣泛應用,為了保證在市場經濟環境下系統能安全、可靠運行,各種管理信息系統(MIS)、地理信息系統(GIS)、電網運行的實時信息系統等的廣泛應用,產生了海量的工商業運行數據。面對這種海量數據的增加,數據挖掘技術可以發現更深層次的規律。數據質量問題是數據挖掘過程中經常面臨的挑戰。目前的研究大多著眼于數據挖掘算法的探討,而忽視了數據分析前對數據質量處理的研究。一些比較成熟的算法對其處理的數據集合都有一定的要求,比如數據完整性好、數據的冗余性少、屬性之間的相關性小。然而,實際系統中的數據通常存在各種問題,很少能直接滿足數據挖掘算法的要求。同時海量的數據中無意義和錯誤的成分也普遍存在,嚴重影響了數據挖掘算法的執行效率。因此,如何提高數據質量已經成為數據挖掘技術應用中的關鍵問題,引起了學術界的廣泛關注[1-2]。
基于數據約束的數據修復技術通過將數據集修改為滿足數據約束的形式,來提高和改進數據質量。本文重點研究基于字典序次序依賴(Lexicographical Order Dependency)的數據修復問題。字典序次序依賴[1-2]定義屬性列表間的次序關系,因而在SQL的Order by等優化中取得了良好的效果。要充分發揮字典序次序依賴的作用,數據集必須滿足給定的依賴定義。這提示我們需要通過數據修復將存在錯誤和誤差的現實數據修復成為正確的滿足約束的形式。然而,面對大量數據的修復問題,已有的集中式修復算法在運行效率方面有所欠缺。特別是在面對電力數據等海量規模的數據時,已有方法難以在可接受的時間內完成修復任務。通過分布式修復技術,我們一方面旨在處理由于單機的存儲限制,無法有效處理的大數據集,也希望利用分布式的計算方法,通過利用更多計算資源來提升算法在大數據下的運行效率。
針對海量字典序次序依賴的數據修復問題,本文提出一種基于Spark分布式計算框架的字典次序依賴數據修復算法,為數據質量的提高提供一種高效可行的處理方式。該方法充分利用分布式計算的特點,通過將數據和計算分布到不同計算節點,以提高運行效率。
本文提出的分布式計算在通用的真實數據集上進行實驗驗證。實驗結果顯示本文算法可以有效地提高計算效率:實際計算時間消耗與數據規模成亞線性關系;并行度的提高顯著地減少了計算時間消耗。

字典序次序依賴是定義在屬性列之上,在依賴的左右兩側,不像函數依賴等那樣是屬性的集合,而是屬性的有序列,因此字典序次序依賴性質非常特別。我們首先通過表1中的例子,來引出字典序次序依賴的具體內容。

表1 考慮如下與環球旅行有關的數據表r

根據語義,可以得出如下三個次序依賴:



第一條表示,隨著編號的遞增,航班的日期也會遞增;第二條表示,航班的日期遞增,會使得總消費遞增;而第三條則是函數依賴“城市決定國家”City→Country的次序依賴表述形式,將函數依賴的左側屬性作為字典序次序依賴的左側屬性列,而將函數依賴的右側屬性加在左側屬性之后,作為次序依賴的右側屬性列,則可以得到由字典序次序依賴表示的函數依賴。
根據這三條字典序次序依賴,可以看出表1中的部分元組并不符合這三條次序依賴。例如第五行的[Month,Day]與其他四條元組都同時違背了OD1和OD2。第三行和第四行的City相同,而Country不同,在OD3上產生了沖突。若將第五行的[Month]改為3,并將第四行的[City]改為BUR,則三條字典序次序依賴都得到滿足。

修復過程中修改一個元組中一個屬性的值,需要的代價為1。為了保證修復后,左側屬性值相同的元組右側屬性值也相同,可以在左側屬性上構建等價類,即所有在A上相同值的元組,構成一個等價類,并且從小到大可以賦予編號ecno。在同一個等價類中,右側屬性B上的值可能不同。如果有w個元組的B值相同,則在最終修復后,這w個元組的B值要么都被修改為同一個新值,要么不變,前者對應修復代價為w,后者對應修復代價為0。而根據次序依賴的定義,最終需要在A上遞增時,B上也要遞增。A上的遞增可以由ecno來反映,而B就無法簡單用編號來衡量或者做標記,因此B上的值需要保留在具體的修復過程之中。算法第一部分,需要將分布式的數據,轉變為由相同左右屬性值的元組組成的最小單元minUnit的集合,每個minUnit中需要含有三個元素:
(1)value=v:表示這些元組的B值。
(2)value=e:表示這些元組的A值在從小到大排列下的序號。
(3)weight=w:表示這些元組的個數,即修復這部分元組需要的代價。
其計算流程如圖1所示。輸入:數據表(這里每個元組由兩個屬性值A和B,用空格分隔);輸出:分布式下的minUnit。示例參加見圖2。

圖1 Spark下disMake_minUnit流程

圖2 Spark下disMake_minUnit示例圖
Spark的特點是對數據不斷進行變化操作,因此偽代碼和圖1都較抽象,因此這里給出一個示例圖。圖2給出RDD的數據流的詳細變化過程,并對每一步操作作出解釋。
(1)-(2):從HDFS中讀入元組,并轉化成鍵值對(ai,bi)的形式。
(3):將相同a值的元組進行聚合,轉化為(ai,[bi1,bi2,…])。
(4)-(5):根據a值排序,并給出等價類編號,轉化為(ai,[bi1,bi2,…],ecnoi),此時ecnoi的大小已經可以代替a的大小順序。
(6):重新將聚合的bi數組拆開,附上ecnoi的信息,轉化為(ecnoi,bij,1),最后的1為了后一步的統計做準備。
(7):對所有相同的(ecno,b)的元組進行聚合,并且更改內部順序為(b,ecno,weight)的格式,由于這里的b對應的是minUnit的value值,所以此時已經產生了需要的minUnit單元格式(value,ecno,weight)。
(8):最后對所有的minUnit進行排序,先根據value值從小到大,value相同的則根據ecno從小到大。具有相同value和ecno的元組會被匯聚在一起,不會存在兩個minUnit的value和ecno均相同,minUnit的順序固定且唯一。
至此,minUnit分布式存儲于各個Partitions之中等待后續使用。
算法的第二步是在構建的minUnit中找到若干個“可靠的”數據,以方便在第三步中根據不動單元進行數據分割,在分割后的每一段數據上進行修復。“可靠的”數據是指“極有可能未被污染的干凈數據”,因此有以下特征:
(1) 所有不動單元上的value隨著ecno單調遞增,即滿足次序依賴。
(2) 每一個不動單元的權重盡可能大。
(3) 不動單元在整個數據集上的分布盡可能平均。
集中式算法可以采用動態規劃,而在分布式啟發式算法中,我們采用遞歸算法和二分算法進行替代。
計算流程如圖3所示。輸入:分成p個部分的minUnit,均衡系數α,篩選層數f,value最小值ub,value最大值lb;輸出:不動單元序列L。

圖3 Spark下disMake_fixedUnit流程

(2) 在每一個需要進行篩選的Partition之中,挑選value在最小值和最大值之間,并且具有最大weight的minUnit作為這個部分的最佳minUnit。如果有多個minUnit都有最大weight,則隨機挑選一個;如果沒有minUnit的value符合上述條件,則可以無候選minUnit。
(3) 將分布式下各個部分得到的候選不動單元收集到主節點。
(4) 模仿(2)中的算法,在收集后的候選不動單元中,找到weight最大的,作為此次算法找到的不動單元FixedUnit,其value值為vf;在多次遞歸之后,可能會出現不存在不動單元的情況,此時說明所有的數據都需要進行調整,而調整的值可以由ub或者lb確定,因此可直接返回。
(5) 將原先的分布式minUnit分成兩部分,排在FixedUnit之前的作為upper_minUnit,而排在FixedUnit之后的作為lower_minUnit,兩部分的Partition數量可以適當減少。對于兩部分拆開的minUnit,分別遞歸執行disMake_fixedUnit算法,算法參數為:對于upper_minUnit,其value最小值為ub,value最大值為vf,篩選層數為f-1;對于lower_minUnit,其value最小值為vf,value最大值為lb,篩選層數為f-1。兩部分同時進行并行操作。
(6) 在兩部分分別得到不動點序列后,將兩部分序列和選出的FixedUnit有序合并,得到不動點序列,進行返回。
算法初始輸入中ub和lb為負無窮大和正無窮大,只要保證在B列的值域兩側即可。初始α可以決定不動點的分布,α偏大,則不動點更集中于中間,時間上會得到優化,但是有可能會使得原本最佳的不動點被直接過濾;α偏小,則二分遞歸時,時間可能不平均。初始的f決定了最后不動點的數量,由于運行到后期時,可能得不到不動點,所以最終程序返回的不動點序列,其數量小于2f。
在得到n個不動單元之后,可以對數據進行重新分組,根據ecno在不動單元之間的位置,將所有的minUnit分成n+1組,除了首尾兩組,每一組均存在于兩個不動單元之間。根據不動單元給出的上下界和文獻[5]中給出的集中式修復算法,對minUnit進行修復;而對于第一組數據,沒有下界,可視作下界為負無窮;對于最后一組數據,沒有上界,可視作上界為正無窮。算法1給出修復部分的算法。
算法1OD修復DisFix
輸入:不動點列表,分布式minUnit。
輸出:修復后的minUnit。
1. Repartition minUnits according to FixedUnit
2. //parallelly,ineach partition
3. rep←α NULL List
4. for mU∈mniUnits do
5. if (mU value 6. mU value←lowerbound 7. remove mU and add mU into rep 8. else if (mU value>upperbound) then 9. mU value←upperbound 10. remove mU and add mU into rep 11. use Fix to repair mniUnits 12. Return rep+mniUnits 算法1主要側重的是后半部分的修復,而Spark的RDD數據流操作只在前兩行出現,后續的步驟都是并行運行于每一個Partition之中,因此這里給出偽代碼。第1行minUnit進行重新分組,之后的第3至12行在每一個Partition運行。首先建立一個空列表(第3行),用于存放數值超越上下界的minUnit,對于這部分,直接修改成對應的值即算修復完畢,從原列表移動至rep之中(第5至10行)。這里要注意,重新分組后分成的n+1組數據中,除了首尾兩組,其余數據都有上下界,而對于第一組數據,沒有下界,可直接跳過第5至7行;對于最后一組數據,沒有上界,可直接跳過第8至10行。對于列表中剩下的數據,值處于上下界之間,因此在該節點之中,可以視作為單機的od修復,因此可以使用文獻[5]中的修復算法。 實驗使用的是一個真實數據集,關于美國航班情況的數據集,這個數據集也在文獻[2,5]中被使用。本次實驗使用的分布式環境為Spark,版本為2.0.0,主要啟動參數見表2。一共進行了三組實驗,分別探討時間與元組數目、分塊數目的關系,以及均衡系數α對于整個修復算法的影響。默認實驗條件為,元組數目20萬行,屬性4列,在實驗前,對5%的數據添加誤差,觀察最終的修復情況;而在分布式的參數中,默認分塊數目p=10,均衡系數α=0.2。由于分布式下的修復算法并沒有其他替代算法,因此只是對自身的實驗結果進行分析。 表2 Spark啟動參數表 實驗1探究的是,不同元組數目下,算法需要的時間。元組數目從10萬行增加至40萬行時,需要的時間變化情況如圖4所示。由于數據量非常大,并且分布式下的算法為啟發式算法,所以無法用文獻[5]中的集中式算法進行時間和效果上的比較。 圖4 實驗1的實驗結果 可以看出,元組數目增加會導致總時間的增加,但是增加的時間并非線性增加,而是獲得了亞線性效果。主要原因有以下幾點:首先算法復雜度理論上是線性,計算時間隨著元組數目增加,線性增加。但是隨著實際數據集的變大,在構建等價類的這一步之中,可能會隨著重復數據的增加,改變了每一個minUnit的權重,而對minUnit的數目并沒有產生太大的影響,由此造成整體的曲線偏離線性關系。另一方面,當進入算法的第三部分后,需要依賴于文獻[5]中的Fix算法,這個算法復雜度是O(nlogn),不過在此之前,有部分的minUnit會因為處在這一個分塊的數據范圍之外而被直接修改,因此實際參與到Fix的數據個數也有限。如果對于一些數據,雖然在初始時增加了誤差,但因為加了誤差之后還是沒有違背次序依賴,也就不需要被修復,因而總時間下降。 實驗2探究的是在選擇不同的并行分塊個數時,需要的時間變化情況。分塊數目從5增加至40,時間與分塊數目之間的關系如圖5所示。 圖5 實驗2的實驗結果 可以看出,分塊數目增加,計算時間減少,基本呈線性下降的關系。這種線性關系是當前大數據分布式計算框架的重要特征,因此說明本分布式算法的設計是合理的。從并行的角度來說,并行數目的增加,可以導致每個機器上的運算量減少,并且數據分配時,數據可以分配得更均勻。而在本文算法中,并行數目的增加,首先使得算法第一部分minUnit的生成速度加快,另一方面,第三部分的數據修復需要和分塊數目同樣大小的不動單元,而不動單元的數目在第二部分中會隨著遞歸層數的增加指數級上升,因此當并行數目上升時,只需在第二部分中增加很少的遞歸深度,便能夠滿足第三部分的不動單元的數目要求。顯然,計算時間并不會隨著并行數目的增加無限制地減少,甚至可以根據并行算法的時間來猜測,繼續增加并行數目,甚至有可能使得并行時間不降反升。主要原因在于忽略了一些基礎時間并不會完全被并行算法分擔,例如算法中,主節點上的不動點的篩選;同時,并行數目的增加會使得中間數據轉移過程中的數據量增加。 實驗3研究均衡系數α對于算法時間和修復質量的影響。實驗3分成兩個部分,當α從0增加至0.4時,測試整個算法的時間;另一方面,由于實驗前在數據中增加了一定的誤差,因此統計在修復算法結束以后,被修改的單元格數目,作為整個算法的修復質量。數據共有20萬行,4個屬性,而有5%的數據被加入了誤差,因此理論上的錯誤單元格數量為40 000。但要注意的是,這里的錯誤數據可能被加入到了次序依賴的左側屬性列之中,而修復算法只是修改右側的數據,使得修復后的結果滿足給出的次序依賴。圖6為實驗3的結果。 (a) α與時間的關系 從圖6(a)中可以看到,隨著α的變大,整體的修復時間是變小的。當α較小時,α的增加會大幅縮減算法時間。這與算法中的相關分析是相符的。設計均衡系數α的初衷,便是使得在二分確定不動單元時,盡可能使得不動單元靠近整體數據的中間,一方面能夠使得下一部的二分算法的兩個部分數據規模相近,另一方面也是使得算法中,在每個子機器中使用Fix算法修復數據時,各個子機器分得的數據規模相近。在分布式中,并行時間取決于自節點的并行時間最大值,因而合理分配各個子機器的數據量是非常重要的。在增加α的同時,可能會造成數據修復質量的下降,因為α的增大有可能會造成可靠的不動單元由于處于數據的兩端而直接被篩去,因此需要考慮α對于修復質量的影響,探討α的增大是否會增加被修改的單元格數目。不過實際情況下,從圖6(b)中可以看出,α的增加并沒有使得更改的單元格數目大幅上升,而是幾乎保持不動。由此可以推斷,雖然α增加,但是每一次選擇的不動點單元依舊保持著比較高的質量,沒有被污染。從這里可以體現出,算法對于數據誤差的高容納性,并且修復效果很好。另外注意到雖然有40 000個單元格被污染,但實際的單元格修改數目大致為47 016,超過最小修改值約17%。這樣的結果是可以被接受的,因為算法是針對右側屬性列進行修改,而如果某條元組的次序依賴左側屬性值被誤差干擾,則可能會需要修改多條元組的右側屬性值來消去此誤差對于次序依賴的影響。 為了提高數據質量中字典序次序依賴修復計算的效率,本文基于SPARK分布式計算框架提出新的字典序次序依賴分布式修復算法,通過數據和計算的有效分布,算法有效改善了大數據集上的運行效率。文章通過實驗驗證了本文方法的有效性,并說明了算法參數的意義和效果。4 實 驗
4.1 實驗數據

4.2 時間與元組數目

4.3 時間與分塊數目的關系

4.4 均衡系數的影響

5 結 語