曾 樂,孫 超,張來恩,陳文琴
(國家氣象信息中心,北京100081)
氣象綜合業務監視數據是能反映全國氣象業務實時運行狀態的各種數據的集合[1-2]。為實現“全流程、全業務、一體化”的綜合業務監視目標,與一般的信息系統監視數據相比,氣象綜合業務監視數據涵蓋的內容更廣泛,包含很多業務運行的細節數據,數據量非常大,處理邏輯較為復雜,時效性要求高。
氣象綜合業務監視數據可分為資源類監視數據和業務類監視數據。其中資源類監視數據是從信息系統層面體現氣象業務軟硬件資源可用性,包括主機、存儲、網絡、安全、動力環境等基礎設施以及數據庫、消息中間件等通用系統軟件、氣象業務應用進程運行狀態數據;而業務類監視數據是從業務層面體現氣象業務運行狀態的監視數據,在這類監視數據中,一般包含具有氣象業務涵義的數據內容,如收集的觀測資料到報率、可用率等[3],與氣象業務流程和業務邏輯息息相關,能反映業務可用性等情況。
根據監視數據的形態,氣象綜合業務監視數據可細分為資源性能指標類數據、原始業務日志數據、業務指標類數據、告警事件類數據等。
1)資源性能指標類數據是以指標數值來反映資源的運行能力狀態的數據,如存儲可用率、網絡流量等。
2)原始業務日志數據是業務各環節產生的日志數據,是主要的業務監視數據來源,其中最核心的是氣象行業內各種觀探測資料從采集、傳輸、預處理、存儲到產品加工、服務應用的氣象資料全生命周期各環節的運行狀態日志數據。除此之外,還包括各個氣象業務系統生成的業務日志數據,如定時任務的執行情況、用戶對具體業務服務的訪問日志等。目前行業內對關鍵的原始業務日志數據的內容和格式提出了規范性要求,以便于數據的處理和管理。
3)業務指標類數據是原始業務日志經處理、統計等計算邏輯生成的能精細化反映業務運行的狀態和可用性的指標數據,如地面逐小時觀測資料的到報完整率、及時率、可用率等。這些業務指標數據,不僅能監視保障業務的正常穩定運行,還能提供業務管理決策支持。
4)告警事件類數據是基于前三者數據,經過閾值和狀態判斷等計算分析發現的異常告警事件數據,包括告警事件的來源、種類、異常詳細情況等內容,生成的告警類數據還需要進行進一步的告警規則處理及業務關聯分析。
目前,每小時采集和處理的氣象綜合業務監視數據已達到1億條以上,要實現對海量監視數據的高效采集和處理分析,本文提出了基于大數據技術的監視數據采集和處理框架,能實現對海量原始監視數據的高效采集、實時處理分析,系統框架同時具備穩定性、容錯性和可擴展性等特征。
系統框架分為三層,包括原始監視數據采集層、緩沖層、處理層。處理后的資源性能指標類數據、原始業務日志數據、業務指標類數據、告警事件類數據等將存儲在不同的數據庫中,如圖1。

圖1 系統框架
1)采集層。實現氣象綜合業務監視原始日志的高效接入,并對接入數據進行清洗,保證后端指標計算環節數據的準確性。采用REST接口及Flume日志收集系統兩種方式實現監視數據采集。
2)緩沖層。對采集的監視數據進行緩存,實現監視數據采集與處理環節的松耦合,為處理環節提供有效數據源。采用Kafka消息隊列實現對監視日志數據和指標數據的緩存。
3)處理層。根據氣象綜合業務監視需求,對原始監視數據進行預處理、指標計算并生成告警事件。采用Spark Streaming實時流數據處理框架實現高并發的監視數據處理。
對監視信息采集和處理之后,根據不同特點的監視數據類型,針對日志類、指標類、告警類監視數據分別設計了不同的存儲模型:業務日志數據存儲在基于索引技術的Elasticsearch數據庫[4],方便日志詳情查詢;資源性能指標和業務指標類數據采用時序值方式存儲在Cassandra數據庫[5];告警事件類數據存儲在MongoDB數據庫,熱點數據存儲在Redis內存數據庫。由于存儲的設計不是本文重點,故存儲模型設計不在此詳述。
氣象綜合業務監視數據的采集主要通過“拉取”和“推送”兩種方式,拉取方式主要通過Flume技術,部署采集Agent到客戶端,采集監視數據;推送方式主要由業務系統在運行過程中,主動調用REST接口實時推送監視數據。不同類型監視數據的采集方式和技術見表1。

