董 斌,楊 迪,王 錚,周文紅
(1.中國電信股份有限公司上海研究院 上海200122;2.中國電信集團公司 北京100032)
自從Google發布了基于云計算的分布式MapReduce[1]大數據處理編程模型,大數據技術和應用得到了廣泛的應用,開源的Hadoop分布式計算軟件框架[2]更是將大數據應用推向了極限,網頁搜索、RTB(real time bidding)廣告、精準營銷等典型應用的成功使Hadoop、MapReduce成為大數據的象征。MapReduce是一種離線的批處理方式,可以成功處理TB、PB級海量數據,但無法應對實時數據分析需求和對消息事件的實時響應,大數據處理需要支持實時處理和迭代計算技術作為補充,因此流式計算成為大數據技術研究的新熱點。流式計算來自于一個信念:數據的價值隨著時間的流逝而降低,所以事件出現后必須盡快地對它們進行處理,發生一個事件進行一次處理,而不是緩存起來成批處理。
電信運營商是天然的大數據擁有者,豐富的數據資源已成為電信運營商的重要戰略資產,而運營商數據也有鮮明特點,其在大數據產業鏈中處于數據傳遞和交換中心的地位,擁有的數據一般不是最終落地數據,主要由實時信令和流式數據構成,因此在運營商大數據應用實踐過程中,對實時信令數據處理、分析的實時性需求越來越迫切。流式大數據技術的應用為運營商數據信息化提供了新的機遇和工具,并有助于提升數據價值和推進業務創新。
在運營商動態數據信息開放 (open information of dynamic data,OIDD)平臺的大數據集約運營應用中,實時信令采集、清理匯聚需要在省層面進行,全國層面再進行匯聚和加工處理開放。目前最快獲取信令中實時信息(如用戶狀態、位置信息等)的時延能達到小時量級,這無法滿足對數據實時性敏感的數據增值服務要求。以解決OIDD所面臨的問題為出發點,本文提出了基于流式計算的全國OIDD平臺的實時信令處理大數據技術解決方案。
用戶信息能力是電信能力的重要組成部分,具有豐富的潛在價值。動態數據信息開放平臺是對用戶動態信息進行實時采集、匯聚、挖掘和開放的平臺,為網絡優化和第三方應用提供“快數據”的增值服務。
通過網絡信令接口可以實時獲取到用戶狀態信息,包括終端能力信息、用戶狀態信息、用戶位置信息、網絡狀態及IP地址和電話號碼臨時綁定關系等,OIDD對從網絡中實時采集的數據集約匯聚,并經過脫敏處理后,根據數據不同類型和特性封裝成核心數據能力,面向第三方應用提供“快數據”能力服務和對網絡狀態進行實時的監控和優化。
目前,OIDD平臺基于Hadoop大數據平臺建設,平臺技術架構如圖1所示。
·ETL(extraction transformation loading):負責將分散的、從異構數據源中收集的數據(如信令數據、位置數據、DPI數據等)抽取到臨時中間層后進行清洗、轉換、集成,最后通過數據總線存入Hadoop分布式數據庫。

圖1 OIDD平臺架構
·運營管理平臺(OP):系統數據開放接口的管理平臺,提供數據路由、接入鑒權、數據訪問權限、數據脫敏等功能。
·數據總線:系統的數據總線,整合了系統Hadoop、數據庫、緩存等資源池訪問、系統消息隊列,并提供高速網絡訪問接口,使各模塊可以分開部署,靈活擴容。
·緩存(cache):OIDD的高速數據緩存主體為內存數據庫,在海量數據環境下它提供了數十倍于物理數據庫的數據查詢速度。
·數據庫(DB):保存數據挖掘和數據分析的結果性數據、被經常訪問的數據以及其他的臨時性數據等。
電信網絡按地域部署,信令信息、DPI信息都需要在省層面采集,并在集團層面進一步匯聚。OIDD平臺組網分為集團和省份2個層面,集團平臺完成數據統一匯聚,省份部署采集系統,并可根據自身發展,決定是否建設省級OIDD平臺。
圖2是數據源采集和匯聚的過程,數據源可以分為3種類型:集團集中建設的平臺系統(如業務平臺、集團IT平臺等)、已經具備數據集約的省份OIDD平臺、按省建設但是集團具備統一數據集約的系統(如綜合網管、集團信令系統、日志采集系統等)。
基于已建成的OIDD平臺,封裝成API和獨立的服務地址,提供數據的查詢、狀態訂閱和通知的功能。現已開展了用戶漫游服務、企業名片掛機推送、行車路線定位監測等服務,并取得良好的經濟效益。在平臺運營實踐中,雖然通過高速數據緩存能提升系統效率,顯著降低外部應用對分析結果的查詢時延,但由于需要經過層層匯聚和批處理,數據時效性最快只能達到小時量級,限制了新業務創新,提升“快數據”實時性價值成為OIDD平臺的未來發展重點。

