成兆金,莊立偉,張媛媛,譚 奎,伍湘秦
(1.日照市氣象局,山東 日照 276826; 2.國家氣象中心,北京 100081; 3.華云信息技術工程有限公司,北京 100081;4.重慶市豐都縣氣象局,重慶 408200; 5.湖南省氣象局,湖南 長沙 410000)
中國氣象局觀測司2009年下發了農業氣象測報業務系統(AgMODOS V1.0),實現了觀測資料的信息化,系統產生的數據文件以FTP方式傳輸[1]。隨著自動觀測技術的迅速發展,FTP文件傳輸方式難以滿足大容量實時氣象資料的高時效傳輸。
目前,氣象通信傳輸多采用消息隊列技術,有研究者利用Java語言,采用RabbitMQ消息隊列技術,設計開發消息傳輸服務模塊,能支持多種客戶端,臺站通過消息傳輸客戶端將生產的觀測數據進行消息封裝后發布到消息隊列(Message Queue)中,省級消息服務器從隊列中獲取消息后,以秒級的時間推送到國家級消息服務器,進而完成高時效的數據傳輸,滿足農業氣象觀測數據不斷增加和變化的需求[2-5]。
本文研究的重點是在AgMODOS的基礎上,引入基于Java語言的消息服務機制,研發基層臺站客戶端的消息服務軟件,改進新XML標準數據格式文件傳輸模塊,進行有效的文件傳輸監控與管理。
消息服務客戶端軟件采用面向對象技術與模塊化結構技術框架,利用Java語言,采用RabbitMQ消息隊列等技術,運用XML可擴展標識語言開發完成[6-9]。
消息(Message)是消息隊列中的最小單元,實質上是一段數據,一般作為多個應用程序之間的信息傳輸載體,消息可非常簡單,比如只包含臺站要傳輸的XML[10]。消息隊列(Message queue),也就是消息中間件,可通過其高可靠的傳遞機制進行和平臺無關的數據交換[11]。消息隊列技術正逐漸成為各行業通信系統中廣泛使用的核心技術。由于RabbitMQ采用AMQP傳輸協議,服務端支持消息隊列的持久性,可以有效保證各個環節流轉過程的觀測數據完整性,同時可提供靈活的路由控制轉發功能,符合氣象數據傳輸的業務需求。
2.2.1 消息服務模塊架構
消息傳輸服務模塊,采用面向對象思維與模塊化技術,兩層架構設計,由交換微代理接口層和交換微代理服務層組成[12]。
交換微代理接口層為消息傳輸服務系統提供管理配置接口,接口可直接嵌入到AgMODOS中,與交換微代理服務進行信息交換。
交換微代理服務層,也就是業務邏輯層(BLL),采用RabbitMQ消息中間件框架及common.io文件監聽技術,以消息流發送的方式實現農業氣象觀測XML標準格式數據發送、數據接收、數據過濾/封裝、數據緩存和數據補傳,保證了數據內容安全完整[13]。
2.2.2 功能設計
消息收發采用消息中間件RabbitMQ,將數據按照規范的數據封裝和文件名規則輸出。消息服務參數采用conf.properties文件配置,完成數據接收服務器IP、發送者身份、編報中心、監視路徑等消息參數配置。通過數據過濾,對數據文件的格式進行檢查及重復判斷,然后進行消息封裝、數據發送、移盤,對發送成功的數據文件轉移到Sending及sendback文件夾。發送失敗,重新調用回調接口,進行數據補發,對發送成功的數據設置緩存存儲,交換服務微代理功能結構如圖1所示。

圖1 交換服務微代理功能結構Fig.1 Exchange service micro-agent function structure
在數據接入的臺站類型方面,增加區分主站、備份站的臺站信息。對同一類資料,主測站和備份測站,采用不同的4級編碼來區分。省級根據4級編碼來區分測站類型,對主站、備份測站運行狀況進行監視。
在傳輸服務層面,資料的格式檢查增加時效性檢查,對臺站傳輸客戶端的接收目錄中的資料進行時效判斷,對超過規定時間范圍的資料數據,不再進行傳輸發送。
2.2.3 傳輸流程
臺站通過消息傳輸客戶端,對AgMODOS系統生產的XML標準格式數據Z文件進行監聽、校驗、封裝,最后以消息的方式傳給省級消息服務端,具體流程如圖2所示。圖中各部分運行過程:AgMODOS軟件生成XML文件后寫入接口目錄,然后由消息傳輸客戶端經common.io文件初始化啟動監聽,掃描獲得文件后,進行集合唯一性重復判斷,如果文件重復移盤至重復目錄,通過文件名格檢、fileMatch匹配,則將消息屬性封裝后進行消息發送至省級CTS消息傳輸服務器,省級不落地直接轉發國家級,不通過格檢的文件移盤錯誤目錄。
(1)程序啟動后監聽資料入口目錄,對新增實時文件或年度文件實時處理。
common.io文件監聽,采用common.io.FileAlterationMonitor文件目錄動態監控技術,對監控目錄進行間隔1 s的實時動態監聽,獲取目標文件以供檢查。