表1 監視信息采集方式
REST接口采集通過調用REST(Representational StateTransfer,表述性狀態轉移)接口[6]實現客戶端通過HTTP層向服務器端推送監視數據。在氣象業務系統中,實時收集、解析、處理生成監視所需的運行狀態信息,并通過調用REST接口,實時發送監視數據,其發送的RESTful接口,以國家級地面自動站收集日志采集為例,數據格式如下:
{
"type":"RT.CTS.STATION.DI",
"system":"CTS",
"message":"國內氣象通信系統",
"occur_time":1518256800000,
"receive_time":1518256922000,
"fields":{
"FILE_NAME_N":"Z_SURF_C_BESZ_20180210100000_O_AWS_FTM_PQC.BIN",
"LENGTH":1286,
"IIiii":"54511",
"STATION_NAME":"北京南郊站",
"DATA_TYPE":"A.0001.0044.R001",
"PROCESS_LINK":"2",
"DATA_TIME":"2018-02-10 10:00"
"PROCESS_TIME":"2018-02-10 10:01:10.000",
"TRAN_TIME":"2018-02-10 10:01:15.000"
}
}
收集日志包含“臺站編號”、“資料類型”、“資料時次”、“業務環節”等內容,其中臺站編號、資料類型、資料時次用于計算數據到報情況,其它信息用于日志詳情查詢。
Flume是一個高可用的,高可靠的,分布式的海量日志采集、聚合和傳輸的系統,本系統采用Flume-ng,其核心是Agent,主要由source、channel、sink三個組件組成[7-8]。
Flume采集框架,包括采集層和匯聚層。采集層Agent負責采集氣象業務系統的運行日志,并進行格式轉換及封裝,匯聚層Agent負責匯聚所有業務系統的監視數據。匯聚層相當于業務總線的作用,起到隔離、緩沖的作用,如圖2。

圖2 Flume采集框架
1)隔離。屏蔽采集層單個Agent調整,而不影響其它監視數據采集。Agent調整包括客戶端IP變動,業務日志格式變化等。
2)緩沖。緩存采集層傳輸過來的數據到File Channel,一方面可以保證采集層采集Agent的不間斷執行,另一方面可減緩大量監視數據傳輸的峰值壓力,并保證服務端監視數據收集處理程序異常時,不丟失采集到的業務監視數據。
另外,在Agent設計中,主要有以下關鍵點:
1)雙Channel設計。當堆積在Channel中的Events數小于閾值時,所有的Events被保存在MemoryChannel中,Sink從MemoryChannel中讀取數據;當堆積在Channel中的Events數大于閾值時,多余的部分自動存放在FileChannel中,Sink從FileChannel中讀取數據。雙Channel可以充分使用MemoryChannel的高吞吐特性保證數據處理時效性,同時,利用FileChannel的緩存特性保證數據完整性。
2)故障切換設計。在采集層Agent,Failover Sink Processor維護一個由3個Sink組成的優先級Sink組件列表,只要有一個Sink組件接收或發送Event不可用,Event就被自動切換到下一個組件。
3)負載均衡設計。在匯聚層Agent,使用負載均衡機制LoadBalance。部署2個Agent,均衡處理采集層Agent發送的監控數據,在每個Agent中,設計4個Sink,可以實現處理的高并發。采集數據轉換復雜的環節通過增加Sink組件提高Agent的吞吐量。
監視數據預處理主要針對氣象原始業務日志數據,資源性能指標類監視數據比較規整,不需要進行預處理。氣象原始業務日志數據包括14大類498個子類的氣象觀測資料及產品從臺站到省級、國家級的收集、分發、處理、存儲、歸檔、服務的全業務流程運行信息[9]。原始業務日志數據存在信息不完整、數據重復等問題,需要在監視指標計算前對數據進行標準化構建和數據清洗。
4.1.1 數據標準化構建
數據標準化構建包括數據合法性檢查、時間標準化處理、業務信息關聯和日志信息豐富等,處理流程如圖3。