圖2 動態信息開放平臺的多級組網
根據現有OIDD平臺架構和組網提高數據時效性,重點要解決以下2個方面問題。
·原有平臺基于Hadoop大數據處理技術,本質是批處理方式,實時信令數據采集需要積累到一定量或時間后再統一處理。平臺要進一步支持對實時、單獨的消息和事件進行處理,并且這個過程是消息/事件驅動和不間斷的。
·全國OIDD平臺數據需要通過省、集團兩個層面的匯聚,也是導致數據實時性無法保證的重要原因,有的在省公司可以開展的業務,由于數據時效性已過,集團層面難以開展。全國OIDD平臺需要及時獲得實時信令信息,盡量消除批量存儲再轉發的中間環節。
流式數據處理系統和批量數據處理系統有著本質的差別,流式數據處理系統需要維護消息隊列并進行實時消息的及時處理。分布式流式大數據處理技術雖然處于起步發展階段,但由于市場廣泛需求的驅動,成為關注和研究熱點[3]。當前具有代表性的流式處理系統有Storm[4]、S4[5]以及Spark Streaming[6]。
流式計算組件是實時大數據技術平臺的核心,但是幾種技術的比較不是本文重點,本文目標是選擇一款便于引入OIDD平臺的實時大數據處理技術。Storm主從方式比S4去中心方式更適合消息處理(保證消息順序性)[2],因此Storm越來越得到廣泛關注和應用,如阿里巴巴、百度實時數據處理都采用了Storm架構。Spark Streaming也是當前熱點,其原理是將數據流分成小的時間片斷(秒級),以類似批處理的方式處理數據,對于目前版本的Spark Streaming而言,其最小的批處理時間間隔選取為0.5~2 s[6],所以Spark Streaming能夠滿足除了對實時性要求非常高之外的所有流式準實時計算場景。但相比Storm系統,Spark Streaming計算時延大,Storm目前最小的時延是毫秒級[3]。基于上述分析,本文有針對性地對如何將Strom引入OIDD平臺的解決方案進行了分析。
大數據處理架構包括數據采集、數據接入、數據處理和數據輸出,如圖3所示。流式大數據處理也要選擇和配置滿足實時信令處理要求的數據采集、匯聚、處理和分析相關組件。

圖3 大數據處理流程
通過Flume-ng+Kafka+Storm可以搭建實現支持實時信令處理的大數據分析平臺。Flume-ng負責從各節點上實時采集數據;由于數據采集的速度和數據處理的速度不一定同步,因此需要一個消息中間件作為緩沖,Kafka作為一種高吞吐量的分布式消息系統非常適合承擔此項工作;流式數據處理作為關鍵組件,根據前述分析選擇了Storm;數據 輸 出 可 以 用MySQL和 自 定 義API(application programming interface,應用程序編程接口)。
Flume目前是一個分布式、高可用、可擴展的海量信息收集工具,可以實時進行數據的收集和傳輸,Apache Flume項目將Flume1.0以后的版本統稱為Flume-ng[7]。為了簡潔,后續涉及的Flume組件都是指Flume-ng版本。
Flume架構如圖4所示,以代理(agent)為最小的獨立運行單位,由數據源采集(source)、數據臨時存儲(sink)和數據流通道(channel)3層組成,一個agent就是一個JVM(Java virtual machine,Java虛擬機)。數據流的采集由事件(event)貫穿始終,事件是Flume的基本數據單位,運營商網絡中信令消息和日志記錄都可以看成一個個事件。
·source負責接收事件,并進行簡單處理后,寫到定制的各種數據接收方。通過編程,可針對不同數據源或數據類別對source進行定制。
·channel位于source和sink之間,用于緩存接收進來的事件。
·sink負責取出channel中事件,傳輸到下一跳或最終目的,sink也是可以通過編程自定義的。當sink成功地將事件發送到下一跳或最終目的地,事件從channel移除。

