王 勇,張 躍
(北京工業(yè)大學信息學部,北京 100124)
大數據時代帶給社會的不僅僅是數據變大、資源增多,更有思維模式改變以及隨之而來的數據處理技術不斷創(chuàng)新、數據利用能力飛速發(fā)展。健康大數據技術應用和發(fā)展已作為國家重大戰(zhàn)略付諸實施[1]。相關研究有:Abderrazak 等[2]將hadoop 框架以及開源相關組件應用于倉儲問題,提高醫(yī)療數據的倉儲性能,對健康數據平臺建設具有一定的借鑒意義,缺點是不太符合健康監(jiān)測數據特點;文獻[3]利用HBase 和Phoenix 構建高性能的健康監(jiān)測大數據平臺,并對平臺讀寫性能進行優(yōu)化,然而其未對數據采集傳輸和發(fā)布共享進行研究;文獻[4-5]分別研究了適合健康監(jiān)測大數據的接入協議和發(fā)布協議,為健康監(jiān)測數據采集和發(fā)布共享提供了思路,但仍需要在具體實施中進一步驗證。
大規(guī)模健康監(jiān)測數據的采集存儲和共享利用仍然存在很多問題,本文詳細研究了Kafka[6]、HBase[7]等大數據相關技術,實現一款面向用戶健康服務的、可擴展的健康監(jiān)測大數據處理平臺,有效解決健康監(jiān)測數據生態(tài)系統(tǒng)中大規(guī)模數據的采集傳輸、存儲以及發(fā)布共享問題,填補了研究空白。通過研究HBase 組織與存儲模式,設計出適合存儲健康監(jiān)測大數據的HBase 存儲模型。對Kafka 分布式消息中間件的發(fā)布訂閱模式進行研究,實現健康監(jiān)測大數據的采集傳輸與發(fā)布共享架構。引入Kafka 作為架構中樞,不僅能屏蔽數據源的異構型,保證各個服務模塊之間高內聚、低耦合,還能使數據通道變得簡單,減輕下游數據庫系統(tǒng)的壓力,提高系統(tǒng)擴展性。
健康監(jiān)測大數據理想化狀態(tài)是:由健康監(jiān)測設備產生的數據,通過數據采集接口傳入數據中心進行集中存儲,利用健康監(jiān)測大數據平臺提供的數據發(fā)布接口獲取平臺數據,實現數據共享,經過分析與處理后的數據也可通過數據發(fā)布接口發(fā)送給用戶。
如圖1 所示,健康監(jiān)測大數據平臺系統(tǒng)架構包括應用平臺和支撐平臺,應用平臺主要實現數據應用,如數據分析和挖掘,數據發(fā)布是將最終的分析結果以及相關數據共享給用戶;在支撐平臺中,有分布式數據采集傳輸模塊與存儲模塊,分布式數據采集傳輸模塊對不同來源的健康監(jiān)測數據進行采集和傳輸,存儲模塊主要實現數據持久化,負責將數據高效存儲在大數據集群上,為數據應用提供支持。

Fig.1 Health monitoring big data platform architecture model圖1 健康監(jiān)測大數據平臺架構模型
Apache Kafka 是Hadoop 生態(tài)系統(tǒng)中的一個工具,用于處理事務日志和其它實時數據。Kafka 是一個流媒體平臺,能夠以發(fā)布/訂閱的形式傳遞流數據[8]。在發(fā)布—訂閱消息系統(tǒng)中,消息的生產者稱為發(fā)布者,消費者稱為訂閱者,消息被持久化到一個topic 中,消費者可以訂閱一個或多個topic 并消費該topic 中所有的數據,其體系結構如圖2 所示。
健康監(jiān)測大數據平臺需要實時將采集到的健康監(jiān)測數據信息存入數據中心進行持久化存儲,當信息采集平臺將這些變化的數據信息寫入或更新到數據庫時,數據庫產生很大的壓力,對數據采集系統(tǒng)性能提出了很高要求。利用kafka 分布式、高吞吐、基于發(fā)布/訂閱的特性,可在廉價的PC Server 上搭建大規(guī)模的消息系統(tǒng)[9]。
Kafka Connect 是一種用于在Kafka 和其它系統(tǒng)之間可擴展、可靠的流式傳輸數據工具,使用它能快速將大量數據集移入和移出Kafka 連接器。Kafka Connect 可獲取整個數據庫,或從所有應用程序服務器收集指標數據到Kafka 主題,使數據用于流處理。導出作業(yè)可將數據從Kafka topic 傳輸到二次存儲或查詢系統(tǒng),或傳遞到批處理系統(tǒng)進行離線分析。Kafka 數據采集與傳輸模型如圖3 所示。

