張敬偉,尚宏佳,錢俊彥,周 萍,楊 青+
1.桂林電子科技大學 廣西可信軟件重點實驗室,廣西 桂林 541004
2.桂林電子科技大學 廣西云計算與大數據協同創新中心,廣西 桂林 541004
3.桂林電子科技大學 廣西自動檢測技術與儀器重點實驗室,廣西 桂林 541004
非均勻數據分布下的MapReduce連接查詢算法優化*
張敬偉1,2,尚宏佳1,錢俊彥1,周 萍3,楊 青3+
1.桂林電子科技大學 廣西可信軟件重點實驗室,廣西 桂林 541004
2.桂林電子科技大學 廣西云計算與大數據協同創新中心,廣西 桂林 541004
3.桂林電子科技大學 廣西自動檢測技術與儀器重點實驗室,廣西 桂林 541004
MapReduce分布式計算框架有助于提升大規模數據連接查詢的效率,但當連接屬性分布不均勻時,其簡單的散列策略容易導致計算節點間負載不均衡,影響作業的整體性能。針對連接查詢操作中的數據傾斜問題,研究了MapReduce框架下大規模數據連接查詢操作的優化算法。首先對經典的改進重分區連接查詢算法進行實驗分析,研究了傳統MapReduce計算框架下連接查詢操作的執行流程,找出了基于MapReduce計算框架的連接查詢算法在數據分布不均勻時的性能瓶頸;進而提出了組合分割平衡分區優化策略,設計并實現了基于組合分割平衡分區優化策略的改進型連接查詢算法。實驗結果表明,提出的優化策略在大規模數據的連接查詢處理上很好地解決了數據傾斜帶來的性能影響,具有好的時間性能和可擴展性。
連接查詢;MapReduce;數據傾斜
多樣化的應用促進了數據的快速積累,GB、TB級的數據分析已成為常態。在互聯網領域,全球最大中文搜索引擎百度,收錄了全世界萬億個網頁,數據總量接近1 000 PB(http://chgcx.sirt.edu.cn/a/2015/ 12/06/2015120619241012299.html)。在零售業,沃爾瑪每小時都會積累2.5 PB的消費數據(http://news. hexun.com/2016-05-13/183843576.html)。在金融行業,國內“銀聯”銀行卡發行量接近40億張,每天需處理超過600億次交易,年新增的數據量也達到數十PB。上述的大規模數據及其分析應用中,連接查詢操作是最頻繁使用的算子之一,連接查詢操作的性能對大規模數據分析效率具有重要影響。
MapReduce計算架構具有較好的可擴展性、高可用性以及容錯性,被廣泛地應用于大規模數據分析相關工作中。然而,大規模數據中經常會出現數據分布不均勻的情況,采用MapReduce計算框架的連接查詢操作并不總是十分有效,會導致各計算節點負載不均衡,降低連接查詢操作效率,影響大規模數據分析性能。因此,本文將致力解決大規模數據連接操作過程中數據分布不均勻導致作業性能下降的問題,提升大規模數據連接查詢的效率。
連接查詢操作是關系型數據庫的核心操作算子,在日志分析、聯機分析處理等領域也被頻繁使用,大規模數據的連接查詢需借助MapReduce計算框架來提升性能。目前,針對MapReduce計算框架下的連接查詢操作及其優化工作可歸納為以下3類。
(1)基于傳統MapReduce框架的連接優化研究。該類研究不需要對數據重新進行組織,實現過程也較為簡單,但應對復雜連接查詢操作時,往往需要多個連續的MapReduce作業,執行過程較為復雜。文獻[1]設計了兩表等值連接的標準重分區連接算法,在云數據管理系統中有較好的應用,但當數據量較大時,Reduce階段可能會出現內存溢出。文獻[2]為了解決標準重分區連接算法內存消耗較大的問題,設計了改進標準重分區連接算法,一定程度降低了Reduce階段對內存容量的要求。文獻[1]設計了當兩表數據量相差很大時,僅需一個無Reduce階段的MapReduce作業且具有很高效率的廣播連接算法,但當兩張表數據量都較大時,Map階段可能會出現內存溢出。文獻[2]在廣播算法的基礎上設計了半連接算法,對較小表進行過濾,減少了廣播過程中數據傳輸量和Map階段的內存消耗,但需連續的3個作業才能完成,執行過程較為復雜。文獻[2]設計了分片半連接算法,更細粒度地對較小表進行過濾,進一步減少了廣播過程中數據傳輸量和Map階段的內存消耗,但同樣存在執行過程較為復雜的不足。文獻[3]設計了冗余重分區算法來處理兩表非等值連接,利用二維矩陣較為簡潔地完成了復雜的非等值連接操作,但在數據混洗階段網絡傳輸代價較大。文獻[4]設計了兩表相似度連接的算法,濾掉不可能成為最終結果的數據,有效地減少了網絡傳輸代價,但其應用范圍僅僅限于文本字符串的相似性連接。文獻[5]設計了利用一個MapReduce作業處理星型連接與鏈式連接的多表等值連接算法,很大程度地簡化了實現多表連接操作的復雜性,但隨著連接表數目的增加,其中間數據量將急劇增加。文獻[6]提出了網絡感知多路連接算法,在多路連接方面具有較高效率,但僅適用于兩張大表和多個放在同一個Reducer節點的小表,且兩張大表之間必須有連接屬性。文獻[7]設計了基于數據本地化計算的連接查詢處理算法,一定程度地提高了連接查詢效率。
(2)基于改進MapReduce框架的連接優化研究。該類研究從集群層面對MapReduce計算框架進行改進,減少了連接查詢過程中MapReduce作業的數目和中間數據量,但增加了算法的實現難度。文獻[8]設計了Map-Reduce-Merge新型編程框架,在Reduce階段后面附加一個Merge操作,加強了原有的Map-Reduce計算框架,可以方便地實現關系數據庫中的連接和笛卡爾積操作。文獻[9]將索引結構引入到Map-Reduce-Merge計算框架上,借助索引技術在Map-Reduce-Merge上實現數據剪枝預處理,縮小了待處理數據空間。文獻[10]設計了Map-Join-Reduce的編程模型,在Map和Reduce之間增加一個Join操作,從多個數據源讀取數據,利用一個MapReduce作業就可以完成多表連接操作。文獻[11]為MapReduce增加了一個全局節點,用于接收、存儲和更新少量的全局信息,對Mapper生成的中間結果進行過濾,減少了混洗代價和網絡傳輸。
(3)基于數據索引的連接優化研究。該類研究利用索引對數據進行有效過濾,一定程度地提高了連接查詢的效率,但需要對數據進行重新組織。文獻[12]在Hadoop和Hive的基礎上,設計了HadoopDB系統,充分利用傳統關系型數據庫中的索引及查詢優化機制提高連接查詢的效率,但其索引的容錯性較弱。文獻[13]利用MapReduce提供的用戶自定義函數構建索引設計了Hadoop++系統,使用寄宿索引技術提高數據查詢和連接效率,但其索引構建代價較高。文獻[14]針對Hadoop++建立索引代價較高的不足,提出了HAIL,給每個文件的不同備份建立相應的索引,提高數據查詢和連接的效率。文獻[15]基于垂直分組設計了多表連接的混合系統Llama,將多表連接查詢分解為無數據耦合的多個子查詢,大大地減少了MapReduce作業數。文獻[16]提出了CoHadoop系統,改變副本放置策略來提高數據連接查詢效率,但不具有普遍適用性。文獻[17]提出了Tenzing系統,在MapReduce框架上融合了ColumnIO、BigTable、GFS、MySQL等系統實現對SQL的支持,對底層數據進行數據過濾和數據索引,提高連接查詢效率。
上述研究工作均致力于基于MapReduce計算框架的大規模數據連接操作及其優化,優化出發點均在集群層面上,對數據分布不均勻給大規模數據連接查詢操作帶來的影響考慮不夠充分。本文將充分研究數據不均勻分布對大規模數據連接效率的影響及優化,最大程度地提升數據連接查詢的性能,進而提高大規模數據分析的效率。
連接查詢在數據分析中非常重要,也是關系數據庫中最主要的操作之一,主要包括內連接、外連接和交叉連接等。由于內連接較常用,本文圍繞內連接展開研究。
3.1 問題的描述
TPC-H基準數據集是查詢和事務處理常用性能測試數據集[18],為了簡化描述且不失一般性,以TPCH中CUSTOMER和ORDERS兩張表的連接查詢操作為例,設計連接查詢用例,本文研究的連接查詢算法將圍繞該查詢用例進行討論。給定關系表CUSTOMER和ORDERS,其中表CUSTOMER包含屬性custkey、custname等,表ORDERS包含屬性orderkey、custkey等;兩表通過屬性custkey進行連接,CR和CS分別表示表CUSTOMER和ORDERS相關的選擇條件,查詢用例的SQL描述如下:
SELECT ORDERS.orderkey,CUSTOMER.custname
FROM ORDERS INNER JOIN CUSTOMER
ON ORDERS.custkey=CUSTOMER.custkey
WHERE CRAND CS
3.2 問題的定義
設參加連接查詢的兩張表分別為R和S,R約定為主表,S約定為從表,ri和si分別為R和S的屬性,nr和ns分別為R和S的屬性個數,則表R和S的屬性集合R′和S′可表示為:

其中,屬性x∈R′∩S′,y∈R′,z∈S′。不失一般性,連接條件約定為R.x=S.x;查詢條件約定為CR和CS;投影屬性約定為P;連接操作可以描述為σR.x=S.x(R×S);投影操作可以描述為πP(R×S)。本文的連接查詢可以定義為:

上述給出的SQL查詢用例的關系代數表達式如下:

首先基于傳統MapReduce計算框架,實現了改進重分區連接查詢(improved repartition join query,IRJQ)算法并展開實驗分析;接著,針對IRJQ算法在數據分布不均勻下各計算節點負載不均衡導致效率低下的問題,設計了組合分割平衡分區優化策略(combination and division partition strategy,CDPS),進而實現了基于組合分割平衡分區優化策略的改進型連接查詢算法(IRJQ+CDPS)。
4.1 基于傳統MapReduce連接查詢算法
改進重分區連接查詢算法是借助于傳統Map-Reduce計算框架連接查詢操作的一種典型實現方式,該算法僅僅需要一個MapReduce作業就能完成連接查詢操作,特別是在Reduce端較小內存消耗,使得它被廣泛地應用于大規模數據分析中。在Map階段完成對連接屬性的解析和標記,以HashPartition為核心完成Shuffle過程,在Reduce階段完成連接操作。圖1給出了IRJQ算法的計算框架和執行流程。
IRJQ算法的運行過程主要分為Map、Shuffle和Reduce共3個階段。其中Map階段完成兩表的連接屬性的解析和標記操作,以及查詢屬性的解析;Shuffle階段負責相同hash值分組從Map端到Reduce端的傳遞;Reduce階段則將來自不同表的連接屬性和查詢值進行連接。

Fig.1 Computation framework and implementation process of IRJQ algorithm圖1 IRJQ算法的計算框架和執行流程
4.1.1 Map階段的屬性提取和標記過程
MapTaski任務獲取輸入分片InputSpliti,讀取Input-Spliti中源表屬性和所有記錄。對記錄MapRecordij根據不同的源表屬性,解析出對應的連接屬性join_keyij和查詢屬性query_valueij。在連接屬性join_keyij前加上源表標記tag組成復合連接屬性composite_keyij,將<composite_keyij,query_valueij>以key/value形式輸出,完成屬性提取和標記。
如果輸入分片InputSpliti來自ORDERS表,從MapRecordij中解析出custkeyij和orderkeyij,在custkeyij的前面加一個數字“1”組成復合的輸出鍵“1”+custkeyij,將<“1”+custkeyij,orderkeyij>以key/value鍵值對的形式輸出,完成對記錄中屬性的提取和標記過程。同樣,如果輸入分片InputSpliti來自CUSTOMER表,從MapRecordij解析出custkeyij和custnameij,在custkeyij的前面加一個數字“0”組成復合的輸出鍵“0”+custkeyij,以key/value鍵值對的形式將<“0”+custkeyij,custnameij>輸出。其中,在來自于CUSTOMER表中記錄的連接屬性custkeyij前面加標記“0”,而來自于ORDERS表中記錄的連接屬性custkeyij前面加標記“1”,是為了在Shuffle過程中,使得CUSTOMER表中記錄排在ORDERS表中記錄的前面,這樣在Reduce階段只用緩存CUSTOMER表中的記錄,減少Reduce階段對內存的消耗。其算法描述如下:
算法1屬性解析和標記算法


4.1.2 Shuffle階段的哈希分區過程
當所有輸入分片完成屬性的提取和標記后,需要將分組劃分到合適的分區,以保證各Reducer節點擁有相等的分組數目。在傳統MapReduce框架中對分組的劃分過程是以HashPartition為核心完成的,讀取分組Groupi中的連接屬性join_keyi,計算連接屬性join_keyi對應的哈希值HashCodei,將HashCodei和分區的數目做取余運算得到對應的分區PartitionNumi。其中,連接屬性join_keyi一般是以字符串的形式出現,為了保證所分區擁有相等數目的分組,連接屬性join_keyi的哈希值HashCodei計算策略較為重要,默認情況下使用JDK中String類的hashcode()方法。計算字符串join_keyi的長度length,讀取字符串join_ keyi在內存中的存儲地址并保存在字符數組val[]中,最后做一個length次的迭代運算HashCodei=31*h+ val[i++]得到哈希值HashCodei,即為對應的分區。其算法描述如下:
算法2哈希分區算法


4.1.3 Reduce階段的連接過程
ReduceTaski任務從多個MapTask的本地磁盤中拉取屬于自己的中間數據,將數據進行排序、分組和合并等操作,得到多個分組組成的集合GroupSeti。分組Groupij中的記錄以<composite_key,query_value_ List>鍵和多個值組成的序列對形式存在,且每一個分組內所有來自于CUSTOMER的記錄都排在來自ORDERS表的記錄之前。對分組Groupij中的記錄ReduceRecordp,讀取復合連接屬性composite_keyp,從復合連接屬性composite_keyp中解析出連接屬性join_ keyp和源表標記tag,遍歷查詢屬性的序列query_ value_Listp。
如果tag為“0”,表明該記錄來自于CUSTOMER表,將查詢屬性序列query_value_Listp中的每一個cust_query_valueq分別和連接屬性join_keyp以<join_ keyp,cust_query_valueq>鍵值對的形式保存于緩存集合ReduceBufferi中。如果tag為“1”,表明該記錄來自于ORDERS表,對序列query_value_Listp中的各查詢屬性order_query_valueq分別在緩存集合ReduceBufferi中查詢其連接屬性join_keyp是否存在。如果存在,就從緩存集合ReduceBufferi中讀取該連接屬性join_ keyp對應的查詢屬性cust_query_valueq,并將<order_ query_valueq,cust_query_valueq>以鍵值對的形式輸出;如果緩存ReduceBufferi中查詢其連接屬性join_ keyp不存在,則讀取序列query_value_Listp中的下一條查詢記錄order_query_value(q+1),直到序列query_ value_Listp中的所有記錄遍歷完成。其算法描述如下:
算法3連接查詢算法


4.1.4 算法性能分析
實驗平臺環境和實驗數據集詳細見5.1節,實驗的評價標準為整個作業運行時間。連接條件定義為CUSTOMER.custkey=ORDERS.custkey,實驗采用測試用例SQL描述如下:
SELECT ORDERS.orderkey,CUSTOMER.custname
FROM ORDERS INNER JOIN CUSTOMER
ON CUSTOMER.custkey=ORDERS.custkey
為了較全面評估IRJQ算法的性能,將CUSTOMER的連接率設定為0.1%、20%和50%,ORDERS數據的傾斜率設定為0.2、0.5和0.8。其中,連接率定義為CUSTOMER中有購買記錄的用戶所占的比率;傾斜率定義為某一分組數據量在整個數據集中所占的比率。
實驗1數據傾斜時不同連接率下IRJQ算法時間性能分析。固定CUSTOMER表中的數據量,不斷增加ORDERS中的數據量,對比分析IRJQ算法在數據傾斜時不同連接率下的時間性能。其中,CUSTOMER中記錄數目固定為8 000萬條,連接率分別取0.1%、20%和50%;ORDERS中的記錄數目分別取10億、20億、30億、40億、50億、60億、70億和80億,傾斜率分別取0.2、0.5和0.8。實驗結果如圖2所示。
實驗結果表明,ORDERS中的連接屬性不均勻分布對IRJQ算法時間性能影響較大,隨著ORDERS中的數據量及傾斜率增大,其時間性能大幅度下降。這主要是因為傳統MapReduce框架為了保證所有的分區有相同數目的分組,以哈希分區策略完成對分組的劃分。
假設ORDERS共有m條記錄,傾斜率為α,且傾斜分組數目為1;CUSTOMER共有n條記錄,連接率為β;Reduce階段共有k個分區。則每個分區的分組數目為,傾斜分組中的記錄數目為m×α,非傾斜分組中的記錄數目約為,傾斜分區中的記錄數目約為,非傾斜分區中的記錄數目約為,傾斜分區與非傾斜分區間的記錄數目差約為。可以很清楚地看出,隨著ORDERS中記錄數目m或者傾斜率α的增加,傾斜分組的記錄數目m×α變得越來越大,傾斜分組和非傾斜分組間的數據量差也會越來越大。當α→1或者m→∞時,,即ORDERS中數據分布嚴重不均勻或者數據量較大會導致多個分區間的數據量相差巨大,造成Reduce階段負載嚴重不均衡,最終影響整個作業的時間性能。
4.2 基于改進型MapReduce連接查詢算法
IRJQ算法在數據分布均勻的情況下擁有較好的時間性能和穩定性,然而當數據分布不均勻時,Reduce階段會因為多個分組數據量相差較大導致負載不平衡,嚴重影響了算法的時間性能和穩定性。針對IRJQ算法的這種不足,本文設計和實現了組合分割平衡分區優化策略,形成MapReduce計算框架下基于組合分割平衡分區優化策略的改進型連接查詢算法(IRJQ+CDPS)。圖3給出了IRJQ+CDPS算法的計算框架和執行流程。
IRJQ+CDPS算法以改進重分區連接查詢算法為基礎,分為3個階段Map、Shuffle和Reduce。Map和Reduce階段同IRJQ算法一致,其核心改進體現在Shuffle過程中的組合分割平衡分區優化策略,保證了在數據傾斜時Reduce階段的負載均衡。
4.2.1 ORDERS表連接屬性頻率分布統計

Fig.2 Time performance comparison of IRJQ algorithm under skewed data圖2 數據傾斜下IRJQ算法時間性能對比分析
為了解決數據傾斜導致連接算法時間性能下降的不足,本文對嚴重傾斜的連接屬性分組采用分割分區策略,不嚴重或不傾斜的連接屬性分組采用組合分區策略。其中,首先需要解決的問題就是要得到ORDERS表中連接屬性的頻率分布情況。該過程由一個獨立的MapReduce作業完成,MapTaski任務獲取對應的輸入分片inputspliti,讀取記錄MapRecordij,解析出MapRecordij的連接屬性join_keyij,然后將<join_ keyij,1>以key/value的形式輸出到對應的分組中。多個擁有不同連接屬性的分組經過Shuffle過程的分區操作,被從Map端傳送到Reduce端。這里使用的分區策略是MapReduce計算框架默認的哈希分區策略。ReduceTaski任務對來自多個Map端的分組進行排序、分組和合并等操作得到對應的分區,讀取分組Groupi的連接屬性join_keyij和value_Listij。其中value_ Listij為從Map階段傳送過來的具有相同連接屬性的多個“1”組成的序列,將value_Listij中所有的“1”求和得到對應連接屬性join_keyij的頻率frequencyij。最后將<join_keyij,frequencyij>以key/value的形式輸出,完成對ORDERS表中連接屬性出現頻率的統計。

Fig.3 Computation framework and implementation process of CDPS+IRJQ algorithm圖3 IRJQ+CDPS算法的計算框架和執行過程
4.2.2 嚴重傾斜分區和不嚴重或不傾斜分區劃分
在得到了ORDERS表中連接屬性的頻率分布后,一個非常重要的問題就是如何準確地找到那些連接屬性嚴重傾斜的分組。一個常見的方法就是計算所有連接屬性分組中key/value鍵值對數目的平均值AVG,每個分組通過和平均值AVG的比較來確認是否嚴重傾斜。如果某個分組中key/value鍵值對數目小于平均值AVG,那么就認為該分組為不嚴重或不傾斜分組,將對其采用組合分區策略;同樣,如果某個分組中key/value鍵值對數目大于或等于平均值AVG,那么就認為該分組為嚴重傾斜分組,將對其采用分割分區策略。所有分組的key/value鍵值對數目平均值AVG計算方式如下:

其中,|Groupi|表示第i個分組中key/value鍵值對的數目;m表示分組Group的個數。很明顯,key/value鍵值對數目的平均值AVG決定了分組是否嚴重傾斜。如果嚴重傾斜分組太多,AVG也會變得更接近嚴重傾斜分組,從而保證了較為準確地劃分出嚴重傾斜分組和不嚴重或不傾斜分組。
4.2.3 組合分割平衡分區優化策略
針對數據分布不均勻導致的傾斜問題,將那些不嚴重傾斜或不傾斜連接屬性的分組,組合成較大的分組,然后再將組合后的大分組劃分到各Reducer節點中,而那些嚴重傾斜連接屬性的分組等劃分到各Reducer節點中。
不嚴重傾斜或不傾斜屬性分組的組合分區策略,根據每個分組中數據量的大小組合成較大的分組,在組合過程中最大程度地保證組合后的多個大分組數據量大致相等,然后將組合后的分組分別傳遞到Reduce端,使所有的Reduce端具有近似相等的負載量,達到負載均衡的目的。這種組合分區策略是典型NP難解問題,采用啟發式的方法得到次優解,即優先分配較大分組,接著在剩下的分組中選擇數據量最多的分組分配到負載最小的分區上。根據每個分組{G1,G2,…,Gm}的數據量大小進行降序排列;然后,將前n個分組分配給{r1,r2,…,rn}n個Reducer節點,這樣rn的負載量最小;接著,選擇{r1,r2,…,rn}中當前負載量最小的ri,將Gn+1分配給ri;重復上一步,依次將Gj分配給負載量最小的ri,只到所有的分組分配完成,達到Reduce階段的負載均衡。
嚴重傾斜連接屬性分組的分割分區策略,組合分區策略應對傾斜不是很嚴重的分組時往往具有較好的負載均衡效果,當面對那些嚴重傾斜的分組時,有些較大分組數據量的大小要比其他多個較小分組組合之后的數據量還要大,從而導致無論怎么分配組合都無法達到在Reduce端的負載均衡,離人們所期望的效果相差甚遠。假設GSet={G1,G2,G3,G4,G5}= {2 000,700,360,150,80},PSet={P1,P2,P3},使用組合分區策略:{G1→P1},{G2→P2},{G3→P3},接下來即使把G4、G5都分配到P3上也達不到負載平衡。但是,可以將這些嚴重傾斜連接屬性分組等劃分成n份,將這n等份分別發到n個分區中,在Reduce端就可以得到一個比較好的負載平衡效果。
算法4組合分割平衡分區優化算法

在大規模數據分析過程中,數據分布不均勻對連接查詢操作的性能有非常大的影響。本文設計和實現了IRJQ+CDPS算法和IRJQ算法,并以實驗的方式對兩者的時間性能和Reduce階段最大負載量進行對比與分析。
5.1 實驗平臺環境和數據集
實驗平臺由16臺高性能服務器構成,1臺設為主控節點,15臺設為計算節點,分布在兩個機架上面,每個機架有獨立的路由器。每個節點配有2個處理核心,2.4 GHz主頻,8 GB內存和1.5 TB的本地存儲磁盤,操作系統為Red Hat Linux 5.6。使用Hadoop 1.1.2版本的系統作為集群環境,每個節點配置2個MapTask任務和2個ReduceTask任務,HDFS的塊大小設定為128 MB,每一個數據塊的副本數設置為3,其他各項參數均采用默認設置。
使用TPC-H基準測試集生成工具產生用于連接查詢操作實驗的數據集,并采用其中的CUSTOMER和ORDERS兩個數據表來做連接操作。其中,CUSTOMER表中的數據量分別取1 000萬條、2 000萬條、3 000萬條、4 000萬條、5 000萬條、6 000萬條、7 000條和8 000萬條;ORDERS中數據量分別取10億條、20億條、30億條、40億條、50億條、60億條、70億條和80億條。為了更好地評估IRJQ+CDPS算法的性能,將ORDERS數據傾斜率設定為0.2、0.5和0.8。
實驗的評價標準為時間性能和Reduce端最大負載量。其中,時間性能簡單設定為整個作業運行的總時間;Reduce端的最大負載量設定為所有分區中最大那個分區的數據量。
5.2 實驗與結果
為了和帶有區間范圍的連接查詢和低選擇性的連接查詢相區分,將一般性連接查詢定義為全表范圍的連接查詢。
5.2.1 全表范圍的連接查詢對比實驗
對兩種連接查詢算法全表范圍連接查詢的性能進行對比分析,連接條件設定為CUSTOMER.custkey=ORDERS.custkey,測試用例SQL描述如下:
SELECT ORDERS.orderkey,CUSTOMER.custname
FROM ORDERS INNER JOIN CUSTOMER
ON CUSTOMER.custkey=ORDERS.custkey
實驗2全表范圍連接查詢的不同傾斜率下時間性能對比實驗。固定ORDERS表中數據量,不斷增加CUSTOMER中的數據量,對比分析IRJQ算法和IRJQ+CDPS算法在全表范圍連接查詢的不同傾斜率下的時間性能。其中,CUSTOMER中記錄數固定為8 000萬條,連接率為100%;ORDERS中記錄數分別取10億、20億、30億、40億、50億、60億、70億和80億,傾斜率分別取0.2、0.5和0.8。實驗結果如圖4所示。
實驗結果表明,當ORDERS中數據量較少時,IRJQ算法擁有較好的時間性能;隨著ORDERS中數據量的增加,IRJQ算法的時間性能快速下降,而IRJQ+CDPS算法的時間性能逐步轉好;當ORDERS中數據量增加到一定程度,IRJQ+CDPS算法擁有非常好的時間性能,且整個過程中IRJQ+CDPS算法擁有較好的穩定性。這主要是因為IRJQ+CDPS算法相對于IRJQ算法較為復雜,當ORDERS中數據量較少時,數據分布不均勻導致的Reduce階段的負載不均衡不是很明顯,IRJQ+CDPS算法負載均衡優化策略帶來的時間性能優勢相對于復雜的執行流程帶來額外的時間開銷還是太小;隨著ORDERS中數據量的增加,數據分布不均勻導致的Reduce階段的負載不均衡越來越明顯,復雜的執行流程帶來額外的時間開銷相對于IRJQ+CDPS算法優化策略帶來的時間性能優勢可以忽略不計,IRJQ+CDPS算法的時間性能優勢也越來越明顯。
實驗3全表范圍連接查詢的不同傾斜率下Reduce階段最大負載量對比實驗。固定ORDERS中的數據量,不斷增加CUSTOMER中的數據量,對比分析IRJQ算法和IRJQ+CDPS算法在全表范圍連接查詢的不同傾斜率下Reduce階段的最大負載量。其中,CUSTOMER中記錄數目固定為8 000萬條,連接率為100%;ORDERS中記錄數目分別取10億、20億、30億、40億、50億、60億、70億和80億,傾斜率分別取0.2、0.5和0.8。實驗結果如圖5所示。
實驗結果表明,在整個實驗過程中,與IRJQ算法相比,IRJQ+CDPS算法在Reduce階段的最大負載量一直較低;并且隨著ORDERS中數據量和傾斜率的增大,IRJQ+CDPS算法的這種負載均衡優勢越來越明顯。這主要是因為IRJQ算法在Shuffle階段采用的是哈希分區,只能保證Reduce階段多個分區間擁有相等的分組數,而無法保證每個分組擁有相等的數據量,更加無法保證Reduce階段的負載均衡;而IRJQ+CDPS算法在Shuffle階段采用了組合分割平衡分區優化策略,對不嚴重或不傾斜的分組采用組合分區策略,嚴重傾斜的分組采用分割策略,每個分組擁有近似相等的數據量,很好地保證了Reduce階段的負載均衡。

Fig.4 Time performance comparison of full-join queries under different data skewed rates圖4 不同傾斜率下全表連接查詢時間性能對比
5.2.2 帶有區間范圍的連接查詢對比實驗
對兩種連接查詢算法帶有區間范圍連接查詢的性能進行對比分析,連接條件設定為CUSTOMER. custkey=ORDERS.custkey,選擇條件設定為207 290<CUSTOMER.custkey<291050,測試用例SQL描述如下:
SELECT ORDERS.orderkey,CUSTOMER.custname
FROM ORDERS INNER JOIN CUSTOMER
ON CUSTOMER.custkey=ORDERS.custkey
WHERE 207290<CUSTOMER.custkey<291050
實驗4帶有區間范圍連接查詢的不同傾斜率下時間性能對比實驗。固定ORDERS表中的數據量,不斷增加CUSTOMER中的數據量,對比分析IRJQ算法和IRJQ+CDPS算法在帶有區間范圍連接查詢的不同傾斜率下的時間性能。其中,CUSTOMER中記錄數固定為8 000萬條,連接率為100%;ORDERS中記錄數分別取10億、20億、30億、40億、50億、60億、70億和80億,傾斜率分別取0.2、0.5和0.8。實驗結果如圖6所示。

Fig.5 Maximum load of Reduce comparison for full-join queries under different data skewed rates圖5 不同傾斜率下全表連接查詢Reduce端最大負載量對比

Fig.6 Time performance comparison of range queries under different data skewed rates圖6 不同傾斜率下范圍查詢時間性能對比
實驗結果同實驗2類似,當ORDERS中數據量較少時,IRJQ算法擁有較好的時間性能;隨著ORDERS中數據量的增加,IRJQ算法的時間性能快速下降,而IRJQ+CDPS算法的時間性能逐步轉好;當ORDERS中數據量達到一定程度時,IRJQ+CDPS算法擁有非常好的時間性能,且整個過程中IRJQ+CDPS算法擁有較好的穩定性。同時,相比于實驗2,實驗4中的IRJQ+CDPS算法的組合分割平衡分區策略帶來的時間性能優勢稍微降低。這主要是因為,相比于全表范圍的連接查詢,帶有區間范圍連接查詢的查詢范圍較小,相同的輸入數據量下,Reduce階段需要緩存的數據量較小,一定程度減少了數據分布不均勻對整個連接查詢算法時間性能的影響。
實驗5帶有區間范圍連接查詢的不同傾斜率下Reduce階段最大負載量對比實驗。固定ORDERS表中的數據量,不斷增加CUSTOMER中的數據量,對比分析IRJQ算法和IRJQ+CDPS算法在不同傾斜率下Reduce階段的最大負載量。其中,CUSTOMER中記錄數固定為8 000萬條,連接率為100%;ORDERS中記錄數分別取10億、20億、30億、40億、50億、60億、70億和80億,傾斜率分別取0.2、0.5和0.8。實驗結果如圖7所示。
實驗結果同實驗3類似,在整個實驗過程中,IRJQ+CDPS算法在Reduce階段的最大負載量一直較低,并且隨著ORDERS中數據量和傾斜率的增大,IRJQ+CDPS算法的這種負載均衡優勢越來越明顯。這主要是因為,IRJQ算法在Shuffle階段采用的是哈希分區,在數據分布不均勻時,只能保證Reduce階段多個分區間擁有相等的分組數,而無法保證每個分組有相等的數據量,更加無法保證Reduce階段的負載均衡;而IRJQ+CDPS算法在Shuffle階段采用了組合分割平衡分區優化策略,每個分組擁有近似相等的數據量,很好地保證了Reduce階段的負載均衡。
5.2.3 低選擇性的連接查詢對比實驗
對兩種連接查詢算法低選擇性連接查詢的性能進行對比分析,連接條件設定為CUSTOMER.custkey=ORDERS.custkey,選擇條件設定為CUSTOMER. custkey=23 698,測試用例SQL描述如下:
SELECT ORDERS.orderkey,CUSTOMER.custname
FROM ORDERS INNER JOIN CUSTOMER
ON CUSTOMER.custkey=ORDERS.custkey
WHERE CUSTOMER.custkey=23698
實驗6低選擇性連接查詢的不同傾斜率下時間性能對比實驗。固定ORDERS表中的數據量,不斷增加CUSTOMER中的數據量,對比分析IRJQ算法和IRJQ+CDPS算法在低選擇性連接查詢的不同傾斜率下的時間性能。其中,CUSTOMER中記錄數固定為8 000萬條,連接率為100%;ORDERS中記錄數分別取10億、20億、30億、40億、50億、60億、70億和80億,傾斜率分別取0.2、0.5和0.8。實驗結果如圖8所示。
實驗結果同實驗4類似,當ORDERS中數據量較少時,IRJQ算法擁有較好的時間性能;隨著ORDERS中數據量的增加,IRJQ算法的時間性能快速下降,而IRJQ+CDPS算法的時間性能逐步轉好;當ORDERS中數據量達到一定程度時,IRJQ+CDPS算法擁有非常好的時間性能,且整個過程中IRJQ+CDPS算法擁有較好的穩定性。同時,相比于實驗4,實驗6中的IRJQ+CDPS算法的平衡分區帶來的時間性能優勢有所降低。這是主要是因為,相比于帶有區間范圍的連接查詢,低選擇性連接查詢的查詢范圍較小,相同的輸入數據量下,Reduce階段需要緩存的數據量較小,一定程度減少了數據分布不均勻對整個連接查詢算法時間性能的影響。
實驗7低選擇性連接查詢的不同傾斜率下Reduce階段最大負載量對比實驗。固定ORDERS表中的數據量,不斷增加CUSTOMER中的數據量,對比分析IRJQ算法和IRJQ+CDPS算法在低選擇性連接查詢的不同傾斜率下Reduce階段的最大負載量。其中,CUSTOMER中記錄數固定為8 000萬條,連接率為100%;ORDERS中記錄數分別取10億、20億、30億、40億、50億、60億、70億和80億,傾斜率分別取0.2、0.5和0.8。實驗結果如圖9所示。

Fig.7 Maximum load of Reduce comparison for range queries under different data skewed rates圖7 不同傾斜率下范圍查詢Reduce端最大負載量對比
實驗結果同實驗5類似,在整個實驗過程中,IRJQ+CDPS算法在Reduce階段的最大負載量一直較低,并且隨著ORDERS中數據量和傾斜率的增大,IRJQ+CDPS算法的這種負載均衡優勢越來越明顯。這主要是因為,IRJQ算法在Shuffle階段采用的是哈希分區,在數據分布不均勻時,只能保證Reduce階段多個分區間擁有相等的分組數,而無法保證每個分組有相等的數據量,更加無法保證Reduce階段的負載均衡;而IRJQ+CDPS算法在Shuffle階段采用了組合分割平衡分區優化策略,每個分組擁有近似相等的數據量,很好地保證了Reduce階段的負載均衡。

Fig.8 Time performance comparison of low-join-rate queries under different data skewed rates圖8 不同傾斜率下低連接率查詢時間性能對比

Fig.9 Maximum load of Reduce comparison for low-join-rate queries under different data skewed rates圖9 不同傾斜率下低連接率查詢Reduce端最大負載量對比
5.3 結果分析
從上面的實驗結果中可以看到,不論是在全局范圍的連接查詢操作,還是帶有區間范圍的連接查詢操作,亦或是低選擇性連接查詢操作,相對于IRJQ算法,IRJQ+CDPS算法在時間性能和Reduce端最大負載量兩方面均具有非常大的優勢。
在時間復雜度方面,當ORDERS中數據量較少時,IRJQ算法擁有較好的時間性能;隨著ORDERS表中數據量的增加,IRJQ算法的時間性能快速下降,而IRJQ+CDPS算法的時間性能逐步轉好;當ORDERS表中數據量達到一定程度時,IRJQ+CDPS算法擁有較好的時間性能。這主要是因為,IRJQ+CDPS算法相對于IRJQ算法較為復雜,當ORDERS中數據量較少時,數據分布不均勻導致的Reduce階段的負載不均衡不是很明顯,IRJQ+CDPS算法的優化策略帶來的時間性能優勢相對于復雜的執行流程帶來額外的時間開銷還是太小;隨著ORDERS中數據量的增加,數據分布不均勻導致的Reduce階段的負載不均衡不是越來越明顯,復雜的執行流程帶來額外的時間開銷相對于IRJQ+CDPS算法的優化策略帶來的時間性能優勢可以忽略不計,IRJQ+CDPS算法的時間性能優勢也越來越明顯。
在Reduce端的最大負載量方面,整個實驗過程中,IRJQ+CDPS算法在Reduce階段的最大負載量一直較低,并且隨著ORDERS中數據量和傾斜率的增大,IRJQ+CDPS算法的這種負載均衡優勢越來越明顯。這主要是因為,IRJQ算法在Shuffle階段采用的是哈希分區,在數據分布不均勻時,只能保證Reduce階段多個分區間擁有相等的分組數,而無法保證每個分組有相等的數據量,更加無法保證Reduce階段的負載均衡;而IRJQ+CDPS算法在Shuffle階段采用了組合分割平衡分區優化策略,很好地保證了Reduce階段的負載均衡。
連接查詢是大規模數據分析的核心操作算子之一,數據傾斜在大規模數據分析中普遍存在,且對借助于MapReduce計算框架的連接查詢算法的效率具有較大影響。本文主要針對連接查詢操作中的數據傾斜問題,研究MapReduce框架下大規模數據連接查詢操作的優化算法。首先,以較為常見的改進重分區連接查詢算法為例,研究借助于傳統MapReduce計算框架連接查詢操作的執行流程,找出基于Map-Reduce計算框架連接算法在數據分布不均勻時的性能瓶頸;進而,提出了組合分割平衡分區優化策略,形成了MapReduce計算框架下基于組合分割平衡分區優化策略的改進型連接查詢算法。實驗結果表明,提出的優化策略在大規模數據連接查詢處理上很好地解決了數據傾斜對其性能的影響,具有較好的時間性能和可擴展性。
[1]Olston C,Reed B,Srivastava U,et al.Pig a not-so-foreign language for data processing[C]//Proceedings of the 2008 International Conference on Management of Data,Vancouver, Canada,Jun 9-12,2008.New York:ACM,2008:1099-1110.
[2]Blanas S,Patel J M,Ercegovac V,et al.A comparison of join algorithms for log processing in MapReduce[C]//Proceedings of the 2010 ACM SIGMOD International Conference on Management of Data,Indianapolis,USA,Jun 6-11, 2010.New York:ACM,2010:975-986.
[3]Okcan A,Riedewald M.Processing theta-joins using Map-Reduce[C]//Proceedings of the 2011 ACM SIGMOD International Conference on Management of Data,Athens,Greece, Jun 12-16,2011.New York:ACM,2011:949-960.
[4]Vernica R,Carey M J,Li Chen.Efficient parallel set similarity joins using MapReduce[C]//Proceedings of the 2010 ACM SIGMOD International Conference on Management of Data, Indianapolis,USA,Jun 6-10,2010.New York:ACM,2010: 495-506.
[5]Afrati F N,Ullman J D.Optimizing multiway joins in a Map Reduce environment[C]//Proceedings of the 13th International Conference on Extending Database Technology,Lausanne,Switzerland,Mar 22-26,2010.New York:ACM,2011: 1282-1298.
[6]Slagter K,Hsu C H,Chung Y C,et al.SmartJoin:a networkaware multiway join for MapReduce[J].Cluster Computing, 2014,17(3):629-641.
[7]Zhao Yanrong,Wang Weiping,Meng Dan,et al.Efficient join query processing algorithm CHMJ based on Hadoop[J]. Journal of Software,2012,23(8):2032-2041.
[8]Yang H C,Dasdan A,Hsiao R L,et al.Map-Reduce-Merge: simplified relational data processing on large clusters[C]// Proceedings of the 2007 ACM SIGMOD International Conference on Management of Data,Beijing,Jun 11-14,2007. New York:ACM,2007:1029-1040.
[9]Yang H C,Parker D S.Traverse:simplified indexing on large Map-Reduce-Merge clusters[C]//LNCS 5463:Proceedings of the 14th International Conference on Database Systems for Advanced Applications,Brisbane,Australia,Apr 21-23, 2009.Berlin,Heidelberg:Springer,2009:308-322.
[10]Jiang D,Tung A K H,Chen Gang.Map-Join-Reduce:toward scalable and efficient data analysis on large clusters[J]. IEEE Transactions on Knowledge and Data Engineering, 2011,23(9):1299-1311.
[11]Ding Linlin,Wang Guoren,Xin Junchang,et al.ComMap Reduce:an improvement of MapReduce with lightweight communication mechanisms[C]//LNCS 7239:Proceedings of the 17th International Conference on Database Systems for Advanced Applications,Busan,Korea,Apr 15-19,2012.Berlin,Heidelberg:Springer,2012:150-168.
[12]Abouzeid A,Bajda-Pawlikowski K,Abadi D,et al.Hadoop-DB:an architectural hybrid of MapReduce and DBMS technologies for analytical workloads[C]//Proceedings of the 35th International Conference on Very Large Data Bases,Lyon, France,Aug 24-28,2009.NewYork:ACM,2009:922-933.
[13]Dittrich J,Quiané-Ruiz J A,Jindal A,et al.Hadoop++:making a yellow elephant run like a cheetah(without it evenoticing) [C]//Proceedings of the 36th International Conference on Very Large Data Bases,Singapore,Sep 13-17,2010.New York:ACM,2010:515-529.
[14]Dittrich J,Quiané-Ruiz J A,Richter S,et al.Only aggressive elephants are fast elephants[C]//Proceedings of the 38th International Conference on Very Large Data Bases,Istanbul, Turkey,Aug 27-31,2012.NewYork:ACM,2012:1591-1602.
[15]Lin Yuting,Agrawal D,Chen Chen,et al.Llama:leveraging columnar storage for scalable join processing in the Map-Reduce framework[C]//Proceedings of the 2011 ACM SIGMOD International Conference on Management of Data, Athens,Greece,Jun 12-16,2011.New York:ACM,2011: 961-972.
[16]Zhang Yanfeng,Gao Qixin,Gao Lixin,et al.Priter:a distributed framework for prioritized iterative computations [C]//Proceedings of the 2nd ACM Symposium on Cloud Computing,Cascais,Portugal,Oct 26-28,2011.New York: ACM,2011:1-13.
[17]Chattopadhyay B,Lin Liang,Liu Weiran,et al.Tenzing a SQL implementation on the MapReduce framework[C]//Proceedings of the 37th International Conference on Very Large Data Bases,Seattle,USA,Aug 29-Sep 3,2011.New York: ACM,2011:1318-1327.
[18]Zhu Haitong.Efficient star join for column-oriented data store in the MapReduce environment[D].Shanghai:East China Normal University,2012.
附中文參考文獻:
[7]趙彥榮,王偉平,孟丹,等.基于Hadoop的高效連接查詢處理算法[J].軟件學報,2012,23(8):2032-2041.
[18]祝海通.MapReduce環境中基于列存儲的一種高效的星型連接方法[D].上海:華東師范大學,2012.

ZHANG Jingwei was born in 1977.He received the Ph.D.degree from East China Normal University in 2012. Now he is an associate professor at Guilin University of Electronic Technology,and the member of CCF.His research interests include Web data analysis and management,query optimization technologies,massive data management and storage,etc.
張敬偉(1977—),男,山東蓬萊人,2012年于華東師范大學獲得博士學位,現為桂林電子科技大學計算機與信息安全學院副教授,CCF會員,主要研究領域為Web數據分析與管理,查詢優化技術,海量數據管理和存儲等。

SHANG Hongjia was born in 1989.He is an M.S.candidate at School of Computer and Information Security,Guilin University of Electronic Technology.His research interests include database technology and distributed computing,etc.
尚宏佳(1989—),男,湖北隨州人,桂林電子科技大學計算機與信息安全學院碩士研究生,主要研究領域為數據庫技術,分布式計算等。

QIAN Junyan was born in 1973.He received the Ph.D.degree from Southeast University in 2008.Now he is a professor at Guilin University of Electronic Technology,and the senior member of CCF.His research interests include software engineering,program analysis and verification,information security and VLSI fault tolerance technologies,etc.
錢俊彥(1973—),男,浙江嵊縣人,2008年于東南大學獲得博士學位,現為桂林電子科技大學計算機與信息安全學院教授,CCF高級會員,主要研究領域為軟件工程,程序分析與驗證,信息安全,VLSI容錯技術等。

ZHOU Ping was born in 1961.She is a professor at Guilin University of Electronic Technology.Her research interests include speech signal processing and intelligent control,etc.
周萍(1961—),女,河北唐山人,桂林電子科技大學電子工程與自動化學院教授,主要研究領域為語音信號處理,智能控制等。

YANG Qing was born in 1976.She is an associate professor at Guilin University of Electronic Technology.Her research interests include massive data management and large-scale intelligent information processing,etc.
楊青(1976—),女,廣西恭城人,桂林電子科技大學電子工程與自動化學院副教授,主要研究領域為海量數據管理,大規模智能信息處理等。
Join Query Optimization Based on MapReduce under Skewed Data*
ZHANG Jingwei1,2,SHANG Hongjia1,QIAN Junyan1,ZHOU Ping3,YANG Qing3+
1.Guangxi Key Laboratory of Trusted Software,Guilin University of Electronic Technology,Guilin,Guangxi 541004,China
2.Guangxi Cooperative Innovation Center of Cloud Computing and Big Data,Guilin University of Electronic Technology,Guilin,Guangxi 541004,China
3.Guangxi Key Laboratory of Automatic Measurement Technology and Instrument,Guilin University of Electronic Technology,Guilin,Guangxi 541004,China
+Corresponding author:E-mail:gtyqing@hotmail.com
ZHANG Jingwei,SHANG Hongjia,QIAN Junyan,et al.Join query optimization based on MapReduce under skewed data.Journal of Frontiers of Computer Science and Technology,2017,11(5):752-767.
MapReduce,a classic distributed computing environment,can improve the performance of join query on large-scale data,but when the join attributes do not follow a uniform distribution,the pure hash strategy in traditional MapReduce will lead to load imbalance over computing nodes,which will reduce the performance of overall task.Aiming at the data skew problem in the join query,this paper studies the join query optimization based on MapReduce computing framework.Firstly,this paper conducts experimental analysis for the improved repartitioning join query algorithm,studies the execution phases of join query based on traditional MapReduce computing framework,and finds the performance bottlenecks of join query on MapReduce computing framework when data do not follow a uniform distribution.Based on the above,this paper designs and implements an improved join query optimization algorithm,which is based on an execution strategy by integrating the combination segmentation method and equilibrium partitioning method.The experimental results show that the proposed optimization method provides a good solution for distributed join query on large-scale skewed datasets,and presents an excellent time performance and scalability.
join query;MapReduce;skewed data
10.3778/j.issn.1673-9418.1604022
A
TP311.130
*The National Natural Science Foundation of China under Grant Nos.U1501252,61363005,61462017(國家自然科學基金);the Natural Science Foundation of Guangxi under Grant Nos.2014GXNSFAA118353,2014GXNSFAA118390,2014GXNSFDA118036(廣西自然科學基金);the High Level Innovation Team of Colleges and Universities in Guangxi and Outstanding Scholars Program Funding (廣西高等學校高水平創新團隊及卓越學者計劃);the Program of Guangxi Cooperative Innovation Center of Cloud Computing and Big Data(廣西云計算與大數據協同創新中心基金項目);the Guangxi Cooperative Innovation Center of IOT and Industrialization (廣西物聯網技術與產業化推進協同創新中心資助項目).
Received 2016-04,Accepted 2016-06.
CNKI網絡優先出版:2016-06-27,http://www.cnki.net/kcms/detail/11.5602.TP.20160627.0929.006.html