, , ,,
(1.上海民用飛機健康監控工程技術研究中心,上海 200241; 2.北京航天測控技術有限公司,北京 100041; 3.北京市高速交通工具智能診斷與健康管理重點實驗室,北京 100041;4.軌道交通裝備全壽命周期狀態監測與智能管理技術與應用北京市工程實驗室,北京 100041)
隨著計算機和測控技術的發展,試飛試驗數據呈現出參數多(上萬個)、數據量大(上百TB)、參數類型多樣化等特點[1]。飛行試驗數據是完成新機定型、鑒定的主要依據,同時也是支撐航空科技發展的寶貴資料。能否將大量飛行試驗數據有效管理并使用起來,這對我國航空事業的發展具有重要的現實意義。
飛行試驗是在真實飛行條件下進行的科學研究和產品試驗。它是航空科技發展的重要手段,是航空產品研制和鑒定的必須環節,是為用戶摸索和積累經驗的有效途徑。試飛任務中的數據信息管理是一個龐大而復雜的系統工程,目前國內傳統的試飛數據處理模式是:一型飛機、一個團隊、一套數據格式及處理程序,即由專業人員使用自行開發的軟件,對測試數據進行處理,中間結果及最終報告數據以操作系統文件的形式保存;有關數據處理的文檔人工處理。這種處理模式的缺點是:1)數據和程序的繼承性差,共享困難;2)低水平重復,不利于積累以往的經驗;3)處理效率低,容易出錯;4)處理周期長,不利于快速做出下一步的試飛決策。
本文研究基于Spark的飛機試驗數據處理技術,將改變原有的“特定飛機,專用軟件”的研制模式,將復雜、繁瑣的數據管理工作抽象出來,使工程師能夠將更多的精力集中到數據的處理和分析工作中去。一方面,提煉共同要求,統一飛行試驗信息化標準,完成系統的統一規劃與部署,避免試飛信息化工作出現效率低下和資源浪費等問題。另一方面,通過建立健全和完善型號信息系統、數據與信息以及數據處理軟件的標準和規范,提高信息系統和軟件的可重復性,避免低水平的重復開發。
目前,應用最為廣泛的大數據技術是hadoop以及其分布式架構map-reduce。Hadoop MapReduce采用Master/slave 結構。只要按照其編程規范,只需要編寫少量的業務邏輯代碼即可實現一個強大的海量數據并發處理程序。其核心思想是分而治之。Mapper負責把一個復雜的業務,任務分成若干個簡單的任務分發到網絡上的每個節點并行執行,產生的結果由Reduce進行匯總,輸出到HDFS中,大大縮短了數據處理的時間開銷。MapReduce以一種可靠且容錯的方式對大規模集群海量數據進行數據處理,數據挖掘,機器學習等方面的操作[2-7]。
盡管該架構具備開源分布式的特點且應用范圍廣泛,但在大量數據離線計算過程中,map-reduce存在著大量的硬盤讀寫,這造成計算效率很低。Spark是用于大數據處理快速而通用的引擎,其采用分布式內存計算方式,將過程數據保存在內存中,減少了由于磁盤交互產生的I/O,從而提高數據計算效率[8-10]。
圖1 Hadoop和Spark的邏輯回歸
Spark擁有Hadoop MapReduce所具有的優點,但和MapReduce的最大不同之處在于Spark是基于內存的迭代式計算[11]。Spark的Job處理的中間輸出結果可以保存在內存中,從而不再需要讀寫HDFS,除此之外,Map-Reduce在計算中只有兩個階段,即map和reduce。而在Spark的計算模型中,可以分為n階段,因為它內存迭代式的,我們在處理完一個階段以后,可以繼續往下處理很多個階段,而不只是兩個階段。因此,Spark能更好地適用于數據挖掘與機器學習等需要迭代的MapReduce的算法。不僅實現了MapReduce的算子map函數和reduce函數及計算模型,還提供更為豐富的算子,如filter、join、groupByKey等,是一個用來實現快速而同用的集群計算的平臺。
此外,Spark應用程序還離不開SparkContext和Executor。Executor負責執行任務,運行Executor的機器稱為Worker節點,SparkContext由用戶程序啟動,通過資源調度模塊實現與Executor通信。圖2為Spark的操作機制。
圖2 Spark操作機制
集群管理器主要控制整個集群,監控各工作節點。工作節點負責控制計算節點,啟動執行器和驅動程序。執行器是應用程序運行在工作節點上的一個進程。
RDD(Resilient Distributed Datasets)[1],彈性分布式數據集,是分布式內存的一個抽象概念,RDD提供了一種高度受限的共享內存模型,即RDD是只讀的記錄分區的集合,只能通過在其他RDD執行確定的轉換操作(如map、join和group by)而創建,然而這些限制使得實現容錯的開銷很低。對開發者而言,RDD可以看作是Spark的一個對象,它本身運行于內存中,如讀文件是一個RDD,對文件計算是一個RDD,結果集也是一個RDD,不同的分片、數據之間的依賴、key-value類型的map數據都可以看做RDD。圖3顯示Spark計算的RDD模型。
圖3 Spark計算過程不同階段的RDD模型
飛機試驗數據解析效率是影響大數據分析,挖掘額外信息資源的重要影響因素。隨著飛機集成度的提高,傳感器數據的增多,飛機試驗數據的參數種類與數據量劇增。傳統的飛機數據解析方法主要是針對源碼數據量小的情況,無法滿足當前大型飛機試驗任務的需求。而針對大規模源碼數據,目前一般都是工程師對任務進行分工,每個人負責任務的一部分,在各自的機器上運行,最后將結果進行匯總,這不僅會增加人力成本,也會增加解析的復雜度,降低解析效率,影響下一步試飛決策。因此,需要建立一種針對大規模飛行試驗數據的預處理方法。
本文一方面采用Spark處理框架,通過分布式計算方式,減少磁盤交互產生的I/O,提高計算效率;另一方面,利用飛機數據采集特點,減少不必要的解析,節省集群內存計算空間,提高解析效率。從數據解析過程優化與并行處理的角度,提出了基于Spark的海量飛行試驗數據的預處理方法。
飛機運行過程中,機載采集器根據規定的采樣率對機載數據進行采集,當次試飛結束后下載原始數據。由于數據參數格式固定,在數據采集過程,根據實際情況解析前段數據,直到獲取所有需要的參數初始信息位置,包括參數的采樣初始時間,整個數據文件采樣的初始時間以及數據文件出現的參數名全集等。
確定數據文件中存在的參數集合,預先剔除參數組中不存在于當前數據文件中的參數群,只提取存在于數據文件中的參數屬性,有效地跳過不必要的解析過程。同時減少廣播變量(Broadcast)占用的內存,為集群內存計算節省空間,顯著提高解析的效率。
進而,切割并行數據,生成若干個原始數據包。切包的規則是只能在兩個單包之間切,而不能跨包,否則會丟失數據,通過不斷計算單包的長度來用指針偏移的方法來切包。
在此基礎上,對飛行測試數據進行預處理。預處理過程分為兩個階段,配置階段和應用階段。圖4顯示了兩個階段的過程,包括控制流程和數據流程。
圖4 圖解碼階段的數據預處理步驟
在配置階段,主要是在試飛數據處理設備正式投入運行之前和運行過程中,進行系統基礎信息的配置與擴展開發,并且進行系統數據庫的管理,為系統提供原始數據的解碼、分析邏輯、算法功能等方面的配置和擴展開發能力,使系統具備持續的完善能力。
1)首先進行的是用戶配置、試驗信息配置與基礎信息配置的工作。
2)其次是完成對配置文件的解析和試驗數據的預處理。
3)再次是存儲原始二進制數據和解析后工程值數據。
4)最后針對解析后數據為用戶提供數據服務,包括數據的查詢、下載、導出、分析、二次計算等。
在準備階段,從配置文件中讀取幀格式的配置信息,包括幀參數的位置,采樣率,通道數等。然后讀取參數數據表中所有幀參數的數據類型、分辨率、數據長度等。最后,根據參數的位置信息設置參數索引表。
在解碼階段,當接收到源代碼時,根據429、232、664協議規定的幀格式對幀頭進行解析,獲取當前幀的索引表唯一標識,通過標識確定當前幀采集的參數索引表,進而根據參數信息將源碼數據解析為工程值數據。
圖5 解碼階段的數據預處理步驟
在使用SPARK技術來解決大規模數據預處理問題的過程中,如何在每個預處理步驟中構建測試數據RDD模型以及如何使用SPARK算子至關重要,直接影響數據預處理的效率。圖6為基于SPARK的數據處理流程圖。
圖6 基于SPARK的數據處理流程圖
基于SPARK的數據處理具體步驟如下:
步驟1:初始化。包括共享變量初始化,通過解析配置文件,獲取參數信息和協議解析規則,并打包成廣播變量作為集群共享信息,同時初始化累加器,作為分布式運算過程中不同線程之間數據通信的媒介;以及待解析數據初始化,將源碼數據文件的上傳到HDFS,并按照1 GB一個單元進行預先切包。最后還對spark環境進行初始化,指定基本的參數配置,保證spark的高效運行。
步驟2:分布式切包。將上傳到HDFS上的數據加載到分布式內存中,在加載的過程中,同時將數據每隔N(一般可以設定為1000)個幀切一次包,并將所有切后的包重組為K(一般可以設定為2000)份,設定為每份啟動一個線程,進行分布式處理。將加載到分布式算子中。一部分數據將打開一個線程。創建數據結構為JavaRDD>的RDD數據集,該數據集作為分布式處理的數據源,其中每個byte數組代表的是幀二進制數據。
步驟3:解析單包。根據步驟一中初始化的廣播變量信息,解析幀頭,獲取幀頭的標識信息,根據標識以及解析的配置文件獲取當前幀采集的參數以及參數屬性,進而將源碼幀數據解析成工程值數據。期間,待解析的參數組范圍之外的參數直接跳過,保證解析速度的最大化;此外,整個解析過程通過mapPartitionsToPair算子執行并行處理,各個線程共享廣播變量,盡可能的避免不同線程在運算過程中的shuffle操作,減少線程之間由于通信導致的延遲。該步驟獲取的數據格式為JavaPairRDD
步驟4: 參數值聚合。對步驟三得到的結果進行聚合操作,執行groupBykey算子,將key(參數組+時間)相同的value(參數位置+參數值)聚合在一起,得到當前參數組當前幀時間的所有參數數據集,并將數據集根據參數組規定的順序組成長字符串,以便獲取同一采樣時間點的參數值,便于進行事后試驗數據的關聯分析。最終獲取到的數據結構為JavaPairRDD
圖7 主機配置
步驟5:求解合成參數。有一些參數需要一些基本參數經過一定的運算法則轉換才能獲取,如兩個參數的加減乘除運算、位權重運算、EU轉換、多項式計算等。所以在將基礎參數工程值解析出來后,需要找到合成參數需要的基本參數相同時間點的數據集,然后經過算法運行,得到合成參數的值。其結果形式與步驟5產生的結果一致。
步驟6:過濾數據。針對不同的參數組,需要分別導出到不同的文件中,以保證不同試驗科目的獨立性,便于不同專業的專家對數據進行分析。然后以參數組+時間字符串中的參數組為過濾條件,使用filter算子,篩選出所需的參數組數據集,由于執行的是filter算子,得到的RDD數據集數據結構沒有變化,結果形式與步驟5一致。
步驟7:首先需要對整體數據進行全局排序,以保證所有數據能夠以時間順序進行排列。此時數據是分布式的散播在內存中,為了將參數組數據以規定的數據格式導出,需要將內存中所有的相關數據合并在一起,使用repartitionAndSortWithinPartitions算子,將partitioner參數設置為1,即得到合并后的數據。由于只是數據的合并,所以RDD結構仍然不變,為JavaPairRDD
步驟8:保存數據文件。 使用saveAsTextFile算子,將步驟7得到的合并結果保存到HDFS的文件系統中,每個參數組都分別形成獨立的文件系統,通過服務的形式分發給不同試驗科目的專業人員,進行進一步的數據分析。
步驟9:轉化為HBase字段。預處理的數據需要進行卸載,一方面以系統文件的形式分發給專業人員,另一方面需要持久化存儲到分布式數據庫HBase中,以便時候的隨時查詢和進一步的分析,通過flatMapToPair算子將步驟6的RDD數據集轉化為滿足HBase特定形式的JavaPairRDD
步驟10:保存HFile文件。由于實際產生的數據量過大,直接存儲到HBase容易出現內存溢出問題,并且消耗時間較長。故而采取先轉存為HFile文件的形式進行HBase存儲,使用saveAsNewAPIHadoopFile算子將數據集轉化為HFile文件。
步驟11:存儲到HBase。通過doBulkLoad方法將HFile文件轉化為HBase數據,進而完成送源碼數據到工程值數據的處理工作。
實驗環境部署在有3個物理服務器的集群中,其中一個服務器既作為主管理節點,又作為計算節點;另一個服務器作為備管理節點和計算節點;最后一個服務器作為計算節點,具體環境配置如表1所示。
表1 實驗環境配置
試驗數據來源于某型號飛機試飛過程產生的數據,數據量為30 GB。將試驗數據分成三份,分別在三臺主機上進行預處理。
基于Spark的飛機試驗數據預處理結果,如圖8所示。數據切包、單包解析、參數聚合、參數合成、數據存儲所需時間約為30分鐘。傳統數據處理平臺數據預處理時間則需要9小時。相較于傳統數據處理方法,本文提出的數據預處理方法大大提高了解析效率,克服了傳統數據處理方式效率低下,單個機器內存和CPU等硬件條件不足的問題。
圖8 各階段處理時間
主機一運行線程333個,各線程累積執行時間2.3 h,解析工程數據文件9.5 GB;主機二運行線程331個,各線程累積執行時間2.3 h,解析工程數據文件9.5 GB;主機三運行線程336個,各線程累積執行時間2.3 h,解析工程數據文件9.7 GB。各線程執行時間最小為5 s,最大為55 s。數據垃圾回收時間最小為0 ms,最大為32 s。各線程處理原始數據最小為3.6 MB,最大為11.3 MB,解析數據輸出大小最小為17.2 MB,最大為42.2 MB,如圖9所示。盡管本文提出的方法相較于傳統方法,大大提高了數據解析效率。但各線程負載不均衡,未充分利用系統資源,影響系統運行性能,對數據解析時間產生一定的負面影響,這也是后續需要進一步改進的地方。
圖9 飛行測試數據預處理記錄
本文針對海量飛行數據預處理方法處理效率低、處理周期長等問題,提出了基于Spark的飛機試驗數據預處理方法,并以某型飛機30 GB的試飛數據為例進行實驗驗證,結果顯示本文提出的數據預處理方法相較于傳統的大規模數據預處理方法可有效地縮短數據預處理時間,提高處理效率,幫助用戶快速做出下一步試飛決策。未來將重點研究數據傾斜調優與Shuffle調優的問題,優化Spark的性能,進一步提高數據解析效率。