Fig.2 Kafka architecture圖2 Kafka 體系結構

Fig.3 Kafka data acquisition and transmission model圖3 Kafka 數據采集與傳輸模型
系統(tǒng)使用Hadoop 體系中的HBase 組件對數據進行持久化存儲。HBase 是一個使用key/value 鍵值對的基于列存儲的數據庫,支持海量數據的高效存儲,存儲的數據具有稀疏性[10]。
HBase 表的索引稱為RowKey 行關鍵字,RowKey 必須具備唯一性,一般為標志性信息和時間戳組合。Rowkey 長度不宜過長,還應盡量保證散列[11]。本文將健康檔案編號或身份證號加入rowkey,健康檔案編號或身份證號具有一定的隨機性,能夠保證rowkey 設計均勻分布在各個Region中。與此同時還要考慮集群查詢性能,查詢都是基于某個用戶的時間序列,本文設計rowkey 的id+時間戳timestamp作為rowkey,用戶的信息就會連續(xù)存儲在一起,查詢效率自然提高。
Hbase 的列族也是越少越好,因為Hbase 的列族在內存結構中是一個cf 對應一個store 區(qū)域,數據量大的storefile 自然會多,在查詢多列族數據時需要跨文件訪問數據內容,合并任務自然增多,會降低性能。
基于以上原則,根據中華人民共和國衛(wèi)生部批準的《城鄉(xiāng)居民健康檔案基本數據集》[12]建立Hbase 健康監(jiān)測數據模型,如表1 所示。
健康監(jiān)測數據包括用戶的基礎數據、生理數據、運動數據、睡眠數據、環(huán)境數據等[13]。HBase 存儲模型將這些數據分成基礎數據(baseInfo)和健康數據(healthData)兩個列族進行存儲。基礎數據包括身份證號、姓名、性別、年齡、出生日期,健康數據包括身高、體重、體溫、血糖、血氧、血壓、心率、計步、睡眠質量等數據。

Table 1 HBase storage model of health monitoring data表1 健康監(jiān)測數據HBase 存儲模型
為有效實現健康監(jiān)測數據利用與共享,健康監(jiān)測大數據平臺可以提供兩種數據發(fā)布與共享服務:①健康監(jiān)測數據查詢服務;②健康狀態(tài)監(jiān)測服務[5]。
1.3.1 健康監(jiān)測數據查詢服務
健康監(jiān)測大數據平臺提供健康監(jiān)測數據查詢服務,其它基于本平臺的應用通過客戶端主動向健康監(jiān)測大數據平臺服務器發(fā)送查詢請求消息。健康監(jiān)測大數據平臺使用Kafka 作為健康監(jiān)測數據采集與發(fā)布的媒介,實現健康監(jiān)測數據查詢接口,其交互模型如圖4 所示。

Fig.4 Interaction model of health monitoring data query service圖4 健康監(jiān)測數據查詢服務交互模型
在Kafka 中,創(chuàng)建專門用于發(fā)送和接收查詢消息的主題Topic1,第三方數據應用平臺通過<table,query-filter,topic>組成的元組向Topic1 發(fā)送查詢消息,其中table 為想要查詢的HBase 表,query-filter 為查詢過濾器,topic 為查詢結果返回的目標主題。當與Topic1 相對應的消息到達時,查詢處理器處理這些消息,然后到指定的table 按照query-filter 過濾出想要的數據,將數據封裝成消息返回到指定的topic,第三方數據應用平臺獲取這些消息,得到想要的查詢結果。
HBase 查詢實現方式:①按指定RowKey 獲取唯一一條記錄的get 方法;②按指定條件獲取一批記錄的scan 方法。對于個人基本信息數據等全量數據表,使用get 方法,而對于基于時間序列采樣的健康監(jiān)測數據則采用scan 方法查詢較為方便。
一般基于時間序列采樣的健康監(jiān)測數據,本文的Rowkey 設計為身份證號或健康檔案編號+時間戳形式,這樣可將查詢接口中的Key 和startTime、endTime 值拼接起來形成Rowkey 的startRow 和stopRow,便于在HBase 表中查詢相應結果。而對于全量的數據信息表,如個人信息數據表,Rowkey 直接設計為身份證號,這樣查詢條件中的time可以為空,Key 可直接作為RowKey 進行查詢,查詢接口設計如表2 所示。

