王 寧,王 勝
(1. 北京經緯信息技術有限公司,北京 100081;2. 中國鐵道科學研究院集團有限公司 電子計算技術研究所,北京 100081)
鐵路電子支付平臺自2010年開始建設,解決了鐵路客戶現金支付的各種安全隱患,顯著提升了鐵路旅客服務水平。提升了鐵路運輸收入資金周轉周期,實現了中國國家鐵路集團有限公司(簡稱:國鐵集團)鐵路運輸收入集中收繳。
鐵路電子支付平臺支撐12306互聯網售票系統、車站窗口、自助售票機和鐵路貨運站支付等場景。隨著交易量的增加,系統中的交易日志和交易快照數據量,從設計之初的每秒百萬級上升到TB級。原有基于關系型數據庫的記錄儲存方式,已無法滿足性能要求。特別是春運、國慶等高峰期,高并發的數據寫入對數據庫主機CPU、內存、磁盤I/O等帶來較大沖擊,出現長時間寫入延遲和堆積,對系統可用性、穩定性帶來較大風險。
為解決系統處理瓶頸,提升系統處理能力。本文通過相關研究,基于高并發消息隊列中間件Kafka和Hadoop相關大數據處理技術[1],實現對現有交易日志和快照數據的處理和存儲方式進行改造,將該部分數據從關系型數據庫移至大數據平臺。同時,基于鐵路電子支付平臺雙活數據中心整體架構基礎,設計了滿足雙中心雙活處理方式。實現了海量數據的高效采集、存儲、查詢,有效地支撐了鐵路電子支付平臺高效業務處理。
基于高峰期數據處理量的要求,考慮系統的彈性伸縮能力,滿足高吞吐量需求,主要采用高并發消息隊列中間件Kafka完成數據接入[2],Kafka可支持每秒數百萬級別和TB級別的消息處理,且支持Hadoop并行加載。
參考鐵路大數據應用頂層設計研究,各種系統產生的數據是一組順序、大量、快速、連續到達的數據序列并且要求實時進行處理,此類數據可采用流式計算方法[3]。SparkStreaming[4]可以實現高吞吐量且具備容錯機制的實時流數據處理。Spark可以接收Kafka的實時輸入數據,進行實時統計和計算,數據處理完成后,Spark可以和Hadoop進行集成,將結果保存在HDFS,利用YARN服務進行資源調度。
分布式文件系統具有可移植、高容錯和可水平擴展的特點,一般采用HDFS作為存儲海量數據的底層平臺[5-6],基于HDFS之上采用HBase滿足快速查詢檢索[7]。
常見的大數據處理平臺以整合、集成成熟的Hadoop 生態圈開源技術為主,采用分布式存儲HDFS、HBase、分布式計算框架 Spark,以及 ZooKeeper、Redis等組合實現。
基于鐵路電子支付平臺現有交易處理已實現雙中心雙活處理,基于Hadoop技術的交易日志改造也需設計實現雙活處理。考慮到Kafka的高性能處理能力,采用數據雙寫方式,每個中心產生的數據均調用Kafka的接口寫入兩個中心進行處理。保證每個中心均存儲兩中心全量數據,實現數據同步和一致性,滿足雙中心的故障轉移及數據查詢、統計等需求。
結合系統現狀和技術研究,主要改造目標包括:
(1)整體架構支持大并發的數據量高效處理和存儲需求,滿足鐵路電子支付平臺現有雙中心雙活運行架構及峰值交易處理要求。
(2)提供基于業務處理量的實時數據收集、統計,提供基于交易流水號等的條件關聯快速查詢能力(秒級)。數據在線存儲6個月,歷史數據轉入歷史庫,可快速進行數據上、下線切換和查詢。
(3)對Hadoop整體運行環境配置、運行狀態、數據處理量、存儲、系統資源消耗等進行管理、監控和預警。
根據改造目標,鐵路電子支付平臺大數據處理邏輯架構,如圖1所示,主要包括數據采集模塊、數據存儲模塊、數據統計查詢模塊、組件運行監控4個部分。

