顏 冰,王鐘雷
(中國人民財產保險股份有限公司 大數據中心,北京100022)
大數據時代,社會的各行各業,人類的衣、食、住、行、醫、娛等,時時都在產生數據,這些數據正呈指數級、爆炸式增長,而且隨著信息科技的不斷進步,從海量異構大數據中迅速且高效地挖掘出有效價值,并將其轉化為可靠的決策依據,已經成為各個行業所面臨的重大的挑戰,極大地考驗著數據統計分析的能力[1].面對規模大、種類多、變化快等大數據問題,許多企業通過大規模的硬件資源投入來保障數據的基本處理能力,從數據采集到生成報表,仍然采用T+1日的數據處理機制,規模較大的企業甚至需要8 個小時左右才能完成,傳統技術已不能全面滿足業務和管理決策的數據時效要求,企業指標數據的實時供給能力亟待全面提升,傳統統計工作到了必須重視、研究和應用大數據技術的發展階段.
為提高數據的統計分析處理能力,很多企業采用了數據一體機方式作為解決方案,如Teradata 大數據一體機.傳統架構是主機、存儲、網絡、管理軟件、數據倉庫(數據庫或者中間件或者虛擬化軟件)等進行分散管理,而一體機則是把這些進行集成,打包形成一體化的解決方案,來消除傳統解決方案中存在的性能瓶頸問題,比如數據管理,I/O 讀寫等方面存在的性能瓶頸,有針對性地提升系統的整體處理能力.但是隨著數據的急劇增長和大數據需求的爆發,尤其是單一化場景需求逐步向多元化場景需求的轉變,一體機方式從總體成本、擴展能力、配套軟件等方面看,已經逐漸失去競爭優勢,尤其是對于大型(數據密集型)企業明顯不是最優解決方案.當前,越來越多的企業開始探索并通過應用Hadoop 等大數據技術來提升大數據治理和實時供給能力[2,3].
大數據的處理系統大概可以分為兩類,也就是批處理與流處理系統[4].批處理大數據系統(以Hadoop為代表)需先將數據“匯聚成批”,通過批量的預處理之后,加載到分析型數據倉庫之中,可以用來進行高性能離線“實時查詢”.這種批處理系統雖然可以對完整地大數據集合實現高效的即時查詢,但卻沒有辦法查詢到增量的實時在線數據,存在數據延遲的問題.相較于批處理大數據系統,流處理是一種大數據實時處理技術的典型應用,它是一個無限增長、沒有邊界的動態數據集合,以Spark Streaming、Storm、Flink 為代表的流處理系統無需存儲大數據,可對數據進行實時高效地在線預處理,全面、逐條地加載到高性能的內存數據庫中供查詢.本文研究的是一種使用數據庫日志分析、流處理、內存計算和分布式等技術的指標實時計算模式,即基于流處理技術的指標實時計算方法.
流處理系統能滿足對進入系統的數據進行即時計算的需要,相比Hadoop、Spark 等批處理系統,在處理方式有非常大的不同.流處理更加像一個MapReduce計算的通用模型,只不過它的響應時間可以達到秒級甚至是毫秒級.流處理不需要對完整的數據樣本進行計算,只針對通過系統的每一個數據項進行操作.流處理系統理論上能夠處理無限多、無限大的數據,但在同一個時間點,卻只能夠處理一條(真正的流處理)或者少量(微批處理)的數據,在不同記錄之間只保持最少量的狀態.流處理屬于一般意義上的數據富集、持續處理,以及對于無界數據的分析過程的組合.流處理模式適合有近實時處理需求的任務,如基于網站用戶行為實時產品推薦、經營指標實時計算、客戶信用審核、業務審核、反欺詐等.
目前,主要有兩種不同的方法來構建流處理系統,一種屬于真正的流處理(Native Streaming),所有被輸入的記錄或者事件都將按照它們進入的先后順序被逐個處理,如Storm、Flink、Samza;另一種方式是微批處理(Micro-Batching),小的“批”由多條輸入的記錄組成,它們按照預設好的時間常量創建,通常是每隔幾秒生成一個,如Spark Streaming、Trident-Storm.五種主流的流處理技術對比情況如表1所示.
其中,Storm、Spark Streaming 和Flink 是三種較常用的流處理技術[5].

