朱錚雄 黃宇青
1(海際金控有限公司 上海 200120)2(上海市計算技術研究所 上海 200040)
巴塞爾協議Ⅲ[1]根據銀行交易數據與風險資產計算一級資本的充足率。銀行的各種交易數據的集成和整合是整個巴塞爾系統的核心。傳統方式采用腳本和數據庫存儲過程[2]進行數據整合,串聯起上游數據源系統和下游商業報告系統。隨著金融全球化的步伐加快,數據量呈幾何級增長,從原先面臨億級數據處理到面對每天萬億級以上的處理數據,系統面臨極大的瓶頸。原先采用的全部是存儲過程,導致數據庫瓶頸大,單個節點擴展性能難,數據處理速度慢,存儲過程難以維護。同時下游報表系統也嚴重依賴上游數據庫[2],數據庫經常卡頓,嚴重制約整個巴塞爾系統運行的準確性、時效性。提升數據處理性能,提高自動化程度,降低系統之間耦合度是擺在整個銀行運維和開發面前的重要問題。
本文采用Spring Batch的系統構架方式,重新架構整個數據整合系統,并結合目前業界最為流行的Web Service對數據采集、數據處理、數據發布、數據持久化等各個應用模塊按照面向服務模式進行解耦合。本文將這些應用模塊按照面向服務的設計理念重新定義接口,使整個模塊有機地聯系起來,每個服務相互獨立并且可以以一種統一和通用的方式進行交互。當某個服務無法適應數據壓力時,可以進行水平擴展,在保持各個模塊獨立的擴展性的同時進行創新,確保了數據整合的可靠、穩定、高效,且滿足巴塞爾系統的運行要求,成為海量數據整合的核心樞紐。采用這種整合方式,數據處理效率高,系統易于擴展以及被下游系統接入。
Spring Batch[3]是一款優秀的、開源的大數據并行處理框架。通過Spring Batch可以構建出輕量級的健壯的并行處理應用,支持事務、并發、流程、監控、縱向和橫向擴展,提供統一的接口管理和任務管理。它是一款基于 Spring 的企業批處理框架,通過它可以完成大數據并發批量處理。使用Spring Batch可以實現下列目標[4]:
(1) Batch Data:能夠處理大批量數據的導入、導出和業務邏輯計算;
(2) Automation:無須人工干預,能夠自動化執行批量任務;
(3) Robustness:不會因為無效數據或錯誤數據導致程序崩潰;
(4) Reliability:通過跟蹤、監控、日志及相關的處理策略(retry,skip,restart);
(5) Scaling:通過并發和并行技術實現應用的縱向和橫向擴展,滿足數據處理的性能需求。
Spring Batch良好的大數據批處理的性能和高可擴展性,使其被廣泛應用于各類自動化的數據遷徙系統中,包括超大數據的氣象系統[3]。
VMware vFabric Gemfire[5]是一個彈性可擴展分布式內存數據管理平臺,可用于構建需要超高速數據交互的、具有高度可擴展能力的應用系統。它能夠跨越多臺虛擬機、多個 JVM 和多個Gemfire服務器來管理應用對象。使用動態備份和分區,使它能提供多種平臺特性,例如:數據持久性、可靠的事件通報、連續查詢、通用的并行處理、高吞吐、低延遲、高擴展性、持續有效性和WAN分布。
基于以上特性,可以看出VMware vFabric Gemfire非常適合于巴塞爾風險業務管理系統的數據緩存層,它可以滿足本文對整合交易數據進行實時訪問的需求,其性能可以隨著需要彈性擴展,并且可以在多臺服務器上實現部署,實現海量內存緩存池的要求。
Apache CXF[6]是一個開源的Services框架,CXF支持使用Frontend編程API來構建和開發Services,如JAX-WS。這些Services可以支持多種協議,例如:SOAP、XML/HTTP、RESTful HTTP、CORBA,并且可以在多種傳輸協議上運行,例如:HTTP、JMS、JBI。CXF大大簡化了Services的創建,同時繼承了XFire傳統,可以天然地與Spring進行無縫集成。
系統按照功能大致可以分成三個主要階段:大數據讀取,數據整合與映射以及數據計算與存儲。
(1) 大數據讀取:批量高速讀取交易流程上所有的數據,包括交易對手的合同、實際交易、賬戶信息、證券產品信息以及價格,存入中間高速緩存中。
(2) 數據整合與映射:將高速緩存中的交易對手信息、交易系統、合約信息做一一映射,存入數據庫系統中,同時將證券產品中的固收產品單獨提取并寫入數據庫,根據緩存中的合約信息計算凈合約,將凈合約信息更新進系統中。
(3) 數據計算與存儲:先根據已經映射好的數據做風險敞口計算,再根據抵押品現狀抵減風險敞口值,最后計算VaR、預期信用風險、客戶的風險評級和產品的敏感度值。
圖1為系統主要功能模塊。

