廖婷婷,王 彪,肖衛青,李從英,郭 茜
(1.貴州省氣象信息中心,貴州 貴陽 550002;2.國家氣象信息中心,北京 100081)
氣象觀測數據采集于各類氣象儀器,通過各級別的氣象業務工作人員通過氣象業務標準觀測得來[1]。地面自動氣象站經過多年的建設,目前已經建成2 000多個國家級自動站,5萬多個省級的區域自動站,用來傳輸全國的氣溫、氣壓、雨量、蒸發、風速風向等基本氣象要素資料[2-3]。其中國家級自動站傳輸地面分鐘和小時數據,且不同于過去的TXT傳輸方式,未來趨向于使用BUFR消息傳輸,分鐘資料從現在的5 min發展到1 min采集,區域站的數量也會有翻倍式的增長。在這樣的發展背景下,資料在解碼入庫處理過程中的穩定性、及時性、拓展性方面有了更多的要求。采用“云計算”方式進行分布式實時大數據處理是解決的方法之一,其中Apache Storm是一個開源的分布式實時計算系統,可以簡單可靠地處理大量數據流,具有高容錯性和處理速度,在一個小的 Storm 集群中,每個節點可以達到每秒數以百萬計來處理消息的速度[4]。本文使用Storm分布式框架實現了地面氣象數據實時解碼入庫處理程序,相對于傳統的簡約流程各方面性能有了顯著提高。
本文系統部署于國家氣象信息中心實時的地面氣象站觀測數據,其中以rabbit MQ消息格式傳輸的國家站小時數據1 600站/h,國家站分鐘數據1 600條/5 min,以文件格式傳輸的國家站2 400站/h,區域站和雨量站58 500站/h。本文立足于信息中心原有的業務解碼入庫軟件基礎上,制定標準的數據解碼規范,建立統一的可擴展的數據解碼集和分布式快速入庫框架,將解碼功能和入庫功能解耦,通過對不同格式資料的解碼進行API封裝以及不同數據庫類型入庫接口的封裝,實現對資料類型和數據庫類型的獨立擴展。
本文采用Storm分布式實時計算系統框架下進行數據解碼入庫,并實時將監控消息發送至氣象綜合業務實時監控系統(天境)[5],Storm部署采用了多臺服務器,設置了主節點(Master)和工作節點(Worker)。主節點運行了Nimbus程序,負責發送代碼到Storm集群、分配工作任務給節點,并使用Zookeeper程序記錄分配情況。工作節點運行了狀態監控程序(Supervisior程序),負責監聽Nimbus分配的任務[6]。當一個任務被提交給主節點,Nimbus對其進行校驗和工作量計算(計算Task數量),進而給工作節點的處理過程程序(Spout/Bolt)設定相應的Task數量,記錄到Zookeeper當中。
將Storm技術結合到解碼入庫系統,由主節點負責任務分配,工作節點負責消息監聽與傳遞、解碼入庫處理、進程狀態通知這幾項重要功能。主節點收到MQ消息之后,將消息發給消息傳遞程序(Spout程序)傳遞給某個工作節點,工作節點的Supervisior監聽到主節點發來的Spout消息內容之后,獲取氣象解碼消息(包含資料名稱、四級編碼、氣象數據等),然后傳輸給不同的處理程序(Bolt程序)進行處理。如圖1所示,在工作節點監聽到一個個Spout任務后,將任務交給某個Bolt-解碼進行處理;Bolt-解碼的LIST實體類得到該MQ消息體內各個要素的值,或者該文件內各個要素的值,然后再將這些要素值重新進行組合輸出給Bolt-入庫進行入庫操作;在整個過程中,Bolt-DIEI負責發送EIDI信息給綜合監控接口,為天鏡報告解碼入庫的實時狀態。可通過配置調整服務器上Spout和bolt的數量及分布,按需分配資源執行工作。