圖3 數據標準化構建處理流程
1)數據合法性檢查。根據業務特征和資料特征等合法性規則檢查監視數據是否有效,無效數據存入Kafka錯誤數據隊列,入庫備查。
2)時間格式化處理。統一各業務環節監視數據的時間格式,所有時間統一為世界時,并精確到毫秒。
3)業務信息關聯。關聯業務配置信息,例如,某一種氣象資料是否業務考核、具體考核的比例、閾值、及時時間等是該資料的業務配置信息,則在此步驟中將通過關聯業務配置信息來判斷該氣象觀測資料是否業務考核資料,如果是業務考核,則獲取相關的考核信息。
4)日志信息豐富。根據原始業務日志中的臺站編號,從臺站信息庫中補充臺站所在的國家、區域/省、市、地縣等信息。
4.1.2 數據去重
為保證業務質量,氣象資料在各業務環節存在數據訂正、數據傳輸續傳等現象,導致原始業務日志數據也存在重復問題。為保證后續監視指標計算的準確性,預處理過程需要對數據進行清洗去重操作,避免因重復計算造成誤差。
去重過程針對每一條原始業務日志,提取“資料類型”、“資料時次”、“業務環節”、“臺站號/文件”等要素作為唯一標識,根據唯一標識中的信息,計算其MD5值存入Redis緩存,后接入數據時檢查MD5值是否出現,若是則打上重復標記。根據氣象資料業務時次周期處理的特點,在Redis中緩存4個數據周期,能夠覆蓋大部分數據,對于少數超過4個周期的延遲數據,系統采用離線計算方法糾正。
首先對資料類型、資料時次、資料臺站和業務環節全排列編碼,計算方法如式(1)。
編碼值i=MD5(typei×timei×stationi×processi)
(1)
其中:type為資料小類編號;time為業務時次編號;station為臺站號;process為業務環節編號。
以500種資料小類,20種不同業務時次,10萬個臺站,10個處理環節為平均數估算最大占位數,如式(2)。
500×20×105×10=1010
(2)
在Redis中設置1010的Bitmap,占用內存1010÷8=1.25×109字節,約為1.16GB。將每條資料日志記錄的編碼值i存入Redis中緩存,后續的日志記錄均與Redis緩存的編碼值進行比較,如果相同,則打上重復標記。
原始業務日志數據預處理后經過指標計算處理,生成能精細化反映業務運行的狀態和可用性的指標數據,如完整率、到報率、及時率等。監視指標計算環節采用Spark Streaming實時處理框架,對實時數據流進行統計計算。
為實現監視指標的實時更新,指標計算以15秒為窗口從上游獲取日志數據,統計分鐘、小時、日尺度的業務監視指標,以及全國和分省指標,并存入指標庫。
4.2.1 業務指標計算
氣象業務系統對于考核數據每天會生成應收數據清單(節目表),包含數據類型、應收站號、應到時次、環節標識等信息,當有數據到達某個環節時該環節會形成數據達到日志提交綜合業務監視,通過與節目表的算法匹配計算出業務指標,具體流程如下,以區域站數據小時到報率計算為例:
1)通過采集層獲取每日節目表,解析后將類型、站號、時次等信息寫入存儲層中。
2)從實時數據隊列中每15秒獲取一次日志數據,通過數據類型和環節標識區分出區域站數據,并從存儲層中獲取區域站數據的節目表信息。
3)將收集環節的區域站日志中的站號、時次信息與進行匹配節目表,計算15秒內日志中區域站數據的收集情況。
4)從存儲層中獲取區域站到報指標,根據步驟3)中的計算結果進行累積匯總,計算出實時指標回寫至存儲層,供綜合業務監視前端展示調用。如圖4所示。