圖1 系統功能模塊圖
各個模塊的功能如下:
協議/交易/賬戶(Agreement/Trade/Account)的BCP Loading:使用Sybase的BCP導入數據。BCP[7]基于DB-Library以并行的方式導入批量的數據,目的是快速導入Agreement/Trades/Account等基礎信息進入數據庫中待用。
協議數據(Agreement):與交易對手簽訂的結算合約。
交易數據(Trades):實際交易數據。
賬戶數據(Account):交易對手等賬戶信息。
證券數據(Security):證券化的產品信息。
價格數據(Pricing):標價信息。
交易對手/機構映射(Counterparty/Legal Mapping):使用存儲過程處理,將交易對手信息(Legal指公司交易機構)與交易信息對應,存入系統表中。
主要數據映射(Master Data Mapping):使用存儲過程處理,將交易對手、交易信息以及合約信息數據一一匹配映射,存入系統表中。
固收數據處理(Fix Income Process):將固定收益證券(包括中長期國債、公司債券、市政債券和抵押債券等債務類證券)提取出來,存入數據庫中。
價格和數據處理(Pricing & Mapping Process):將標價信息與固定收益證券映射,對固定收益債券進行標價,用于計算市場風險,結果存入數據庫中。
凈合約映射(Netting Data Mapping):使用存儲過程對雙邊凈合約(Netting Agreement)的信用風險抵減計算,并且將得出的數據全部存儲更新在系統表中。
風險敞口計算(Analytical Calculation Processor):使用存儲過程對數據進行風險敞口的初步計算,并且將得出的數據全部存儲更新在系統表中。
抵押品計算(Collateral Mapping):使用存儲過程對抵押品進行的計算,并且抵減對應的風險敞口值,將得出的數據全部存儲更新在系統表中。
VaR計算(VaR Calculation):使用存儲過程計算在一定概率水平(置信度)下,交易數據價值在未來特定時期內的最大可能損失,并將結果存入數據庫中。
CEF計算(CEF Calculate):使用存儲過程計算信用風險敞口預期值。
機構PDLC計算(Facility PDLC Calculation):根據交易數據更新調整機構客戶信用值,同時可以授予信用值。
敏感度映射(Sensitivity):使用存儲過程針對固定收益證券按照利率、利差和在投資收益率等數據敏感度對標的資產做數據映射,完成數據的敏感性值預設。
按照上述的系統主要功能需求,數據整合核心服務層主要分為Spring Batch大數據加載和通用服務數據處理兩大部分。Spring Batch+Gemfire+CXF 輕量級架構系統結構如圖2所示。