圖1 Storm解碼入庫流程圖Fig.1 Storm decoding and warehousing flow chart
由于地面資料類型不同,他們的數據庫表結構也不同,在Bolt-入庫操作過程中需要根據資料類型來啟動相應的入庫程序。如圖2所示,Spout程序將消息傳輸給Bolt解碼程序,解碼完成后根據消息的格式和地面數據的類型(地面分鐘BUFR資料、地面小時BUFR資料、報文類型資料),判斷其為分鐘Tuple、小時Tuple、報文Tuple,并將Tuple作為數據進行發射,發射給3類分別處理不同種類的Bolt入庫函數:Bolt-分鐘數據入庫、Bolt-小時數據入庫、Bolt-報文數據入庫進行處理。
采用貴州省的地面氣象站觀測數據,分別利用Storm解碼入庫和簡約流程入庫[7]的入庫時效進行對比,采用的數據包括BUFR格式國家站小時數據、BUFR格式國家站分鐘數據,以及文本格式下的國家站、區域站、雨量站數據,采用的服務器為3臺linux集群,每臺的處理器為2.6 GHz/8cores,最多每臺支持16線程數。Storm解碼入庫的數據庫為MySQL,簡約流程為Oracle數據庫。

圖2 Bolt-入庫程序流程圖Fig.2 Bolt-warehouse program flow chart
Storm集群配置了3個Worker、6個Spout、18個Bolt,對應Bolt-解碼、Bolt-DIEI、Bolt-入庫程序。簡約流程分別由BUFR分鐘解碼入庫、BUFR小時解碼入庫、報文解碼入庫3個入庫進程進行處理。可以看出Storm時效均比簡約流程提高5倍以上。

表1 Storm程序與簡約流程時效對比Tab.1 Time Efficiency Comparison between Storm Program and Simple Process
在國家氣象信息中心的3臺Storm集群中,用Storm實現了文件格式的國家站、區域站、雨量站等近6萬自動站數據的實時解碼。實現了CTS2上傳的國家站rabbit MQ消息解碼,截止測試時間為止,Storm連續運行了138 d,相比簡約流程時效同樣提高了5倍以上。

表2 在大數據量下的Storm處理時效Tab.2 The Processing Aging of Storm Program in Big Data
在非功能性性能方面,Storm采用多項技術達到地面觀測數據入庫的要求,時效性上需要達到所有該時次站點在1 min內入庫的要求,可靠性和穩定性上要達到每條數據準確入庫、記載錯誤、及時處理的流程,可拓展性上達到方便的應對業務及數據庫的分布式拓展,可靈活調整入庫配置。在這方面,采用Storm的技術可以進行實現。Storm的實現方式及與簡約流程的實現方式見表3。

表3 Storm程序與簡約流程的非功能性對比Tab.3 Non-functional comparison between Storm Program and Simple Process
隨著社會生活的豐富,人們對氣象與環境的關注度越來越高,在氣象行業內部,海量氣象數據的存儲共享與應用顯得越來越重要,用戶對氣象數據訪問的實時性、高效性要求也越來越高。本文通過對Storm解碼入庫進行理論設計與應用,并與簡約流程進行對比,進一步驗證了Storm解碼入庫的處理性能。
①采用Storm分布式框架,使用Spout節點連接外部數據源,將數據轉化為Tuple,傳遞給解碼Bolt、入庫Bolt和DI/EI消息Bolt,分別進行實時的入庫和監控,提高了入庫性能。
②優化解碼入庫的處理流程,將解碼和入庫解耦。當某一過程出現故障的情況下,可以進行靈活切換,切換的節點可以從消息源頭(Spout)、處理進程(Bolt)來進行,且每個節點故障后能自動重啟,減少運維壓力和入庫遲鈍。
③實際應用的效果顯示,Storm解碼入庫流程比簡約流程普遍時效提高了5倍以上,穩定性大有提高,這與Storm主要采用MySQL數據庫有一定關系,探討在不同數據庫下的Storm入庫性能優化也是未來的一個方向。