武志學(xué)
1.成都五舟漢云科技有限公司,成都 611731; 2.成都信息工程大學(xué) 信息安全工程學(xué)院,成都 610225)(*通信作者電子郵箱zhixue.wu@gmail.com)
基于Spark Streaming的實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)
武志學(xué)1,2*
1.成都五舟漢云科技有限公司,成都 611731; 2.成都信息工程大學(xué) 信息安全工程學(xué)院,成都 610225)(*通信作者電子郵箱zhixue.wu@gmail.com)
能耗分項(xiàng)計(jì)量能夠準(zhǔn)確、及時(shí)、有效地發(fā)現(xiàn)能源使用問(wèn)題,形成和實(shí)現(xiàn)最有效的節(jié)能措施。能耗分項(xiàng)計(jì)量系統(tǒng)需要對(duì)各項(xiàng)能源使用量在不同粒度上進(jìn)行統(tǒng)計(jì),既有實(shí)時(shí)性的需求,又需要涉及到聚合、去重、連接等較為復(fù)雜的統(tǒng)計(jì)需求。由于數(shù)據(jù)產(chǎn)生快、實(shí)時(shí)性強(qiáng)、數(shù)據(jù)量大,所以很難統(tǒng)一采集并入庫(kù)存儲(chǔ)后再作處理,這便導(dǎo)致傳統(tǒng)的數(shù)據(jù)處理架構(gòu)不能滿足需求。為此,提出基于Spark Streaming大數(shù)據(jù)流式技術(shù)構(gòu)建一個(gè)實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng),對(duì)實(shí)時(shí)能耗分項(xiàng)計(jì)量的系統(tǒng)架構(gòu)和內(nèi)部結(jié)構(gòu)進(jìn)行了詳細(xì)介紹,并通過(guò)實(shí)驗(yàn)數(shù)據(jù)分析了系統(tǒng)的實(shí)時(shí)數(shù)據(jù)處理能力。與傳統(tǒng)架構(gòu)不同,實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)在數(shù)據(jù)流動(dòng)的過(guò)程中實(shí)時(shí)地進(jìn)行捕捉和處理,一方面把捕捉到的異常信息及時(shí)報(bào)警到前端,同時(shí)把分類分項(xiàng)統(tǒng)計(jì)處理的結(jié)果保存到數(shù)據(jù)庫(kù),以便進(jìn)行離線分析和數(shù)據(jù)挖掘,能有效地解決上述數(shù)據(jù)處理過(guò)程中遇到的問(wèn)題。
流式計(jì)算;能耗分項(xiàng)計(jì)量;Spark Streaming;Apache Kafka;大數(shù)據(jù)
伴隨著我國(guó)城市化進(jìn)程的加快,大型公共建筑節(jié)能工作勢(shì)在必行。如何達(dá)到既滿足使用及舒適度的需求,又能科學(xué)、合理地節(jié)能降耗已經(jīng)是全社會(huì)所要思考的問(wèn)題。在大力推廣節(jié)能減排的階段,要達(dá)到最快、最明顯的節(jié)能效果,不單是采用設(shè)備節(jié)能手段,更需要使用分項(xiàng)計(jì)量準(zhǔn)確、及時(shí)、有效地發(fā)現(xiàn)能源使用問(wèn)題,形成和實(shí)現(xiàn)最有效的節(jié)能措施。能耗分項(xiàng)計(jì)量是指對(duì)建筑的水、電、燃?xì)狻⒓泄帷⒓泄├涞雀鞣N能耗進(jìn)行監(jiān)測(cè),從而得出建筑物的總能耗量和不同能源種類、不同功能系統(tǒng)的能耗量[1]。要實(shí)現(xiàn)分項(xiàng)計(jì)量,必須進(jìn)行數(shù)據(jù)采集、數(shù)據(jù)傳輸、數(shù)據(jù)存儲(chǔ)和數(shù)據(jù)分析等。所以,能耗分項(xiàng)計(jì)量是一個(gè)典型的流式大數(shù)據(jù)系統(tǒng),具有數(shù)據(jù)量大、數(shù)據(jù)產(chǎn)生速度快、數(shù)據(jù)結(jié)構(gòu)復(fù)雜等特點(diǎn)。
一般情況下,能耗分項(xiàng)計(jì)量包括空調(diào)系統(tǒng)、電梯系統(tǒng)、給排水系統(tǒng)、通風(fēng)系統(tǒng)、照明系統(tǒng)及辦公設(shè)備系統(tǒng)等。對(duì)于用能密度高、單體設(shè)備耗能大的集中空調(diào)系統(tǒng),應(yīng)進(jìn)行更細(xì)致的計(jì)量,包括:冷凍主機(jī)用電量、冷凍水泵用電量、冷卻水泵用電量、冷卻塔風(fēng)機(jī)用電量、空調(diào)箱和新風(fēng)機(jī)用電量等。所以進(jìn)行能耗分項(xiàng)計(jì)量時(shí),需要對(duì)各項(xiàng)能源使用量在不同粒度上對(duì)不同數(shù)據(jù)進(jìn)行統(tǒng)計(jì),既有實(shí)時(shí)性的需求,又需要涉及到聚合、去

