趙麗梅 黃小菊 宮學慶
(華東師范大學軟件工程學院 上海 200062)
Apache Spark[1]是一個通用的并行查詢引擎,能夠支持對鍵值數據源的數據分析處理,擴展鍵值數據庫[2-3]大規模復雜查詢分析的能力,例如對鍵值數據源的Join查詢。實際應用場景中常搭建的Spark-over-HBase架構[4],利用Spark查詢引擎支持HBase數據源[5-6]的復雜查詢,其存儲層利用HBase集群對海量數據進行持久化存儲,計算層采用Spark查詢引擎來執行大規模查詢分析。該架構實現存儲層與計算層分離,解決了HBase數據庫僅能通過鍵來進行簡單查詢的不足,擴展了其支持復雜查詢的能力。如圖1所示,該架構底層采用HBase集群來進行數據持久化存儲,HBase集群包括一個HMaster主節點和若干個HRegionServer節點,利用Zookeeper集群來進行分布式協調。數據分布存儲在多個HRegionServer節點,通過主節點來協調各HRegionServer的負載均衡并維護集群的狀態;上層利用Spark集群來實現對于數據的并行計算和查詢響應,Spark集群包括一個Master主節點和若干個Worker節點,通過主節點來管理Worker節點,用戶提交應用程序啟動Driver進程來觸發集群工作。當接收到一個SQL請求后,啟動Driver進程,Spark查詢引擎生成對應的任務調度,通過數據訪問接口提取HBase中的數據到各Worker節點進行數據并行處理,響應查詢請求。