圖4 業務指標計算流程
每次計算當前時次、前兩個時次和后一個時次的到報情況,既能保證數據計算時效性,前端能取到實時的到報情況,又能保證計算資源的合理利用。由于氣象業務數據存在延遲到達的現象,指標計算每天定時從存儲層中將近三天的數據取出進行一次指標重算,保證指標庫中非實時數據的正確性。
4.2.2 計算任務容錯處理
由于指標計算時偶爾會出現計算失敗導致最終指標計算結果存在誤差,系統設計了數據處理容錯機制。采用批序號的增量計算方式,通過Redis緩存記錄當前處理的批號,在指標累計過程中比較最近的有效計算結果實現增量累計,只有在所有數據執行計算完成之后才提交Kafka進行消費量偏移,若是計算失敗則納入下一批次重新計算。數據處理容錯流程如圖5。

圖5 指標計算容錯處理流程
根據業務規則,對監視指標進行判斷和分析將生成告警事件,告警事件包括來源、種類、異常詳細情況等內容。由于氣象綜合業務監視數據包括業務全流程環節監視指標、以及支撐每個業務環節的基礎設施資源運行狀態信息,一個環節出現告警,將導致后續業務流程均產生告警,而大批量的告警將影響運維人員對關鍵告警的判斷。
減少告警數量、分析告警源頭,生成面向運維人員的告警事件,將提高故障處理效率。告警歸因分析需要比較復雜的分析處理過程,還需關聯業務配置信息進行綜合判斷。本系統主要實現了以下基礎的告警處理:
1)告警歸并。對同一種資料、同一個業務時次、不同業務環節的告警事件進行歸并,只保留最前業務環節的告警事件。告警歸并方式包括有根據同一告警源頭進行歸并處理,根據同一時間窗口內的相似告警的歸并處理等,根據不同的業務需求采用不同的歸并處理算法,并對算法進行優化處理。
2)告警壓縮。設置壓縮規則,將同一種告警源頭、同一個業務時次、同一業務環節超時未處理的重復告警事件設置壓縮標識,對重復事件進行壓縮,并標識該告警事件對應的第一次和最后一次發生時間,壓縮所采用的算法包括有:基于滑動窗口的壓縮方法,基于告警事件文本相似度的壓縮法,基于告警發生事件關聯關系的壓縮法等。對于某個具體業務應用,可以配置壓縮時間窗口的大小,壓縮文本相似度大小,壓縮所需要的特定字段等
為了計算告警事件之間的關聯關系,首先需要對同一個時間窗口T內的告警進行去重處理,得到唯一個告警事件ID,這一個時間窗口T內的所有獨立告警可以被認為是一個獨立的數據集,即
AT={a0,a1,a2, …an}
其中AT表示時間窗口T內所有告警的集合,而a0表示某一條獨立的告警事件。
計算告警事件之間的關聯規則就是形成如X→Y的映射關系,其中X和Y是不相交的項集,在關聯關系挖掘中,有兩個非常重要的概念:支持度(s)和置信度(c),對于告警事件的支持度和置信度的計算公式分別為
其中:N是所有AT的總數。σ(X)表示X項集的個數。
3)告警升級。定義告警級別升級策略,對告警事件持續時間較長而未處理關閉的告警進行級別自動升級處理,如從一般告警升級為嚴重告警;也可以依據告警事件發生頻次做升級處理,如對于發生頻次較高的告警做升級處理;或對指定告警類型進行升級處理,如對硬件類資源監控指標得到的告警事件進行升級處理等。這些升級處理的觸發時間和觸發條件等可以通過配置來實現。
為提高對海量氣象業務監視數據的處理時效,系統采用Spark Streaming處理框架實現對監視數據預處理、指標計算。
Spark Streaming是將流式計算分解成一系列短小的批處理作業。這里的批處理引擎是Spark Core,也就是把Spark Streaming的輸入數據按照時間窗口分成一段一段的數據DStream(Discretized Stream),每一段數據都轉換成Spark中的RDD(Resilient Distributed Dataset),然后將Spark Streaming中對DStream的Transformation操作變為針對Spark中對RDD的Transformation操作,將RDD經過操作變成中間結果保存在內存中。整個流式計算根據業務的需求可以對中間的結果進行疊加或者存儲到外部設備。Spark Streaming處理流程如圖6[10]。