圖1 基于傳統(tǒng)數(shù)據(jù)處理模式的分項(xiàng)計(jì)量系統(tǒng)
重、連接等較為復(fù)雜的統(tǒng)計(jì)需求。由于數(shù)據(jù)產(chǎn)生快、實(shí)時(shí)性強(qiáng)、數(shù)據(jù)量大,如果采取傳統(tǒng)的數(shù)據(jù)處理架構(gòu),首先對(duì)采集到的數(shù)據(jù)入庫(kù)存儲(chǔ),然后再作處理,很難滿足分項(xiàng)計(jì)量的需求。特別是為了找到能耗使用規(guī)律提出有效節(jié)能措施,不但需要部署大量能耗采集儀表,還需要進(jìn)行更為復(fù)雜的數(shù)據(jù)處理,從而引起在單位時(shí)間內(nèi)要處理的實(shí)時(shí)數(shù)據(jù)量和計(jì)算工作量同時(shí)大幅上升,這便導(dǎo)致傳統(tǒng)的數(shù)據(jù)處理架構(gòu)不能滿足需要。為了解決這個(gè)問(wèn)題,本文通過(guò)使用Apache Kafka和Spark Streaming模塊構(gòu)建了一個(gè)實(shí)時(shí)流式數(shù)據(jù)處理系統(tǒng)來(lái)進(jìn)行能耗分項(xiàng)計(jì)量。與傳統(tǒng)架構(gòu)不同,實(shí)時(shí)流式數(shù)據(jù)處理系統(tǒng)在數(shù)據(jù)流動(dòng)的過(guò)程中實(shí)時(shí)地進(jìn)行捕捉和處理,并根據(jù)業(yè)務(wù)需求對(duì)數(shù)據(jù)進(jìn)行計(jì)算分析,一方面把捕捉到的異常信息及時(shí)報(bào)警到前端,同時(shí)把分類分項(xiàng)統(tǒng)計(jì)處理的結(jié)果保存到數(shù)據(jù)庫(kù),以便進(jìn)行離線分析和數(shù)據(jù)挖掘。本文將詳細(xì)描述實(shí)時(shí)能耗分項(xiàng)計(jì)量的系統(tǒng)架構(gòu)和內(nèi)部結(jié)構(gòu),并對(duì)架構(gòu)中所使用的大數(shù)據(jù)技術(shù)和系統(tǒng)進(jìn)行介紹和分析,最后,通過(guò)實(shí)際測(cè)試結(jié)果對(duì)實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)的實(shí)時(shí)數(shù)據(jù)處理能力進(jìn)行驗(yàn)證和分析。
清華大學(xué)節(jié)能研究中心研制開發(fā)了能耗分項(xiàng)計(jì)量實(shí)時(shí)分析系統(tǒng)EM-II[2],包括數(shù)據(jù)采集子系統(tǒng)、數(shù)據(jù)處理子系統(tǒng)、數(shù)據(jù)分析展示子系統(tǒng)三大核心部分,另外還有信息維護(hù)、數(shù)據(jù)上報(bào)、系統(tǒng)監(jiān)測(cè)等幾個(gè)子系統(tǒng)。數(shù)據(jù)采集子系統(tǒng)利用安裝在現(xiàn)場(chǎng)的具有數(shù)字通信接口的電計(jì)量表和超聲波冷熱量表采集數(shù)據(jù),并由數(shù)據(jù)采集器匯總接收通過(guò)網(wǎng)關(guān)由路由器連接到互聯(lián)網(wǎng),將數(shù)據(jù)遠(yuǎn)程傳輸回?cái)?shù)據(jù)中心服務(wù)器。數(shù)據(jù)處理子系統(tǒng)負(fù)責(zé)校驗(yàn)解析接收到的原始數(shù)據(jù), 并根據(jù)能耗模型拆分計(jì)算得到分類分項(xiàng)數(shù)據(jù)。數(shù)據(jù)分析展示子系統(tǒng)將經(jīng)過(guò)數(shù)據(jù)處理后的分類分項(xiàng)能耗數(shù)據(jù)進(jìn)行分析、匯總和整合,一方面通過(guò)靜態(tài)或者動(dòng)態(tài)的圖表方式將能耗數(shù)據(jù)展示出來(lái),另一方面能夠提供針對(duì)第三方的數(shù)據(jù)接入服務(wù)和數(shù)據(jù)發(fā)布服務(wù)。
Hysine與多個(gè)高等院校及科研機(jī)構(gòu)合作研制開發(fā)的EMC-2000建筑設(shè)備節(jié)能控制與管理系統(tǒng)[3],適用于新建、改建、擴(kuò)建項(xiàng)目中建筑機(jī)電設(shè)備能效跟蹤控制節(jié)能管理。整個(gè)能源管理系統(tǒng)由管理中心、主干通信網(wǎng)絡(luò)、數(shù)據(jù)采集器、智能電表等組成,同時(shí)為與上一級(jí)能耗監(jiān)測(cè)和管理系統(tǒng)連接預(yù)留系統(tǒng)接口。能源管理中心通過(guò)對(duì)現(xiàn)場(chǎng)數(shù)據(jù)采集器上傳的數(shù)據(jù)進(jìn)行存儲(chǔ)、統(tǒng)計(jì)和分析,為業(yè)主提供有效的能源使用和持續(xù)的能源節(jié)約提供實(shí)施依據(jù)。
安科瑞開發(fā)的Acrel-5000建筑能耗分析管理系統(tǒng)[4]以計(jì)算機(jī)、通信設(shè)備、測(cè)控單元為基本工具,根據(jù)現(xiàn)場(chǎng)實(shí)際情況采用現(xiàn)場(chǎng)總線、光纖環(huán)網(wǎng)或無(wú)線通信中的一種或多種結(jié)合的最優(yōu)化的組網(wǎng)方式,為大型公共建筑的實(shí)時(shí)數(shù)據(jù)采集及遠(yuǎn)程管理與控制提供了基礎(chǔ)平臺(tái),它可以和檢測(cè)設(shè)備構(gòu)成任意復(fù)雜的監(jiān)控系統(tǒng)。
這些能耗分項(xiàng)計(jì)量系統(tǒng)都是參照國(guó)家住建部《國(guó)家機(jī)關(guān)辦公建筑和大型公共建筑能耗監(jiān)測(cè)系統(tǒng)》[5]和《國(guó)家機(jī)關(guān)辦公建筑和大型公共建筑能耗監(jiān)測(cè)系統(tǒng)省、市級(jí)數(shù)據(jù)中心數(shù)據(jù)庫(kù)結(jié)構(gòu)文檔》[6],采用了傳統(tǒng)的數(shù)據(jù)處理模式,如圖1所示。當(dāng)數(shù)據(jù)采集程序接收到數(shù)據(jù)采集器發(fā)送的數(shù)據(jù)以后,首先把數(shù)據(jù)寫入計(jì)量表原始數(shù)值數(shù)據(jù)庫(kù)(D);然后再由拆分程序按照各個(gè)儀表和能耗數(shù)據(jù)各級(jí)分項(xiàng)進(jìn)行拆分和統(tǒng)計(jì),并把結(jié)果寫入分類分項(xiàng)能耗數(shù)據(jù)庫(kù)(B);最后再由分析展示程序基于建筑基本情況數(shù)據(jù)庫(kù)(A)、分類分項(xiàng)能耗數(shù)據(jù)庫(kù)(B)進(jìn)行數(shù)據(jù)分析并展示給用戶。
這種基于傳統(tǒng)數(shù)據(jù)處理模式的分項(xiàng)計(jì)量系統(tǒng)只能適用于采集點(diǎn)數(shù)量比較少、統(tǒng)計(jì)分析比較簡(jiǎn)單的環(huán)境。在采集點(diǎn)數(shù)量達(dá)到上千時(shí),隨著時(shí)間的推移,分類分項(xiàng)能耗數(shù)據(jù)庫(kù)的數(shù)據(jù)會(huì)不斷累計(jì)快速增加,從而可能導(dǎo)致拆分程序無(wú)法及時(shí)完成對(duì)數(shù)據(jù)的拆分和統(tǒng)計(jì)。
為了解決基于傳統(tǒng)數(shù)據(jù)處理模式的能耗分項(xiàng)計(jì)量系統(tǒng)存在的問(wèn)題,本文設(shè)計(jì)并實(shí)現(xiàn)了一個(gè)基于Spark Streaming和Apache Kafka等大數(shù)據(jù)技術(shù)的實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)。在本章首先對(duì)Spark Streaming和Apache Kafka大數(shù)據(jù)技術(shù)進(jìn)行簡(jiǎn)單介紹,然后描述如何使用Spark Streaming和Apache Kafka模塊構(gòu)建基于實(shí)時(shí)流式數(shù)據(jù)處理架構(gòu)的實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)。
2.1 Spark Streaming
Apache Spark是一個(gè)基于內(nèi)存的、可以支持各種大數(shù)據(jù)應(yīng)用場(chǎng)景的、高性能和高容錯(cuò)的開源大數(shù)據(jù)平臺(tái)[7]。Spark Streaming是Apache Spark的一個(gè)子項(xiàng)目,是一個(gè)運(yùn)行在Spark引擎之上的實(shí)時(shí)處理工具[8]。
與Hadoop[9]大數(shù)據(jù)處理平臺(tái)不同,Spark建立在統(tǒng)一抽象的RDD(Resilient Distributed Datasets)之上,使得它可以以基本一致的方式應(yīng)對(duì)各種大數(shù)據(jù)處理場(chǎng)景,包括MapReduce、Streaming、SQL、Machine Learning以及Graph等。
Spark的另一個(gè)特點(diǎn)就是其高性能和容錯(cuò)性。Spark是一種粗粒度數(shù)據(jù)并行的計(jì)算范式,計(jì)算的主體是數(shù)據(jù)集合RDD,而非個(gè)別數(shù)據(jù)。RDD集合內(nèi)的所有數(shù)據(jù)都經(jīng)過(guò)同樣的算子序列,數(shù)據(jù)并行可編程性好,易于獲得高并行性(與數(shù)據(jù)規(guī)模相關(guān),而非與程序邏輯的并行性相關(guān)),也易于高效地映射到底層的并行或分布式硬件上[10]。RDD是一個(gè)容錯(cuò)的、并行的數(shù)據(jù)結(jié)構(gòu),在保證容錯(cuò)的前提下,用內(nèi)存來(lái)承載工作集。內(nèi)存的存取速度快于磁盤多個(gè)數(shù)量級(jí),從而可以極大提升性能[11]。Spark的容錯(cuò)是通過(guò)重放日志更新而取得的。因?yàn)镾park的函數(shù)式語(yǔ)義和冪等特性,重放日志更新RDD不會(huì)有副作用。另外,Spark記錄的是粗粒度的RDD更新,所以容錯(cuò)的開銷可以忽略不計(jì)。
Spark的實(shí)時(shí)性特點(diǎn)是通過(guò)Spark Streaming實(shí)現(xiàn)的。Spark Streaming將流式計(jì)算分解成一系列短小的批處理作業(yè),也就是把輸入數(shù)據(jù)流按照批次大小(如1 s)分成一段一段的數(shù)據(jù)形成DStream(Discretized Stream),而每一段數(shù)據(jù)都轉(zhuǎn)換成Spark中的RDD,如圖2所示。