圖4 Apache Flume架構[7]
Flume是分布式的,每一層均可以水平擴展,具有端到端的數據傳送保障,非常適合實時性高、協議需要定制分析的網絡信令的實時采集。
Kafka也是Apache下的開源消息系統項目[8],是一種高吞吐量的分布式消息發布訂閱系統,在普通的服務器上每秒也能處理幾十萬條消息,可用于低時延的收集和發送大量的事件和日志數據。
Kafka架構如圖5所示,它維護按照類別進行區分的消息[8]。一個典型的Kafka集群包含若干生產者(producer)、處理服務器(broker)、消費者(consumer)以及一個ZooKeeper集群[9]。producer就是向Kafka發消息的客戶端,consumer則是從Kafka取消息的客戶端,一臺Kafka服務器就是一個broker,負 責 消 息 的 處 理 分 發,ZooKeeper管 理broker與consumer的動態加入與離開,各組件都可以水平擴展。
Kafka具有消息緩存能力,Kafka集群可以在一個可配置的時間內保存所有發布上來的消息,不管這些消息有沒有被消費。
在網絡實時信令處理中,需要一個消息隊列系統來緩沖實時信令采集客戶端發送過來的消息,并且要求這個消息隊列系統支持良好的擴展性和大規模的數據流,同時為了和下個環節的數據處理速度匹配。Kafka組件非常適合承擔這項工作,根據配置的訂閱規則將緩存的消息轉發到消息使用者,從而降低實時信令數據處理系統的復雜性。Flume-ng+Kafka+Storm架構中,Flume Sink作為Kafka的生產者,將消息事件傳送到Kafka集群中,按照消息類別(例如按消息發布者區分)進行緩存,Kafka根據配置的訂閱規則轉發到消費者客戶端Storm spout(參見第3.3節的Storm介紹)。

圖5 Apache Kafka架構[8]
Hadoop是基于批量的數據處理,需要等待數據的收集和任務調度,因此是離線的處理方式,通常的時間跨度在數十分鐘到數小時。與Hadoop不同,Storm是基于流式的數據處理,可以持續處理到達的數據,并且是基于內存級的計算,從而讓處理進行得更加實時,處理時延在幾十毫秒到幾百毫秒量級。
Storm也是一個分布式、高容錯的實時計算系統,計算在多個線程、進程和服務器之間并行進行,節點可以方便地水平擴展[4]。根據測試,單個節點服務器大約每秒可處理幾萬條消息或日志。
如圖6所示,Storm集群主要由一個主節點和一群工作節點組成,并通過ZooKeeper進行協調。主節點運行nimbus進程,負責任務調度和資源分配。每個工作節點都運行supervisor進程,supervisor負責接受nimbus分配的任務,啟動和停止屬于自己管理的worker進程,nimbus和supervisor不存在直接通信,兩者的協調都是由ZooKeeper來完成的,任務狀態和心跳信息等也都保存在ZooKeeper上。

圖6 Apache Storm架構[4]
拓撲是Storm中最關鍵的一個抽象概念,相當于一個實時數據流的處理邏輯,可以被提交到Storm集群任務中執行,圖6的worker中執行的任務就是一個個拓撲。Storm中的拓撲如圖7所示[4]。拓撲的基本元素是spout和bolt。spout是一個拓撲的數據源,通常情況下spout會從外部數據源中讀取數據,然后轉換為拓撲內部的源數據;bolt是實時流數據處理邏輯的執行進程,用戶所希望的對數據流的分析和處理操作在這里執行,復雜的消息流處理可能需要經過多個步驟,即經過多步bolt。