表1 五種流處理技術對比情況
Storm 比較適用于實時處理數據的場景,它是一套開源、分布式、高容錯的實時計算系統,在多個方面具備很強的優勢.Storm 具有較強的容錯性,可以對工作進程及節點的故障進行管理;流計算可以在多線程、進程以及服務器之間并行展開,非常易于水平擴展;Storm 的消息處理機制非常地可靠,不會遺漏信息,能保證每個消息都能得到完整的處理,而且在任務失敗時也能夠從消息源重試這個消息;底層使用MQ 作為消息隊列,能夠保證消息能得到快速的處理;Storm具有可靠的事務機制,即數據的處理完全精準,而且可以針對高峰、低峰時間段,動態調整實時計算程序的并行度,以最大限度利用集群資源;同時,Storm 的開發和單元測試也比較方便.Storm 目前發展的已經相對比較成熟,部署和管理起來也很簡單,性能表現也十分出眾,常常被用于實時分析、持續計算、ETL、在線機器學習、分布式遠程調用等.
Spark Streaming 能夠實現對具有很高的吞吐量,需要高容錯機制的流數據的實時處理,屬于Spark 核心API 的擴展內容之一,支持從Flume、Kafka、Twitter、Kinesis、ZeroMQ,以及TCP sockets 等多種數據源獲取數據.獲取到數據源數據后,能夠利用map、reduce、join 和window 等高級函數,處理特別復雜的算法,同時也可以把處理結果持久化到數據庫、文件系統、現場儀表盤等.但是與Storm 相比,Spark Streaming 適用于不要求實時處理和完全可靠的事務機制,不需要動態調整并行度的場景.Spark Streaming 突出的優點是吞吐量(即單位時間內處理的數據量,MB/S)高,是Storm 的2-5 倍.如果除了要進行實時計算外,還包含批處理(離線)、交互式查詢等需求,那么就要先選擇Spark 生態,采用Spark Core 來實現批處理(離線)操作,使用Spark SQL 來實現交互式的查詢,再使用Spark Streaming 來實現流計算,將三者進行無縫地整合,能夠給系統帶來非常高的擴展性能.
Flink 目前還屬于新興的項目,仍處于不斷成熟的時期.它是介于Spark 和Storm 之間的一種架構,采用了原生的流處理系統,與Spark Streaming 有相似的主從結構,與Storm 相似的數據流,所以Flink 兼具了低延遲和高吞吐的特性.同時,Flink 在API 和容錯性上也有很好的表現,使用起來相對來說也比較簡單.Flink 具有許多特性:可進行帶有事件時間的窗口(Window)操作;能進行高吞吐、低延遲和高性能的流處理;窗口(Window)操作高度靈活;支持Exactlyonce(有狀態計算)語義;能進行基于time、count、session,以及data-driven 的窗口操作;對于基于輕量級的分布式快照(Snapshot)實現可以容錯;能運行具備Backpressure 功能的持續流類模型;運行時可同時支持Batch on Streaming 處理以及Streaming 處理;可進行迭代計算;Flink 可在JVM 的內部做到屬于自己的內存管理;程序可自主優化,這樣可以避免在非常特定情況下,產生Shuffle 及排序等高代價的操作,當然中間結果需要進行緩存操作.
基于流處理技術的指標實時計算,是通過實時監聽和捕獲數據庫日志,利用流處理技術對日志和數據庫操作指令進行實時解析,并實時將分析結果用于指標計算的大數據處理模式.Flink 是新興的項目,而Storm 經過多年發展已經比較成熟,相較于Spark Streaming 的處理延時更低,甚至可以到毫秒級,完全可以滿足指標實時更新的需求,因此我們選擇Storm 作為實時處理的核心技術.
Hadoop 是一個批處理系統,它由于具備數據吞吐量大、自動容錯等很多優點,所以廣泛應用在海量異構大數據的處理上.但是,Hadoop 適合大批量數據的離線處理,并不擅長實時計算,因為它本來就是為批處理而開發的,這也是大家一致的共識.不過Hadoop 卻可以作為Storm 等組件運行的基礎框架平臺.因此,整個實時計算系統基于Hadoop 平臺構建,由日志采集、消息管理(Kafka)、協調管理(Zookeeper)、實時處理(Storm)、內存數據庫(Redis)等部分構成[7].其中,由日志采集模塊(使用Shell、C、Java 等腳本開發的插件)監聽數據庫日志,并實時把日志抓取下來推送至Kafka 分布式消息管理系統;通過Storm 系統消費Kafka 中的消息,同時通過Zookeeper 管理期間的消費記錄;由Storm 根據指標計算邏輯對日志進行定制化分析和處理,并輸出到Redis 內存數據庫中;最后由應用程序讀取Redis 中的結果并展示給用戶,或轉入數據庫進行持久化存儲.從技術架構看,自上而下,首先是源端數據庫,下一層是日志采集部分,可以針對多個數據庫同時進行采集,日志采集之下是Kafka 消息管理系統,Kafka 消息管理系統層之下是Storm 實時處理層,Kafka 和Storm 之間的協調管理由Zookeeper 承擔,Storm 流處理層之下是Redis 內存數據庫,最下層是Web 或者App 應用,也可以包括用于持久化的數據庫(如Hbase 等列式數據庫),詳情如圖1所示.
日志采集程序或工具有多種可選方式.在此列舉三種,一種是編寫Shell、C 或Java 腳本程序,自編腳本輕量和完全自主可控,對服務器產生的壓力相對較小,但需要一定的自主開發量;第二種是采用第三方框架技術直接進行采集,比如采用Flume.Flume 屬于分布式的、高效的日志采集系統,可以把分布在不同服務器上的海量日志文件統一收集到一個集中的存儲資源中,但是Flume 的配置卻不怎么簡單,Source、Channel、Sink 的關系交織在配置文件中的話,非常不便于管理;還有一種方式是使用CDC(Change Data Capture)產品實現源端數據庫日志的采集,但CDC 通常需要進行單獨采購,同時需要在源端數據庫和目標端數據安裝軟件,成本較高,對服務器性能要求也高.

