[摘 要] 時下,大數(shù)據(jù)分析已成熱門行業(yè),隨著物聯(lián)網(wǎng)等新興技術的崛起,海量的數(shù)據(jù)正等待著系統(tǒng)進行消化,如何從海量數(shù)據(jù)中挖掘出有價值的信息是眾多企業(yè)所面臨的新的難題和挑戰(zhàn)。在此應用背景下,Apache開源組織推出了Hadoop框架,用以實現(xiàn)海量數(shù)據(jù)的分布式存儲和高效率可擴充的分布式計算。
[關鍵詞] 大數(shù)據(jù)Hadoop;分布式存儲;分布式計算;MapReduce HDFS
[中圖分類號] G258.6 [文獻標識碼] A
1 Hadoop的產(chǎn)生
大數(shù)據(jù)的概念最早是由麥肯錫這家公司提出的,他們指出:“數(shù)據(jù),已經(jīng)滲透到當今每一個行業(yè)和業(yè)務職能領域,成為重要的生產(chǎn)因素,人們對于海量數(shù)據(jù)的挖掘和運用,預示著新一波生產(chǎn)率增長和消費者盈余浪潮的到來。”
進入2012年,大數(shù)據(jù)的概念開始被越來越多的提及,甚至已經(jīng)上了紐約時報和華爾街日報的專欄封面。隨著系統(tǒng)信息化的建設,海量數(shù)據(jù)正呈現(xiàn)井噴式增長,如何消化和分析這些數(shù)據(jù)從中提煉出有價值的信息是企業(yè)所要面臨的新的難題和挑戰(zhàn)。
首先面臨的挑戰(zhàn)來自于系統(tǒng)硬件,雖然硬盤的存儲容量不斷增加,但是磁盤的尋址效率卻沒有隨之提高,因此當數(shù)據(jù)處于一個較大規(guī)模的時候,數(shù)據(jù)的定位和讀取會變得非常緩慢。這是硬件本身的局限性,很難從軟件方面得到突破點,然而同樣從硬件的角度考慮,雖然磁盤的尋址效率發(fā)展緩慢,但是傳輸效率卻相對迅速一些,因此基于大數(shù)據(jù)的處理,我們可以嘗試使用另外一種訪問方式——基于流式讀取[1]。
兩種方式有什么區(qū)別,或許有些人還不太清楚,基于磁盤尋址最典型的應用便是關系數(shù)據(jù)庫,首先定位到數(shù)據(jù)的存儲地址,然后從這個地址開始做局部的數(shù)據(jù)處理;而基于流式訪問首先獲取數(shù)據(jù)的輸入流,通過該流來讀取所有數(shù)據(jù),做全局的數(shù)據(jù)分析,沒有數(shù)據(jù)尋址的過程。
基于流的訪問方式雖然可以不用考慮磁盤的尋址時間,但是缺陷也十分的明顯,比如會消耗很多的計算機資源(cpu,內(nèi)存等),針對這些缺陷,我們能想到的最直接的處理方式便是將數(shù)據(jù)源進行切分,分散到多臺機器上進行并行的讀取,這樣不但加快了數(shù)據(jù)的讀取效率,也緩解了單臺機器性能的不足。但是,經(jīng)過這種方式處理之后,又會帶來新的難題和挑戰(zhàn):
首先,數(shù)據(jù)分散部署之后,有可能會帶來數(shù)據(jù)遺失的風險。
其次,對數(shù)據(jù)片段的解析需要有一個聚合的操作,來匯總最后的結(jié)果。
如何解決這些難題,正是hadoop框架的功能職責。
Hadoop框架提供了一種簡單的編程模型,用來對大數(shù)據(jù)集進行分布式處理;它的處理能力是可擴充的,由一臺機器擴充到成千上萬臺的集群,集群中的每臺機器都會參與存儲和計算[2]。從功能角度來看hadoop主要具備兩方面的特性,存儲和計算。存儲邏輯用到的是HDFS子框架,計算邏輯用到的是MapReduce子框架,每個子框架分別解決了上述難點。
2 HDFS子框架
2.1 體系結(jié)構
由圖片可以看到HDFS主要包含這樣幾個功能組件
Namenode,存儲文檔的元數(shù)據(jù)信息,還有整個文件系統(tǒng)的目錄結(jié)構。
DataNode,存儲文檔塊信息,并且文檔塊之間是有冗余備份的。
這里面提到了文檔塊的概念,同本地文件系統(tǒng)一樣,HDFS也是按塊存儲的,只不過塊的大小設置的相對大一些,默認為64M。如果一個文件不足64M,那么它只存儲在一個塊中,而且并不會占用64M的磁盤空間,這一點需要注意,HDFS不適用于小文件存儲的原因并不是因為小文件消耗磁盤空間,而是因為小文件占用了太多的塊信息,每個文檔塊的元數(shù)據(jù)是會存儲在namenode的內(nèi)存里的,因此當文檔塊較多的時候會十分消耗namenode的內(nèi)存。
從功能結(jié)構來看,namenode提供了數(shù)據(jù)定位的功能,datanode提供數(shù)據(jù)傳輸,也就是客戶端在訪問文件系統(tǒng)的時候是直接從datanode里面讀取數(shù)據(jù)的,而不是namenode。
2.2 IO操作
2.2.1 hdfs讀取文件流程
首先,連接到分布式文件系統(tǒng),從namenode里獲取要訪問的文件由哪些塊組成,每一個塊的存儲地址是多少。
然后,定位到指定的datanode去讀取文件。
注意:每個塊的存儲地址是在hadoop啟動之后才加載到namenode的內(nèi)存里的,而不是持久化存儲到namenode本地。namenode和datanode具備心跳通信的功能,它會定時從datanode那里收到一些反饋,包括block的存儲地址信息等等。
2.2.2 hdfs寫文件流程
首先,同樣是連接到分布式文件系統(tǒng),向namenode發(fā)送創(chuàng)建文件的命令。
namenode保存文檔的元數(shù)據(jù)信息之后會調(diào)度具體的datanode來執(zhí)行數(shù)據(jù)流的寫入操作,寫入成功后,需要執(zhí)行冗余備份,將Block復制多份,每一分存儲到不同的機器節(jié)點中,防止單點故障的出現(xiàn)。
使用HDFS來存儲數(shù)據(jù),每個block至少要備份一份,默認是3份,如果沒有指定備份,或者備份的過程中出現(xiàn)了異常,則文件的寫入操作不會成功。
2.3 hdfs不適用的場景
2.3.1 低延遲的數(shù)據(jù)訪問
HDFS主要針對大文件來設計的,多用于線下的數(shù)據(jù)分析,對于線上應用并且及時性要求較高的系統(tǒng),可嘗試使用Hbase。
23.2 大量小文件
消耗namenode內(nèi)存,可以使用SequenceFile或MapFile來作為小文件的容器
2.3.3 多線程寫入,隨機寫入
HDFS系統(tǒng)中,每個文件只能并發(fā)開啟一個Writer,并且寫入操作只能在文件的末尾進行。
3 MapReduce子框架
MapReduce的大體流程是這樣的,如圖所示:
由圖片可以看到mapreduce執(zhí)行下來主要包含這樣幾個步驟:
(1)首先對輸入數(shù)據(jù)源進行切片。
(2)master調(diào)度worker執(zhí)行map任務。
(3)worker讀取輸入源片段。
(4)worker執(zhí)行map任務,將任務輸出保存在本地。
(5)master調(diào)度worker執(zhí)行reduce任務,reduce worker讀取map任務的輸出文件。
(6)執(zhí)行reduce任務,將任務輸出保存到HDFS。
若對流程細節(jié)進行深究,可以得到這樣一張流程圖:
角色描述:
JobClient,執(zhí)行任務的客戶端。
JobTracker,任務調(diào)度器。
TaskTracker,任務跟蹤器。
Task,具體的任務(Map OR Reduce)。
從生命周期的角度來看,mapreduce流程大概經(jīng)歷這樣幾個階段:初始化、分配、執(zhí)行、反饋、成功與失敗的后續(xù)處理。
每個階段所做的事情大致如下
3.1 任務初始化
3.1.1 JobClient對數(shù)據(jù)源進行切片
切片信息由InputSplit對象封裝,接口定義如下:
public interface InputSplit extends Writable {
long getLength() throws IOException; String[] getLocations() throws IOException;
}
可以看到split并不包含具體的數(shù)據(jù)信息,而只是包含數(shù)據(jù)的引用,map任務會根據(jù)引用地址去加載數(shù)據(jù)。
InputSplit是由InputFormat來負責創(chuàng)建。
public interface InputFormat
InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
RecordReader
JobClient通過getSplits方法來計算切片信息,切片默認大小和HDFS的塊大小相同(64M),這樣有利于map任務的本地化執(zhí)行,無需通過網(wǎng)絡傳遞數(shù)據(jù)
切片成功后,JobClient會將切片信息傳送至JobTracker
3.1.2 通過jobTracker生成jobId。
JobTracker.getNewJobId()
3.1.3 檢查輸出目錄和輸入數(shù)據(jù)源是否存在。
輸出目錄已存在,系統(tǒng)拋出異常。
輸入源目錄不存在,系統(tǒng)拋出異常。
3.1.4 拷貝任務資源到jobTracker機器上(封裝任務的jar包、集群配置文件、輸入源切片信息)。
3.2 任務分配
JobTracker遍歷每一個InputSplit,根據(jù)其記錄的引用地址選擇距離最近的TaskTracker去執(zhí)行,理想情況下切片信息就在TaskTracker的本地,這樣節(jié)省了網(wǎng)絡數(shù)據(jù)傳輸?shù)臅r間。
JobTracker和TaskTracker之間是有心跳通信的邏輯的,通過彼此間不停的通信,JobTracker可以判斷出哪些TaskTracker正在執(zhí)行任務,哪些TaskTracker處于空閑狀態(tài),以此來合理分配任務。
3.3 任務執(zhí)行
TaskTracker接到任務后開始執(zhí)行如下操作:
3.3.1 將任務jar包從HDFS拷貝到本地并進行解壓
3.3.2 創(chuàng)建一個新的JVM來執(zhí)行具體的任務,這樣做的好處是即使所執(zhí)行的任務出現(xiàn)了異常,也不會影響TaskTracker的運行使用。
如果所執(zhí)行的任務是map任務,則處理流程大致如下:
首先加載InputSplit記錄的數(shù)據(jù)源切片,通過InputFormat的getRecordReader()方法,獲取到Reader后,執(zhí)行如下操作:
Kkey=reader.createKey();
V value=reader.createValue();
while(reader.next(key,value)){//遍歷split中的每一條記錄,執(zhí)行map功能函數(shù)。
mapper.map(key,value,output,reporter);
}
3.4 執(zhí)行反饋
mapreduce的執(zhí)行是一個漫長的過程,執(zhí)行期間會將任務的進度反饋給用戶。
任務結(jié)束后,控制臺會打印Counter信息,方便用戶以全局的視角來審查任務。
若執(zhí)行成功:
清理MapReduce本地存儲(mapred.local.dir屬性指定的目錄)。
清理map任務的輸出文件。
若執(zhí)行失敗:
(1)如果task出現(xiàn)問題(map或者reduce)
錯誤可能原因:用戶代碼出現(xiàn)異常;任務超過mapred.task.timeout指定的時間依然沒有返回
錯誤處理:
首先將錯誤信息寫入日志
然后jobtracker會調(diào)度其他tasktracker來重新執(zhí)行次任務,如果失敗次數(shù)超過4次(通過mapred.map.max.attempts和mapred.reduce.max.attempts屬性來設置,默認為4),則job以失敗告終。
如果系統(tǒng)不想以這種方式結(jié)束退出,而是想通過Task成功數(shù)的百分比來決定job是否通過,則可以指定如下兩個屬性:
mapred.max.map.failures.percentmap任務最大失敗率
mapred.max.reduce.failures.percent reduce任務最大失敗率
如果失敗比率超過指定的值,則job以失敗告終。
(2)如果是tasktracker出現(xiàn)問題
判斷問題的依據(jù):和jobtracker不再心跳通信jobtracker將該tasktracker從資源池中移除,以后不在調(diào)度它。
(3)jobtracker出現(xiàn)問題
jobtracker作為系統(tǒng)的單點如果出現(xiàn)問題也是最為嚴重的問題,系統(tǒng)將處于癱瘓。
參考文獻:
[1]TomWhite.Hadoop權威指南(第二版)[M].著清華大學出版社2011,7.
[2]chuckLam.Hadoop實戰(zhàn)[M].人民郵電出版社,2011,10.
作者簡介:單士華(1971.4-),女,本科,講師,研究方向:計算機應用。
曹社香(1971.2-),女,本科,講師,研究方向:計算機應用。