文/袁昌權 胡益群 許光 俞理超
當前,隨著信息技術的快速發展,各個行業的數據規模極速增加,推動社會進入了大數據時代。以Hadoop為代表的技術為大數據處理提供了很好的解決方案。
Hadoop的核心組件是分布式文件系統HDFS,為其提供底層存儲支持。HDFS通常由一個NameNode(NN)和多個DataNode(DN)、以及客戶端組成。NN負責管理DN并響應客戶端請求,DN管理本地數據。HDFS采用副本機制,使得數據存儲具備較高的可靠性。但當NN出現故障時,會導致整個Hadoop集群停止運轉,造成系統的單點故障,只有系統重啟才能繼續工作。此外,Hadoop采用MapReduce(以下簡稱“MR”)計算框架,在處理數據的過程中會將中間結果緩存到本地磁盤,使得系統需要增加很多額外的開銷,降低了數據處理的效率和實時性。
在實際應用中,數據采集的穩定性及高效率是必不可少的;數據存儲的安全性也極為重要。因此,本文旨在提出一種穩定、高效、安全的數據采集與存儲方案。
Zookeeper是一種分布式協同組件,具有容錯性和高可用性。在zk集群的基礎上,可以配置Hadoop的高可用模式。該模式具備雙NN節點,能夠實現容災的功能。另外,結合Flume、Kafka、Hbase等技術,通過Flume實現數據的收集,并將收集的數據緩存到Kafka模塊,最終將數據存儲到Hbase中。
該方案架構設計見圖1所示。
HDFS 2.0通過設計兩個NN構成主備結點的高可用性模式,使得當主NN出現狀況時,備用NN能夠接替它繼續工作。兩者之間可以通過Journalnode進程共享資源和數據同步,并通過zk集群實現狀態切換。
Zookeeper使用ZAB協議來實現管理和服務,主要包括兩個過程:領導者選舉和原子廣播。領導者選舉會通過選舉過程在集群中選擇一個結點作為領導者,當半數以上其他結點(跟隨者)與領導者完成同步時,選舉完成。當客戶端發起請求時,跟隨者會分別將請求發給領導者,領導者按FIFO原則取出進行廣播,半數以上結點通過則提交更新。
在Hadoop集群上集成zookeeper集群,需要選擇 2n+1個DN結點進行部署。將Zookeeper安裝在DN結點上,并在其conf目錄下的zoo.cfg文件中配置zk集群信息。主要包含結點信息(名稱、ip及端口號)、數據和日志的存儲位置等。在同級目錄下創建myid文件作為集群標識,并與zoo.cfg文件對應。依賴于Zookeeper的高可用性Hadoop集群需在原有基礎上,在core-site.xml中指定zk集群IP及端口,hdfs-site.xml中配置兩個NN結點的rpc、http通信地址和端口,設定集群出現故障時主備NN結點的切換及兩者之間的SSH無密切換。
ZK集群構成如圖2所示。
Flume是一種可靠性很高的分布式數據采集、傳輸工具,能將數據轉化成數據流進行控制。Flume的核心成員是由Source、Channel和Sink構成的Agent(代理),每一個代理都是獨立的進程。當Source收到數據之后,會將其緩存到Channel,再經過Sink輸出到其他存儲位置。Flume的數據來源具有多樣性,可通過配置實現定制。
系統通過一個數據源產生程序在NN結點上產生模擬日志數據。在對Flume進行配置時,需要將Source指定為模擬數據源,類型指定為實時日志收集(exec);Channel傳輸管道指定為內存傳輸方式(memeory);Sink指定為Kafka。生成配置文件后,將其上傳到Flume安裝目錄下的conf文件夾下。當啟動Flume時,需要指定該文件的所在位置。參考命令如下:

系統配置成功后,啟動Flume接收模擬源數據,并將該數據傳輸到Kafka,通過Kafka將數據消費。系統數據采集部分結構如圖3所示。

圖1:高可用數據采集與存儲方案

圖2:ZK集群的構成

圖3:Flume數據收集模塊
Kafka是一種發布-訂閱模式的消息系統,具有秒級百萬消息的吞吐量,同時支持在線實時和離線的數據處理。Kafka集群同樣依賴于zk集群,它由若干消息生產者(producer)、一系列服務器(Brokers)和若干消費者(consumer)構成。其中生產者將消息發送到服務器;消費者負責讀取消息;zk集群則協調消費者與服務器的加入與退出。
Kafka按主題對數據進行分類存儲。單個主題可以擴展成多個分區,并將分區部署在集群的各個服務器上,以此保證數據的安全。對于消費者而言,每個主題的一條數據能被多個消費者同時消費。同時,Kafka還支持副本模式,能夠設置分區的副本數。
通過將Flume收集的模擬數據作為生產者,將其送入已經創建的日志主題中,并指定主題分區數和副本數。Kafka消費者負責將數據永久化到Hbase集群中。在將kafka集成到Flume中時,需指定sink組件的接收類型為KafkaSink,設置kafka的主題、服務器地址和端口號。
數據緩存模塊結構如圖4所示。
Hbase是一種非關系型的數據庫,使用Hadoop集群的HDFS文件系統作為存儲介質。Hbase對數據的操作有一套類似于SQL的語句,是基于MR任務的封裝,其執行過程即是MapReduce任務的分發執行。Hbase以列為最小存儲單位,并由列組成行,指定行建與每一行對應。一張表中包含若干行,行鍵是表的主鍵。同時,由若干列又構成列族,每一列的值都有由系統指定的時間戳。Hbase也可依賴于zk集群,協助其完成工作。
本方案中,在zk集群基礎上搭建Hbase集群,并在Hbase上創建名字空間和表用于存儲數據。同時,需要對Hbase表的行鍵和列族進行設計,使表抽象為key-value存儲形式。在設計行鍵時,可以由kafka主題、內容、時間等信息確定一個行鍵,以保證數據不會相互覆蓋。通過調用kafka的API
Consumer.createJavaConsumerConnector.createMessageStreams
將Kafka中的模擬日志數據以數據流的形式消費掉,同時調用Hbase Client的API put函數實現對數據的寫入,永久化存儲數據到Hbase中。
本文提出了一種基于Flume、Kafka、Hbase的數據采集與存儲方案,具有較高的吞吐量、高效率的特點。同時依賴于ZK集群構建了高可用性的Hadoop集群,能夠協調各個模塊內部的工作,提高了集群的穩定性。得益于Hadoop生態的完整,該方案具備一定的可擴展性,可以根據實際應用需求對各個模塊實現功能擴展,具有一定的應用價值。

圖4:Kafka緩存模塊