圖1 基于Storm 的實時分析系統技術架構
本方案采用自行開發的腳本程序進行采集,直接將采集腳本部署在源端數據庫服務器,實時監聽和讀取磁盤設備中的日志文件.此方式相較于另外兩種方式有三個優勢:一是部署簡單,直接部署在源端數據庫服務器即可,無需額外地搭建同步服務器,Flume 和CDC 都需要若干臺服務器來部署;二是響應速度快,可實現實時捕獲增量日志,而CDC 的實時同步間隔通常是數秒,無法滿足要求;三是只涉及磁盤文件讀取,占用源端服務器的CPU 和內存資源少,對服務器的運行影響不大.
Kafka 是基于日志文件的消息系統,消息能夠持久化存儲到硬盤中,數據不容易丟失.Kafka 可以保存消息的進度及位置,對于用戶來說,也可以自行定義消費的起始點,可以實現消息的重復和多次消費.Kafka 同時具有隊列和發布訂閱兩種消息消費模式,可以保證消息隊列中的消息能按照順序被消費并且與Storm 的契合度很高.此外,Kafka 的Consumer 是pull-based 模型,該模型可以緩解日志產生速度快于消費速度的壓力,使消費速度合理匹配生產速度.把Kafka 消息系統放置在日志采集和Storm 模塊中間,是防止在突發的、高并發的情況之下,由于日志可能會出現井噴式的增長,如果這時候Storm 的消費速度不能快于日志的產生速度,就會導致大量消息處理滯后,進而導致丟失,所以加入了Kafka 消息系統作為數據緩沖區.
Zookeeper 是一個針對分布式系統的高可靠地協調系統,它可以讓分布式系統在大多數情況下正常運行.一是可以提供分布式的鎖服務[8].分布式集群系統中,讀取與分析等操作會分散到不同的節點之上進行,所以在數據操作的過程中就有可能發生一致性問題.Zookeeper 提供的這種鎖服務就很好地解決了此問題,保證了進行分布式數據運算時的數據操作的一致性;二是能夠為分布式的系統提供故障恢復的支持.Storm中master 節點運行的守護進程“Nimbus”和worker 節點運行的守護進程“Supervisor”之間的協調工作是通過Zookeeper 來管理的,Nimbus 和Supervisor 自身在集群上都是無狀態的,它們的狀態都保存在Zookeeper中,所以任何節點的宕機和動態擴容都不會影響整個集群的工作運行;三是Zookeeper 也可以管理Kafka 的消費記錄,即使遭遇Kafka 宕機,在進行重啟之后也能定位上次的消費記錄,從宕機點繼續進行消費,實現了“斷點續傳”.
Storm 能夠相對比較簡單地實現對復雜實時計算的編寫以及擴展.數據庫數據的實時處理會使用Storm,就像好比離線數據批處理常常使用Hadoop 一樣,而且Storm 能保證沒有遺漏,保證每一個消息都能被處理,速度也比較快.在一個相對較小的集群中,可以使用多種語言編程,如使用Java、Payson 等語言進行開發,每秒能處理百萬級別的消息.Storm 作為整個指標實時計算模式的功能核心和技術核心部分,主要完成三個方面的工作,即日志解析、指令解析、實時計算.該部分的日志處理能力主要受單一數據庫只能采用單線程處理的限制,實戰時要注意避免該問題成為整體處理能力的提升的瓶頸,但是不同的數據庫日志可以并發處理.
2.4.1 日志解析不同類型的數據庫產品的日志編碼規則和存儲邏輯各不相同,解析日志需要首先研究數據庫日志的編碼和存儲等規則,這是整個計算模式能否正常運行的前提之一,否則無法將日志解析和轉換為易于識別的信息.日志解析程序在接收到日志消息后,將根據數據庫的日志規則,自動切分日志,識別日志類型,剔除回滾等不改變數據的日志類型,僅保留增、刪、改等操作產生的日志,并將該部分日志的每一頁內容由十六進制編碼轉換為“標準和可用”數據用于下一步進行指令解析.整個日志解析過程可劃分為捕獲、切分、識別和轉換等四個部分,如圖2所示.