圖2 DStream的組成
Spark Streaming提供了兩種操作類型,分別為Transformations和Output操作。對(duì)DStream的Transformation操作變?yōu)镾park中對(duì)RDD的Transformation操作,從一個(gè)已知的DStream經(jīng)過(guò)轉(zhuǎn)換得到一個(gè)新的DStream;而且Spark Streaming還額外增加了一類針對(duì)“窗口”(Window)的Transformation操作,可以更靈活地控制DStream的大小(時(shí)間間隔大小、數(shù)據(jù)元素個(gè)數(shù)等)。整個(gè)流式計(jì)算根據(jù)業(yè)務(wù)的需求可以對(duì)中間的結(jié)果進(jìn)行疊加,或者使用Output操作將DStream數(shù)據(jù)輸出到一個(gè)外部的存儲(chǔ)系統(tǒng),如數(shù)據(jù)庫(kù)或文件系統(tǒng)等。
Spark具有極高的擴(kuò)展性與吞吐量。根據(jù)Spark官方網(wǎng)站FAQ,最大的已知Spark集群有8 000個(gè)節(jié)點(diǎn)[12];并且隨著大數(shù)據(jù)增多,預(yù)計(jì)集群規(guī)模也會(huì)隨之變大,以便繼續(xù)滿足吞吐量方面的預(yù)期。另外,使用Spark的EC2 啟動(dòng)腳本可以輕松地在Amazon EC2上啟動(dòng)一個(gè)獨(dú)立集群。Spark目前在EC2上已能夠線性擴(kuò)展到100個(gè)節(jié)點(diǎn)(每個(gè)節(jié)點(diǎn)4核),可以以數(shù)秒的延遲每秒處理6 GB的數(shù)據(jù)量[11]。
一個(gè)Spark集群由多個(gè)工作節(jié)點(diǎn)(Worker Node)組成,每個(gè)工作節(jié)點(diǎn)可以運(yùn)行一個(gè)或多個(gè)Executor,如圖3所示。Executor是一個(gè)用于應(yīng)用程序或者工作節(jié)點(diǎn)的進(jìn)程,負(fù)責(zé)處理Tasks,并將數(shù)據(jù)保存到內(nèi)存或者磁盤中。每個(gè)應(yīng)用程序都有屬于自己的Executor,一個(gè)Executor則包含了一定數(shù)量的Slots來(lái)運(yùn)行分配給它的任務(wù)。在Spark中,每個(gè)作業(yè)(Job)都被分隔成多個(gè)彼此依賴稱之為Stage的Task。 一個(gè)Task就是一個(gè)工作單元, 可以發(fā)送給一個(gè)Executor執(zhí)行。 Task是用來(lái)執(zhí)行應(yīng)用的實(shí)際計(jì)算工作。 每個(gè)Task占用Executor的一個(gè)Slot。

圖3 Spark Streaming集群架構(gòu)
Spark Streaming流計(jì)算可以在數(shù)據(jù)產(chǎn)生并流入系統(tǒng)時(shí)就進(jìn)行處理并馬上得出結(jié)果,非常適合能耗分項(xiàng)計(jì)量中能耗數(shù)據(jù)不斷產(chǎn)生的場(chǎng)景和對(duì)實(shí)時(shí)性的需求。
選擇Spark Streaming的另一個(gè)原因是因?yàn)镾park可以在支持實(shí)時(shí)流式處理的同時(shí),進(jìn)行高效的批處理、交互式SQL查詢和數(shù)據(jù)挖掘,從而可以使能耗分項(xiàng)計(jì)量系統(tǒng)不但可以實(shí)時(shí)地為用戶捕捉能耗異常情況進(jìn)行報(bào)警,還可以提供離線統(tǒng)計(jì)分析和數(shù)據(jù)挖掘的服務(wù)。
2.2 Apache Kafka
Apache Kafka是一個(gè)分布式的、高吞吐量的、易于擴(kuò)展的基于主題發(fā)布/訂閱的消息系統(tǒng),最早是由 LinkedIn 開發(fā),并于2011年開源并貢獻(xiàn)給 Apache 軟件基金會(huì)[13]。
Kafka的邏輯架構(gòu)如圖4所示。Kafka對(duì)消息保存時(shí)根據(jù)話題(Topic)進(jìn)行歸類,發(fā)送消息者成為生產(chǎn)者(Producer),消息接受者成為消費(fèi)者(Consumer)。此外Kafka集群由多個(gè)服務(wù)器組成,每個(gè)服務(wù)器成為代理(Broker)。無(wú)論是Kafka集群,還是Producer和Consumer都依賴于Zookeeper來(lái)保證系統(tǒng)可用性。

圖4 Kafka的邏輯架構(gòu)
一個(gè)話題可以認(rèn)為是一類消息,每個(gè)話題將被分成多個(gè)分區(qū)(Partition)。設(shè)計(jì)分區(qū)的最根本原因是Kafka基于文件存儲(chǔ),通過(guò)分區(qū)可以將日志內(nèi)容分散到多個(gè)服務(wù)器上,來(lái)均衡負(fù)載,保證消息保存/消費(fèi)的效率。如果一個(gè)話題對(duì)應(yīng)一個(gè)文件,那這個(gè)文件所在的機(jī)器IO將會(huì)成為這個(gè)話題的性能瓶頸,而有了分區(qū)后,不同的消息可以并行寫入不同代理的不同分區(qū)里,屬于順序?qū)懘疟P,因此效率非常高,極大地提高了Kafka的吞吐率。所以,消息分區(qū)是Kafka高吞吐率的一個(gè)很重要的保證,即使在非常廉價(jià)的商用機(jī)器上也能做到單機(jī)支持每秒104條以上消息的傳輸[14]。此外,越多的分區(qū)意味著可以容納更多的消費(fèi)者,可以有效提升并發(fā)消費(fèi)的能力。Kafka的消息分區(qū)結(jié)構(gòu)如圖5所示。

圖5 Kafka消息分區(qū)結(jié)構(gòu)
與傳統(tǒng)的消息系統(tǒng)不同,Kafka系統(tǒng)中存儲(chǔ)的消息沒(méi)有明確的消息ID,消息通過(guò)日志中的位置稱為偏移量來(lái)唯一標(biāo)記一條消息,這樣就避免了維護(hù)密集尋址,用于映射消息ID到實(shí)際消息地址的隨機(jī)存取索引結(jié)構(gòu)的開銷。這種設(shè)計(jì)大大提高了Kafka的性能。
Kafka的另外一個(gè)創(chuàng)新是即使消息被消費(fèi),消息仍然不會(huì)被立即刪除。日志文件將會(huì)根據(jù)代理中的配置,保留一定的時(shí)間之后刪除;比如日志文件保留2 d,那么之后文件會(huì)被清除,無(wú)論其中的消息是否被消費(fèi)。Kafka通過(guò)這種簡(jiǎn)單的手段來(lái)釋放磁盤空間,從而可以減少消息消費(fèi)之后對(duì)文件內(nèi)容改動(dòng)的磁盤IO開支。
Kafka還有一個(gè)創(chuàng)新點(diǎn)就是Kafka代理是無(wú)狀態(tài)的,由消費(fèi)者維護(hù)已消費(fèi)的狀態(tài)信息。這種設(shè)計(jì)的一個(gè)好處就是消費(fèi)者可以退回到老的偏移量再次消費(fèi)數(shù)據(jù)。因?yàn)榇硎菬o(wú)狀態(tài)的,它不需要標(biāo)記哪些消息被哪些消費(fèi)者消費(fèi)過(guò),也不需要代理去保證同一個(gè)消費(fèi)者組里只有一個(gè)消費(fèi)者能消費(fèi)某一條消息,因此也就不需要鎖機(jī)制,這也為Kafka的高吞吐率提供了有力保障。
為了提高可用性,Kafka可以配置分區(qū)需要備份的個(gè)數(shù),每個(gè)分區(qū)將會(huì)被備份到多臺(tái)Kafka服務(wù)器上,以提高可用性。每個(gè)分區(qū)都有一個(gè)Kafka服務(wù)器為領(lǐng)導(dǎo)者(Leader),負(fù)責(zé)所有的讀寫操作。如果領(lǐng)導(dǎo)者失效,那么將會(huì)有其他跟隨者(Follower)來(lái)接管成為新的領(lǐng)導(dǎo)者。跟隨者只是單調(diào)地和領(lǐng)導(dǎo)者跟進(jìn),同步消息即可。從集群的整體考慮,Kafka會(huì)將領(lǐng)導(dǎo)者均衡地分散到每個(gè)Kafka服務(wù)器上,來(lái)確保整體的性能穩(wěn)定。
Kafka可以同時(shí)支持離線數(shù)據(jù)處理和實(shí)時(shí)數(shù)據(jù)處理。如圖6所示,Kafka同時(shí)支持點(diǎn)到點(diǎn)分發(fā)模型,即多個(gè)消費(fèi)者共同消費(fèi)隊(duì)列中某個(gè)消息的單個(gè)副本,以及發(fā)布-訂閱模型,即多個(gè)消費(fèi)者接收自己的消息副本。根據(jù)這一特性,可以使用Spark實(shí)時(shí)流處理系統(tǒng)對(duì)消息進(jìn)行實(shí)時(shí)在線處理,同時(shí)使用Hadoop這種批處理系統(tǒng)進(jìn)行離線處理,還可以同時(shí)將數(shù)據(jù)進(jìn)行實(shí)時(shí)備份,只需要保證這三個(gè)操作所使用的消費(fèi)者屬于不同的消費(fèi)者即可。