圖1 鐵路電子支付平臺大數據處理邏輯架構
現有交易日志、交易快照模塊,通過Kafka客戶端接口將消息發送至Kafka集群,兩個中心將數據發送至本地Kafka集群和另一個中心Kafka集群,確保兩中心數據均保持全量數據。
Spark模塊將接收的日志和快照數據保存至HBase中,按天進行存儲,日志和快照分別保存在一張HBase表中,由于存儲數據量大,90天前的數據,自動進行下線處理,節省HBase region server資源,下線的數據作為歷史數據依然保留在HDFS上。需查詢時,執行上線處理,可繼續進行處理。
采用Hadoop YARN管理Spark集群。YARN在Hadoop中的功能作用有2個,負責Hadoop集群中的資源管理;對任務進行調度和監控[8]。
采用ZooKeeper 的容錯性和高可用性分布式組件協調功能構建每個中心內部Hadoop的高可用模式。該模式具備雙 NN 節點,能夠實現容災的功能[9]。
根據不同需求,數據統計包括實時統計和查詢。包括以下3類:
(1)按固定周期每分鐘一次基于默認條件的實時統計和計算,使用Spark steaming每分鐘進行一次統計任務,基于時間確定Kafka數據偏移量并處理對應數據,結果寫入Redis保存;
(2)基于指定條件的統計計算,在接收到用戶請求后,采用Spark任務,從Kafka拉取數據進行計算,進行統計并直接返回;
(3)基于交易流水號的查詢,通過一、二中心統一的查詢接口請求至HBase執行查詢,返回結果合并去重后返回給用戶。
獨立部署監控系統對整體大數據環境進行監控,主要包括:
(1)實時統計監控Kafka的數據寫入量和數據消費延遲;
(2)各Spark程序的運行時間戳(由程序記錄在Redis中),平臺各服務(HBase、HDFS、ZooKeeper等)的狀態;
(3)按周期巡檢Hadoop(HDFS)、 HBase、ZooKeeper等集群所在主機狀態(CPU、內存、網絡、磁盤等)。
以上各類監控數據分別進行采集、判別和告警,通過頁面展示和觸發報警提示。
通過數據采集、數據存儲、查詢統計、組件運行監控模塊4個部分組成數據處理完整流程,模塊調用關系,如圖2所示。

圖2 模塊調用關系
基于大數據環境包括較多的組件部署和配置,搭建完成的環境還需根據業務處理需求進行高并發、高可用穩定性測試。通過測試可以對環境參數進行調優,對處理性能指標評估,對系統穩定性進行驗證等。
鐵路電子支付平臺大數據處理功能基于支付平臺一、二中心獨立部署,各功能組件均采用集群模式搭建,確保系統環境高可用性。
根據機房物理設備情況,支付平臺一、二中心均用物理機環境搭建,ZooKeeper、Kafka、YARN node manager,Hadoop Datanode,Hadoop Namenode,監控均單獨部署在不同的機器上,以盡量避免相互之間資源競爭的影響。
數據存儲配置及規劃,按交易量每天2 000萬筆,每日志數據量1.2 KB,每快照數據量6 KB計算。一天的原始數據量為720 GB。Hbase數據存儲量90天約 51 TB。
系統測試方案按照生產真實數據編寫用例,利用20臺虛擬機模擬客戶端,每臺機器啟用多個服務,每個服務啟動多個線程,模擬多客戶端向支付平臺一、二中心發送數據,客戶端數量基本等同生產環境,達到1500個客戶端。使用驗證腳本統計Spark入庫時間,統計發送數據量,利用HBase程序校驗Kafka數據和HBase是否一致。
性能測試通過兩個系統維護天窗期進行,每次持續時間4 h模擬1500個客戶端,測試期間,交易量平均達 3000 筆/s,峰值達 5000 筆/ s,日志和快照處理平均量 15000 條/s,數據帶寬流量約 110 MB/s,Spark數據統計平均延時6 s。測試完成后,支付一中心和支付二中心均完成超過4500萬筆交易,存儲超過4.5億條數據。
為保障系統高可用,進行了各類故障場景的模擬測試。主要包括:
(1)模擬其中一個中心故障,數據寫入可以自動檢測并切換至另一中心完整寫入;
(2)模擬某一中心內部部分組件失效,包括Kafka隊列服務中斷、Spark單節點故障、HBase單節點故障、HDFS數據存儲單節點故障等,均可失效自動檢測故障,進行故障節點隔離、Server自動轉移。
根據性能和功能測試結果,整體處理能力達到峰值交易處理能力,運行平穩,能夠完成故障自動切換和恢復。滿足高并發、高可用設計目標。
本文提出了一種基于Kafka、Hadoop等的數據采集與存儲方案,設計了滿足雙中心運行的大數據處理集群架構環境,具有吞吐量大、高可用等特點,提升了支付系統處理能力。系統經實際環境運行特別是春運售票高峰,系統運行平穩、性能優良,具有一定的應用和參考價值。