Table 2 Health monitoring data query interface表2 健康監(jiān)測數據查詢接口
1.3.2 健康狀態(tài)監(jiān)測服務
健康監(jiān)測大數據平臺還主動提供健康狀態(tài)監(jiān)測服務,健康監(jiān)測大數據平臺可整合平臺采集存儲的數據,將血壓、體溫、血糖等健康狀況異常情況及時發(fā)送到健康監(jiān)測類設備,以供用戶了解異常狀況,供決策時參考。使用Kafka 作為健康狀態(tài)監(jiān)測服務發(fā)布媒介,其交互模型如圖5 所示。

Fig.5 Interaction model of health monitoring service圖5 健康狀態(tài)監(jiān)測服務交互模型
當健康狀況監(jiān)測模塊發(fā)現健康狀態(tài)異常時,健康狀況監(jiān)測模塊生成一個告警命令報文,并將監(jiān)測結果封裝成告警消息發(fā)送到Kafka 對應的Topic。用戶事先訂閱該Topic,當告警消息到達時可以實時獲取該消息。
Kafka Connect 是一種傳輸數據工具,主要用于Kafka分布式消息系統(tǒng)與其它系統(tǒng)進行數據傳輸,分為Source-Connector 與SinkConnector。其中SourceConnector 用于將整個數據庫或從應用程序服務器收集的指標導入到Kafka主題,而SinkConnector 與之相反,是從Kafka 主題導出數據到其它系統(tǒng)[14]。
開發(fā)Connector 主要是實現兩個接口Connector 和Task,若是開發(fā) Source,只要實現 SourceConnector 和SourceTask 兩個接口。比如把文件的數據讀取到kafka 中,SourceTask 會讀取文件的每一行并把它們封裝為List<SourceRecord>發(fā)送出去。實現SourceConnector 開發(fā)的時序如圖6 所示。

Fig.6 Timing diagram of SourceConnector development圖6 SourceConnector 開發(fā)時序圖
Sink Connector 就是把Kafka 中的數據導入到第三方系統(tǒng)中,比如讀取到HDFS、hbase 等,本文設計并實現的SinkConnector 主要是HBase。SinkConnector 的開發(fā)與SourceConnector 類似,不同點在于SourceTask 使用poll 接口,而SinkTask 使用put 接口。SinkTask 的put()方法接收集合Collection<SinkRecord>存儲到HBase 中。
HBase 中的數據表通過劃分成一個個Region 實現數據分片,每一個Region 關聯一個RowKey 的范圍區(qū)間,數據按RowKey 的字典順序進行組織。正是基于這種設計使得HBase 能夠輕松應對這類查詢:“指定一個RowKey 范圍區(qū)間,獲取該區(qū)間的所有記錄”。如查詢健康檔案號為116755244009,日期從20171001 到20191001 的健康監(jiān)測數據表,healthData 列族中的Blood_pressure 列示例代碼如下:

HBase 非鍵列查詢效率非常低,因為在查詢操作中要掃描整個表。為提高檢索效率,引入二級索引機制[15]。實驗結果表明,經過優(yōu)化后的查詢性能能夠充分滿足數據發(fā)布服務需要。二級索引原理如圖7 所示。
圖7 中,二級索引的本質就是建立各列值與行鍵之間的映射關系[16]。要對F:C1 列建立索引時,只需建立F:C1各列值到其對應的RowKey 映射關系。查詢符合F:C1=C11,對應的F:C2 列值步驟如下:①根據C1=C11 得到索引數據查找對應的RK1;②得到RK1 后再根據RK1 在主表中查詢C2 的值。

Fig.7 Design idea of HBase secondary index圖7 HBase 二級索引設計思路
二級索引表建立和探測數據主表過程如表3 所示。

Table 3 Health monitoring data表3 健康監(jiān)測數據
從表3 數據查詢Id_number 列,構建的二級索引表如表4 所示。

Table 4 Secondary index表4 二級索引
客戶端發(fā)出請求,首先查詢二級索引表,從表4 獲取相應的Rowkey,然后根據主表中的Rowkey 查詢相應的數據記錄,詳細流程如圖8 所示。
3.1.1 硬件環(huán)境
本文利用兩臺服務器劃分為4 個虛擬機節(jié)點搭建系統(tǒng)運行環(huán)境。每個虛擬節(jié)點配置為:CPU:2.40GHz;內存:4.0G;硬盤:200GB。具體分布如表5 所示。

