金石聲 李玨
[摘要]本省自動氣象觀測系統根據觀測要素的不同分為單雨量系統、兩要素系統和六要素系統。三類觀測系統分別接收各自站點采集的報文進行解碼入庫,同時通過FTP將數據報文發送到綜合處理系統再次進行解碼入庫。這樣的處理流程不僅增加網絡負擔,還進行了重復的解碼工作,資源利用率低。為此本文采用基于Spring Batch的框架,定時對三個不同觀測系統的后臺數據庫的實時數據向綜合處理系統的數據庫進行遷移。該流程采用數據分區技術,多線程并行處理,在減少了對網絡帶寬的占用量的同時,降低了各個系統CPU使用量,提高了工作效率。
[關鍵詞]Spring Batch 數據遷移 數據分區 多線程
引言
本省的氣象自動站綜合系統需要對已經進行過解碼入庫的單雨量、兩要素和六要素觀測報文進行集中,再次進行解碼后插入綜合數據庫,并以此對外提供服務。這樣的處理流程不僅在資源利用、網絡帶寬的占用都產生了很大的浪費。
通過分別讀取單雨量、兩要素和六要素的數據庫,對已經入庫的數據進行準實時的遷移,在降低網絡帶寬占用的同時,也能降低綜合處理系統的資源占用。但是基于大量的數據讀寫,且重復性高工作,傳統的編程方式處理不僅繁瑣,且維護性低。已經Spring Batch的出現無疑是解決這種問題的一種有效工具。由于目前已經有比較成熟的開源框架支持批處理的需求,所以本方案擬選用開源框架Spring Batch。這樣可以借助開源框架比較成熟的代碼,減少研究的難度,加強框架擴展性,減少研發周期,加快實際應用進度,并且保證程序的穩定性。
一、Spring Batch簡介
Spring batch是Spring的一個子項目,由Spring Source與Accenture(埃森哲)合作開發的批處理框架。Springhatch對編寫批處理程序本身的特性進行了抽象。將批處理程序分為Job和Job Step兩個部分,將處理環節定義為數據讀、數據處理和數據寫三個步驟。提供Job Repository來存儲Job執行期間的元數據,可以在處理大量的數據時,提供日志記錄/跟蹤,事務管理,處理統計,資源管理等特性。此外,還提供了分區技術采用多線程方式并行處理作業。
二、系統總體架構設計
考慮到系統可能出現的單店故障,為減少處理這類故障的開發難度。開發三個獨立的程序分別對單雨量、兩要素和六要素三個數據庫中的數據進行遷移。本文以遷移兩要素數據為例進行介紹。
Spring bath的核心思想是將讀取到的數據轉化為Java對象,然后對對象進行操作。首先需要根據表中的字段建立相應的實體類,然后Spring Batch把從源數據庫中讀取的每條數據映射為對應的Java對象,由于本文只是對數據進行遷移,不需要對對象進行處理,所以將Java對象的值通過寫步驟寫入目標數據庫。實際應用中只關注兩張表中存儲的數據:小時數據表(tabHourData)和分鐘數據表(tabMinuteData),由于遷移這兩張表沒有具體的先后順序,將這兩部分工作并行處理。
Spring batch提供了XML方式進行業務流程配置,通過spht元素來提供并行作業流的定義,通過task-execution屬性來定義執行的線程池,從而提高Job的執行效率。其中要定義兩個不同的作業步(transferHottrTab_step和transferMinuteTab_step),每個作業步下定義了兩個具體的子Job分別來完成對兩張數據表的數據遷移。子Job中又分別定義了讀、寫兩個過程來完成數據的遷移。其中為了保證執行效率定義了commit-interval,指定了從數據庫讀入1000條數據后進行一次寫操作,這樣既減少了10的訪問,也提高了寫入效率。關鍵配置如下(以transferHottrTab_Job為例):
數據遷移中讀取的數據量較大,為了高時效的完成讀取作業,讀取數據的任務進行分區,每個分區交給不同的線程處理。該模式的優點在于分區中每一個元素的處理都能像一個普通Spring Batch任務的單步一樣運行。具體關系圖如3-1,將需要讀取的目標數據分為了3個分區,每一個分區都有一個執行上下文Execution Context,StepExecutionSplitter根據不同的上下文生成作業步執行器,然后交給PartitionHandler來處理。應為Spring Batch默認實現了StepExecutionSplitter以及PartitionHandler。開發過程中原則上只需要實現自己分區邏輯partitioner即可。
小時表和分鐘表都含有對數據觀測時間的字段,因此對該字段進行分區,可以實現分區策略的共享。具體配置如下:
class=”com.xxzx.partition.DBpartition”>
作業中用于對數據庫進行分區的DBpartition了實現了Spring Btach的Partitioner接口,定義具體的分區策略,將數據查詢的時間進行切片,然后寫入Step的執行上下文,關鍵代碼如下:
while(start<=max){
ExecutionContext context=new ExecutionContext();
if(end>=max){
end=max;
}
context.putInt(_STARTTIME,start);
context.putInt(_ENDTIME,end);
start+=targetSize;
end+=targetSize;
resultMap.put(“partition”+(number++),context);
通過把每次任務中需要查詢的時間段根據targetSize的值進行切片,意味著數據片段分配到不同的作業步中。并將_ENDTIME和_STARTYIME寫入Step的執行上下文(ExecutionContext)。然后在讀取數據的階段通過讀取Step執行上下文獲取每個片段的統計時段。
3.2定時調度
由于Spring Batch本身并不是一個定時的調度框架。本文采用Spring本身提供的一個輕量級的調度框架SpringScheduler來實現定時調度任務。關鍵配置如下:
其中采用cron表達式實現每5分鐘調度一次作業,并且在schedulerLauncher中完成對啟動具體作業的配置。這樣便將Spring Scheduler和Spring Batch結合起來完成數據遷移任務。
四、結論
本文基于Spring Batch框架采用數據分區技術、多線程并行處理的方法開發了一個數據庫遷移系統完成批量數據的遷移工作,并結合Spring Scheduler實現了批處理任務的定時調度。在實際工作中優化了本省自動站數據處理系統中帶寬占用率高、系統資源浪費的現狀。實際工作中對本省自動站數量兩要素自動站進行了測試,完成3260條數據的遷移的時間不超過30秒。完全滿足實際業務中的需求。