圖6 Kafka消息分發(fā)模型
總之,Kafka是一種處理大量數(shù)據(jù)的新型消息系統(tǒng),其高吞吐量、高可靠、高可用、易擴(kuò)展的特性完全適應(yīng)于能耗分項(xiàng)計(jì)量系統(tǒng)。此外,通過(guò)利用Kafka同時(shí)支持多種處理模型的特點(diǎn),能耗分項(xiàng)計(jì)量系統(tǒng)可以在進(jìn)行能耗數(shù)據(jù)在線處理的同時(shí),對(duì)能耗數(shù)據(jù)進(jìn)行備份和離線處理。
2.3 實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)架構(gòu)
本文設(shè)計(jì)的實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)的整體架構(gòu)如圖7所示,主要包括后端能耗數(shù)據(jù)采集部分、Kafka消息系統(tǒng)、Spark Streaming集群、Hadoop集群、前端實(shí)時(shí)展示應(yīng)用和前端分析展示應(yīng)用,以及分類分項(xiàng)能耗數(shù)據(jù)庫(kù)和計(jì)量表原始數(shù)值數(shù)據(jù)庫(kù)。
流式處理系統(tǒng)主要通過(guò)網(wǎng)絡(luò)Socket通信來(lái)實(shí)現(xiàn)與外部IO系統(tǒng)的數(shù)據(jù)交互。由于網(wǎng)絡(luò)通信的不可靠特點(diǎn),發(fā)送端與接收端需要通過(guò)一定的協(xié)議來(lái)保證數(shù)據(jù)包的接收確認(rèn)和失敗重發(fā)機(jī)制。不是所有的IO系統(tǒng)都支持重發(fā),這至少需要實(shí)現(xiàn)數(shù)據(jù)流的持久化,同時(shí)還要實(shí)現(xiàn)高吞吐和低時(shí)延。通過(guò)前面的介紹,可以確定Kafka具備這些特點(diǎn),完全能夠作為實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)的外部數(shù)據(jù)源。
除了把Kafka當(dāng)成輸入數(shù)據(jù)源之外,實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)還將其作為信息輸出數(shù)據(jù)源向前端實(shí)時(shí)展示應(yīng)用推送相關(guān)報(bào)警和實(shí)時(shí)流信息。

圖7 實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)整體架構(gòu)
能耗數(shù)據(jù)采集部分包括能耗采集儀表和數(shù)據(jù)采集器。一般來(lái)講,每個(gè)數(shù)據(jù)采集器可以連接16到32塊采集儀表。數(shù)據(jù)采集器負(fù)責(zé)接收所連接采集儀表發(fā)來(lái)的能耗數(shù)據(jù),并把數(shù)據(jù)整理為住建部所制定的能耗數(shù)據(jù)通信協(xié)議格式[15],然后按照設(shè)置的時(shí)間間隔發(fā)送到設(shè)定的數(shù)據(jù)接收器。每個(gè)數(shù)據(jù)接收器就是Kafka消息系統(tǒng)的消息生產(chǎn)者,負(fù)責(zé)把從數(shù)據(jù)采集器發(fā)來(lái)的數(shù)據(jù)寫入Kafka消息系統(tǒng),從而保證了數(shù)據(jù)的可靠性。
按照住建部要求,計(jì)量表采集到的能耗數(shù)據(jù)一方面必須寫入計(jì)量表原始數(shù)值數(shù)據(jù)庫(kù),同時(shí)還需要按儀表、按分項(xiàng)進(jìn)行拆分并把結(jié)果寫入分類分項(xiàng)能耗數(shù)據(jù)庫(kù)。在滿足住建部基本要求的同時(shí),實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)還對(duì)能耗數(shù)據(jù)進(jìn)行實(shí)時(shí)分析以便能夠及時(shí)捕捉能耗異常情況,并報(bào)警給用戶。
為了能夠保證能耗數(shù)據(jù)處理的實(shí)時(shí)性,實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)充分利用Kafka消息系統(tǒng)可以同時(shí)支持多個(gè)消費(fèi)者組的能力,為能耗數(shù)據(jù)消息設(shè)置兩個(gè)消費(fèi)者組。一個(gè)是運(yùn)行在Spark Streaming流式計(jì)算集群上的能耗數(shù)據(jù)實(shí)時(shí)數(shù)據(jù)拆分程序;另一個(gè)則是運(yùn)行在Spark批處理集群上的計(jì)量表原始數(shù)值寫入程序。
運(yùn)行在Spark Streaming集群上的能耗數(shù)據(jù)拆分程序是實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)的核心模塊。數(shù)據(jù)拆分程序以Kafka消息系統(tǒng)作為能耗數(shù)據(jù)輸入流進(jìn)行實(shí)時(shí)在線處理。首先,數(shù)據(jù)拆分程序?qū)δ芎臄?shù)據(jù)進(jìn)行分類分項(xiàng)拆分,并形成多個(gè)數(shù)據(jù)流供其他業(yè)務(wù)處理模塊使用。第二,數(shù)據(jù)拆分程序把分類分項(xiàng)拆分結(jié)果按照不同時(shí)間粒度進(jìn)行統(tǒng)計(jì),并把統(tǒng)計(jì)結(jié)果寫入分類分項(xiàng)能耗數(shù)據(jù)庫(kù)。時(shí)間粒度分為15 min、小時(shí)、天和月。
除了能耗數(shù)據(jù)拆分程序之外,Spark Streaming集群還可以進(jìn)行多種實(shí)時(shí)在線數(shù)據(jù)處理,比如能耗熱點(diǎn)分析和能耗異常分析。這些能耗數(shù)據(jù)處理程序并不直接從Kafka消息系統(tǒng)中獲取數(shù)據(jù),而是使用能耗數(shù)據(jù)拆分程序生成的數(shù)據(jù)流進(jìn)行數(shù)據(jù)處理,并把分析結(jié)果通過(guò)Kafka消息系統(tǒng)提供給前端實(shí)時(shí)展示應(yīng)用。
實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)的另一部分功能是進(jìn)行離線數(shù)據(jù)統(tǒng)計(jì)、分析以及數(shù)據(jù)挖掘。前端能耗分析程序基于能耗數(shù)據(jù)拆分程序?qū)懭氲椒诸惙猪?xiàng)能耗數(shù)據(jù)庫(kù)的數(shù)據(jù),以及計(jì)量表原始數(shù)值數(shù)據(jù)進(jìn)行各類能耗數(shù)據(jù)統(tǒng)計(jì)、分析以及數(shù)據(jù)挖掘,從而使用能單位可以掌握詳細(xì)能耗使用情況,為制定節(jié)能策略提供科學(xué)依據(jù)。
因?yàn)槟芎臄?shù)據(jù)記錄的數(shù)量遠(yuǎn)遠(yuǎn)超過(guò)了傳統(tǒng)關(guān)系型數(shù)據(jù)庫(kù)可以支持的容量,計(jì)量表原始數(shù)值數(shù)據(jù)庫(kù)和分類分項(xiàng)能耗數(shù)據(jù)庫(kù)均使用了HBase數(shù)據(jù)庫(kù)。HBase是運(yùn)行在Hadoop上的NoSQL數(shù)據(jù)庫(kù),它是一個(gè)分布式的和可擴(kuò)展的面向列的數(shù)據(jù)庫(kù),可以在一組通用硬件上存儲(chǔ)許多具有數(shù)十億行和上百萬(wàn)列的大表[16]。HBase能夠融合Key-Value數(shù)據(jù)模式帶來(lái)實(shí)時(shí)查詢的能力,以及通過(guò)MapReduce或Spark進(jìn)行離線處理或者批處理的能力。總之,HBase能夠存儲(chǔ)大量的數(shù)據(jù),讓用戶在大量的數(shù)據(jù)中查詢記錄,并從中獲得綜合分析報(bào)告。所以,HBase非常適合于存放計(jì)量表原始數(shù)據(jù)和分類分項(xiàng)能耗數(shù)據(jù)。HBase不但可以滿足能耗分項(xiàng)計(jì)量系統(tǒng)每天幾十萬(wàn)條記錄的大數(shù)據(jù)量需求,還可以與Hadoop的MapReduce以及Spark SQL和Spark MLlib結(jié)合為用戶提供高效能耗數(shù)據(jù)分析和數(shù)據(jù)挖掘工作。
2.4 實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)內(nèi)部結(jié)構(gòu)
實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)的內(nèi)部結(jié)構(gòu)如圖8所示。Spark Streaming集群由多個(gè)工作者節(jié)點(diǎn)(Worker Node)組成,每個(gè)工作者節(jié)點(diǎn)包含一個(gè)或多個(gè)Spark Executor。同時(shí),在每個(gè)工作節(jié)點(diǎn)還安裝了用來(lái)存儲(chǔ)能耗數(shù)據(jù)的數(shù)據(jù)庫(kù)系統(tǒng)HBase和MySQL,以及數(shù)據(jù)倉(cāng)庫(kù)系統(tǒng)Hive。HBase用來(lái)存儲(chǔ)能耗分類分項(xiàng)數(shù)據(jù)和計(jì)量表原始數(shù)值數(shù)據(jù);MySQL用來(lái)存儲(chǔ)與用能單位和分項(xiàng)計(jì)量系統(tǒng)各種設(shè)備部署情況的結(jié)構(gòu)化數(shù)據(jù);Hive用來(lái)按主題、多維度、多粒度對(duì)分類分項(xiàng)能耗數(shù)據(jù)進(jìn)行存儲(chǔ)和管理,為后期進(jìn)行離線分析和數(shù)據(jù)挖掘提供良好的基礎(chǔ)。