圖2 日志解析過程圖
2.4.2 指令解析
基于日志解析部分的結果,指令解析部分將按照擬統計的大數據指標的算法要求,從日志中篩選出指標計算涉及到的所有操作信息,解析每個增、刪、改等操作涉及的指令所影響的數據表以及字段信息,抽取出用于數據篩選和計算的信息,尤其是相應的數據增量變化信息.簡單來說,就是通過指令解析從中獲取指標計算所需要的全部信息,然后將解析結果推送至內存計算程序進行下一步處理.指令解析過程可劃分為識別、篩選、解析和推送等四個部分,如圖3所示.

圖3 指令解析過程圖
2.4.3 實時計算
以數據統計類指標計算為例.傳統的處理方式是首先同步源端數據庫和目標端數據庫的數據,待增量數據在目標端數據庫同步且入庫完成之后,再調用程序根據指標算法對全量樣本數據進行聚合計算,計算結果保存到數據庫中等待應用讀取,數據的時效性、連續性比較差.基于流處理的實時計算則是對數據庫數據進行實時、在線、同步處理,無需入庫,無需對樣本進行全量計算,直接在上一次計算結果基礎上進行處理,理論上可提供毫秒級的實時計算能力.
實時計算程序需要根據指標算法預設某指標的完整計算規則,基于日志和指令解析的結果,自動適配該指標的計算邏輯.在篩選出需要參與計算的信息后,針對Insert、Update、Delete 指令選擇相應的處理方式進行計算.如果是Insert 插入指令,可以在上一次統計結果基礎上直接進行“加法”操作;Update 更新指令在日志中會記錄該表原始數據塊和該表最新數據塊,可根據數據實際變化情況進行“加法或者減法”操作;Delete 刪除指令進行“減法”操作.對上一次計算得出的結果進行相應的“加法或者減法”計算后,將得出的最新數值寫入內存數據庫中供應用調用.當然,也可以保存所有增量變化的信息,持久化到備份庫或者寬表中滿足不同場景的需要.
有些指標的計算需要考慮復雜的篩選條件,有的可能需要進行復合或混合運算,只要獲取了“字段”的變化情況,無非是計算的復雜度得到了增加,但是復雜的判斷和計算勢必會影響實時處理的效率,這點需要在數據庫建模時統籌進行考慮,并在處理邏輯上進行優化.此外,需要注意的是,內存數據庫中的數據應當定期進行轉儲,比如可以將轉儲的數據保存至HBase 列存儲數據庫中,這樣就可以在系統宕機后對數據重新進行初始化.
上述基于流處理技術的指標實時計算方案,采用了較成熟的主流技術和工具,能夠實現對大數據的實時、在線以及持續地處理,可以滿足業務和管理決策對數據的實時性需求.該方案與傳統數據倉庫、BI、數據采集(如CDC 等)等技術相比,具有五大優勢.一是處理高效,從捕獲數據庫日志到完成指標實時計算的時耗能達到毫秒級;二是對源端數據庫服務器影響小,因直接讀取服務器磁盤日志文件,不涉及數據庫系統級的管理和交互,所以基本不占用源端服務器的CPU 和內存資源;三是可靠性高,消息隊列(kafka)和協調系統(Zookeeper)保障了日志能夠逐條被處理,并且整個集群在宕機后能夠快速恢復;四是成本低,流處理集群基于X86 架構服務器搭建,價格低,采購、維護和升級簡單;五是可擴展性強,整個系統采用Hadoop分布式集群架構,通過增加硬件設備可實現處理能力的線性提升.
基于Storm 的大數據指標實時計算模式已經在某省級單位進行了實踐.數據庫產品的型號及版本為IBM Informix 11,在局域網(千兆)內架設基于X86 架構PC 服務器的Hadoop(開源)集群作為運行平臺.日志采集使用Java 插件完成,采用Hadoop、Kafka、Zookeeper、Storm、Redis 等開源產品和組件完成消息管理、協調管理、流計算、內存計算等工作.整個實戰環境不超過10 臺PC 服務器,可配置4-6 個計算節點和2-4 個管理節點,主要用于處理本地的兩個Informix 數據庫日志.基于以上的環境和配置構建的流處理計算系統,實現了兩個大類,不少于3 個維度的統計指標的實時計算,達到了毫秒級的準實時計算效果.值得一提的是,在每個數據庫日志200-500 MB 大小的情況下,各臺服務器CPU 占用率僅為5%左右,剩余空閑資源可利用空間還非常大,還可以增加更多的指標進行處理,當然也要考慮日志解析節點的綜合處理能力,不能盲目的增加過多的計算內容.
該單位的數據發布平臺對接了流處理系統的實時指標數據.依托于流處理分布式集群的快速處理能力,流處理系統的計算結果通過消息機制實時推送至發布平臺.以該單位的保險費指標為例,實時數據支持日期、機構和產品三個維度,可滿足各級管理人員實時監控和分析業務情況的需要.下一步,該單位計劃逐步開發賠款、賠案、實收保費、應收保費等更多指標的實時展示功能,進一步增強平臺的業務和管理支撐能力.
目前,基于Storm 的大數據指標實時計算方法存在兩個相對比較大的問題.一是數據庫產品升級可能帶來日志格式的變化.數據庫產品手冊并未說明日志的編碼規則,日志的類型等信息需要自行研究,如果數據庫版本變化導致日志編碼或存儲規則發生變化,這種情況就需要在升級之前重新研究日志規則,然后對應調整日志解析算法;二是指標口徑調整可能引起系統處理邏輯的變更.如果指標口徑調整導致算法邏輯發生變化,比如統計的字段或者數據篩選條件發生改變,就需要調整指令解析和實時計算程序.這兩種情況尤其是第二種問題如果頻繁發生,可能要耗費大量的時間和人力成本完成相應改造工作.
基于Storm 的大數據指標實時計算方法,尚處于研究階段,仍需要進一步的測試和優化,穩定性有待進一步提高,處理能力也還有挖掘的空間.就現階段應用實踐效果來看,在不需要大量資金投入的情況下,滿足數據規模適中的企業的少量指標的實時計算基本沒有問題,但是數據日志規則研究和單個數據庫日志的解析效率問題,目前仍然是實現大批量指標計算的掣肘,所以大規模應用的基礎仍需要進一步的夯實.但是隨著大數據技術日新月異的不斷進步,更成熟和強大的組件或產品也會不斷涌現,該技術在將來通過持續地升級和調整,流處理能力也必將會越來越強大.