圖1 Spark-over-HBase架構
Spark-over-HBase架構擴展了HBase數據庫的大規模查詢分析能力,但是對于復雜查詢分析中較常用、開銷較大的Join操作,Spark查詢引擎的處理過程仍然有以下兩方面不足:
(1) 網絡傳輸開銷大。根據文獻[7]可知,因為存儲模式簡單,所以大多數的鍵值數據庫不具備在存儲層進行謂詞下推和投影來過濾數據,掃描數據效率低下。所以,大多情況下Spark查詢引擎需要提取全表數據進行處理。對于Join操作,Spark底層實現了Broadcast Hash Join算法、Repartition Join算法[8]。針對兩個大表進行Join的Repartition Join算法,是將兩表的全表數據進行Shuffle操作[9]。Spark當前采用的Shuffle策略是Sort Based Shuffle,這涉及大量的磁盤I/O開銷和計算開銷,尤其是網絡傳輸開銷。對于寬表或者網絡通信效率低下的情況,該部分開銷占比更大。
(2) 并行度設置不合理。在硬件資源滿足條件的情況下,并行處理大數據量往往能夠極大地提高工作性能。在Spark執行Join查詢時,Join執行的并行度與進行Shuffle操作時重分區的個數有關,即(1)中Reduce任務的個數。Spark提供spark.sql.shuffle.partitions參數來設定Shuffle時分區的個數。一般情況下,各Join實現算法取該參數的默認值。同時,用戶也可根據生產環境配置手動調整該參數,但是手動調整人工代價大且難以做出正確估計,很難給出最佳參數配置。
如何減少網絡傳輸代價、合理設置Join的并行度,提升現有的Spark-over-HBase平臺下的大表Join性能對于大規模數據查詢分析意義重大。
基于此,本文重點從優化算法流程以及動態設定并行度兩方面來提升Join操作性能,具體的貢獻如下:
(1) 借鑒Semi Join算法[11]的思想,首先提取左表Join列數據構建HashMap,利用該HashMap對右表進行過濾,過濾掉右表Join列數據不在HashMap中的元組,即不符合連接條件的元組。通過該預處理流程,可以減少右表參與Shuffle操作的數據量和進行Join匹配的數據量,降低網絡傳輸開銷和相關的CPU開銷。
(2) 考慮集群的CPU核數配置來動態設置Join操作的并行度,盡可能充分利用集群資源,提升Join操作效率。
基于以上的優化方法,本文進行了理論分析和對比實驗驗證。實驗結果顯示,對于兩張表Join列數據不完全匹配的情況,右表與左表匹配數據量越少,本文所提方案優化效果越明顯。
Join操作是大規模查詢分析中最常見且開銷最大的操作之一,在Spark查詢引擎中利用Broadcast Hash Join算法、Shuffle Hash Join算法和Sort Merge Join算法來實現。Broadcast Hash Join算法局限性大,主要適用于兩表數據量相差極大,且小表數據量小于規定閾值的場景,本文不予討論。后兩種算法均屬于Repartition Join,主要是在進行Join之前對兩表數據進行Shuffle操作。Repartition Join主要處理參與Join的兩個表數據量都很大的場景,通過Shuffle操作實現兩表分區數據有效匹配,但是Shuffle操作的磁盤I/O開銷、網絡通信開銷和內存處理開銷很大。
目前,針對各類查詢引擎進行大表Join操作的查詢優化已經有了很多的研究。
文獻[12]提出了基于Hadoop框架的大數據集的Join優化算法,算法主要利用Hadoop的分布式緩沖機制來優化MapReduce框架的Reduce Side Join。對于參與Join操作的兩個表,算法先提取出其中一個表的連接屬性,然后利用Bit-map數據結構壓縮成小數據文件存入磁盤中,通過Hadoop的分布式緩存機制將小數據文件傳輸到各個分布式節點。然后,在Map階段,利用讀取到的小數據文件對另一個表的數據進行過濾,過濾掉不在該小數據文件中的元組,即不滿足Join連接條件的元組。最后,在Reduce階段將兩表連接屬性值相同的元組執行Join操作。該優化算法利用其中一個表的連接屬性對另一個表的數據進行預過濾,可以減少Shuffle階段的數據量,降低網絡傳輸開銷。但是該算法利用Bit-map數據結構進行壓縮,對另一個表數據過濾時存在一定的誤判率,對數據的過濾性不好。而且,該算法需要利用Hadoop的分布式緩存機制將小數據文件存入磁盤,額外增加了I/O開銷,影響了最終的Join優化效果。
文獻[13]提出了一種基于Bloom Filter數據結構的Spark大表等值連接的優化算法。該算法首先對兩張數據表抽取連接屬性并進行去重,然后利用Bloom Filter數據結構對去重后的連接屬性分別進行壓縮得到兩個位數組,對兩個位數組進行“與”運算,生成BF位數組。利用這個BF位數組再分別對兩張表進行過濾,即過濾掉不符合連接條件的記錄。最后,對過濾后的兩張表執行Hash Join算法,得到連接結果。該優化算法利用Bloom Filter數據結構,同樣是過濾掉兩個表中不符合連接條件的元組,減少Shuffle操作的數據量。但是對兩張表的連接屬性進行去重時涉及Shuffle操作,且隨著連接屬性值的增多,該部分開銷隨之增加。而且Bloom Filter的數據結構壓縮效率沒有Bit-map數據結構好,且為了降低誤判率,位數組的長度還需適當增加,如何選定合適的誤判率以及對應的位數組大小仍需進行優化。
文獻[14]在Spark平臺上針對大維表的等值連接提出了優化算法。算法主要包括以下幾步:(1) 對事實表Fact的連接屬性值Key進行去重,得到無重集FactUK,FactUK中元組不僅包括Key鍵,也包括其在Fact表中的存儲位置。(2) 將FactUK與維表Dim進行預連接,執行Partition Join。其中,重分區的個數按照Fact和Dim的大小進行動態設定,并利用一致性哈希算法來進行重分區,避免了因數據傾斜產生的連接負載不均的問題,然后在各個分區上對FactUK和Dim按照Key進行cogroup分組并過濾掉不能匹配上的Key。(3) 將預連接結果按照Fact的分區號進行重分區,在各個分區將預連接結果與Fact表通過zipPartition操作進行組裝,返回完整的連接結果。該優化算法主要在于結合了Partition Join與Semi Join的優勢,對兩表數據進行重分區和預連接,減少了對于事實表全表數據進行重分區的Shuffle開銷,同時也優化了連接執行的并行度、采用一致性哈希來進行數據分區,以此獲得更好的連接性能。但是該算法也增加了對事實表連接屬性值去重的Shuffle開銷,并且該算法假設事實表和維度表的數據可以完全緩存到內存中,而在實際生產環境中,很難將所有數據完全緩存到內存中。
本方案中各分區的連接算法仍采用Sort Merge Join,主要利用Semi Join思想對參與Join的兩表數據進行預處理,并且動態設定Join操作的并行度,以獲得更好的優化效果。假設參與Join的兩個表分別為R表、S表,連接條件為R.A=S.B,其中A為R表key鍵,B為S表列族中的對應列。定義該優化方案為Semi Sort Merge Join,其對應的Join執行流程如下所示。
基于Semi Join算法進行數據預處理的流程如下:
(1) 提取R表Join列。對R表中的元組進行投影,只保留Join列的信息,結果定義為joinSet數據集。因為HBase數據庫中Key鍵的唯一性,所以該Join列數據沒有重復值。并且只提取單列數據,數據量少。
(2) 構建HashMap。對(1)中的joinSet數據集構建HashMap。為了盡可能減少HashMap的內存開銷,設定HashMap的Key鍵為Join列值,value值統一設定為null。
(3) 過濾S表數據。利用(2)中HashMap的Key鍵匹配S表的Join列,對S表數據進行過濾,剔除掉S表中B列數值不包含在HashMap中的元組。
利用Semi Join算法的思想對數據進行預處理,在執行Join操作前,在內存中構建R表Join列數據的HashMap,利用該HashMap對S表數據進行精確過濾,過濾掉S表數據的Join列在R表中沒有相關匹配值的元組,可以減少后續操作中對S表的無用數據進行Shuffle操作的磁盤I/O開銷、網絡開銷和相關的CPU開銷,也減少了后續參與Sort Merge Join的數據量。
對于Repartition Join的實現,Shuffle操作就是按照設定的重分區的個數對R表和S表數據按照Join列數據的Hash值進行重分區。在Spark SQL中,Shuffle操作重分區的個數主要由參數spark.sql.shuffle.partitions決定,默認值是200。因為重分區的個數直接關系到Join操作執行的并行度,所以合理設置重分區的個數尤為重要。如果該數值設定過小,會導致集群處理性能低且資源利用不合理,未發揮集群優勢;如果設定過大,則網絡連接超負荷、任務調度開銷大,也不利于提升集群的處理性能。
所以在本文方案中,考慮利用動態優化的思想,在執行過程中根據集群資源來動態設定重分區的個數,以此來優化Join操作的并行度。定義重分區個數為Partitionnum,其計算公式如下:
Partitionnum=w×corenum
(1)
式中:corenum表示集群中executor實例總的CPU核數。因為集群的計算能力受制于集群中CPU核數的個數[15],因此用w×corenum表示集群資源的限制。如果分區個數等于corenum,即Join操作的并行度等于corenum,w=1,則可能某些運行較快的任務較早運行完,空閑出相應的CPU核;如果設定w過大,則可能任務調度過于頻繁,開銷過大。本文設置多組實驗,設定不同的重分區個數,測試得出w的最優值為2。
綜上所述,優化后該步驟進行Shuffle操作的具體流程如下所示:(1) 通過配置—num-executors、—executor-cores分別獲取Spark集群中每個節點上executor的實例數和每個executor所分配的CPU核數,則集群中executor實例總的CPU核數為兩者的乘積;(2) 按照式(1)設定spark.sql.shuffle.partitions參數;(3) 利用Spark的Sort Based Shuffle策略對兩表數據進行重分區。
通過Shuffle操作,將兩表的數據分為具有相同個數的多個分區,然后對兩表具有相同分區號的分區數據進行合并,執行Sort Merge Join操作,主要流程如下:(1) 在各分區上,對兩表數據按照相同排序規則進行排序;(2) 分別順序遍歷兩表數據,按照Join連接條件進行匹配。
綜上所述,該優化方案的整體執行計劃如圖2所示。(1) 利用TableScan獲取R表數據,通過Map映射Join列數據,并利用CollectAsMap算子構建Join列的HashMap數據結構;(2) 利用TableScan獲取S表數據,通過Map映射好各屬性后,利用R表Join列數據的HashMap對S表數據進行過濾,過濾掉S表Join列數據不在HashMap中的元組;(3) 通過Map映射R表的各屬性;(4) 按照應用配置,計算集群中executor實例總的CPU核數,并根據Join列的Hash值以及2×corenum對R表和S表數據進行重分區;(5) 在對應分區上分別對兩表數據進行Sort Merge Join。