圖8 實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)內(nèi)部結(jié)構(gòu)
來(lái)自各個(gè)數(shù)據(jù)采集器的能耗數(shù)據(jù)使用同一個(gè)話題(Topic)寫入Kafka消息系統(tǒng)。為了保證實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)的吞吐量,以便支持更多的計(jì)量表,我們對(duì)能耗數(shù)據(jù)消息話題進(jìn)行了分區(qū)。通過(guò)消息分區(qū)可以提高消息生產(chǎn)者和消息消費(fèi)者的并發(fā)能力。
在實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)中,每個(gè)消息分區(qū)將有兩個(gè)消息消費(fèi)者小組:一個(gè)是Spark Streaming流式集群的能耗數(shù)據(jù)拆分程序小組;另一個(gè)是Spark批處理集群的計(jì)量表原始數(shù)值數(shù)據(jù)寫入程序小組。每個(gè)小組都由多個(gè)消費(fèi)者組成,每個(gè)消息分區(qū)的數(shù)據(jù)都會(huì)被每個(gè)小組中的一個(gè)消費(fèi)者接收。
圖8描述了一個(gè)包含兩個(gè)Worker Node的Spark 集群,每個(gè)Worker Node運(yùn)行了三個(gè)Spark Executor。每個(gè)Worker Node上有兩個(gè)Executor屬于能耗數(shù)據(jù)拆分程序小組,另外一個(gè)屬于計(jì)量表原始數(shù)值寫入程序小組。
能耗數(shù)據(jù)消息話題分成了四個(gè)分區(qū),能耗數(shù)據(jù)拆分程序小組中的每個(gè)Executor消費(fèi)一個(gè)消息分區(qū)的數(shù)據(jù);計(jì)量表原始數(shù)據(jù)值寫入程序小組中的每個(gè)Executor負(fù)責(zé)消費(fèi)兩個(gè)消息分區(qū)的數(shù)據(jù)。一般來(lái)講,分區(qū)的個(gè)數(shù)最好是消費(fèi)者小組中消費(fèi)者的倍數(shù),也就是說(shuō),同小組中的每個(gè)消費(fèi)者負(fù)責(zé)處理的消息分區(qū)個(gè)數(shù)是等同的。在實(shí)際環(huán)境中消息話題分區(qū)的個(gè)數(shù)需要按照整個(gè)系統(tǒng)連接的計(jì)量?jī)x表的個(gè)數(shù)來(lái)確定。
與Receiver方式相比,Direct方式雖然使用較為復(fù)雜,但是它能提供更好的靈活性和可靠性,所以本文選用Direct方式。Direct方式使用Kafka的基本API,由Spark Streaming負(fù)責(zé)記錄在每個(gè)消息分區(qū)中的消費(fèi)位移,也就是已經(jīng)消費(fèi)過(guò)的消息位置,并保存在Spark系統(tǒng)的檢測(cè)點(diǎn)(Check Point)記錄中。使用Direct方式,Spark Streaming會(huì)周期性地查詢Kafka,來(lái)獲得每個(gè)消息分區(qū)的最新的位移,從而定義每個(gè)數(shù)據(jù)塊的數(shù)據(jù)范圍。當(dāng)處理消息的作業(yè)啟動(dòng)時(shí),就會(huì)使用Kafka的簡(jiǎn)單消費(fèi)API 來(lái)獲取Kafka指定范圍的數(shù)據(jù)。Spark會(huì)創(chuàng)建跟Kafka 分區(qū)一樣多的RDD 分區(qū),并且會(huì)并行從Kafka中讀取數(shù)據(jù)。所以在Kafka 分區(qū)和RDD 分區(qū)之間,有一個(gè)一對(duì)一的映射關(guān)系。采用Direct方式的另外一個(gè)優(yōu)勢(shì)就是可以利用Kafka保證數(shù)據(jù)的可靠性,并且可以保證數(shù)據(jù)是消費(fèi)一次且僅消費(fèi)一次。
在每個(gè)Spark Executor中,運(yùn)行著能耗數(shù)據(jù)拆分程序和多個(gè)能耗數(shù)據(jù)實(shí)時(shí)處理程序。能耗拆分程序的功能在前面已經(jīng)介紹。每個(gè)能耗數(shù)據(jù)處理程序基于拆分程序生成的實(shí)時(shí)數(shù)據(jù)流完成一定的數(shù)據(jù)處理工作,并把部分?jǐn)?shù)據(jù)處理的結(jié)果寫入Kafka消息系統(tǒng)供前端實(shí)時(shí)展示應(yīng)用使用,同時(shí)還會(huì)把一些數(shù)據(jù)處理結(jié)果寫入MySQL數(shù)據(jù)庫(kù)供前端分析系統(tǒng)使用。
能耗用量異常分析程序是我們提供的一個(gè)能耗實(shí)時(shí)數(shù)據(jù)處理程序,它基于數(shù)據(jù)拆分程序提供的能耗使用數(shù)據(jù)流,根據(jù)用能單位設(shè)置的各類閾值以及正常能耗使用量發(fā)現(xiàn)用能異常情況,并通過(guò)Kafka消息系統(tǒng)及時(shí)報(bào)警給前端實(shí)時(shí)展示應(yīng)用。比如,單位給某辦公樓層設(shè)置的空調(diào)用電的閾值為每小時(shí)20度,能耗異常分析程序在對(duì)能耗使用數(shù)據(jù)流進(jìn)行處理時(shí)就會(huì)檢測(cè)該樓層的空調(diào)用電量,當(dāng)用電量超過(guò)每小時(shí)20度時(shí),就會(huì)產(chǎn)生報(bào)警消息通過(guò)Kafka提交給前端實(shí)時(shí)展示應(yīng)用。同樣的,假定某小區(qū)正常煤氣流量為10 m3/min左右,如果能耗用量異常分析程序發(fā)現(xiàn)該小區(qū)煤氣流量遠(yuǎn)遠(yuǎn)超過(guò)了10 m3/min,那么就有可能是發(fā)生了煤氣管道漏氣。這時(shí),能耗用量異常分析程序就會(huì)產(chǎn)生報(bào)警消息。
能耗用量熱點(diǎn)分析程序是我們提供的另一個(gè)能耗實(shí)時(shí)數(shù)據(jù)處理程序,它會(huì)實(shí)時(shí)統(tǒng)計(jì)每個(gè)計(jì)量點(diǎn)的每刻的能耗使用量并通過(guò)Kafka消息系統(tǒng)發(fā)布。前端實(shí)時(shí)展示應(yīng)用可以獲取感興趣的計(jì)量點(diǎn)的流量統(tǒng)計(jì)來(lái)繪制能耗用量熱點(diǎn)圖,從而可以一目了然地及時(shí)了解所關(guān)心計(jì)量點(diǎn)的能耗使用狀況。
2.5 實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)的優(yōu)勢(shì)
本文提出的實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)充分利用了最先進(jìn)的大數(shù)據(jù)技術(shù),特別是流計(jì)算技術(shù),并針對(duì)能耗分項(xiàng)計(jì)量的特點(diǎn)對(duì)整體系統(tǒng)架構(gòu)和內(nèi)部結(jié)構(gòu)進(jìn)行了認(rèn)真的研究與設(shè)計(jì)。與傳統(tǒng)的能耗分項(xiàng)計(jì)量系統(tǒng)比,本文提出的實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)具有如下優(yōu)勢(shì):
首先,實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)可以同時(shí)支持實(shí)時(shí)在線數(shù)據(jù)處理和離線數(shù)據(jù)統(tǒng)計(jì)分析,而傳統(tǒng)的能耗分項(xiàng)計(jì)量系統(tǒng)只支持對(duì)能耗使用情況的離線統(tǒng)計(jì)和分析。實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)的異常情況實(shí)時(shí)報(bào)警功能和能耗使用熱點(diǎn)實(shí)時(shí)分析功能,不僅可以使用能單位在發(fā)生能耗異常情況時(shí)可以及時(shí)采取相應(yīng)措施,防止異常情況蔓延,還可以讓用能單位隨時(shí)掌握整體能耗情況的實(shí)時(shí)現(xiàn)狀。
其次,實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)具有很強(qiáng)的數(shù)據(jù)處理能力。整體系統(tǒng)架構(gòu)使用了當(dāng)前最先進(jìn)的快速流式處理系統(tǒng)Spark Streaming和具有高可靠、高吞吐量的Kafka消息系統(tǒng)作為實(shí)時(shí)數(shù)據(jù)流處理的核心架構(gòu)。整個(gè)數(shù)據(jù)處理過(guò)程是基于內(nèi)存,而不像傳統(tǒng)能耗分項(xiàng)計(jì)量系統(tǒng)需要把數(shù)據(jù)首先寫入文件系統(tǒng),然后再讀入到內(nèi)存進(jìn)行處理,所以,實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)的處理效率會(huì)比傳統(tǒng)能耗分項(xiàng)計(jì)量系統(tǒng)提高百倍以上。這意味著,在同樣的硬件配置情況下,實(shí)時(shí)能耗分析計(jì)量系統(tǒng)可以支持的能耗采集點(diǎn)數(shù)可以提高上百倍。
第三,實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)具有很強(qiáng)的可擴(kuò)展性。實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)架構(gòu)中的Kafka消息系統(tǒng)、Spark系統(tǒng)、HBase系統(tǒng)和Hadoop系統(tǒng)都是分布式集群結(jié)構(gòu),并具有很強(qiáng)的擴(kuò)展能力。所以,在使用實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)的每個(gè)階段,用戶只需要部署能夠滿足當(dāng)時(shí)能耗監(jiān)控需求的設(shè)備即可,而不需要考慮后期可能的需求。這一方面可以節(jié)省用戶的投資成本,還減少了用戶初期部署的設(shè)計(jì)負(fù)擔(dān)。
第四,實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)提供快速數(shù)據(jù)挖掘能力。除了強(qiáng)大的實(shí)時(shí)數(shù)據(jù)處理能力以外,借助于Spark 平臺(tái),實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)還可以利用Spark MLlib進(jìn)行深度數(shù)據(jù)挖掘,發(fā)現(xiàn)復(fù)雜的能耗數(shù)據(jù)之間的關(guān)聯(lián)關(guān)系,從而為制定有效的節(jié)能措施提供科學(xué)依據(jù)。基于Spark MLlib的數(shù)據(jù)挖掘效率會(huì)遠(yuǎn)遠(yuǎn)高于基于MapReduce模式的Mahout數(shù)據(jù)挖掘系統(tǒng)的效率。
第五,實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)可以很容易增加新的業(yè)務(wù)處理功能。在當(dāng)前系統(tǒng)中,提供了能耗異常分析和能耗用量熱點(diǎn)分析兩個(gè)實(shí)時(shí)處理功能,但是今后可以根據(jù)用戶需求很方便地添加新的業(yè)務(wù)處理能力。新添加的業(yè)務(wù)處理功能將會(huì)與原有的處理并行進(jìn)行,并不會(huì)影響現(xiàn)有的實(shí)時(shí)業(yè)務(wù)處理能力。
為了檢驗(yàn)實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)進(jìn)行分項(xiàng)計(jì)量和實(shí)時(shí)數(shù)據(jù)處理的能力,實(shí)際部署了一套實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng),在對(duì)各種參數(shù)進(jìn)行優(yōu)化之后,進(jìn)行了一系列的測(cè)試。
3.1 測(cè)試環(huán)境
測(cè)試環(huán)境是運(yùn)行在云平臺(tái)上的7臺(tái)虛擬機(jī)組成。每臺(tái)虛擬機(jī)的配置為8核CPU,25 GB內(nèi)存,1 TB HDD磁盤。圖9描述了實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)各個(gè)模塊的部署情況。