圖2 消息傳輸模塊流程Fig.2 Message transfer module flow chart
FileMonitor m=new FileMonitor(1 000);//設置監控的間隔時間1 s輪詢1次
//指定文件夾,添加監聽
String monitorPath=PROP_MAP.get(“monitorPath”);
m.monitor(monitorPath,new FlumeListener());
//開啟監聽
m.start();
(2)通過檢驗規則完成對新增文件的實時檢驗,匹配成功的數據進行后續流程,未通過校驗的文件按照相應規則進行移盤操作,并記錄對應日志信息。
對于校驗成功的XML標準格式數據Z文件,消息傳輸客戶端會把客戶端的配置信息和XML的屬性信息(表1)封裝成RabbitMQ消息屬性和文件內容,形成實時消息文件或年度消息文件,封裝的字節流消息體構成一個完整的消息發送給省級CTS消息服務器的RabbitMQ中,實現臺站與省級、國家級CTS消息服務器的對接。
(3)對匹配成功的數據動態獲取文件內容長度,并讀取文件,接著將讀取的XML標準格式數據內容轉碼為base64后存放至byte數組中封裝為消息體,與消息頭一起進行封裝成RabbitMQ消息發送給省級CTS的RabbitMQ進行接收。

表1 農業氣象觀測數據消息屬性Tab.1 Message properties of agricultural meteorological observation data
(4)如發送出現異常或失敗時,將文件的處理信息,封裝到map中,同時另開一個線程,開啟定時輪詢功能,以時間間隔為60 s持續從map中獲取文件信息,嘗試重連并發送消息,直到消息發送成功為止。文件發送成功后將文件信息從該map中移除。對發送成功的數據文件轉移到Sending及sendback文件夾,如果發送失敗,重新調用回調接口,進行數據補發。
2.2.4 消息服務傳輸操作流程
消息傳輸是指通過Java消息服務協議進行網絡內或遠程文件傳輸。AgMODOS系統提供簡便、快捷的文件上傳工具。從程序組AgMODOS中選擇AgMOService(或AgMOEditor)程序項,在其主菜單上選擇“XML消息傳輸”,調用Java程序“MsgTransferClientV1.1.jar”,啟動消息服務,點擊“推送”按鈕,將“未傳”XML文件發送至省級CTS消息服務器,立即不落地上傳國家級CTS消息服務器,然后查看文件發生信息,確保消息服務正常,完成臺站發報任務。如果消息服務未啟動,點擊“檢測消息服務”按鈕,檢查消息服務狀態,再點擊“啟動消息服務”按鈕,啟動Java消息服務,重新“推送”(圖3)。

圖3 消息服務傳輸操作流程Fig.3 Message service transfer operation flow chart
(1)基于原農業氣象測報業務系統(AgMODOS1.0)的架構技術,引入Java消息服務傳輸技術,彌補了FTP傳輸的不足。2020年12月1日的全國業務單軌運行以來,表明臺站生成的XML標準文件上傳及時、入庫解譯正確、無丟失,達到了業務運行要求,對進一步實現氣象數據標準化建設具有重要意義。
(2)消息傳輸模塊基于Java語言的jdk1.8.0版本開發,對臺站計算機的操作系統要求較高,必須Win7及以上版本方可安裝,對環境變量的配置必須手工操作,且Java的更新升級,必須將計算機操作系統環境變量“Java_Home”的變量值進行同步更新,更換成更新升級后的版本信息,給臺站業務人員帶來諸多不便,將來需進一步完善消息服務客戶端軟件,以適應專業版、企業版、家庭版、旗艦版等各版本的操作系統,并做到自動配置Java的系統環境變量。
(4)消息傳輸系統是中國氣象局推行業務標準化建設新開發的業務系統,是一個新概念,要求臺站業務人員認知消息服務的概念和有關知識。業務準入運行以來:系統運行穩定,Z文件數據99%在1 s內完成從臺站到國家局的傳輸,達到了業務運行要求。