張瑞聰,任鵬程,房 凱,張衛山
(中國石油大學(華東)計算機與通信工程學院,青島 266580)
隨著工業化、信息化的飛速發展,設備工藝越來越復雜,監控、傳感器的廣泛部署使得設備的運行狀態得以有效監控[1].基于工業行業特性,工業物聯網大數據顯現出規模大、類型雜和價值密度不一致等特征[2],因此,相比傳統的大數據,對具有以上特征的大數據分析處理更加困難.而且目前設備故障以發生故障后進行報警的方式呈現,個別故障預測只能基于專家經驗[3]判斷,過度依賴于專家知識.
本文結合當前工業領域[4]發展面臨的問題,設計了基于Hadoop環境的分布式物聯網設備狀態分析處理系統,主要提供數據服務、數據分析和數據存儲的功能.通過分布式系統的構建,有力地解決了當前工業物聯網設備數據規模大、設備數據類型雜、設備數據價值密度不一等問題,實現了工業物聯網設備運行狀態的精準診斷分析,有效提高生產效率、降低資源能源消耗.
近年來,國內外對物聯網設備狀態的分析和處理方面的研究已經獲得了廣泛的應用,尤其是在設備運行狀態監控和故障診斷方面.2009年,曾凡琳等雙層監控系統框架下的帶有時延和丟包的子系統故障檢測方法[5],減小了延時與丟包對故障檢測性能的影響[6].2019年,焦亞軍等利用PLC和傳感器設計了一套帶式運送機故障檢測和預防系統,隨時檢測設備故障點[7],若出現故障信號立刻發出報警,提示維修.遺憾的是以上系統均未考慮對海量數據處理時性能方面的問題.
Hadoop是一個能夠讓用戶輕松架構和使用的分布式計算平臺.用戶可以輕松地在Hadoop上開發和運行處理海量數據的應用程序[8].它主要有高可靠性、高擴展性、高效性、高容錯性的優點.
根據Hadoop平臺的特點,2013年,劉樹仁等為了處理海量數據,設計并實現了基于Hadoop技術的數據存儲系統[9],驗證了該系統具有分布式海量存儲及高效查詢的優勢,適合智能電網環境下設備狀態監測數據的存儲;2014年,Duan等通過智能化分析能提出新的云計算數據存儲管理模式,讓管理的效率更高、安全性更好、維護性更強[10].
盡管Hadoop在離線復雜大數據處理方面表現良好,但支持的機器學習算法卻相當有限,為了更好地利用Python數據處理包做大數據處理,本文引入了Dask計算工具.Dask是一個由Python語言支撐的靈活的分布式計算[11]工具,主要具有以下兩個特點:第一為動態任務分配,第二具有大數據處理能力.Dask分布式計算提供并行化Numpy array和Pandas DataFrame類的接口,同時提供任務分配接口,更加便捷地將分布式計算整合到項目之中.Dask采用原生態訪問pydata堆棧,在完全Python環境中啟動分布式計算,能夠保證以最小的性能開銷和較低的延遲情況下快速計算[12].此外Dask既能運行于千個節點上做分布式計算,也能運行于單臺筆記本上,且在響應方面基于交互式設計理念,為開發者和使用者提供快速的反饋和精準的診斷.
本文結合Hadoop和Dask分布式計算框架,設計了一種基于Hadoop環境的分布式物聯網狀態分析和處理系統,系統能夠高效地處理大批量數據并實時準確預測設備的運行狀態.
根據相關需求分析,本系統采用數據層、服務層、通訊層和表現層四層架構,具體形式如圖1所示.