圖7 Storm的流計算拓撲[4]
實時信令數據的處理邏輯預先加載到Storm集群,Storm根據從Kaffka接收到的消息,按消息類別觸發任務,進行信令數據處理分析,并輸出結果。
現有動態數據信息開放平臺OIDD是Hadoop大數據處理平臺,基于批處理方式。從第2節的分析可知,OIDD平臺亟需解決數據的時效性問題,通過引入流式計算大數據技術,提高數據處理實時性能力,同時要考慮同現有平臺的融合方案。
實時信令處理大數據技術解決方案包括數據實時采集、流式的數據處理以及消息的調度和緩存,需要關注的關鍵技術包括組件的選取和部署、數據的處理時延和吞吐量以及與現有平臺的融合。
根據第3節分析,實時采集采用Flume組件,流式數據處理采用Storm,Kaffka作為消息中間件進行消息的緩存和調度。
(1)相關組件關鍵技術及部署方案
信令實時采集組件Flume:Flume負責網絡信令的采集以及DPI日志采集。通信網絡信令是實時的消息接口,不同設備的接口消息協議也有差別,DPI采集的則是一條條使用記錄。針對不同的數據源,需要通過開發不同的定制化Flume source程序,Flume source組件即嵌入網絡設備,也可以部署在云資源池,通過網絡接口連接。Flume agent的channel集中部署在云資源池,需要同時處理不同數據源數據,應該采用內存處理模式,提高處理性能。Flume部署則最好靠近數據源,因此建議在省層面云資源池部署Flume集群,通過管理平臺為不同的數據源加載對應的Flume source程序,根據數據吞吐量彈性配置計算、存儲資源。
分布式消息中間件Kafka:Kafka整體架構簡單,部署方便,有內置的分區,這讓Kafka成為了一個很好的大規模消息處理應用的解決方案。在實時信令的大數據處理中,Kafka主要是作為消息緩存中間件,保證數據采集和數據分析處理的消息同步,Flume sink作為Kafka的生產者,而Storm spout作為Kafka的消費者,Kafka將從Flume sink接收到的數據緩存到不同分區,Kafka broker對接收到的數據進行持久化處理,保存到存儲服務器,基于訂閱機制,將緩存的消息按需發送到不同的大數據處理平臺。Kafka集群也部署在省層面,同時為省OIDD平臺、集團OIDD平臺提供服務。
Storm平臺:Storm平臺是實時信令處理的大數據平臺關鍵組件,通過spout從Kafka消息隊列中讀取數據,發送到相應的bolt中進行處理,bolt則按業務需求配置對應的規則策略,可以針對不同數據源劃分不同bolt類,根據吞吐量在云資源池分配相應的資源。Storm還需要同集團/省的動態數據信息開放平臺進行整合,與Hadoop平臺實現資源共享,平臺整合的解決方案在第4.2節中描述。
(2)關鍵流程描述
圖8是實時信令大數據技術方案中的主要環節和處理流程。
·數據采集:通過Flume集群上部署的各類數據源采集接口機,采集OIDD所需的數據源信息,并進行數據格式轉化,傳送到Kafka消息隊列。

圖8 實時信令處理大數據技術方案
·建立消息隊列:Kafka將接收到的數據,按不同數據源建立多條分布式消息隊列并將數據緩存,根據不同OIDD大數據平臺對數據的訂閱規則,傳送給相應的OIDD平臺分別處理,省OIDD平臺和集團OIDD平臺的數據源可以不一致,消費者可以是OIDD的Storm平臺,也可以是原來的Hadoop平臺。
·消息接收:不同數據源具有不同的數據分析處理邏輯,對于實時信令由Storm平臺處理,Storm的spout接收消息或文件內容信息,并送到對應的bolt處理。
·規則匹配:由Storm與關聯規則篩選符合規則的數據內容,這些規則可以是對原有數據的更新,也可根據外部應用對數據的訂閱需求轉化而來,然后輸出結果。
·數據輸出:數據發送到數據庫存儲或者根據外部應用的訂閱需求觸發通知、查詢等事件。
Hadoop 2.0平臺中引入了新的資源管理系統YARN[10],MapReduce、Storm和Spark等組件均可運行在YARN之上,這就為Storm與Hadoop大數據平臺整合提供了解決方案。將Storm運行在Hadoop YARN上,Storm與原有Hadoop平臺可共享整個集群中的資源,并實現了數據的復用,避免了在多個數據平臺上保存同樣的數據副本,從而節省了存儲資源和跨群復制數據導致的網絡開銷,也避免了多個集群帶來的維護成本。
OIDD平臺匯聚運營商網絡中多種數據源,實時信令也是OIDD大數據平臺需要采集的數據源,基于YARN架構,可以將負責實時信令處理的Storm大數據平臺組件整合到原有Hadoop大數據平臺中,Storm平臺輸出的數據仍然入庫到已經部署的HDFS/HBase數據庫中,從而實現云資源、大數據組件和數據的共享。整合后的大數據平臺,原有平臺中的數據管理、安全控制、服務封裝和業務生命周期管理等模塊可以基本保持不變,但Hadoop平臺的版本需要升級到YARN,并提供對Storm模塊的數據管理、規則配置的接口。融合的大數據平臺和組網架構如圖9所示。