Fig.8 Query flow using secondary index table圖8 使用二級索引表查詢流程

Table 5 Distribution of cluster system operating environment表5 集群系統(tǒng)運行環(huán)境分布
3.1.2 軟件環(huán)境
系統(tǒng)軟件環(huán)境及版本如表6 所示。

Table 6 System software environment and version表6 系統(tǒng)軟件環(huán)境及版本
基于Kafka 和HBase 的健康監(jiān)測大數據平臺系統(tǒng)性能主要考慮健康監(jiān)測數據的采集傳輸能力和健康監(jiān)測數據的查詢能力,系統(tǒng)性能測試與優(yōu)化重點是Apache Kafka 分布式消息隊列的吞吐量與HBase 數據庫查詢效能。
3.2.1 Kafka 分布式消息隊列性能測試
將存儲在文件中的數據作為數據源,HBase 作為數據持久化存儲獲取數據。利用Kafka 提供的性能測試工具kafka-producer-perf-test.sh 和kafka-consumer-perf-test.sh腳本對Kafka 的生產者和消費者吞吐速率進行測試。為充分挖掘Kafka 系統(tǒng)性能,結合本平臺測試環(huán)境設置相關參數如表7 所示。
一般而言,增大批次有利于增加吞吐量(減少了網絡IO 次數),但過于增大批次帶來的好處無法抵消壓縮時間的增長,吞吐率就會降低。分區(qū)數決定了Kafka 的并行度,分區(qū)數一般是broker 的整數倍。

Table 7 Kafka related parameter settings表7 Kafka 相關參數設置
單線程吞吐量顯然是有限的,并沒有完全利用Kafka集群的高吞吐量,因此采用多線程進行并發(fā)讀寫對此進行優(yōu)化。對線程數與吞吐率的關系進行測試,結果如圖9 所示。

Fig.9 Relationship between thread number and throughput rate圖9 線程數與吞吐率關系
優(yōu)化以后,使用10 個線程寫,系統(tǒng)隨著線程數的增加吞吐率顯著提升到27MB/s 左右,消息數達17 萬條/s 以上,可見使用批處理或多線程對提升吞吐率效果明顯。
3.2.2 HBase 數據庫性能測試
采用HBase 統(tǒng)一的JavaAPI 接口對HBase 數據查詢性能進行測試,圖10 為采用二級索引前后的查詢響應時間對比結果。查詢條件為非RowKey,查詢數據量從2~12萬條記錄不等。實驗結果顯示,二級索引的建立能夠使非索引數據的查詢響應時間縮短近3 倍。

Fig.10 Comparison of query response time before and after optimization圖10 優(yōu)化前后查詢響應時間對比
本文基于Kafka 分布式消息系統(tǒng),結合HBase 分布式存儲數據庫,以解決健康監(jiān)測數據生態(tài)系統(tǒng)中“信息孤島”問題為出發(fā)點,通過開發(fā)Kafka Connector 初步形成一個高可靠的健康監(jiān)測大數據平臺。首先研究了Kafka 和HBase在健康監(jiān)測數據平臺建設中的應用,設計了健康監(jiān)測數據的采集傳輸、共享架構以及存儲模型。然后調整集群設置和參數配置,對查詢效率進行優(yōu)化,以達到平臺最佳性能。實驗結果表明,總的吞吐量取決于代理節(jié)點的數量、數據的主題分區(qū)數量以及生產消費消息的節(jié)點數量。通常情況下增加分區(qū)可以提高Kafka 集群的吞吐量,然而分區(qū)過多會增加無效及延遲風險,采用批處理或者多線程都有利于增加吞吐量,但是線程數一般應不大于分區(qū)數。建立二級索引是應對HBase 非RowKey 查詢的有效方式。本文針對健康監(jiān)測數據存儲特點建立二級索引,能有效提升查詢響應速度。
本文研究了大數據關鍵技術在健康監(jiān)測數據平臺中的應用。要實現生產環(huán)境大規(guī)模集群的有效配置,需要考慮核心節(jié)點數量。隨著數據量和組件數量的增加,節(jié)點之間的網絡帶寬或將成為瓶頸。由于健康監(jiān)測數據本身的復雜性以及HBase 的局限性,要提升復雜查詢效率還需進一步研究。