圖2 核心服務層系統結構圖
本文利用Spring Batch批量讀入大文件,使用Mapper將文件數據映射成對象數據,根據數據類型適配不同的數據處理器(Processor),并調用通用服務中與之對應的服務進行數據處理,將處理完成的數據存儲進數據庫,并發布在Gemfire cache中。
根據系統功能劃分,有如下處理器:
(1) 客戶處理器(Customer Processor):使用Spring Batch導入客戶及交易對手信息,并調用客戶服務獲取匹配交易對手信息。
(2) 協議處理器(Agreement Processor):使用Spring Batch導入合同信息,并調用協議計算服務獲取匹配合同以及凈值優惠計算的信息。
(3) 機構處理器(Facility Processor):使用Spring Batch導入客戶信用信息,并且調用機構數據服務獲取客戶信用的信息。
(4) 證券處理器(Securities Processor):使用Spring Batch導入證券信息,并且調用證券化服務獲取證券的信息。
(5) 計算處理器(Calculate Processor):使用Spring Batch導入計算信息,并且調用計算服務和投資組合服務來計算整個信息。
(6) 敏感度處理器(Sensitivity Processor):使用Spring Batch導入敏感度信息,并且調用敏感度服務來比較和計算敏感度。
2.2.1 Spring Batch大數據加載
數據處理流程如圖3所示。