圖9 系統(tǒng)測(cè)試環(huán)境
系統(tǒng)部署的指導(dǎo)思想是要保證整個(gè)系統(tǒng)的可靠性和可擴(kuò)充性,并且保證節(jié)點(diǎn)之間的負(fù)載均衡性。具體部署情況如下:在兩臺(tái)服務(wù)器上部署了Hadoop的Name Node和HBase的Master Server;三臺(tái)服務(wù)器上部署了Zookeeper、Spark、Hadoop的Data Node和HBase的Master Server;最后兩臺(tái)服務(wù)器上部署了Kafka、Spark、Hadoop的Data Node和HBase的Master Server。
3.2 測(cè)試結(jié)果與分析
實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)的性能指標(biāo)主要考慮的是系統(tǒng)的吞吐量和處理數(shù)據(jù)的延遲時(shí)間。系統(tǒng)的吞吐量一般用兩個(gè)指標(biāo)來(lái)衡量:一是單位時(shí)間內(nèi)系統(tǒng)能夠處理的能耗數(shù)據(jù)的條數(shù);二是系統(tǒng)處理一條能耗數(shù)據(jù)所需要的時(shí)間。單位時(shí)間內(nèi)處理的數(shù)據(jù)條數(shù)越多說(shuō)明系統(tǒng)的吞吐量越高,系統(tǒng)處理數(shù)據(jù)的能力越強(qiáng)。處理能耗數(shù)據(jù)的延遲時(shí)間的指標(biāo)也有兩個(gè):一個(gè)是從接收到一條能耗數(shù)據(jù)到開始處理該條數(shù)據(jù)之間的時(shí)間間隔稱為調(diào)度延遲時(shí)間(Scheduling Delay);另一個(gè)是從接收到一條能耗數(shù)據(jù)到處理完該條數(shù)據(jù)之間的時(shí)間間隔稱為總延遲時(shí)間(Total Delay)。處理能耗數(shù)據(jù)的延遲時(shí)間越小,說(shuō)明系統(tǒng)處理數(shù)據(jù)越及時(shí),系統(tǒng)實(shí)時(shí)性越強(qiáng)。
在測(cè)試中,通過(guò)給實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)的Kafka消息系統(tǒng)加載實(shí)際數(shù)據(jù)來(lái)測(cè)試系統(tǒng)的吞吐量和處理數(shù)據(jù)的延遲時(shí)間,測(cè)試結(jié)果如圖10所示。圖中展示的測(cè)試運(yùn)行了6 min 27 s,每秒加載一組能耗數(shù)據(jù),每組數(shù)據(jù)大約包含140條能耗記錄,總共處理了387組數(shù)據(jù),64 968條能耗記錄。
圖10(a)展示的是給系統(tǒng)加載能耗數(shù)據(jù)的速率(Input Rate)。可以看出給系統(tǒng)加載能耗數(shù)據(jù)的平均速率為每秒167.88條記錄,瞬間最高值達(dá)到了每秒300條以上,絕大多數(shù)數(shù)據(jù)都是按每秒140到200條數(shù)據(jù)的速率發(fā)送的。
圖10(b)展示的是數(shù)據(jù)的調(diào)度延遲時(shí)間。盡管顯示的平均調(diào)度延遲時(shí)間為10 ms,但從圖中可以看出這主要是由于在測(cè)試剛開始啟動(dòng)時(shí),第一批數(shù)據(jù)有一個(gè)2 s延遲而導(dǎo)致的。從右圖可以看出,其余批次數(shù)據(jù)的調(diào)度延遲平均值在0.2 ms以內(nèi)。
圖10(c)展示的是處理一批能耗數(shù)據(jù)所需要的時(shí)間。圖中顯示處理每批數(shù)據(jù)的平均時(shí)間為133 ms。如果考慮到除去系統(tǒng)剛啟動(dòng)運(yùn)行的第一批數(shù)據(jù),那么平均處理每一批數(shù)據(jù)的時(shí)間會(huì)在100 ms以內(nèi)。從圖的形狀來(lái)看,除了第一批數(shù)據(jù)以外,系統(tǒng)整個(gè)處理過(guò)程非常平穩(wěn)。
圖10(d)展示的是系統(tǒng)處理能耗數(shù)據(jù)的總延遲的平均時(shí)間為143 ms。類似于調(diào)度延遲時(shí)間,總延遲平均時(shí)間也因?yàn)榈谝慌鷶?shù)據(jù)的延遲而拉高。如果剔除第一批數(shù)據(jù),其余批次數(shù)據(jù)的總延遲時(shí)間均在100 ms以內(nèi)。
圖11以表格的形式展示了測(cè)試最后26批次數(shù)據(jù)的結(jié)果,包括每批數(shù)據(jù)的條數(shù)和提交時(shí)間,也就是包含多少條能耗數(shù)據(jù)、調(diào)度延遲時(shí)間、處理時(shí)間和總延遲時(shí)間。通過(guò)圖11的數(shù)據(jù),可以更進(jìn)一步佐證上面對(duì)測(cè)試數(shù)據(jù)的分析結(jié)果。從圖11可以看出每批數(shù)據(jù)平均包含146.88條能耗數(shù)據(jù);平臺(tái)調(diào)度延遲時(shí)間為0.34 ms;每批數(shù)據(jù)的平均處理時(shí)間為117.65 ms;平均總延遲時(shí)間為118 ms。所以,實(shí)時(shí)能耗分項(xiàng)計(jì)量的吞吐量為每秒處理1 248條記錄(146.88/117.65×1 000)。
圖11的數(shù)據(jù)是在系統(tǒng)度過(guò)了初始階段達(dá)到穩(wěn)定以后的數(shù)據(jù),結(jié)合圖10的整體情況,可以知道圖11的數(shù)據(jù)更能代表實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)的特性。