圖2 Semi Sort Merge Join算法執行計劃
Spark并行處理框架是基于分布式共享內存進行計算處理的,即在任務執行過程中,數據是緩存在內存中進行計算處理的,必要情況下需要將中間結果存入磁盤,例如Shuffle操作。為了簡化分析思路,本節方案分析不考慮將中間結果存入磁盤的情況,同時假設內存充足,所有中間結果可以有效緩存在內存中并進行內存計算。
本節的方案分析利用代價模型[16-17]進行代價估算。代價估算基于第2節中的例子,定義代價估計中各參數及參數意義如表1所示。

表1 代價模型中各參數及意義
為了方便分析,定義Filterability(過濾性)表示通過R表的A列對S表進行過濾后,S表過濾掉的元組數占原表元組數的比例,其計算公式如下:
(2)

對于分布式數據處理,主要考慮I/O代價、網絡傳輸代價和CPU的計算代價這三方面,總的代價估計如式(3)所示。
異丙托溴銨聯合布地奈德混懸液霧化吸入治療上呼吸道感染后慢性咳嗽的效果……………………………… 陳衍秋 陳英俊(3)335
Costtotal=CostI/O+CostComm+CostCPU
(3)
I/O代價主要考慮讀取參與Join的表所耗費的時間,因為優化前后均是對數據進行全表掃描,所以該部分代價不進行對比分析。在分布式環境下,網絡傳輸開銷占比較大,相對而言CPU的計算開銷占比很小,所以本節重點分析優化方案的網絡傳輸代價。優化方案的網絡傳輸代價主要包括對R表Join列構建的HashMap的網絡傳輸時間和Shuffle過程中進行數據重分區所耗費的時間,該部分的代價主要與網絡傳輸的數據量有關,數據量越大,網絡傳輸代價越大。具體如式(4)所示。
Costcomm=(sizeof(H)×|H|+
max(sizenR,sizenfilterS))×comm
(4)