圖3 數據處理流程圖
本文將處理的數據分解為Job,并且為其定義屬性和基礎設施,通過Reader、Processor和Writer來實現數據業務處理、基于Pojo的開發以及領域對象描述。本文分別使用了如下組件:
(1) 批量任務加載器(Batch Loader):使用Autosys定時啟動Spring Batch開始處理文件。
(2) 任務庫(Job repository):用來持久化Job的元數據,是所有Job的中心倉庫。
(3) 任務啟動器(Job launcher):從Job的中心倉庫取出一個Job,并且啟動。
(4) 任務(Job):Batch操作的基礎執行單元。
(5) 步驟(Step):Job的一個階段,一個Job由一組Step構成,其中Tasklet Step包含一個事務過程,包含重復執行、同步、異步等策略。
(6) 單元數據(Item):從數據源讀出或寫入的一條數據記錄。
(7) 單元數據讀取(Item Reader):從給定的數據源讀取Item集合。
(8) 單元數據業務處理(Item Processor):調用通用服務中對應的服務組件,對Item進行邏輯業務處理(包括數據映射和數值計算等。
(9) 單元數據寫入(Item Writer):把Item寫入數據源。
針對超級大數據,我們還可以對Spring Batch進行橫向和縱向的擴展,確保整個系統能夠應付超大的數據集合壓力。對任務進行擴展的幾種方式如表1所示。

表1 擴展方式描述
每個Step都可以并行處理,Step并行處理模式使用了在一個節點上橫向處理,但隨著作業處理量的增加,如果一臺節點服務器無法滿足Job的處理,可以采用Partitioning Step的方式將多個機器節點組合起來完成一個Job的處理。如圖4所示,主服務器對Item讀、寫的處理邏輯進行分離,通常情況下將讀操作放在一個節點進行,將寫操作分發到另外的節點執行。這樣做到了負載均衡和主從復制,理論上只要增加處理服務器,就幾乎可以無限提升Step的處理業務能力。

圖4 集群處理結構圖
定義任務舉例:
……
processor=″compositeItemProcessor″ writer=″cacheWriter″ commit-interval=″1000″ /> …… 上述任務中,定義了一個交易數據的加載,定義使用fileReader批量文件讀取組件和使用Gemfire的緩存寫入cacheWrite,同時也定義了數據處理器Composite Item Processor映射。 核心處理器Spring配置舉例: 在此配置中,使用了數據組合類Composite Item Processor處理Transaction Item對象,把Mapping Processor、Customer Processor、Securities Processor、Agreement Processor和Facility Processor、Trade Level Calculation Processor和Remove Obj Reference Processor等單個處理器“串聯”在一起生成 Transaction 對象,這樣就可以通過調整配置文件,達到各個組件復用和靈活配置的目的。 以配置中的客戶處理Customer Processor為例子,在Spring中定義bean: 實現customerProcessor類: public class CustomerProcessor implements ItemProcessor public BaseTransaction process(BaseTransaction txn) throws Exception { setAccountDetail(txn); return txn; } protected void setAccountDetail(BaseTransaction txn) { this.accountDataService.matchAccounts(txn); this.transactionService.processHouseAccounts(txn); } } 處理客戶數據需要進行兩個步驟:(1) 將客戶賬戶信息進行映射;(2) 將集團客戶內部掛消帳內部處理掉。 2.2.2 Gemfire做數據通用服務的DAO 通用服務主要提供核心業務邏輯處理,并以Web Service方式將數據發布在網絡上。其主要體系結構如圖5所示。 圖5 通用服務系統結構圖 本文主要采用面向接口的方式對通用服務進行設計。通過Java Interface。接口中對服務實現方法進行抽象定義,并對方法進行具體實現。通過面向接口編程,可以完成統一調用,應用在不同數據源上,比如對于同一個DAO接口,分別有Gemfire、jdbc、hibernate以及jms的實現,當Service調用DAO的save功能時,可以同步完成對Gemfire、數據庫以及jms的數據存儲(發布)。 本文以交易服務為例,定義了Transaction Data Service的創建交易數據(create)接口: public interface TransactionDataService { public void create(BaseTransaction tx); } GemfireDAO通過對緩存節點Region塊的存儲傳入交易數據,實現了創建交易數據(create)接口,在實現過程中,注意使用同步來確保數據的原子性: public class TransactionGemfireDAOImpl implements TransactionDAO { public BaseTransaction create(BaseTransaction tx) { if(tx != null){ this.getTransactionRegion().put(tx.getId(),tx); } return tx; } public Region String regionName = ″/transactionRegion″; if(this.transactionRegion == null){ synchronized(this){ this.transactionRegion = RegionUtils.getRegionByName(regionName); } } return transactionRegion; } … } 2.2.3 CXF完成通用服務數據接口發布 通用服務通過CXF發布標準的Web Service,不僅可以做到水平橫向擴展,同時也方便與下游系統主要系統對接,按照其要求定義對應的接口,本文以敏感性分析為例,在保持其他條件不變的前提下,研究單個市場風險要素(利率、匯率、股票價格和商品價格)的變化可能會對金融工具或資產組合的收益或經濟價值產生的影響。 在Web.xml中除了傳統加載Spring配置文件外,還需要加載CXF的Servlet,完成Web Service的映射: 在Spring中定義bean: 本文還定義了Sensitivity接口: @WebService public class SensitivityWebSerivce implements SensitivityService { @WebMethod public String getTransactionss(@Webparam(name=″asOfDate″)String asOfDate, @WebParam(name=″tradeType″)){ Criteria criteria = new Criteria(asOfDate, tradeType); return transactionDataService.getTransactions(criteria); } … } 本文通過定義一個敏感度的查詢數據接口,直接調用內部的Transaction Service查詢接口,間接地把內部交易服務標準的Web Service的方式提供給下游系統使用。同時因為采用的是分布式部署,系統擴展很方便,下游運行報表期間,直接增加CXF的節點就輕松應對了高訪問需求,同時隔離了對數據庫的壓力。 本文在使用Spring Batch+Gemfire+CXF框架重構了整個整合系統后,在銀行內部管理系統中的3臺8核CPU的虛擬機(SpringBatch 80 GB內存,Gemfire 120 GB內存,CXF80 GB內存)上進行了UAT環境測試實驗。計算處理1.4 TB數據所需時間從2小時變為20分鐘。此外,通過提升部分風險計算服務器CPU性能,進一步提升了空間。下游系統讀取系統穩定性大大增強,沒有再發生因為下游系統查詢導致核心數據庫宕機的情況,極大地節省了人力維護成本和機器運行成本。 傳統巴塞爾數據整合系統依賴存儲過程和ETL工具,系統復雜,性能較差,而Spring Batch 和Gemfire的整合應用在業界整合使用案例較少。本文提出一套全新的基于Spring Batch+Gemfire+CXF輕量級應用架構來處理海量的巴塞爾整合數據,用于替代之前大量的存儲過程的設計。整個系統通過實際測試、運行,表現良好,具有各層間低耦合、高擴展性、高可靠性的特點,性能遠超傳統依賴存儲過程的數據整合系統。基于Spring Batch+Gemfire+CXF架構是一個行之有效的輕量級、大數據、低成本應用整合解決方案,可以推廣到大數據、高可用、企業級、可伸縮的企業、銀行等應用開發中。
3 應用效果
4 結 語