圖9 融合的大數據平臺和組網架構
在新的平臺組網架構中,數據采集分為ETL組件和流采集組件,實時性要求高的數據通過Flume,由Storm流計算模塊實時分析處理,滿足對數據時效性要求高的數據業務需求;數據總線基于消息中間件調度,實現基于消息發布訂閱的任務調度;大數據平臺實現了資源共享,采集的源數據整合加工后仍然保存到共享的統一數據庫,因此仍然能夠滿足原來基于Hadoop的大數據應用。
大數據流式計算和批量計算適用于不同的應用場景。批處理匯聚海量數據分析出的結果可能更精確,但對數據時效性要求嚴格而對歷史數據積累并不非常關注的場景,流式計算具有明顯的優勢。批量計算和流式計算是有優劣互補的,因此在多種應用場合下可以將兩者結合起來使用。
以Hadoop大數據技術為基礎的OIDD平臺已在現網應用,隨著業務開展,業務創新對數據的時效性提出了更高要求。結合流式計算的研究,本文提出了融合流式計算和批量計算的OIDD解決方案,對所需的組件進行了分析和選擇,從實驗室搭建Flume-ng+Kafka+Storm大數據處理環境,10萬條數據可在幾秒內完成采集、匯聚、處理端到端過程,大大提高了數據有效性。
大數據流式計算的研究和應用仍處于起步和嘗試階段,為了促進大數據流式計算平臺的成熟,還需要在實踐中進行試點和完善。
1 Condiet T,Alvaro P,Hellerstein J M,et al.MapReduce Online.UCB/EECS-2009-136,2009
2 Apache Software Foundation.Welcome to Apache Hadoop.http://hadoop.apache.org,2015
3 孫大為,張廣艷,鄭緯民.大數據流式計算:關鍵技術及系統實例.軟件學報,2014,25(4):839~862 Sun D W,Zhang G Y,Zheng W M.Big data stream computing:technologies and instances.Journal of Software,2014,25(4):839~862
4 Neumeyer L,Robbins B,Nair A,et al.S4:distributed stream computing platform.Proceedings of the IEEE International Conference on Data Mining Workshops,Sydney,Australia,2010
5 崔星燦,禹曉輝,劉洋等.分布式流處理技術綜述.計算機研究與發展,2015,52(2):318~332 Cui X C,Yu X H,Liu Y,et al.Distributed stream processing:a survey.Journal of Computer Research and Development,2015,52(2):318~332
6 Apache Software Foundation.Spark Streaming makes it easy to build scalable fault-tolerant streaming applications.http://spark.apache.org/streaming/,2015
7 Apache Software Foundation.Apache Kafka:a high-throughput distributed messaging system.http://kafka.apache.org/,2015
8 Apache Software Foundation.Storm official website.http://storm.apache.org/,2015
9 Apache Software Foundation.Welcome to Apache ZooKeeper.http://zookeeper.apache.org/,2015
10 Vavilapali V K,Murthy A C,Douglas C,et al.Apache Hadoop YARN:yet another resource negotiator.Proceedings of the 4th ACM Symposium on Cloud Computing,Santa Clara,California,USA,2013