本文方案主要是在Spark實現的Sort Merge Join算法基礎上進行改進的,在各分區上仍舊采用Sort Merge Join算法執行Join操作,所以本節主要對比Semi Sort Merge Join算法與Sort Merge Join算法的網絡通信代價,對比情況如下:
Sort Merge Join算法未對數據進行過濾,通過TableScan后直接將兩表數據進行Shuffle操作,涉及的網絡傳輸代價如式(5)所示。
Costcomm=max(sizenR,sizenS)×comm
(5)
在Semi Sort Merge Join算法中,如果對R表Join列投影后行數和寬度都相對較小,而右表S表是寬表且行數較多,則sizeof(H)×|H|的網絡傳輸代價可以忽略不計。此時,過濾性對網絡傳輸代價影響較大,FT越大,其過濾數據量越多,網絡傳輸數據量越少,對應網絡傳輸代價越小。相較于文獻[12-13],如果誤判率較高,對S表數據的過濾性不好,不符合連接條件的數據產生的網絡傳輸量就比較大。但是,如果對R表Join列投影后行數和寬度都相對較大的情況下,sizeof(H)×|H|的網絡傳輸代價不容忽視。例如R表和S表全表匹配的情況下,生成HashMap以及將其進行網絡傳輸的開銷會降低Semi Sort Merge Join的執行效率。
綜上所述,對于R表和S表數據不完全匹配的情況下,Semi Sort Merge Join算法的性能提升主要在于通過R表的Join列構建HashMap后,過濾掉S表不符合連接條件的元組,從而減少Shuffle操作的數據量,減少Shuffle write和Shuffle read的開銷。
本實驗使用三臺服務器搭建HBase集群和Spark集群,服務器的硬件配置以及相關的軟件版本如表2所示。