圖6 Spark Streaming處理流程
對于一個Spark集群來說,執行器(Executor)數量和執行器的并行度固定后,分區數量決定了任務執行的數量,也決定了任務執行的效率。
針對執行器數量,分區過多會導致任務的排隊,分區過少會降低集群的利用率;分區數據的傾斜,會導致各個任務的執行時間不一致,快的任務需要等待慢的任務,形成木桶效應,降低利用率。另一方面時間窗口大,每一批的數據量過大,會導致數據溢寫,降低效率,處理時間變長;窗口小,批次數據過少,又會導致調度時間相對過長。所以,分區和時間窗口的設置需要充分考慮系統資源情況和處理數據量。
氣象綜合業務監視數據平均每秒處理記錄數約為3萬條,設置時間窗口為15s,每批數據量為45萬條;執行器數量為12,故選擇把數據分成10個分區,對應10個執行器,另外2個執行器進行調度處理。一方面為了提高系統資源利用率,另一方面避免在計算過程中分區之間過多的數據交換。
由于計算中經常會對同一種資料、同一個時次進行指標累加計算,如果將數據隨機分配到分區中執行計算,同一種資料會在多個執行器上計算,匯聚時需要從各個執行器上進行結果匯總,根據實驗結果,影響了整體運行效率。為了減少任務執行后產生的匯聚開銷,在數據進入分區前增加一次排序操作,按照資料類型和業務時次進行排序,盡可能把同一種資料同一個時次的日志數據放在同一個執行器上執行。
把45萬條數據根據資料類型和時次均勻分成10個分區,數據分區的偽代碼如圖7。

圖7 數據分區偽代碼
為提高分區效率,本文采用水塘采樣算法,對每一個時間窗口的m條數據,隨機抽取n條樣本數據,針對樣本數據,按照分區邏輯計算每個分區的索引范圍,再將m條數據根據索引范圍加入到每個分區中。引入該方法后,數據分區效率得到大幅度提升。
根據復雜度[11]式(3)估算45萬條數據的排序性能。
計算復雜度=O(m)+n×log(n)+O(n)
(3)
分區優化前,計算復雜度為
O(450000)+450000×log(450000)+
O(450000)≈3443946
(4)
分區優化后,計算復雜度為
O(450000)+2000×log(2000)+O(2000)≈458602
(5)
可見,利用采樣數據進行分區優化,根據復雜度計算結果,分區性能可提升7倍以上。
4.5 基于全鏈路壓測體系的故障仿真與定位
基于全鏈路壓測體系的故障仿真建立了一套標準的模擬流程(圖8),包含準備階段、執行階段、檢查階段和恢復階段。覆蓋從計劃到還原的完整過程,并通過可視化的方式清晰的呈現給使用者。故障模型訓練的目的是有針對性的制造一些故障,給故障定位系統制造數據。其流程圖如圖9所示。

圖8 全鏈路壓測體系故障仿真流程圖

