摘 要:針對電信大數據在流動人口統計中的處理需求,采用Intel?Hadoop發行版,設計Hive數據倉庫并進行優化,重點對性能影響較大的join連接和數據傾斜問題進行了優化。實驗表明,對于TB級數據,簡單統計如count、sum等可在10分鐘以內完成,聚合統計如join、group by等可在30分鐘左右完成,能有效支撐大數據環境下的流動人口統計和監測。
關鍵詞:Hive;優化;join;數據傾斜
中圖分類號:TP301 文獻標識碼:A
1 引言(Introduction)
電信運營商在移動通信業務運營過程中,獲取了大量客觀、真實的用戶歷史數據,這些歷史數據可以客觀反映用戶的消費行為,也可以反映影響用戶消費行為的內外部因素的變化情況[1]。根據移動通信客戶的來話與去話等話務信息,結合客戶身份資料,可以實現對特定區域人口的流入、流出情況及流動類型等進行分析。
然而,基于移動通信數據的流動人口統計面臨諸多挑戰:①數據源多樣化:CDR(語音、SMS、GPRS、3G、4G等)、計費信息、客戶信息、基站參數等;②數據量大:高達360TB原始數據(某省電信公司);③數據增長快速:2TB/天。通信數據呈現出大數據的特征,既有的技術架構和路線,已不能處理如此海量的電信數據。
近年來,涌現出了眾多的大數據處理架構,其中Hadoop開源架構應用最廣泛,在移動、電信等部門通過部署Hadoop架構開展電信大數據服務取得了一定的成效。本文針對電信大數據在流動人口統計中的處理需求,采用Intel?Hadoop發行版,設計Hive數據倉庫并進行優化,重點對性能影響較大的join連接和數據傾斜問題進行了優化,實現海量數據的高效查詢和統計,滿足流動人口的快速統計和分析。
2 Hive數據倉庫設計(Hive data warehouse design)
移動通信大數據的流動人口業務需求分析:移動通信數據的抽取、轉換和導入;基于日、月、年的報表統計和數據規模;數據倉庫30TB數據。現方案采用10臺服務器,以實現數據的高速裝載、查詢和統計分析,如圖1所示。
圖1 Hive數據倉庫設計
Fig.1 Hive data warehouse design
Hive是一個建立在Hadoop之上的數據倉庫,用于查詢和分析結構化海量數據。采用HDFS進行數據存儲和Map/Reduce進行數據操作。基本特點包括:
(1)提供類似于SQL的查詢語言。
(2)高擴展性(scale-out),動態擴容無須停機。
(3)針對海量數據的高性能查詢和分析系統。
(4)提供靈活的擴展性。
(5)復雜數據類型,擴展函數和腳本等。
在運行count、sum等聚合函數進行統計計算時發現,將數據從普通數據庫導入Hive中,分區的個數以及各分區數據量的均衡性會影響Hive的性能。解決辦法就是給導入的表增加一個自增的int類型的字段,用這個字段來進行數據分割,最后得到的分區就是均衡的,如圖2所示。
圖2 數據分區
Fig.2 Data partition
3 Hive性能分析和優化(Hive performance analysis
and optimization)
Hadoop的分配優化主要包含以下三個層面:①底層Map和Reduce的參數調優;②Hive內部邏輯優化;③SQL代碼邏輯優化。
3.1 Map/Reduce端的優化
Map/Reduce端的優化主要通過分析各個可調參數在Map/Reduce任務運行過程中起到的作用,通過改變參數大小優化底層分配策略。
表1 Map side調優參數表
Tab.1 Tuning parameter table of map side
選項 類型 默認值 描述
io.sort.mb Int 100 緩存Map中間結果的buffer大小(in MB)
io.sort.record.percent float 0.05 io.sort.mb中間來保存Map output記錄邊界的百分比
io.sort.spill.percent float 0.80 Map開始做spill操作的閥值
io.sort.factor Int 10 做merge操作時同時操作的stream數的上限
min.num.spill.for.combine Int 3 Combiner函數運行的最小spill數
Mapred.compress.map.output Boolean False Map中間結果是否采用壓縮
Mapred.map.output.compression.codec Class name Org.apache.Haddoop.io.compress.defaultcodec Map中間結果的壓縮格式
表2 Reduce side調優參數表
Tab.2 Tuning parameter table of Reduce side
選項 類型 默認值 描述
Mapred.reduce.parallel.copies Int 5 每個Reduce并行下載Map結果的最大線程數
Mapred.reduce.copy.backoff Int 300 Reduce下載線程最大等待時間(in sec)
io.sort.factor Int 10 同上
Mapred.job.shuffle.input.buffer.percent Float 0.7 用來緩存shuffle數據的Reduce task heap百分比
Mapred.job.reduce.input.buffer.percent Float 0.0 Sort完成后Reduce計算階段緩存數據的百分比
Mapred.job.shuffle.merge.percent Float 0.66 緩存占內存多少百分比后做merge操作
3.2 Hive內部邏輯優化和代碼邏輯優化
Hive使用HQL(Hibernate Query Language),HQL不僅提供了類似標準SQL語句的查詢方式,而且提供更加豐富靈活、更為強大的查詢能力,允許用戶自定義Mapper和Reducer來處理更為復雜的查詢分析任務。導致Hive性能不佳的原因有兩個:①沒有索引支持,查詢需要暴力掃描全表;②在處理小量數據時Map/Reduce框架耗費資源比例過大,即Map/Reduce框架本身具有較高的延遲,導致基于此框架下的HQL查詢也體現高延遲性。優化思路:
由于Hive的HQL語言是自動轉化為Map/Reduce程序進行執行的。每個job對應一個Map/Reduce框架,所以盡可能減少job的個數可以減少執行時間。
Map/Reduce有其數據特性,Hive也有優化約定,所以編寫Hive語言時需注意一些規則,才能提高查詢效率。
本文對性能影響較大的join多表連接和數據傾斜等問題實施優化。
3.2.1 join優化
Hive只支持等值連接(equality joins)、外連接(outer joins)和左半連接(left semi joins)。
join時,每次Map/Reduce任務的執行過程如下:reducer會緩存join序列中前面所有表的記錄,然后通過最后一個表將結果序列化到HDFS。這有助于減少在reduce端內存的使用量。無論是外關聯outer join還是內關聯inner join,如果join的key相同,無論jion多少個表都會合并成一個Map/Reduce任務。
查詢時,應該盡量將小表放在join的左邊,否則會因為緩存浪費大量內存。例如:
SELECT x.val,y.val,z.val FROM x JOIN y ON(x.key=y.key1)JOIN z ON (z.key=y.key1)
三個表使用同一個join key,生成一次Map/Reduce任務計算。Reduce端先緩存x表和y表的記錄,然后每次取得z表中的一個記錄就計算一次join結果。
兩次Map/Reduce任務:
SELECT x.val,y.val,z.val FROM x JOIN y ON(x.key=y.key1)JOIN z ON (z.key=y.key2)
生成兩次Map/Reduce任務:第一次緩存x表,用y表序列化;第二次緩存第一次Map/Reduce計算的結果,然后用z表序列化。
Hive不支持where子句中的子查詢,SQL常用的IN/EXISTS子句需要改寫。IN/EXISTS子查詢在HIVE中一個更高效的實現是利用LEFT SEMI JOIN重寫子查詢語句。LEFT SEMI JOIN的限制是,JOIN右邊的表不能在WHERE子句、SELECT子句或其他地方過濾,只能在ON子句中設置過濾條件。
SELECT x.key, x.value FROM x WHERE x.key in (SELECT y.key FROM y)
被重寫為:
SELECT x.key, x.val FROM x LEFT SEMI JOIN y on (x.key=y.key)
對于多個子查詢SQL無關且計算量過大的SQL,可以開啟并行執行MR任務,減少計算壓力。
hive.exec.parallel[=true]
hive.exec.parallel.thread.number[=8]
hive.exec.parallel可以控制一個SQL中多個可并行執行的job的運行方式。當hive.exec.parallel為true的時候,同一個SQL中可以并行執行的job會并發的執行。參數hive.exec.parallel.thread.number就是控制對于同一個SQL來說同時可以運行的job的最大值,該參數默認為8。此時最大可以同時運行8個job。通過修改參數hive.exec.parallel和hive.exec.parallel.thread.number測試不同情況的執行速度,實現性能優化和負載均衡。
3.2.2 數據傾斜問題的解決
數據傾斜表現:任務進度長時間維持在99%(或100%),查看任務監控頁面,發現只有少量Reduce子任務未完成。因為其處理的數據量和其他Reduce差異過大。單一Reduce的記錄數與平均記錄數差異過大,通常可能達到三倍甚至更多。最長時長遠大于平均時長。造成數據傾斜的主要原因:①key分布不均勻;②業務數據本身的特性;③建表時考慮不周;④某些SQL語句本身就有數據傾斜。
表3 數據傾斜
Tab.3 Data skew
關鍵詞 情形 后果
Join 其中一個表較小,但是key集中 分發到某一個或幾個Reduce上的數據遠高于平均值
大表與大表,但是分桶的判斷字段0值或空值過多 這些空值都由一個Reduce處理,非常慢
group by group by維度過小,
某特殊值過多 處理某值的Reduce非常耗時
Count Distinct 某值的數量過多 處理此特殊值的Reduce耗時
(1)參數調節
hive.map.aggr=true
Map 端部分聚合,相當于Combiner。
hive.groupby.skewindata=true
數據傾斜聚合優化,設置參數hive.groupby.skewindata=true,控制生成兩個MR Job,第一個MR Job中,Map的輸出結果會隨機分配到reduce做一次預匯總,減少某些key值條數過多或某些key值條數過少而造成的數據傾斜問題。
(2)SQL語句調節
如何Join:關于驅動表的選取,選用join key分布最均勻的表作為驅動表,做好列裁剪和filter操作,以達到兩表做join的時候,數據量相對變小的效果。
大小表Join:使用map join讓小的維度表先進內存。在map端完成reduce。
大表Join大表:把空值的key變成一個字符串加上隨機數,把傾斜的數據分到不同的reduce上,由于null值關聯不上,處理后并不影響最終結果。
count distinct大量相同特殊值:count distinct時,將特殊值單獨處理。如果還有其他計算需要進行group by,可以先將特殊值的記錄單獨處理,再和其他計算結果進行union。
group by維度過小:采用sum() group by的方式來替換count(distinct)完成計算。
舉例:空值產生的數據傾斜
日志中,常會有信息丟失的問題,比如log中的user_id和users表中的user_id關聯,會碰到數據傾斜的問題。
解決方法1:user_id為空的不參與關聯
select * from log x join users y on x.user_id is not null and x.user_id=y.user_id union all select * from log x where x.user_id is null;
解決方法2:空值的key變成一個字符串加上隨機數形成新的key值
select * from log x left outer join users y on case when x.user_id is null then concat(‘hive,rand() ) else x.user_id end=y.user_id;
方法2比方法1效率更高,IO和作業數都少了。方法1中log讀取兩次,jobs數是2,方法2中job數是1。以上優化方法適合無效id,比如-99,“”,null等產生的傾斜問題。
4 結論(Conclusion)
針對移動通信數據中的流動人口統計業務需求,設計Hive數據倉庫并進行優化,重點對性能影響較大的join連接和數據傾斜問題進行了優化,實現了數據的高速查詢和統計。簡單統計,如count,sum等10分鐘以內完成;聚合統計,如join、group by等30分鐘左右完成,高效完成了流動人口的統計。
參考文獻(References)
[1] 智勇.基于移動通信信息資源的人口流動趨勢研究[J].山東社 會科學,2013(5):102-105.
[2] 王大力.基于移動通信數據處理的公安流動人口管理系統設 計與研究[D].同濟大學,2012.
[3] 朱珠.基于Hadoop的海量數據處理模型研究和應用[D].北京 郵電大學,2011.
作者簡介:
周天綺(1976-),男,碩士,講師.研究領域:大數據處理.