高澤+李常寶+楊淙鈞+劉忠麟+艾中良


摘 要: 多表關聯查詢是進行數據挖掘與分析的有效技術手段。隨著大數據時代的到來,當前的數據分析技術在進行海量數據多表聯查操作時存在明顯的性能瓶頸,為此提出一種基于MapReduce計算模型的多表聯查算法UGS用以提升多表關聯查詢效率。實驗表明,在海量數據背景下,該算法的查詢效率明顯優于大數據領域的SparkSQL,Hive及關系型數據庫的MySQL。
關鍵詞: MapReduce; 多表聯查; 關聯空間剪枝; Spark
中圖分類號: TN911?34 文獻標識碼: A 文章編號: 1004?373X(2015)14?0081?04
在當今的生產生活中,圍繞著每個人每件事都會產生大量的數據,而這些數據往往是分布在不同的數據文件中,想對這些數據進行處理分析就必然要用到多表聯合查詢,聯合查詢在實際的生產生活中非常有必要。當前的多表聯合查詢主要通過兩種方式實現:一種是基于傳統數據庫的表JOIN方式,這種方式存在數據規模瓶頸問題,無法支撐大規模數據關聯;另一種是基于大數據技術的多源數據融合[1]方式,雖然能夠解決關聯查詢在數據規模方面的瓶頸問題,但在運行效率方面存在較大的優化空間,目前難以滿足交互式查詢需求。因此,針對當前多表關聯查詢領域存在的問題,本文提出了一種基于MapReduce計算模型[2]的新型多表聯查方法;實驗表明,在解決多表關聯數據規模瓶頸的基礎上,較當前大數據領域的多表關聯模式能夠顯著提升運行效率。
1 相關工作介紹
當前多表關聯查詢主要借助2種方式實現:關系型數據庫方式和分布式并行計算方式。下面通過一個關聯查詢實例對2種實現方式進行復雜性分析。假設2張待聯查的表table1和table2,其中table1的數據量為C1條,table2的數據量為C2條。要求輸出table1.Key =table2.Key的條件下2張表的所有行,即:“SELECT * FROM table1 INNER JOIN table2 ON table1.Key=table2.Key”。
1.1 關系型數據庫的實現
傳統關系型數據庫的多表關聯查詢采用基于關聯條件的集合相乘思路實現[3],針對table1的每行數據,在table2中對其關鍵字(Key)進行查找,如果找到滿足條件的數據,那么把它們組合成一條新數據存儲到結果數據集中。此時數據庫需要處理的條數為C1C2,也就是說時間復雜度為O(C1C2)。在此模式下,兩表規模均為10萬條數據時其響應時間已經達到5 min以上,兩表規模達到百萬條時,運行2 h仍未得到結果。
1.2 分布式處理引擎的實現
分布式并行計算實現方式主要基于大數據技術體系中的MapReduce模式展開,目前方法主要有Hive[4],Spark[5]兩種方式。
1.2.1 MapReduce編程模型概述
MapReduce編程模型以(Key,Value)元組為基本單位展開數據處理,整個處理過程分為Map、Reduce兩個階段:Map階段處理輸入數據并將處理結果基于Key值通過哈希計算映射到Reduce處理節點;Reduce階段處理本地數據并輸出結果。由于相同Key值的哈希計算結果是確定的,因此,每個Reduce處理節點上完整保存了該key值的所有數據,編程人員在只需在每個Reduce節點處理本地數據即可完成對全局數據的處理。
1.2.2 分布式處理引擎執行多表聯查
Hive提供SQL查詢接口,通過對用戶輸入的查詢任務進行語法樹解析,將SQL查詢轉化成Hadoop的MapReduce任務集,基于MapReduce展開數據處理[6],由于MapReduce存在中間數據磁盤讀寫瓶頸,從而在很大程度上限制了Hive的執行效率。Spark分析引擎針對Hadoop的MapReduce中間數據磁盤讀寫瓶頸基于內存計算展開優化,使得同樣功能的任務在大部分情況下比Hadoop執行效率更優,Spark在執行多表關聯查詢時采用優化的笛卡爾積關聯算法,雖然性能較傳統的笛卡爾積有所優化,但是復雜度依舊為笛卡爾積的O(C1C2),并且空間復雜度為O(C1C2)。Hive在進行數據關聯查詢時,單作業單機數據規模超過2 000 000×10 000 000時,執行時間在1 min以上,存在較大的優化空間;Spark對Hive的執行過程進行了基于內存的執行效率優化,但關聯計算過程存在內存占用不可控的問題,當單作業單機數據規模超過20 000 000[×]100 000 000時,會因內存溢出導致關聯查詢無法完成,數據規模相對較小時也存在一定的運行效率優化空間。
因此需要設計一種空間膨脹相對可控,并且時間復雜度更低的算法來提高海量數據多表關聯效率,從而提升海量分析能力。
2 算法的設計與實現
2.1 算法思路
本算法主要借助MapReduce計算模型展開,在Map階段,對各表記錄添加來源標記,并將各表數據采用相同的散列算法進行映射分發,使各表相同的Key值被集中到相同的處理節點上;在Reduce階段,基于各表標記進行關聯結果篩選,本地化獲取關聯查詢結果集。算法介紹如下:
算法名:UGS(Union Group and Segmentation)算法。
輸入參數:參與關聯查詢的表路徑及關聯條件集,關聯查詢結果輸出路徑。
輸出數據:關聯查詢結果。
執行步驟:在上述實例中,算法在集群上的執行過程如下:
(1) 在Map階段通過數據格式變換,將參與關聯查詢的各表數據統一為相同格式。將聯合查詢條件中的Key單獨抽取出來,其他數據存放在OtherRecord中,并添加標記以記錄來源的TableID,Map階段輸出為 (Key,TableID,OtherRecord)。
(2) 在Reduce階段,輸入Map階段的輸出結果,對Key值相同的記錄進行關聯篩選,如果某個Key存在于所有表中,那么是1條或多條(可能存在1個Key在某一table下存在多行)有效的結果。并將結果按表格式處理后輸出。
算法首先需要遍歷數據,對每條數據通過Key計算出Reduce標識,Reduce端完成數據收集后,在每個Reduce內通過排序將Key相同的記錄整合在一起然后進行檢索條件的完備性判斷。在這種計算模式下,假設有N張表參與聯查,第i張表的數據量為[Ci],共有M個Map和R個Reduce參與并發計算。令[Sum= 1NCi],那么算法的期望時間復雜度為[O(SumM+SumR×log2SumR)],空間復雜度為O(Sum)。如在兩表聯查下,時間復雜度為[OC1+C2M+C1+C2R×log2C1+C2R],空間復雜度為O(C1+C2)。在最壞情況下,即多張表內的Key列所有數據只有一個相同的值X,那么此方法的結果會退化為笛卡爾積結果,時間復雜度會退化到[O(1NCi)]。但是,在實際條件下很難有這種情況發生,并且如果關鍵字完全相同,那么結果數據集的數據量為[1NCi]條,此次聯查不論在任何方法下復雜度都不會小于[O(1NCi)]。可以看到,由于復雜度數量級不同,在表規模較大,并且關鍵字離散的條件下,本算法的執行時間相較于笛卡爾積優化算法會大幅縮短,并且很好地解決了空間膨脹問題。
2.2 基于Spark的算法實現
本文基于大數據分析引擎Spark展開算法實現,首先介紹Spark相關的幾個概念和操作:
SparkContext:Spark程序的入口,可以在聲明時定義各種系統參數,如集群主節點位置,單個任務使用的最大內存量,需要核心數等等。
RDD(Resilient Distributed Datasets):彈性分布式數據集,它是Spark系統提供的一種分布式內存抽象,可以支持基于工作集的應用,同時具有數據流模型自動容錯,位置感知調度和可伸縮性的特點。它允許用戶在執行任務時顯示的將工作集緩存在內存中,后續的操作能夠重用工作集,極大地提升了執行速度。
TextFile:讀取本地或者分布式文件系統的數據并生成RDD。使用方法為RDD=sparkContext.textFile(FilePath)。其中FilePath為字符串類型,可以為本地文件路徑或者hdfs路徑。
union:將相同格式的兩個RDD合并為一個,使用方式為RDD.union(OtherRDD)。
GroupByKey:是將數據按Key排序,并將相同Key的所有其他數據合并為一個List。使用方式為RDD.GroupByKey()。
算法實現如下:
輸入:結果輸出路徑OutputPath,多張表詳細信,每張表以(表路徑,關鍵字列的列號)二元組形式描述。
輸出:以文件形式返回分布式文件系統。
實現步驟:
(1) 讀取數據并將每張表的數據處理為統一格式。使用Spark調用hdfs數據的系統接口TextFile從分布式文件系統中讀取數據,對于每張表所對應的文件,生成文件的惟一標識(TableID)并添加到文件的每行數據之內,再通過Map操作處理為固定格式的數據,即RDD(String, (String, String)),存儲的數據為(Key, (TableID, OtherRecord))。
(2) 將多張表的數據合并到一起。由于經過步驟(1)處理后數據格式相同,可以使用RDD的union操作來進行合并,這樣合并后的數據可以使用Spark本身提供的方法GroupByKey來對數據進行處理。
(3) 使用GroupByKey將關鍵字相同的數據合并為一條記錄。即將Key相同的數據行中的(TableID,Record)放在一個List下。
(4) 檢索數據,剔除不滿足條件的數據。對步驟(3)執行過GroupByKey操作的數據,對每一行數據根據用戶需求的連接方式進行數據的整理刪除,如INNER JOIN就是對每一個Key判斷該Key對應的數據是否包含所有表的內容,如果是則是滿足條件的結果,如果缺少某張表的數據,那么便不滿足要求,對其進行刪除操作。而LEFT JOIN和RIGHT JOIN等則只要存在指定表的數據就會被保留下來。
(5) 將符合條件的數據拆分還原。由于某些表中,相同的Key可能存在多條數據與之對應,需要將這種數據還原、補全成多條。如在在學校學生的數據庫中,同一姓名“A”可能對應著多個學生,這樣在與其他表進行以姓名為關鍵字的聯查時,“A”的結果數據應該為多行,而由于GroupByKey操作會將這些數據化為1行,所以需要進行拆分,將之還原為多行“A”。而在實現上,對每行數據生成若干的ArrayBuffer,然后將這些ArrayBuffer進行全乘就可以獲得拆分后的結果。
(6) 將最終數據存入文件系統中。
3 對比實驗
實驗環境:
集群硬件:5臺實驗機組成的集群環境,其中主節點擁有4核心16 GB內存,4臺從節點擁有4核心4 GB內存,每個核心擁有3.4 GHz的主頻。
軟件部署:操作系統為Ubuntu 12.04;MySQL為MySQL Ver14.14 Distrib 5.5.29;Hadoop集群為Hadoop?2.2.0;Spark集群為Spark?1.1.0;Hive為0.12;Scala為2.10.4。
實驗方案:本算法需要與現有的關系型數據庫、傳統分布式文件系統處理方案進行橫向對比,在關系型數據庫可接受的數據范圍內,做出數據量從小到大的對比實驗;并在數據規模較大的前提下,與SPARKJOIN[7]和Hive進行對比實驗。
與關系型數據庫對比實驗:設計5組不同的數據規模的數據,每組數據由2張表構成。其中關系型數據庫使用INNER JOIN命令進行查找。由于聯查需要生成2個表條數相乘的中間數據集,所以在規模分別為10萬條與100萬條數據的兩表進行聯查時,會生成[1011]規模的數據,并在1 h內無法返回結果。故5組聯查數據數據量分別為(1 000[×]1 000),(1 000[×]10 000),(10 000[×]10 000),(10 000×100 000),(100 000×100 000)。
與傳統分布式文件系統處理方案的比較:由于數據分發、I/O等條件的限制,分布式文件系統處理數據有一定的數據傳遞時間,所以在小規模數據處理方面,數據分析時間占比較少,所以需要在一定規模的數據下進行橫向對比。因此設計5組數據進行對比實驗。聯查數據規模分別為(104[×]104),(105[×]105), (106[×]106),(106[×]107),(107[×]107)。2張表的格式如表1所示,其中待聯查列均是ID列。
表1 表結構
3.1 UGS算法與常用關系型數據庫比較
UGS算法與MySQL比較如圖1所示。表2為UGS算法與MySQL執行時間對比。
可以看到,由于磁盤I/O、網絡I/O、任務劃分、數據分發收集需要占用一定時間,故在數據量較少的情況下,傳統的關系型數據庫仍有著較大的優勢,但是在數據量增大時,中間數據集每增大10倍,關系型數據庫所需處理時間都會增大約10倍,在對一張100 000條記錄的表與1 000 000條記錄的表進行聯查時,MySQL運行了1 h仍未返回結果。而本文的UGS算法在數據量較小時,雖然也需要進行幾秒的查詢,但是增長穩定,在2張100 000條與100 000條的表進行聯查時,效率比MySQL提升了將近100倍,在數據量繼續增長的條件下,將會有著更大地提升。
圖1 UGS算法與MySQL比較
表2 UGS算法與MySQL執行時間對比
3.2 UGS算法與其他大數據平臺實現的比較
UGS算法與Hive,SPARKJOIN比較如圖2所示。而對于傳統的大數據方案,SPARKJOIN相對于Hive優化了任務分發收集等步驟,所以效率相差穩定為10~20 s之間,而UGS算法相對SPARKJOIN和Hive來講,由于算法復雜度的優化,所以隨著數據規模增大,處理效率相較于Hive和SPARKJOIN有著較大地提升。
3.3 UGS算法在多表聯查下與當前大數據實現方案的比較
對于多表聯查而言,由于UGS算法本身的復雜度為[O(SumM+SumR×log2SumR)],導致添加一張表所需的時間開銷較少;而當前的大數據實現方案中,復雜度為[O(1NCiR)],每添加一張表復雜度都會提升一個數量級。所以在多表聯查下,UGS算法相較于當前大數據實現方案優勢更加明顯。
圖2 UGS算法與Hive,SPARKJOIN比較
表3 UGS算法與SPARKJOIN處理時間
對于3張數據規模均為1 000萬條的表,以相同的Key列進行聯查,SparkJoin使用了182.170 s得出結果,Hive使用了207.281 s獲取結果,而UGS算法僅僅需要56.494 s就可以得出結果,可以看到由于增加表之后增加了任務的并發程度,并且更好的數據本地化降低了系統I/O開銷,導致了處理時間相對于2張表聯查增加了僅10 s。實驗表明,在多表聯查(表數大于等于3)的條件下,UGS算法相對于當前的大數據解決方案效率提升更高。
4 結 語
本文提出了一種基于MapReduce的多表聯查算法用于實現海量多源數據的快速關聯查詢。實驗表明,在數據量為10萬條與100萬條的兩表聯查中,UGS算法相較于傳統關系型數據庫有著7~8倍的提升,在每張表數據量均為100萬條的兩表聯查中,相較于關系型數據庫有著100倍的性能提升,隨著數據量提升UGS算法的優勢有著更明顯的體現。
在基于大數據技術的實現方案比較中,當參與關聯的單表數據規模達到1 000萬級時,UGS相對于SPARKJOIN性能提升了約1倍,相對于Hive提升了1倍有余,并且隨著數據規模增大、待聯查表數量增多性能提升將更為明顯。
參考資料
[1] WHITE T. Hadoop: the definitive guide [M]. 3rd ed. BeiJing: OReilly Media, 2013.
[2] DEAN J, GHEMAWAT S. MapReduce: simplified data processing on large clusters [J]. Communications of the ACM: 50th Anniversary Issue, 2008, 51(1): 107?113.
[3] VARDI M. The complexity of relational query languages [C]// Proceedings of the fourteenth annual ACM symposium on Theory of computing. USA: ACM, 1982: 137?146.
[4] THUSOO A, SARMA J S, JAIN N, et al. Hive: a warehousing solution over a map?reduce framework [J]. Proceedings of the VLDB Endowment, 2009, 2(2): 1626?1629.
[5] ZAHARIA M, CHOWDHURY M, DAS T, et al. Resilient distributed datasets: A fault?tolerant abstraction for in?memory cluster computing [C]// Proceedings of the 9th USENIX Conference on Networked Systems Design and Implementation. [S.l.]: USENIX Association, 2012: 2?12.
[6] YANG H, DASDAN A, HSIAO R L, et al. Map?reduce?merge: simplified relational data processing on large clusters [C]// Proceedings of the ACM SIGMOD International Conference on Management of Data. New York: ACM, 2007: 1029?1040.
[7] LUO Yi, WANG Wei, LIN Xuemin. Spark: A keyword search engine on relational databases [C]// Proceedings of 2013 IEEE 29th International Conference on Data Engineering (ICDE). [S.l.]: IEEE, 2008: 1111?1118.
[8] CYGANIAK R. A relational algebra for SPARQL, HPL?2005?170 [D]. Bristol: Digital Media Systems Laboratory, 2005.