吳海濤 廖 兵 王 丹 李張泰 周文進
(1.四川明星電力股份有限公司,四川 遂寧 629000 2.北京國網信通埃森哲信息技術有限公司,北京 100031)
目前數據中心的數據抽取方式大致為全量數據接入方式或基于全量對比的數據抽取技術。該方式處理效率低下,經常會造成數據積壓嚴重,通常無法達到數據的實時性抽取任務要求。針對該問題,本系統對現今的增量抽取技術進行了研究,提出一種Change Data Capture (CDC)結合GPkafka的實時數據接入抽取技術,極大地提升了目前明星電力公司中臺數據抽取實時性,達到了零數據積壓的數據入庫要求。
CDC又稱變更數據捕獲(Change Data Capture)。CDC有兩個模式:同步和異步。同步CDC主要是采用觸發器記錄新增數據,基本能夠做到實時增量抽取。而異步CDC則是通過分析已經commit的日志記錄來得到增量數據信息,有一定的時間延遲,并且提供了到Oracle Streams的接口。同步相對比較簡單,通過觸發器捕獲增量數據。而異步CDC根據實現的內部機制區別,又可以分為異步HotLog模式,異步分布式HotLog模式和異步AutoLog模式[1]。
2.1 Kafka。Kafka是一個分布式消息隊列。Kafka對消息保存時根據Topic進行歸類,發送消息者稱為Producer,消息接受者稱為Consumer。Producer即生產者,向Kafka集群發送消息,在發送消息之前,會對消息進行分類。Topic即主題,通過對消息指定主題可以將消息分類,消費者可以只關注自己需要的Topic中的消息。Consumer即消費者,消費者通過與kafka集群建立長連接的方式,不斷地從集群中拉取消息,然后可以對這些消息進行處理。
2.2 GPSS。Greenplum Stream Server(GPSS)是 一 個ETL(提取、轉換、加載)工具。GPSS服務器的一個實例從一個或多個客戶機接收流數據,使用Greenplum數據庫可讀的外部表將數據轉換并插入到目標Greenplum表中。數據源和數據格式是特定于客戶機的。數據源和數據格式由客戶端指定[2]。
整體技術流程圖如圖1所示

圖1 基于異步聯機日志結合流處理的實時數據抽取實現
系統采用可視化數據抽取流程建模技術實現了以上的數據抽取過程。可視化數據抽取技術通過組件連接的方式形成數據處理管道。
整體數據實時抽取流程由兩個管道組成,分別是數據生產者和數據消費者。
3.1 數據生產者管道。通過解析異步聯機日志,CDC感知數據源庫Oracle中表的數據更新、增加和刪除操作,并將操作流轉換成數據流發送到Kafka中。通過CDC讀取到的日志進行SQL解析與轉換,將數據解析為對應數據的JSON對象,并標記該數據是增加、修改還是刪除數據,然后將數據存入到Kafka中。需要注意的是,存儲到Kafka中的數據,不同來源表的數據需要存儲到不同的Topic中,相當于每個Topic中僅存儲固定的一張源表的增量數據信息,一般情況下,使用Oracle的schema名和table名聯合起來作為Topic的名稱[3]。
3.2 數據消費者管道。GPKafka通過Job方式將導數的配置提交到GPSS,GPSS讀取Kafka對應的Topic數據,并寫入對應的Grennplum數據庫,然后將Job的調度的信息存入到本地文件中。GPKafka消費者根據配置的目標倉庫的schema和table信息,自動生成對應的Job配置文件,然后提交Job至GPSS中進行執行。GPSS接收到Job信息后,開始根據Job配置讀取Kafka中對應Topic中的數據,然后寫入到GreenPlum數據庫中。當同時運行的Job數量過多時,GPKafka消費者會根據實際情況,暫停和調度不同的Job運行情況,以滿足當前數據抽取業務的需要。最后將流程運行階段性情況和調度情況寫入到本地文件系統中[4]。
數據來源:明星電力公司中臺數據,大小為從100M-5G單表。
測試環境:1臺Oracle數據庫服務器,1臺大數據中臺服務器,1個GreenPlum實例(6臺服務器,1個Master,24個Segment),Kafka集群(3臺服務器節點),服務器物理內存64G,內網帶寬1Gbps。測試所用源庫表以完全入庫完畢。
測試方法:對比方法:
第一,CDC讀取數據庫數據變化日志,然后通過傳統JDBC直接寫入Grennplum數據庫,后面稱為(CDC+JDBC);第二,我們系統采用的方法:通過CDC讀取數據庫數據變化日志,然后通過GPKafka直接寫入Grennplum數據庫,后面稱為(CDC+GPKAFKA);第三,通過JDBC實現全量抽取,并通過Greenplum存儲組件將數據寫入Grennplum數據庫,后面稱為(JDBC全量抽取)[5]。
實現對比結果如表3、表4所示。

表3 各個方法測試對比結果(累計讀取源Oracle數據庫效率)

表4 各個方法測試對比結果(累計寫入目標Greenplum數據庫效率)
通過觀察計算可以發現JDBC全量抽取和CDC+JDBC這兩種方式再讀取和寫入效率上都是一致的,因為他們都在同一個任務中做處理。而CDC+GPKAFKA這種方式,由于讀取出數據后,暫存在Kafka中,有個消費的過程,它的讀取和寫入的效率是有區別的,寫入效率遠高于讀取的效率。從事實數據抽取的效率上面來看,CDC+GPKAFKA這種方式是最優選。CDC+GPKAFKA由于同時采用了CDC異步日志分析技術和kafak流式并行寫入技術,不經過JDBC接口因此在小表和大表,小數據改變量到大數據改變量情況下都保持了非常好的數據抽取寫入性能[6]。
我們對多個業務數據源系統進行數據實時抽取,包含的數據源表總共約5000余張,歷史存量數據約150GB,月增量約10GB。同時,以上所述業務系統內的原始數據源表,包含部分未做分區、分表等設計的超大表,無唯一鍵和主鍵等現象,其中最大的表記錄數量達到2億條,約5GB,為數據中心對于該類表的數據實時性抽取增加了較大的困難和障礙。原始的數據抽取方式為全量數據接入方式和基于全量對比的數據抽取技術。該方式處理效率低下,通常造成數據積壓嚴重,無法達到數據的實時性抽取任務要求[7]。
針對該問題,提出的一種CDC結合GPKafka的實時數據接入抽取技術,提升了中臺數據抽取實時性。通過與CDC+JDBC方式、JDBC全量抽取方式對比,CDC+GPKAFKA有明顯的優勢,在對1億條數據量的讀寫測試中,CDC+GPKafka的方式讀寫數據效率分別261000條/分鐘和509000條/分鐘。達到了零數據積壓的數據入庫要求。