徐劍等
摘 要: Hadoop系統在處理多表鏈接問題時,每輪都會將大量的中間結果寫入本地磁盤,從而嚴重降低了系統的處理效率。為解決該問題,提出一種“替換?查詢”方法,該方法通過對鏈接表建立索引,將預輸出的元組集替換為索引信息輸出到中間結果,以索引的形式參與多表鏈接,以此減少中間結果的I/O代價。運用緩沖池、二次排序和多線程技術對索引信息進行優化管理,加快索引查詢速度。最后在TPC?H數據集上,設計了與原Hadoop的對比實驗,結果表明該方法可減少35.5%的存儲空間,提高12.9%的運行效率。
關鍵詞: 多表鏈接; 替換?查詢; 索引; 緩沖池; 二次排序
中圖分類號: TN911?34; TP311 文獻標識碼: A 文章編號: 1004?373X(2014)06?0090?05
0 引 言
隨著互聯網應用的快速發展,海量數據的存儲與處理成為研究人員面臨的嚴峻挑戰。近年來,谷歌提出的分布式文件系統GFS、并行編程框架Map?Reduce。基于此,開源社區Apache的Hadoop項目實現了分布式文件系統HDFS和并行編程框架Hadoop Map?Reduce。Hadoop因其良好的可擴展性、高可用性以及容錯性,廣泛地應用在IBM,百度,360,阿里巴巴等互聯網公司的海量數據存儲與分析應用中,是目前使用最為廣泛的云計算平臺之一。
針對分布式計算環境中多表鏈接運算的問題,文獻[1?2]提出一種在單個Map?Reduce任務中同時鏈接多個關系表的優化算法,文獻[3]提出一種基于預排序的多表鏈接算法。這些算法都存在中間結果快速增加的問題,一方面,為了防止鏈接屬性缺失現象的出現,本次鏈接要盡可能的將下一步或者下幾步鏈接相關的屬性信息輸出到中間結果;另一方面,每一步鏈接操作之后,中間結果的屬性列增加,伴隨著鏈接次數的增加,輸出到中間結果的屬性列越來越多;最后,子鏈接的中間結果要被多次復制,溢寫磁盤或者網絡傳輸到其他節點。這些因素使得中間結果快速膨脹,從而導致大量的內存占用,過多的磁盤溢寫和不必要的網絡傳輸代價,嚴重降低了系統性能。
為了解決多表鏈接中間結果快速增加的問題,本文提出“替換?查詢”方法,該方法具體如下,在每一次子鏈接結果集輸出到中間結果之前,系統對結果集中的記錄元組建立非侵入式的Hadoop分布式文件索引(Non?invasive Index of Hadoop Distributed File System,NIHDFS),在保證結果集信息完整性的前提下,用索引替代記錄元組信息并輸出到中間結果,記錄元組以索引的形式參與以后的鏈接過程;當需要記錄元組信息時,該方法能夠通過查詢操作快速的實現元組信息恢復。在索引的平均長度比記錄元組的平均長度短或者鏈接表的數量較多的情況下,該方法能夠有效地減少中間結果的數據規模并降低其增長速度。
在Hadoop?1.1.2上,修改系統源碼并實現“替換?查詢”多步鏈接的處理方法,本文將改造后的系統命名為Hadoop?1.1.2?NIHDFS。在與原系統的對比實驗中,“替換?查詢”的多表鏈接處理方法,能夠有效的提升Hadoop處理多表鏈接的性能。
1 Hadoop?1.1.2?NIHDFS系統整體設計
原Hadoop在處理多表鏈接問題時,Map階段對輸入的記錄元組進行鍵?值解析,解析出真實的記錄元組之后便直接進行Map端的鏈接操作。Hadoop?1.1.2?NIHDFS系統則不然,在執行Map端鏈接操作之前,系統要對輸入的信息依照“替換?查詢”方法進行處理。該處理包含3個部分,分別是對鏈接表建立NIHDFS索引,替換鏈接表元組信息以及元組信息的快速查詢。他們分別由索引化模塊,替換模塊和查詢模塊來完成,如圖1所示。
T3是原始鏈接表,當該鏈接表進入系統后,先后進行索引化和替換操作,并用記錄元組的索引信息和本次鏈接的屬性信息替換原始記錄元組,然后再進行Map端鏈接操作。Output_1是子鏈接的鏈接結果,IndexT1_1表示鏈接表T1中某個記錄元組的索引,IndexT1_1:IndexT2_1表示鏈接表T1和T2的一條鏈接結果,其中只存儲著2條記錄對應的索引信息。當Output_1作為該系統的輸入時,該系統需要對其中的索引信息進行查詢,恢復出原始記錄元組并進行解析,然后再進行Map端的鏈接操作。Map端和Reduce端鏈接操作的操作內容由具體的多表鏈接算法來決定。
2 NIHDFS索引
HDFS(Hadoop Distributed File System)是被設計成適合運行在通用硬件上的分布式文件系統。本文以HDFS為基礎,建立非侵入式的Hadoop分布式文件系統索引,即NIHDFS。它是根據鏈接表各記錄元組在HDFS存儲的物理位置信息建立起來的預鏈接索引。
NIHDFS主要包含文件的統一資源定位符(File URL),數據塊間偏移量(Block Offset),塊內記錄偏移量(Record Offset),以及記錄元組長度(Record Length)4個部分,結構如圖2所示。統一資源定位符(File URL)是文件在HDFS中存儲的邏輯路徑,該路徑可以選擇系統默認值,也可以由用戶定義。數據塊偏移量(Block Offset)是索引對應記錄所在的數據塊在文件中的起始偏移量。針對特定數據塊中的不同記錄,它們的長度和存儲的物理位置各不相同,本文用塊內記錄偏移量(Record Offset)來存儲記錄在數據塊中的位置信息。記錄元組長度(Record Length)指記錄元組的大小。
如在HDFS中存儲著一個維表,該表在HDFS中的邏輯路徑是URL;它被HDFS切分成3個數據塊,分別是Block_A,Block_B,Block_C,起始的偏移量分別是Offset_A,Offset_B,Offset_C;某條記錄存儲在Block_B中,塊內偏移量是Offset_rc,該記錄所對應索引是“URL+ Offset_B + Offset_rc + Record_tmp.Length”。
由于圖2所示的索引中只包含一個數據塊信息,當維表中的記錄元組被切分到兩個數據塊上時,根據圖2建立的索引只能夠恢復記錄元組的部分信息,這顯然是不正確的。這種情況下,本文特別設計了如圖3所示的索引,該索引中包含有2個數據塊偏移量,塊內記錄元組偏移量和記錄元組的長度信息,因此它能夠完整的存儲記錄的位置信息。由于記錄元組的后半部分數據存儲在第2個數據塊的起始位置上,所以記錄在第2個數據塊的塊內記錄偏移量的值是零。
3 基于NIHDFS的“替換?查詢”方法
“替換?查詢”的多表鏈接處理是通過索引化,替換和查詢3個模塊來實現的,如圖1所示。
3.1 索引化
在處理多表鏈接問題時,Map?Reduce任務的輸入有2種類型,一種是前面某個Map?Reduce任務產生的鏈接結果,一種是原始鏈接表。由于鏈接結果中存儲的是記錄元組的索引信息,所以無需進行索引化操作。當輸入是原始鏈接表時,要根據NIHDFS索引結構對表中的記錄元組建立索引。當被索引化的記錄元組被切分到2個數據塊時,建立圖3中所示的索引。
3.2 替換
根據索引化過程中對記錄元組建立的NIHDFS索引,用索引信息和本次鏈接的鏈接屬性組成的鍵?值對替換掉原始的記錄元組,并保證替換操作的正確性和索引信息的完整性。在之后的鏈接操作中,如其他經典的多表鏈接算法一樣,鏈接屬性主導著整個鏈接過程,索引信息作為附屬信息或被輸出到中間結果,或被發送到其他的節點。
3.3 查詢
當Map?Reduce任務的輸入是鏈接結果時,因為鏈接結果中存儲的是記錄元組的一系列索引信息,并不包含本次鏈接的鏈接屬性信息。此時要對NIHDFS索引執行查詢操作,到達獲取鏈接屬性信息的目的,系統將啟動專門線程來完成該查詢操作。
查詢過程如圖4所示,TaskTracker是多表鏈接操作的任務執行節點,該節點中運行著多個Map或Reduce任務,這些任務提出查詢請求并將該請求和索引信息打包發送到系統的文件管理中心,即NameNode。
NameNode對索引信息進行解析,獲取文件名;查詢HDFS中隸屬于該文件的所有數據塊及副本信息,并從中隨機選擇一個數據塊來響應查詢請求;將該數據塊的ID以及NIHDFS索引信息發送給數據塊所在的機器節點,即查詢服務節點。
查詢服務節點收到查詢請求信息之后,首先根據請求信息中的數據塊ID和路徑信息在本地定位到該數據塊;其次,對NIHDFS索引信息進行解析,依據塊內偏移量,準確定位記錄元組的物理位置;最后,根據記錄元組的起始位置和記錄長度,獲取完整的元組信息,并將其回送到請求節點。查詢請求節點對接受到的記錄元組進行解析,從中獲取本次鏈接所需的鏈接屬性信息,從而完成整個查詢流程。
當遇到如圖4所示的索引時,需進行兩次查詢操作,將兩次查詢結果按照先后順序整合起來,得到完整的記錄元祖信息。
4 整體系統優化
4.1 NIHDFS優化
在對NIHDFS索引執行查詢操作是,File URL是不可或缺的。隸屬于同一個文件的所有記錄元組都對應著一個相同的File URL,這導致File URL被多次重復存儲,由于File URL相對較長,多次重復存儲所占用的磁盤空間是不可忽略的。為了解決該問題,本文用哈希方法把文件標示符集合S與File URL集合做成一一映射,用文件標示符S替代File URL,從而減少File URL長度,降低磁盤消耗。
采用預處理添加分隔符的方法對索引中的塊內記錄元組長度信息(Record Length)進行優化,具體方法如下,在上傳HDFS之前,對鏈接表進行預處理。一方面,將一條完整的記錄元組信息存儲為一行,行與行之間的回車換行符作為元組與元組之間的分隔符,用分隔符替代塊內記錄長度。當元組被載入內存進行索引化操作或者查詢服務節點響應查詢請求獲取記錄元組時,都以分隔符作為元組記錄結束的標志,而不必關心各記錄元組的長度。將記錄長度信息從索引中去掉,達到簡化索引結構的目的。另一方面,在不同屬性之間添加分隔符,降低Map?Reduce任務解析屬性信息的難度。
4.2 查詢過程優化
4.2.1 緩沖池法
鏈接過程中,部分記錄元組在短時間內會被多次重復查詢,每次都要執行完整的查詢過程,這是不必要的。本文采用緩沖池法對其進行優化,經常被查詢的記錄元組存放于內存緩存池中,當查詢請求到來時,先檢查緩存池中是否存在該記錄元組。若存在,則直接從緩存池中提取,否則才執行完整的查詢操作,獲取元組信息。采用LRU作為緩存池中記錄元組信息的更新算法。
4.2.2 二次排序法
在數據塊中定位記錄元組時,文件指針的移動操作是很頻繁的。由于文件指針的單向向下的移動特性,當訪問的內容不是按照偏移量由小到大有序時,指針無效移動次數會相當高。本文采用二次排序的方法對系統進行優化。在原Hadoop系統上添加二次排序功能模塊,用以對中間結果中索引信息進行排序。排序策略如下:依照數據塊間偏移量的大小,對數據塊從小到大進行排序,即一次排序;同一個數據塊內的記錄,依照塊內記錄元組偏移量的大小,按從小到大順序進行排序,即二次排序。有序的中間結果能夠大幅減少查詢的過程中文件指針無效移動的次數,從而提高定位效率。由于Map?Reduce過程中會對記錄元組進行排序,二次排序在此基礎上進行,開銷不大。
4.2.3 并行執行
鏈接過程中,查詢記錄所在的物理位置集中
于數據塊的某些區域,并且在一定的時間范圍內該區域被查詢訪問的頻率很高,本文把這樣的區域稱為查詢熱區。采用并行執行的思想,把單個數據塊查詢熱區的數量上限設置為M,對于含有N個查詢熱區的數據塊來說,如果N≤M,那么就開啟N個線程,每一個線程中設置一個指向對應熱區的文件指針,該指針只響應本熱區的記錄元組查詢請求。當N>M時,采用隨機的方法選取兩個相鄰的熱區并將其合并成一個熱區,同時對N執行減一操作,直到N≤M,然后采取與N≤M時相同的操作。查詢熱區的并行執行方法,一方面使得數據塊擁有同時響應多個查詢請求的能力,提升查詢操作的并行度,在查詢請求集中爆發時依然能夠保證查詢效率;另一方面,它將指針的定位操作集中到數據塊的查詢熱區上,從而避免了熱區之間大量的無效指針移位,提升定位效率。
5 實驗部分
5.1 實驗環境
本實驗運行在由五臺服務器搭建的集群上,機器均為同等配置。服務器硬件環境是:1.6 GHz的八核CPU,16 GB內存,150 GB硬盤。軟件環境:ubuntu10.10,JDK1.6,Hadoop?1.1.2,以及Hadoop?1.1.2?NDFSI。實驗數據是TPC?H的基準數據集,選用nation, region,customer,orders四張表作為輸入數據,數據規模為397.6M。
5.2 實驗內容以及結果分析
兩個系統分別執行了四張表的鏈接操作,鏈接的具體過程如圖5所示。
鏈接算法是經典的Reduce Join算法,兩個系統分別執行了五遍該算法,將各個階段的統計信息整合求取平均值,并將其填入表1中。由于Hadoop?1.1.2?NIHDFS系統的最終結果是一系列NIHDFS索引信息,需要將其恢復為記錄元組,該階段消耗的時間是t4,產生的中間結果是size4,原系統在該階段不需要做任何操作。
Hadoop?1.1.2?NIHDFS系統產生的中間結果size1,size2,size3遠比原系統要小,鏈接操作所消耗的時間也比原系統要小很多,空間和時間消耗對比如圖6和圖7所示。與原系統相比,新系統空間性能提升35.5%,時間性能提升12.9%,時間和空間整體對比如圖8所示。本實驗表明,“替換?查詢”的多表鏈接處理方法能夠有效地減少中間結果的數據規模和增長速度,提升系統效率。
6 結 語
本文通過對記錄元組建立NIHDFS索引,用索引替代冗長的記錄元組信息參與鏈接,并對索引結構進行優化,從而大大減少中間結果的數據規模。
運用緩沖池,二次排序以及并行執行的方法對查詢過程進行優化,該方法適用于鏈接表屬性較多,記錄元組長度較長的場合。
參考文獻
[1] AFRATI F, ULLMAN J. Optimizing joins in a map?reduce environment [C]//Proceedings of 2010 EDBT. New York: ACM, 2010: 99?110.
[2] JIANG Da?wei, TUNG A, CHEN Gang. Map?join?reduce: towards scalable and efficient data analysis on large clusters [J]. IEEE Transactions on Knowledge and Data Engineering, 2010, 23(9): 1299?1311.
[3] LIN Yu?ting, AGRAWAL D, CHEN Chun, et a1. Llama: leveraging columnar storage for scalable join [C]// Proceedings of 2011 ACM SIGMOD International Conference on Management of Data. New York: ACM, 2011: 861?972.
[4] 趙保學,李戰懷,陳群,等.基于共享的MapReduce多查詢優化技術[J].計算機應用研究,2013(5):74?76.
[5] 趙保學,李戰懷,陳群,等.可擴展Hadoop任務分配模塊的研究與實現[C]//第29屆中國數據庫學術會議論文集(B輯)(NDBC2012).合肥:知識與數據工程實驗室,2012:83?85.
[6] 林大云.基于Hadoop的微博信息挖掘[J].計算機光盤軟件與應用,2012(1):7?8.
[7] DEAN J, GHEMAWAT S. MapReduce: simplified data processing on large cluster [J]. Communications of the ACM, 2008, 51(1): 107?113.
[8] 王珊,王會舉,覃雄派,等.架構大數據:挑戰、現狀與展望[J].計算機學報,2011,34(10):1741?1751.
5 實驗部分
5.1 實驗環境
本實驗運行在由五臺服務器搭建的集群上,機器均為同等配置。服務器硬件環境是:1.6 GHz的八核CPU,16 GB內存,150 GB硬盤。軟件環境:ubuntu10.10,JDK1.6,Hadoop?1.1.2,以及Hadoop?1.1.2?NDFSI。實驗數據是TPC?H的基準數據集,選用nation, region,customer,orders四張表作為輸入數據,數據規模為397.6M。
5.2 實驗內容以及結果分析
兩個系統分別執行了四張表的鏈接操作,鏈接的具體過程如圖5所示。
鏈接算法是經典的Reduce Join算法,兩個系統分別執行了五遍該算法,將各個階段的統計信息整合求取平均值,并將其填入表1中。由于Hadoop?1.1.2?NIHDFS系統的最終結果是一系列NIHDFS索引信息,需要將其恢復為記錄元組,該階段消耗的時間是t4,產生的中間結果是size4,原系統在該階段不需要做任何操作。
Hadoop?1.1.2?NIHDFS系統產生的中間結果size1,size2,size3遠比原系統要小,鏈接操作所消耗的時間也比原系統要小很多,空間和時間消耗對比如圖6和圖7所示。與原系統相比,新系統空間性能提升35.5%,時間性能提升12.9%,時間和空間整體對比如圖8所示。本實驗表明,“替換?查詢”的多表鏈接處理方法能夠有效地減少中間結果的數據規模和增長速度,提升系統效率。
6 結 語
本文通過對記錄元組建立NIHDFS索引,用索引替代冗長的記錄元組信息參與鏈接,并對索引結構進行優化,從而大大減少中間結果的數據規模。
運用緩沖池,二次排序以及并行執行的方法對查詢過程進行優化,該方法適用于鏈接表屬性較多,記錄元組長度較長的場合。
參考文獻
[1] AFRATI F, ULLMAN J. Optimizing joins in a map?reduce environment [C]//Proceedings of 2010 EDBT. New York: ACM, 2010: 99?110.
[2] JIANG Da?wei, TUNG A, CHEN Gang. Map?join?reduce: towards scalable and efficient data analysis on large clusters [J]. IEEE Transactions on Knowledge and Data Engineering, 2010, 23(9): 1299?1311.
[3] LIN Yu?ting, AGRAWAL D, CHEN Chun, et a1. Llama: leveraging columnar storage for scalable join [C]// Proceedings of 2011 ACM SIGMOD International Conference on Management of Data. New York: ACM, 2011: 861?972.
[4] 趙保學,李戰懷,陳群,等.基于共享的MapReduce多查詢優化技術[J].計算機應用研究,2013(5):74?76.
[5] 趙保學,李戰懷,陳群,等.可擴展Hadoop任務分配模塊的研究與實現[C]//第29屆中國數據庫學術會議論文集(B輯)(NDBC2012).合肥:知識與數據工程實驗室,2012:83?85.
[6] 林大云.基于Hadoop的微博信息挖掘[J].計算機光盤軟件與應用,2012(1):7?8.
[7] DEAN J, GHEMAWAT S. MapReduce: simplified data processing on large cluster [J]. Communications of the ACM, 2008, 51(1): 107?113.
[8] 王珊,王會舉,覃雄派,等.架構大數據:挑戰、現狀與展望[J].計算機學報,2011,34(10):1741?1751.
5 實驗部分
5.1 實驗環境
本實驗運行在由五臺服務器搭建的集群上,機器均為同等配置。服務器硬件環境是:1.6 GHz的八核CPU,16 GB內存,150 GB硬盤。軟件環境:ubuntu10.10,JDK1.6,Hadoop?1.1.2,以及Hadoop?1.1.2?NDFSI。實驗數據是TPC?H的基準數據集,選用nation, region,customer,orders四張表作為輸入數據,數據規模為397.6M。
5.2 實驗內容以及結果分析
兩個系統分別執行了四張表的鏈接操作,鏈接的具體過程如圖5所示。
鏈接算法是經典的Reduce Join算法,兩個系統分別執行了五遍該算法,將各個階段的統計信息整合求取平均值,并將其填入表1中。由于Hadoop?1.1.2?NIHDFS系統的最終結果是一系列NIHDFS索引信息,需要將其恢復為記錄元組,該階段消耗的時間是t4,產生的中間結果是size4,原系統在該階段不需要做任何操作。
Hadoop?1.1.2?NIHDFS系統產生的中間結果size1,size2,size3遠比原系統要小,鏈接操作所消耗的時間也比原系統要小很多,空間和時間消耗對比如圖6和圖7所示。與原系統相比,新系統空間性能提升35.5%,時間性能提升12.9%,時間和空間整體對比如圖8所示。本實驗表明,“替換?查詢”的多表鏈接處理方法能夠有效地減少中間結果的數據規模和增長速度,提升系統效率。
6 結 語
本文通過對記錄元組建立NIHDFS索引,用索引替代冗長的記錄元組信息參與鏈接,并對索引結構進行優化,從而大大減少中間結果的數據規模。
運用緩沖池,二次排序以及并行執行的方法對查詢過程進行優化,該方法適用于鏈接表屬性較多,記錄元組長度較長的場合。
參考文獻
[1] AFRATI F, ULLMAN J. Optimizing joins in a map?reduce environment [C]//Proceedings of 2010 EDBT. New York: ACM, 2010: 99?110.
[2] JIANG Da?wei, TUNG A, CHEN Gang. Map?join?reduce: towards scalable and efficient data analysis on large clusters [J]. IEEE Transactions on Knowledge and Data Engineering, 2010, 23(9): 1299?1311.
[3] LIN Yu?ting, AGRAWAL D, CHEN Chun, et a1. Llama: leveraging columnar storage for scalable join [C]// Proceedings of 2011 ACM SIGMOD International Conference on Management of Data. New York: ACM, 2011: 861?972.
[4] 趙保學,李戰懷,陳群,等.基于共享的MapReduce多查詢優化技術[J].計算機應用研究,2013(5):74?76.
[5] 趙保學,李戰懷,陳群,等.可擴展Hadoop任務分配模塊的研究與實現[C]//第29屆中國數據庫學術會議論文集(B輯)(NDBC2012).合肥:知識與數據工程實驗室,2012:83?85.
[6] 林大云.基于Hadoop的微博信息挖掘[J].計算機光盤軟件與應用,2012(1):7?8.
[7] DEAN J, GHEMAWAT S. MapReduce: simplified data processing on large cluster [J]. Communications of the ACM, 2008, 51(1): 107?113.
[8] 王珊,王會舉,覃雄派,等.架構大數據:挑戰、現狀與展望[J].計算機學報,2011,34(10):1741?1751.