田燕軍 王莉
(山西經濟管理干部學院 山西省太原市 030024)
MapReduce 是一種大規模數據處理的編程模型,用于大規模數據集的并行運算[1]。遵循著“分而治之,迭代匯總”的思想,即將海量的數據拆分到分布式文件系統中的不同機器上,讓各個機器直接訪問和處理拆分后的數據并獲得局部結果,再匯總、合并這些局部結果形成全局結果。MapReduce 并行程序模型簡單,易于編寫,寫一個分布式程序,只要簡單的實現一些接口,就可以完成,同寫一個簡單的串行程序一模一樣的,這個程序之后可以分布到大量的廉價的pc 機器上運行。MapReduce 也有不錯的容錯能力,因MapReduce 要運行在廉價的PC 機器上[2],這些機器隨時可能出故障,在出了故障時,MapReduce 可自動把計算任務轉移到另一個節點上運行,不需要人工參與。MapReduce 也有良好的擴展性,當程序運行起來硬件資源得不到滿足時,可以通過增加機器來添加硬件資源。
HBase[3]中表的數據非常龐大,最高可達PB 級,如用傳統串行程序處理HBase 表中數據,只能利用單機資源,會極大的降低處理HBase 表數據的效率; 但用MapReduce 分布式思想程序來處理HBase 表中的海量數據時,可利用集群資源,快速批量完成數據的處理,在實際工作中,對HBase 的操作也大多是與MapReduce 共同進行。
使用三臺PC 機搭建分布式計算平臺,3 臺PC 機分別都安裝CentOS6.8 64 位操作系統,并按下邊2.1 進行大數據集群環境準備,再按2.2 進行大數據平臺軟件安裝。
修改/etc/sysconfig/network 文件,分別設置三臺PC 機的主機名為:node01、node02、node03;
修改/etc/hosts 文件,分別綁定三臺PC 機的主機名與IP 地址;使用setenforce 0 臨時關閉selinux, 修改/etc/selinux/config 文件永久關閉selinux;使用service iptables stop 臨時關閉防火墻,使用chkconfig iptables off永久關閉防火墻;使用ssh-keygen -t rsa, sshcopy-id node01 對三臺PC 機互相做免密登錄;三臺PC 機分別使用yum源安裝ntp(網絡時間服務),并進行配置,使三臺PC機時間同步。
三臺PC 機上分別新建/export/servers 目錄用于存放安裝程序,新建/export/softwares 目錄用于存放軟件安裝包;接下來三臺PC 機分別安裝jdk(版本為jdk-8u144-linux-x64.tar.gz),jdk 是 Java語言的軟件開發工具包,包含了JAVA 的運行環境、JAVA 工具和JAVA 基礎的類庫;安裝mysql(版本為mysql-connector-java-5.1.38.jar),mysql 是基于SQL 查詢的開源跨平臺數據庫管理系統;安裝zookeeper(版本為zookeeper-3.4.5.tar.gz),zookeeper 是一個分布式協調服務的開源框架,主要用來解決分布式集群中應用系統的一致性問題,例如怎樣避免同時操作同一數據造成臟讀的問題;安裝hadoop(版本為hadoop-2.6.0.tar.gz),hadoop 是Apache 軟件基金會旗下的一個開源分布式計算平臺,以HDFS 和MapReduce 為核心用戶提供了系統底層細節透明的分布式基礎架構;安裝HBase(版本為HBase-1.2.0.tar.gz),HBase 是bigtable 的開源java 版本,是建立在hdfs 之上,提供高可靠性、高性能、列存儲、可伸縮、實時讀寫nosql 的數據庫系統;安裝hive(版本為hive-1.1.0.tar.gz), hive是基于Hadoop 的一個數據倉庫工具,可以將結構化的數據文件映射為一張數據庫表,并提供類SQL 查詢功能[4]。
HBase 中的表數據最終都存儲在HDFS 上面,故HBase 天生就支持MapReduce 的操作,提供了TableInputFormat、TableOutput Format、TableMapper 和TableReducer 類來支持使用MapReduce 框架處理HBase 上的數據,并提供了TableMapReduceUtil 類來初始化一個HBase-MapReduce 任務。
TableInputFormat 繼承自TableInputFormatBase 類,負責將HBase 數據按Region 進行切片。
TableOutputFormat 類繼承自FileOutputFormat 類,負責將MapReduce 任務輸出的數據寫入HBase 表中。
TableMapper 是一個抽象類,繼承自Mapper 類,負責將輸入的Key 轉為RowKey 的字節碼數據,輸入的Value 轉為Result 類型,表示一行數據。開發者需要重載TableMapper 類的map 方法來實現自己的Map 任務。
TableReducer 同樣是一個抽象類,繼承自Reducer 類,與普通的Reducer 類沒有區別,開發者需要重載TableReducer 類的reduce方法來實現自己的Map 任務。
TableMapReduceUtil 是一個輔助類,用來簡化一個HBase-MapReduce 作業的配置過程。該類提供了initTableMapperJob()方法初始化map 任務、提供了initTableReducerJob()方法初始化reduce任務。
建立數據來源表person 表,包含一個列族f1,向表中添加數據,在列族中放入name 列,將"姓名測試數據"放入該列中,在列族中放入age 列,將"年齡測試數據"放入該列中,如此插入多行,行鍵為不同的數據即可; 建立輸出表person2,包含一個列族f1; 通過MapReduce 操作Hbase 的person 表,提取出f1 列族下name 列數據,并寫入person2 表的'f1:name'中。
步驟如下:
(1)自定義Mapper 繼承TableMapper; 該自定義Mapper 會遍歷person 表中每行數據的單元格,如是name 列,則提取出來放入到put 中。
(2)自定義Reducer 繼承TableReducer; 該自定義Reducer 不進行任何處理,只是把map 輸出的put,再直接輸出。
(3)組裝八大步,讀取Hbase 表中數據和初始化map 用TableMapReduceUtil.initTableMapperJob(),向HBase 表中寫入數據和初始化reduce 用TableMapReduceUtil.initTableReducerJob()
算法1:


hdfs 建立數據源person.txt 文件,向該文件中添加 數據,包含rowkey、name、age 三列; 建立輸出表person3,包含一個列族f1;通過MapReduce 讀取該hdfs 文件,寫入到hbase 的person3 表中。
步驟如下:
(1)自定義Mapper 繼承Mapper; 該自定義Mapper 會讀取person.txt 文件中數據,放到put 中。
(2)自定義Reducer 繼承TableReducer; 該自定義Reducer 不進行任何處理,只是把map 輸出的put,再直接輸出。
(3)組裝八大步, 通過TableMapReduceUtil.initTableReducer Job()向HBase 表中寫入數據和初始化reduce。
HBase 豐富的存儲能力與超強的并發訪問能力,使得HBase 應用非常廣泛。目前已經在金融、交通、醫療、車聯網、IoT 等眾多領域有了最佳實踐,涉及到訂單/賬單存儲、用戶畫像、時空/時序數據、對象存儲、Cube 分析等各個使用場景。對HBase 表中數據的處理,也用到方方面面,通過MapReduce 操作HBase,可以大大提高處理HBase 表中數據的速度,同時,因為HBase 天生就支持MapReduce 的操作,故會簡化MapReduce 程序的編寫。