圖10 系統(tǒng)實(shí)時(shí)性測(cè)試結(jié)果

圖11 系統(tǒng)實(shí)時(shí)性詳細(xì)測(cè)試結(jié)果
3.3 測(cè)試結(jié)論
從上面的實(shí)驗(yàn)結(jié)果可以看出,在實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)啟動(dòng)以后,只需要處理完第一批數(shù)據(jù)以后,就能達(dá)到穩(wěn)定的運(yùn)行狀態(tài),大約3 s。平均實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)的吞吐量為每秒處理1 248條記錄,平臺(tái)調(diào)度延遲時(shí)間為0.34 ms;每批數(shù)據(jù)的平均處理時(shí)間為117.65 ms;平均總延遲時(shí)間為118 ms。所以,實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)具有很高的吞吐量,實(shí)時(shí)性很強(qiáng),并且系統(tǒng)數(shù)據(jù)處理速率很平穩(wěn)。
按照國(guó)家住建部分項(xiàng)計(jì)量規(guī)則要求,每塊分項(xiàng)計(jì)量?jī)x表需要每15 min提交一次數(shù)據(jù);而在15 min時(shí)間內(nèi),實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)可以處理超過(guò)100萬(wàn)條(15×60×1 200)數(shù)據(jù)。也就是說(shuō),在現(xiàn)有的系統(tǒng)配置環(huán)境下,實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)可以支持100萬(wàn)塊儀表。因?yàn)閭鹘y(tǒng)的能耗分項(xiàng)計(jì)量系統(tǒng)需要先把數(shù)據(jù)寫入磁盤文件,然后再讀入進(jìn)行數(shù)據(jù)處理,并且沒(méi)有采用大數(shù)據(jù)并發(fā)處理技術(shù),所以每套系統(tǒng)能支持的分項(xiàng)計(jì)量?jī)x表一般都在1 000塊左右,只適合于單個(gè)企事業(yè)單位的分項(xiàng)計(jì)量工作。實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)將處理能耗數(shù)據(jù)的能力提升了上千倍,完全可以滿足同一個(gè)城市的所有公共事業(yè)單位提供分項(xiàng)計(jì)量服務(wù)。
本文提出了一種基于Spark Streaming和Apache Kafka模塊構(gòu)建的用于能耗分項(xiàng)計(jì)量的實(shí)時(shí)流式處理系統(tǒng),簡(jiǎn)稱實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)。它能夠滿足能耗分項(xiàng)計(jì)量數(shù)據(jù)產(chǎn)生快、實(shí)時(shí)性強(qiáng)、數(shù)據(jù)量大的數(shù)據(jù)處理需求。與傳統(tǒng)數(shù)據(jù)處理架構(gòu)不同,實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)不僅提供離線數(shù)據(jù)的統(tǒng)計(jì)與分析,并且根據(jù)業(yè)務(wù)需求對(duì)數(shù)據(jù)進(jìn)行實(shí)時(shí)在線處理,在數(shù)據(jù)流動(dòng)的過(guò)程中實(shí)時(shí)地捕捉異常信息并進(jìn)行處理,最終把結(jié)果保存或者分發(fā)給需要的組件。本文詳細(xì)描述了實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)的整體架構(gòu)和內(nèi)部結(jié)構(gòu),闡述了其主要特點(diǎn),并通過(guò)實(shí)際測(cè)試證明了其強(qiáng)大的數(shù)據(jù)處理能力和實(shí)時(shí)性。
從功能方面來(lái)講,與傳統(tǒng)的能耗分項(xiàng)計(jì)量系統(tǒng)相比,實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)的最大優(yōu)點(diǎn)就是在支持離線能耗統(tǒng)計(jì)的同時(shí),還可以支持實(shí)時(shí)在線數(shù)據(jù)處理和深度數(shù)據(jù)挖掘。比如,可以對(duì)能耗數(shù)據(jù)流進(jìn)行實(shí)時(shí)分析,發(fā)現(xiàn)能耗用量異常情況,及時(shí)報(bào)警給用戶,以便用能單位可以及時(shí)采取相應(yīng)措施,防止異常情況蔓延。再比如,實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)還可以實(shí)時(shí)統(tǒng)計(jì)各計(jì)量點(diǎn)能耗情況并實(shí)時(shí)展示給用戶,使用能單位及時(shí)掌握整體能耗的實(shí)時(shí)現(xiàn)狀。
從性能方面來(lái)講,本文提出的實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)進(jìn)行能耗數(shù)據(jù)處理的能力遠(yuǎn)遠(yuǎn)超過(guò)傳統(tǒng)的能耗分項(xiàng)計(jì)量系統(tǒng),能夠支持能耗數(shù)據(jù)采集點(diǎn)的個(gè)數(shù)高出上千倍。并且,實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)具有很強(qiáng)的擴(kuò)展能力,可以通過(guò)增加服務(wù)器和存儲(chǔ)設(shè)備來(lái)提高其總體處理能力,從而可以支持更多的能耗數(shù)據(jù)采集點(diǎn)。
總之,本文提出的實(shí)時(shí)能耗分項(xiàng)計(jì)量系統(tǒng)不論從性能方面、功能方面,還是從系統(tǒng)的可擴(kuò)展方面都遠(yuǎn)優(yōu)于傳統(tǒng)的能耗分項(xiàng)計(jì)量系統(tǒng)。本系統(tǒng)的第一版開發(fā)已經(jīng)完成,已經(jīng)在2016年開始在四川省進(jìn)行實(shí)地部署。此外,本文提出的實(shí)時(shí)流式數(shù)據(jù)處理系統(tǒng)還可以應(yīng)用于其他流式數(shù)據(jù)處理場(chǎng)合,比如股市走向分析、氣象數(shù)據(jù)測(cè)控、網(wǎng)站用戶行為分析和公路卡口過(guò)車數(shù)據(jù)分析等。
References)
[1] 清華大學(xué)建筑節(jié)能研究中心. 中國(guó)建筑節(jié)能年度發(fā)展研究報(bào)告2010[M]. 北京:中國(guó)建筑工業(yè)出版社, 2010:105-130.(Building energy conservation research center of tsinghua university. Annual Report of China Building Energy Conservation 2010[M]. Beijing:China Architecture and Building Press, 2010:105-130.)
[2] 魏慶芃. 大型公共建筑能耗分項(xiàng)計(jì)量實(shí)時(shí)監(jiān)測(cè)分析系統(tǒng)EMS-II的發(fā)展[J]. 建筑, 2009(3):34-37.(WEI Q P. Development of the detailed classification energy consumption measurement system for large public building EMS-II [J]. Construction and Architecture, 2009(3):34-37.)
[3] EMC-2000能源管理系統(tǒng)[EB/OL]. [2016- 09- 10]. http://www.hysine.cn/web/list/369/1.html.(EMC-2000 energy management system [EB/OL]. [2016- 09- 10]. http://www.hysine.cn/web/list/369/1.html.)
[4] 黃斌, 杜運(yùn)東, 曹雪華. 基于Acrel-5000的大型公共建筑能耗監(jiān)測(cè)系統(tǒng)設(shè)計(jì)與應(yīng)用[J]. 智能建筑電氣技術(shù), 2009, 3(5):47-50.(HUANG B, DU Y D, CAO X H. Design and application of large public building energy consumption monitoring system Acrel-5000[J]. Electrical Technology of Intelligent Building, 2009, 3(5):47-50.)
[5] 國(guó)家機(jī)關(guān)辦公建筑和大型公共建筑能耗監(jiān)測(cè)系統(tǒng)——分項(xiàng)計(jì)量數(shù)據(jù)采集技術(shù)導(dǎo)則[S]. 北京:中華人民共和國(guó)住房和城鄉(xiāng)建設(shè)部, 2008:1-25.(Government offices and large public buildings energy consumption monitoring system — the technical guidance for detailed classification energy data collection[S]. Beijing: Ministry of Housing and Urban-Rural Development of the People’s Republic of China, 2008:1-25.
[6] 國(guó)家機(jī)關(guān)辦公建筑和大型公共建筑能耗監(jiān)測(cè)系統(tǒng)——省、市級(jí)數(shù)據(jù)中心數(shù)據(jù)庫(kù)結(jié)構(gòu)文檔[S]. 北京:中華人民共和國(guó)住房和城鄉(xiāng)建設(shè)部, 2009:1-12.(Government offices and large public buildings energy consumption monitoring system — provincial and municipal data center database structure document[S]. Beijing: Ministry of Housing and Urban-Rural Development of the People’s Republic of China, 2009:1-12.)
[7] Spark programming guide[EB/OL]. [2016- 07- 27]. http://spark.apache. org /docs/latest/programming-guide.html.
[8] Spark streaming programming guide [EB/OL]. [2016- 07- 27]. https://sp ark.apache.org/docs/latest/programming-guide.html.
[9] 陸嘉恒. Hadoop實(shí)戰(zhàn)[M]. 北京:機(jī)械工業(yè)出版社, 2012:1-121.(LU J H. Hadoop in Action[M]. Beijing: China Machine Press, 2012:1-121.)
[10] KARAU H, KONWINSKI A, WENDELL P, et al. Spark快速大數(shù)據(jù)分析[M]. 王道遠(yuǎn), 譯. 北京: 人民郵電出版社, 2015: 161-185.(KARAU H, KONWINSKI A, WENDELL P, et al. Learning Spark: Lighting-Fast Data Analysis [M]. WANG D Y, translated. Beijing:Posts and Telecom Press, 2015: 161-185.)
[11] 夏俊鸞, 邵賽賽. Spark Streaming:大規(guī)模流式數(shù)據(jù)處理的新貴[J]. 程序員, 2014(2):44-48.(XIA J L, SHAO S S. Spark streaming: large-scale streaming data processing upstart[J]. Programmer, 2014(2):44-48.)
[12] Apache spark FAQ [EB/OL]. [2016- 08- 04]. https://spark.apache.org/faq.html.
[13] Apache Kafka: a high-throughput distributed messaging system [EB/OL]. [2016- 01- 09]. http://kafka.apache.org/documentation.html.
[14] KREPS J, NARKHED N, RAO J. Kafka: a distributed messaging system for log processing[C]// NetDB2011: Proceedings of the 6th International Workshop on Networking Meets Databases. New York: ACM, 2011: Article No. 12.
[15] 國(guó)家機(jī)關(guān)辦公建筑和大型公共建筑能耗監(jiān)測(cè)系統(tǒng)——數(shù)據(jù)上傳XML格式文檔[S]. 北京:中華人民共和國(guó)住房和城鄉(xiāng)建設(shè)部, 2009:55-59.(Government offices and large public buildings energy consumption monitoring system — XML format for data uploading [S]. Beijing: Ministry of Housing and Urban-Rural Development of the People’s Republic of China, 2009:55-59.)
[16] GEORGE L. HBase權(quán)威指南[M]. 代志遠(yuǎn), 劉佳, 蔣杰, 譯. 北京:人民郵電出版社, 2013:5-25.(GEORGE L. HBase: the Definitive Guide[M]. DAI Z Y, LIU J, JIANG J, translated. Beijing:Posts and Telecom Press, 2013:5-25.)
WU Zhixue, born in 1960, Ph. D., professor. His research interests include cloud computing, streaming data processing, data mining.
Real-time detailed classification energy consumption measurement system based on Spark Streaming
WU Zhixue1,2*
(1. Chengdu Wuzhou Handge Technology Limited, Chengdu Sichuan 611731, China;2. School of Information Security Engineering, Chengdu University of Information Technology, Chengdu Sichuan 610225, China)
Detailed classification energy consumption measurement can discover energy consuming issues more accurately, timely and effectively, which can form and implement the most effective energy-saving measures. Detailed classification energy measurement system needs to calculate energy consumption amounts at multiple time scales according to detailed classification coding. Not only does it need to complete the tasks timely, but also need to deal with data aggregating, data de-duplication and data joining operations. Due to the fast speed of the data being generated, the requirement of the data being processed in real-time, and the big size of the data volume, it is difficult to store the data to a database system first, and then to process the data afterwards. Therefore, the traditional data processing infrastructure cannot fulfil the requirements of detailed classification energy consumption measurement system. A new real-time detailed classification energy consumption measurement system based on Spark Streaming technologies was designed and implemented, the system infrastructure and the internal structure of the system were introduced in detail, and its real-time data processing capabilities were proved through experiments. Different from the traditional ways, the proposed system processes energy consumption data in real-time to capture any unusual behaviour timely; at the same time, it separates the data and calculates the consumption usages according to the detailed classification coding, and stores the results to a database system for offline analysis and data mining, which can effectively solve the previously mentioned problems encountered in the data processing process.
stream computing; detailed classification energy consumption measurement; Spark Streaming; Apache Kafka; big data
2016- 10- 10;
2016- 12- 21。
武志學(xué)(1960—),男,山西河津人,教授,博士,主要研究方向:云計(jì)算、流式數(shù)據(jù)處理、數(shù)據(jù)挖掘。
1001- 9081(2017)04- 0928- 08
10.11772/j.issn.1001- 9081.2017.04.0928
TP391
A