楊彥彬,干禎輝
(1.中通服軟件科技有限公司,上海 200000;2.杭州東方通信軟件技術有限公司,浙江 杭州 310013)
隨著大數據時代到來,關系型數據結構逐漸被NOSQL所替代。但是,由于傳統SQL語法方便易學且普及率高,因此直接廢棄難度很大,最終以Hive SQL及Spark SQL為代表的大數據組件也轉向支持傳統SQL。這些組件提供了SQL解析為分布式運算引擎的功能,但在如何提升執行效率方面則沒有更多論述。本文以此為切入點,一方面討論了SQL遷移后出現的問題原因,另一方面給出了簡單實用的解決技巧,以利于在未來生產實踐中推廣。
Spark是一個日常數據處理框架,它在接受job的時候,內部會對其進行細致劃分,分為邏輯執行計劃和物理執行。邏輯執行計劃是將一個RDD切分成不同的Stage,并產生一系列依賴關系,也就是Task之間窄依賴和寬依賴,其中寬依賴部分形成了Shuffle[1]。大部分Shuffle處后續會切分成多個Stage提交節點后執行Action操作,稱之為物理執行。
Spark的Task執行可以分為兩種計算形式:流水線性計算和非流水線性計算,前者直接計算完成,有效減少內存空間,典型的是filter()或者map()等操作。而后者則需要借助內存空間完成,典型的是Groupbykey()或者Reducebykey()等操作。因此流水線性計算速度要快于非流水線性計算。圖1是Spark整體轉換流程[2]。

圖1 Spark整體轉換流程
Hive SQL及Spark SQL這些組件出現,實現了將收集后的海量數據按照原有業務模型進行計算的可能,但是這個過程中也帶來了很多問題,最典型的無疑是數據傾斜。所謂數據傾斜,即海量數據的主鍵執行一對多關聯后由于分配節點計算量不均勻,導致一個節點還在執行計算時候,其他節點已經完成,都在等待該節點結束運行[3]。圖2左側就是數據傾斜的原因圖示,明顯節點1計算量遠大于節點2和3。數據傾斜在實際工作當中的外在表現是某一個Task進度長時間徘徊在99%左右。而在最終結果集WEB UI中明顯看到某節點執行時間與其他差異。圖2右側WEB UI中,紅框的節點計算時間遠大于其他節點。

圖2 數據傾斜產生原因和表現
常規優化SQL手段就是簡化其復雜程度,將聚合、分組函數多次拆分,形成若干個簡單SQL,以此降低Task之間的join操作,同時單個SQL盡量利用流水線模式加快計算速度。但是該方法對數據傾斜幾乎產生不了實質作用,因為簡化的SQL的無法解決數據分布不均的問題。
數據傾斜產生的核心原因在于相同的業務主鍵聚集于一個計算節點,這是分布式計算模型特點所決定的。因此如果能將主鍵打散,并以打散的主鍵進行數據關聯,通常是首選解決方案。實踐當中我們一般將主鍵按照一定規則編碼,形成新主鍵,并進行關聯。圖3描述了主鍵規則編碼前后的變化。左側以10000作為主鍵,各節點分布不均。右側則是通過主鍵編碼:分別形成10000-1、10000-2、10000-3,此時任務被均勻分布到各個節點。但是需要指出,該方法也會增加任務分區和Task數量,加大了資源調度難度。因此使用時要進行斟酌[4]。

圖3 按規則編碼主鍵前后的變化
另外,某些時候即使采用主鍵編碼也很難解決Spark在最后階段Reduce過程中的傾斜,因而在此基礎上需要配合廣播join持續優化。
廣播join的實質在于將較小的表通過Driver端分發到各計算節點,將原來計算方 式,即各個分區計算完成后再與小表進行join操作變化為小表直接在分區join,從而避 免了海量數據主鍵在最后階段Reduce過程時一對多出現場景[5]。圖4描述了廣播join的原理。

圖4 廣播join的實現原理
典型的廣播join用在Hive表,一般能夠提前確認表大小,避免廣播后出現錯誤。在非Hive表上則需要通過強制廣播join實現,Spark通過broadcast()方法來完成。但是由于Spark無法提前確認分發表大小,在使用該方法的時候,當Driver端內存不足會出現OOM現象。同時過大的表亦不適合廣播join方法。因此使用前盡量確認分發表大小。
使用前述方法優化之后性能一般有明顯提升。圖5是優化前后比對圖。以1.8億數據量測試,50 G內存提交。優化前計算用時1 229 585 ms約等于20.5 min,優化后用時877 460 ms約等于14.6 min,相比優化前提速約30%。圖5紅圈是同一個計算節點,未優化之前明顯的數據傾斜,優化之后Task分布更為均勻,數據傾斜也相應消失。

圖5 優化前后的對比
隨著各類技術的不斷研究,相信在不久的將來會出現更多基于大數據的SQL優化方法和手段,為進一步提升大數據計算應用提供了堅實的保證。