圖1 系統架構
結合四層架構的設計方式,系統由3個模塊組成:數據服務模塊、數據分析模塊和數據存儲模塊.
數據服務模塊采用Web-Server,避免一臺主機多環境相互影響而導致的程序故障問題[13].以雙向服務的方式實現數據的推拉結合,在保證高速數據傳輸的同時實現數據異常處理、消息丟失重發、消息“斷點續傳”,保證了雙向數據傳輸的可靠性和穩定性.該模塊不僅從云平臺拉取數據,還可以將數據預測的結果反饋回云平臺.此外數據服務模塊和數據處理層是完成隔離,兩層中間采用Socket通信.
數據分析模塊在服務層實現,其中數據處理和特征提取選擇基于Dask分布式計算框架,預測基于Keras框架[14].數據處理的主要功能有:數據缺失填充、數據異常檢測和替換、數據歸一化和數據降維處理等.特征提取和選擇模塊主要功能為數據特征提取和選擇.數據預測主要功能為預測算法的訓練、測試與運行.此外該模塊還支持模型更換(支持數據歸一化算法替換、數據降維算法替換和異常檢測算法替換)和數據持久化等功能.
數據存儲模塊負責模型保存、預測結果保存和提取特征保存等數據存儲服務.由于數據量會隨著時間而不斷增加為此我們必須考慮到數據量增加導致得硬盤不足問題,結合服務實時訪問需求最終采用HBase作為數據存儲,另外采用HDFS[15]作為存儲基礎.
本節將講解數據服務模塊、數據分析模塊和數據存儲模塊的詳細設計.系統詳細設計圖如圖2所示.

圖2 系統詳細設計圖
2.2.1 數據服務模塊設計
數據服務模塊定義了數據檢查機制、數據重傳協議和數據傳輸格式.數據發送方會主動將每一臺設備所有傳感器數據實時發送給Web-server服務器[16],數據格式如下:


上述數據傳送格式為json,數據中包含設備名稱和數據(分別是狀態參數和設備數據)以及校驗碼Hash_MD5,且MD5計算值是SensorData元素的字符串計算的MD5值.當SensorData數據發送缺失或者數據“污染”時MD5計算結果將會發生變化,因此可以確定數據穩定性和可靠性.當數據計算MD5匹配/不匹配都會向數據發送方發送數據狀態,發送Json如下所示:

如上所示,當數據傳輸中數據缺少或被干擾時返回為1,如果數據接收且數據完整性和準確性良好則Megs會返回0,否則應表示數據已接收但數據校驗出錯,請求數據重發.同時當數據發送方在數據發送2秒后沒有收到數據確認時,數據發送方會立刻向數據接收端再次發送數據,同時再次等待數據確認消息.
當網絡傳輸出現問題時內存數據會進行暫時數據存儲,其中數據持久化為雙端持久化,以便網絡恢復時能迅速重建程序,同時確保內存中數據不會丟失,從而避免網絡故障帶來的數據部分丟失問題[17].具體實現為數據發送端會實時保存數據傳輸時間點并持久化,數據傳輸時間點是記錄當前數據成功發送并且返回接受成功的數據時間點(數據發送后且收到接收端數據Megs為0的確認);數據接收端會實時持久化數據組,具體流程為在接收端接收到數據時并不會立刻返回數據確認,首先將數據添加到內存數據數組中(添加之前會進行數據是否存在判斷,排除數據接收但確認消息丟失情況)并將其持久化到數據庫中,然后再返回數據確認消息[18].
將預測結果反饋回云平臺時,由于Web_server和預測模塊不是運行于同一臺服務器,為降低由數據寫入和讀取帶來的延遲,引入模塊之間通信.本論文采用Socket通信方法[19],主要實現兩個方法Socket_server和Socket_client.Socket_server為Socketserver端負責將驗證正確的數據實時發送,而預測模塊為Socket_client負責接收數據,以及將預測結果返回給Web_server.Socket通信JSON格式設計如下所示,傳輸數據只有設備名稱和設備數據(設備狀態數據未使用).預測數據返回的JSON格式傳輸數據只有設備名稱、數據時間和預測狀態.