圖9 故障模型給故障定位系統制造數據流程圖
故障注入的對象包括1)應用:外部超時/響應變慢等。接口,DB延遲/連接滿/熱點,redis緩存熱點,kafka,中間件的負載均衡/限流等。2)資源:cpu,內存,磁盤,io,網絡延時等。以及3)程序:cpu,mem,iptable,流量劫持,解析,過濾,模擬丟棄和延時等。
通過故障模擬,可以實現以下目標:1)檢驗降級預案的有效性:下游依賴出現故障時,預案能及時應對,將系統的 SLA(service-level agreement,服務等級協議)維持在相對較高的水平,不因下游故障引起當前服務可用性的故障。2)監控報警:校驗報警是否符合預期:是否報警、消息提示是否正確、報警的實效、收到報警的人是否預期。3)故障復現:故障復盤的后續todo項落地效果如何,通過一定時間后對故障的重現和驗證,完成閉環。4)架構容災測試:主備切換、負載均衡、流量調度等容災手段健壯性如何,提前發現并修復可避免的重大問題。5)參數調優:限流策略、報警閾值、超時設置的調優。
氣象綜合業務監視數據采集與處理框架的實驗運行環境由11臺虛擬機組成,硬件配置如表2。

表2 硬件配置
為測試監視數據采集性能,選取了5個業務日志文件,計算Flume從讀取文件到寫入Kafka消息隊列之間的時效和吞吐量,測試結果表明,5個日志文件的平均處理吞吐量為18MB/s,每秒采集的日志記錄條數峰值達30萬以上,如圖10。

圖10 Flume采集性能
另外,基于Spark Streaming流處理平臺,接入所有氣象綜合業務監視數據,測試24小時內,預處理、去重、指標計算環節的處理記錄條數,如圖11。

圖11 Spark Streaming處理性能
測試結果表明,Spark Streaming流處理平臺能實現平均每分鐘處理200萬條,每秒處理峰值達6萬條以上,日累計處理記錄數超過30億條。
本文選取了2019年6月15日到6月20日共5天內的告警事件數據,并采用上述告警壓縮方法,選定時間窗口為10分鐘,得到了壓縮前后的告警數量,如圖12所示。

圖12 對5天內的告警數據做壓縮得到的結果。
圖12中展示了一部分時間內的告警壓縮結果,其中紅色線表示壓縮之前的原始告警數量,而綠色線表示經過告警壓縮之后的告警數量,從圖中可以對比發現,壓縮之后告警的數量得到了極大減少,且通過計算發現,該時間段內的告警壓縮率結果為71.2%,表明經過壓縮算法處理后,呈現出來的告警數量降低到原來的30%左右,顯示出壓縮算法的效果非常顯著。
為了驗證系統的穩定性和及時發現故障,解決故障的能力。從資源、應用和程序三個方面進行了故障注入的模擬測試。下面給出了其中兩個測試的實驗結果。
1)進程掛起故障模擬
圖13顯示了編號為1841的進程掛起的故障模擬結果,故障注入之前狀態為S,故障注入之后的狀態變為T。

圖13 進程掛起的故障模擬結果
該結果顯示故障注入系統可以成功的模擬和監測進程故障。
2)系統使用故障模擬注入
在該實驗中,人為模擬了通過對系統進行冒煙壓力測試的方式,加大系統的CPU使用率,圖14顯示了從12月29日到1月6日這一周之內測試的結果。

圖14 測試結果
結果顯示,系統可以有效捕捉到CPU使用率超標的系統緩慢故障并及時發出有效預警,給系統維護人員準確判斷故障原因與及時排出故障提供了有效依據。
本文建立了針對氣象綜合業務監視數據的采集與處理框架,采用REST、Flume、Kafka、Spark Streaming等一系列大數據技術與算法,實現對監視數據的高效采集、緩沖和處理。同時采用先進的故障模擬仿真技術對系統進行了故障排查壓力測試。該框架已經業務應用于中國氣象局氣象綜合業務實時監控系統中,目前,每日采集和處理數據超過30億條,每秒處理峰值達6萬條以上,支撐監視數據30s內完成采集、處理到告警的快速發布,提高了氣象綜合業務故障的快速發現能力。未來,基于該處理框架的監視數據挖掘、告警關聯分析、故障智能預測和故障自愈將是研究的重點,以推動氣象監控運維系統的自動化、智能化水平。