表2 相關配置及軟件版本
本文的實驗均使用TPC-H Benchmark數據集[18]。TPC基準是被全球數據庫廠商公認的性能評價標準,其中的TPC-H測試基準是一組決策支持基準,可測試系統執行復雜、高并發查詢的能力。
TPC-H數據集總共包括8張表,根據表之間的關聯性,選擇orders表(訂單表)和lineitem表(訂單明細表)來進行Join查詢性能的實驗。實驗數據表使用TPC-H提供的數據生成器生成數據,數據大小由比例系數SF決定的,根據SF的不同大小生成不同數量的測試數據集。
本節實驗主要通過兩表Join查詢來測試Spark查詢引擎提供的Sort Merge Join算法的查詢處理性能以及本文方案的性能。實驗主要利用orders表和lineitem表進行等值連接,連接條件為orders.O_ORDERKEY=lineitem.L_ORDERKEY。
本文設置的每組實驗均執行5次,實驗結果取平均值。
本節主要測試優化方案中利用Semi Join算法進行數據預處理對Sort Merge Join算法的提升效果。實驗對應的lineitem表的行數為1 200萬,為了測試兩表之間匹配程度對算法的影響,設置orders表的數據量分別為1 000、10萬、100萬、150萬、300萬,查詢對應lineitem表中有效匹配元組數分別為4 000、40萬、400萬、600萬、1 200萬。
實驗的結果如圖3所示。其中橫坐標表示orders表的行數,縱坐標表示查詢執行時間,單位為秒。對比兩個算法的執行時間,當orders表行數小于100萬時,可以明顯看到經過Semi Join預處理后Join執行時間更短。而且,隨著orders表行數的減少,對應linetime表中有效匹配元組數逐漸減少,過濾掉的數據量逐漸增多,通過Semi Join算法預處理后對于Spark的Sort Merge Join算法的提升效果更加明顯。但是,隨著orders表數據量逐漸增加,對應linetime表中有效匹配元組數也在增加,過濾掉的數據量也逐漸減少,利用Semi Join進行預處理反而不利于Sort Merge Join的執行,主要是受提取orders表的Join列數據構建HashMap的內存和網絡傳輸開銷的影響。

圖3 Sort Merge Join算法經過Semi Join預處理與未經Semi Join算法預處理響應時間對比
所以,Join之前利用Semi Join算法進行數據預處理主要適用于左表與右表Join列值不完全匹配的場景。右表與左表Join列值匹配數越少,過濾掉的右表數據量越多,則參與Shuffle操作以及Sort Merge Join的數據量越少,對Join操作的性能提升越有幫助,提升效果越顯著。
由4.4節可知,對于左表與右表Join列值不完全匹配的場景,經過Semi Join預處理后,Sort Merge Join算法的提升效率顯著。在此基礎上,進一步測試Join操作的并行度對Sort Merge Join算法的提升效果,即對比本文所定義的Semi Sort Merge Join算法的完整優化方法與Sort Merge Join算法的性能。
在本節實驗中,選用100萬行的orders表和1 200萬行的lineitem表進行實驗。在搭建的Spark集群中,集群Worker節點數為2,每個節點上分配2個executor,對每個executor分配2個核,則集群中的總核數為8。在實驗設計中,調整參數spark.sql.shuffle.partitions來設定shuffle時不同的重分區個數,從而影響到Join操作的并行度,以此來測試Join操作的并行度對執行時間的影響,結果如圖4所示。

圖4 Join操作并行度對查詢時間的影響
可以看出,當設定重分區個數為16,即Join操作的并行度為16時,Semi Sort Merge Join算法執行時間最短。所以,當設定重分區個數為Spark集群的CPU核數的2倍時,Join執行性能最優。因此,對于2.2節中的式(1),設定w系數為2。
4.4節中僅測試經過Semi Join預處理后對于Sort Merge Join的提升效率,未測試Join操作的并行度優化對算法的影響。在100萬行的orders表與1 200萬行的lineitem表進行Join操作的場景中,未經過Semi Join預處理的執行時間為266 s,經過Semi Join預處理的執行時間為249 s,優化后算法性能提升了6.39%。在本節中進一步設置好最佳并行度后,Semi Sort Merge Join算法的執行時間為222 s,相較于未經過Semi Join預處理的Sort Merge Join算法性能提升了16.54%。可見,通過動態設定連接并行度對Join操作的查詢也有很大的幫助。
Spark支持大規模數據處理,對任務進行分布式并行執行。但其涉及開銷較大的Join操作,一直是大數據查詢分析的瓶頸。本文對Spark現有的大表Join實現算法進行了研究,發現其未考慮兩表Join列數據匹配關系對Shuffle操作的影響。因此,本文基于Semi Join,根據兩表Join列之間的匹配關系,提出了一種改進的Join實現算法。該算法利用左表Join列數據所構建的HashMap對右表數據進行過濾,主要適用于兩表Join列數據不完全匹配的情況,且右表與左表匹配數據量越少,該算法優化效果越明顯。