2.2.2 數據分析模塊設計
數據分析模塊主要基于Dask分布式計算框架,該分布式計算框架能有效支持大量機器學習算法并通過高效的并行計算縮短計算時間.同時引入了計算資源調度DRB方法,保證數據預處理算法和預測算法的高效運行和分布式計算的穩定性.下面主要介紹分布式計算框架的搭建和節點調度方案的實現.
本分布式計算平臺基于Dask技術來搭建,通過搭建一臺主節點多臺從節點和多個備用節點的組成最終的分布式計算平臺,每個節點之間采用TCP通信,高速并行化計算[20]每次主節點分發的計算請求.主節點名稱、從節點名稱和備用節點名稱分別為Scheduler、Worker和Temp_Worker.
本分布式計算將由5臺機器組成,其中每臺機器包含4個節點.因為Dask分布式計算能在運行時刻加入節點,且當節點加入后可以快速的從Dask主節點獲得計算任務,平臺的設備信息都可以從設備的主機點獲取,信息讀取地址為:http://192.168.1.101:端口/workers,端口號根據實際信息修改訪問端口號.
節點調度方案如圖3所示,當設備計算資源過于匱乏時Temp_Worker將持續啟動子線程并加入分布式計算系統中,其中計算資源匱乏判斷條件為整體資源使用率情況[21].

圖3 DRB節點調度方案
計算資源匱乏具體的判斷條件如式(1)和式(2)所示,C表示分布式計算平臺整體CPU計算資源大小,ci表示其中一臺機器某個節點的CPU使用率,M表示分布式計算平臺整體內存大小,mi表示其中一臺子節點的內存使用率.mt表示在t時刻分布式計算平臺整體內存使用均值即如下公式所示:

式中,n表示設備總的節點數量不包含主節點.ct在時間點t時刻分布式計算平臺的CPU整體使用率[22].當且僅當任何一個整體使用率c超過0.8(計算機超頻,不同機器可能不一樣)或者m超過0.8時且該值維持10秒以上,待加入節點將啟動Worker并加入Scheduler.此外當設備資源過剩時我們將關閉部分啟動節點,具體資源判斷當ct和mt同時在持續5秒內資源利用率都低于在0.3時備用節點進程將終止.
此外在多次實驗過程中發現部分節點可能會出現死機或者內存使用率抵達使用上限,導致節點癱瘓或節點進程阻塞問題.為此根據分布式計算的任務備份的基本原理(Dask的Scheduler在管理節點時,如果節點丟失時,主節點會將丟失節點任務分發給其它節點執行),節點進程阻塞判斷方案將根據mi單個節點內存使用率和ci單個節點CPU使用率.判斷條件為當且當ci一直大于1.2時且維持利用率不變20 s以上時節點線程將會被終止,并立刻再次啟動該節點進程.
2.2.3 數據存儲模塊設計
數據存儲主要包括設備數據存儲、預測數據存儲和中間模式存儲.為保證計算的高效性和穩定性,存儲服務器采用基于Hadoop分布式數據存儲的HBase數據庫[23],在盡可能保證數據安全的同時提供便捷的存儲擴展.
數據存儲在HBase中有兩種表結構設計模式,寬表設計模式和高表設計模式[24].寬表設計模式和高表設計模式根據數據量大小的時間消耗如表1所示,其中高表設計模式的時間消耗平均低于寬表設計模式百分之五十以上.因此本文選擇高表設計模式,通過減少列簇,將查詢信息插入到Rowkey中來降低查詢時間.
根據某著名物聯網公司的數據分析,最終數據建表通過構建三個數據列簇和一個Rowkey.Rowkey為數據識別的唯一標識符,且數據查詢時間相對較快,因此Rowkey將采用設備名稱和時間組成Rowkey設計,在數據查詢時只需設備名稱+(開始時間-結束時間)即可查詢某一設備的多個時間段數據.預測數據將存儲于第三個數據列簇中,預測數據列將采用yc作為標識.中間模式數據存儲構建方式和預測數據存儲類似,只需將列標識修改為ts存儲于第四列簇中.表列簇設計如表2所示.

表1 寬表高表對比

表2 數據庫表設計
3 個列簇分別表示為原始數據存儲、轉換數據存儲和預測設備狀態存儲.Rowkey為索引只有一列(ssdjd_20180105125000_1中最后_1是為了避免時間軸既20180105125000出錯從而無法通過加30秒定位上一個時間或下一個時間點數據),原始數據列簇包含137列,轉換數據包含8503列,預測設備狀態存儲設備的當前狀態,當預測出設備狀態時,該預測值為半小時后的設備狀態,因此為維持預測p1與當前時間軸一致性,引入內存變量和持久化方法,在預測出數據時先暫且保存在該變量中,每當數據超過30個時會進行自動數據存儲.
HBase數據庫存儲軟硬件環境如表3所示,HBase存儲服務采用5臺服務器作為數據存儲服務器,其中152作為數據存儲的主節點Master同時也作為資源管理節點,153~156作為子節點datanode為數據存儲節點.

表3 HBase數據庫軟硬件環境
為了驗證系統的有效性,本文選擇某企業商業空調的傳感器數據進行實驗,部分數據展示如圖4所示.
系統可以得到蒸發器側進水溫度、蒸發器側出水溫度、冷凝器側進水溫度、冷凝器側出水溫度等44個不同特征的商業空調傳感器數據.系統通過數據服務模塊從企業云平臺的得到維度為45的矩陣數據,其中最后一列數據為設備的運行狀態標識.

圖4 部分實驗數據
數據分析模塊數據預處理功能主要包括數據的缺失填充、異常檢測和替換、數據歸一化和降維處理等.特征提取和選擇模塊主要功能為數據特征提取和選擇.數據預測主要功能為預測算法的訓練、測試與運行.
將系統分析結果進行可視化如圖5所示,圖中橫坐標表示時間序列,縱坐標表示發生故障的概率.上方折線圖為預測結果,下方折線圖為真實設備狀態值.當預測概率值大于0.5時系統將會發出警報,提示設備將出現非正常運行狀態.從圖中可以看出,系統精確的預測出了設備的狀態,并且這一預測為實時的預測.

圖5 結果可視化展示
本次實驗數據輸入基本單位為(4×2880)×13的矩陣數據即一個數據樣本.分布式計算包括數據預處理、特征提取和特征選擇過程,本測試將采用兩種測試條件;第一種為同一時間向分布式計算平臺發送多次任務請求,且任務請求計算內容相同;第二種為同一時間向分布式計算平臺發送多次任務請求,且任務請求計算內容不同.
第一種測試條件如表4所示,相同時間提交所有任務計算內容相同.

表4 分布式計算測試結果1
從表4中可看出,同一時段提交任務數量有1、2、3、4、5、6、10,實驗結果時間均為運行100次耗時平均值.從表中可看出當提交相同任務時計算平臺能合理的優化計算任務,并且不增加計算耗時即平臺能避免重復計算.
第二種測試條件如表5所示,相同時間提交所有任務計算內容不同.

表5 分布式計算測試結果2
從表5中可看出當提交任務數量上升時耗時也開始逐漸上升,根據表中數據可看出當該平臺可以并發處理多個任務,且平均任務耗時為3 s左右.為保證任務能在有效時間內獲得計算結果(數據采集間隔為30 s),該分布式計算平臺至多同時提交10個不同任務.
本文設計了一種Hadoop環境下的分布式物聯網設備狀態分析處理系統,能夠快速準確地進行設備狀態預測.為了提高數據處理效率,數據預處理和特征提取方法完全基于Dask分布式計算框架,并根據實際分布式計算中遇到的節點阻塞問題提出了DRB節點優化方案,實現了分布式節點的智能啟動、關閉和重啟.本系統的完成,實現了工業物聯網設備運行狀態的精準診斷分析,有效提高生產效率、降低資源能源消耗.