張春風, 申 飛, 張 俊, 陳 杰, 劉 靜
1(中國科學院 強磁場科學中心,合肥 230031)
2(中國科學技術大學,合肥 230026)
車聯網云數據中心與綜合服務平臺匯聚了車輛位置、狀態、速度、加速度、路網等非結構化的車聯網數據,其規模己經達到TP甚至BP級別. 傳統的數據分析技術已經無法滿足該級別數據處理的需求,因此,引進分布式計算技術和數據存儲技術,構建流式計算處理框架,對車輛進行實時監控和調度管理迫在眉睫.目前,不少流式大數據處理的方案被提出. 其中,Spark Streaming是Spark核心API的一個擴展,不同于Storm一次一個地處理數據流,Spark Streaming在處理前按時間間隔預先將數據流切分為一段一段的批處理作業.因此,Spark Streaming不是真正意義上的流式計算,而是批處理,相比于Storm,Spark Streaming存在延遲高,吞吐量較小等缺點. 另外,Samza是由LinkedIn開源的一個分布式流處理系統,它依賴于Hadoop的資源調度和Apache Kafka[1,2]. Samza的流單位既不是元組,也不是Dstream,而是一條條消息,在數據傳遞過程中,消息可能會多次發送,造成數據冗余. 針對車聯網數據處理分析的問題,以及其低延遲,增量計算的需求,本文設計了一種基于Storm技術的流式計算系統,系統具有低延遲,高吞吐,分層且可擴展的特性. 利用Kafka消息隊列將各層之間解耦,Storm進行數據實時分析,Hbase和Redis對分析結果存儲,從而實現對車輛狀態進行實時監控.
車聯網實時分析系統主要由Boost.Asio、Kafka、Storm、Redis、Hbase組成. 其中,Boost.Asio負責與車載終端建立連接,采集數據. Kafka負責連接采集層和Storm. Redis和Hbase負責分析結果的存儲. 系統的整個核心實時分析模塊由Storm擔當,對采集來的數據分析過濾,實時處理. 下文將介紹Storm流式計算框架.
Storm是一個分布式的、可靠的、容錯的數據流處理系統[3]. 同Hadoop一樣,Storm可以處理大批量的數據,并且Storm在保證高可靠性的前提下可以讓處理進行的更加實時; Storm同樣還具備容錯和分布計算的特性,即可以擴展到不同的機器上進行大批量的數據處理. 除此之外,Storm同時還有以下的這些特性:
(1) 簡單的編程模型. 類似于MapReduce降低了并行批處理復雜性,降低了進行實時處理的復雜性.
(2) 容錯性. Storm會管理工作進程和節點的故障.
(3) 水平擴展. 計算是在多個線程、進程和服務器之間并行進行的. Storm使用Zookeeper進行集群協調,這樣可以充分的保證大型集群的良好運行[3-5].
(4) 可靠的消息處理. Storm保證每個消息至少能得到一次完整處理. 任務失敗時,它會負責從消息源重試消息.
(5) 快速. 系統的設計保證了消息能得到快速的處理,使用ZeroMQ作為其底層消息隊列[6-10].
為了更好的體現Storm在流式計算方面獨特的優越性,對比Spark計算框架. 如表1所示二者的主要區別表現在,Storm是純實時的,來一條數據,處理一條數據,后者是準實時,對一個時間段內的數據收集起來,作為一個RDD,再處理. 而且,Storm的事務機制、健壯性/容錯性、動態調整并行度等方面的表現,都要比Spark Streaming更加優秀.

表1 Spark和Storm流數據計算框架比較
系統架構如圖1所示,主要包含數據采集、數據轉發、實時分析、數據存儲、可視化展示.

圖1 系統架構圖
系統采用層次化結構的設計原理,每個部分的主要功能如下:
(1) 數據采集: 負責與智能終端(OBD)建立TCP連接,驗證校驗,獲取報文數據.
(2) 數據轉發: 對數據類型進行劃分,放在Kafka消息隊列中,實現數據的分類管理和高并發接入.
(3) 實時分析: 創建 KafkaSpout,從 Kafka 中獲取數據,并以數據流的形式發送給bolt,bolt負責轉化這些數據流,在bolt中完成過濾,分析計算.
(4) 數據存儲: 將實時分析結果存儲至Redis和Hbase,利用分布式文件系統的優勢可以實現高并發的讀寫速度.
(5) 可視化展示: 使用Dubbo分布式服務提供實時定位,軌跡查詢和速度報警等服務,同時利用百度地圖動態顯示.
數據采集主要負責接收車載終端發送過來的實時車輛信息數據,車載終端通過無線網絡與數據采集層建立通信連接. 數據采集層會維護一個連接請求隊列,面對高并發連接的需求,模塊在開發過程中使用Boost.Asio基礎網絡庫作為通信基礎,使用Boost.Asio庫的異步接口函數來實現全異步的事件處理,包括TCP鏈接監聽、TCP數據發送、TCP數據接收. 數據采集層提供車載終端統一的信號接收服務,避免了數據的重復,缺失,從而保證數據采集的質量和可靠性,.
數據轉發作為平臺各層之間的通信層,將系統各層之間進行有效地解耦,提高平臺的健壯性. 目前用于消息傳遞的方案主要包括RabbitMQ和Kafka. 其中RabbitMQ是流行的開源消息隊列系統,開發語言為erlang. Kafka則是一個分布式的高吞吐量的消息系統[11-13]. 與kafka相比,RabbitMQ協議復雜,參數較多,因此其僅適用于數據量較小的場景. 而Kafka具有透明、易擴展和吞吐量較高的優點,更適合處理海量的車聯網數據. 基于此,本系統采用Kafka消息隊列實現數據緩存與轉發,利用其能夠提供消息持久化能力和具有容錯性保障的優勢,達到系統數據緩沖的目的.
作為基于log File的消息系統,Kafka更加可靠,減少了數據丟失的現象. 另外,Kafka可以記錄數據的消費位置,同時可以自定義消息消費的起始位置,從而保證了重復消費消息的實現. 而且,Kafka同時具有隊列和發布訂閱兩種消息消費模式,與Storm(實時分析層)的契合度很高,充分利用Linux系統的I/O提高讀寫速度等. 轉發層的架構如圖2所示.

圖2 緩存轉發架構
采集層作為Producer(生產者),將采集到的車載終端數據以終端標識為區分標準,建立多個topic,用來管理不同種類的消息,不同類別的消息會記錄在到其對應的topic池中,而這些進入到topic中的消息會被Kafka寫入磁盤的log文件中進行持久化處理. 實時分析層作為Consumer(消費者),Storm集群從Kakfa中獲取實時流進行處理分析. 數據處理分析的速度可以慢于數據采集的速度,Storm集群有空余時再拉取那些沒拉取到的數據,從而保證數據不丟失.
數據實時分析層是系統的核心層. 車載終端所采集的數據是沒有被解析的原始數據,使用單字節、雙字節或四字節來進行物理量的表示. 所采集到的數據格式為:

因此,實時分析層需將采集到的車輛實時信息進行過濾、解析、坐標轉換. 解析海量數據存在延遲阻塞、高并發等問題. 為了解決這些問題,本文拋棄了Java線程池、無限隊列等傳統的方法,突破集中式單節點運算的限制,采用分布式,高容錯的實時計算系統Storm. 實時分析拓撲如圖3所示.
首先,建立實時分析拓撲圖(Topology)并提交給Storm集群,由集群中主節點Master的守護進程“Nimbus”分發代碼,將任務分配給工作節點(Worker)執行,同時監控任務和工作節點的運行情況等; Worker節點上運行的守護進程“Supervisor”,負責接收Nimbus分發的任務并運行,每一個Worker上都會運行著Topology程序的一部分. 因此,Topology程序的運行就是由集群上多個Worker一起協同工作的.Topology的部分代碼如下所示:


圖3 實時分析拓撲
拓撲中包含Spout和Bolt兩種角色,系統中KafkaSpout從Kafka消息隊列中獲取數據,通過nextTuple()方法以數據流Tuple元組的形式發送給下游的MsgPreDealBolt,Spout的ack和fail方法分別在元組被成功處理和處理失敗時調用,保證數據處理的完整性. MsgPreDealBolt完成過濾工作,根據指令校驗碼進行篩選出符合要求的軌跡數據,按照FieldsGrouping的分組策略,通過execute()方法發送到解析模塊. 解析模塊GpsDealbolt主要完成分析處理邏輯,包括2進制的轉化,終端識別,分析終端發送數據的類型,并做出相應的處理,最后按照FieldsGrouping的分組策略發送至下一處理模塊. 轉換模塊主要是北斗坐標轉換為百度坐標的處理(方便使用百度地圖功能),從而以次完成數據的解析處理,轉換等工作. Storm通過實現不同的Bolt來完成計算結果的多樣化存儲. 本系統中對分析結果處理有HbaseBolt和RedisBolt. HbaseBolt將結果保存到HDFS分布式文件系統中,RedisBolt將計算結果保存到緩存中,便于查詢檢索.
基于傳統關系型數據庫存儲的車輛信息表日漸增大,接近單表存儲的上限,且數據的查詢和寫入性能會呈現指數級別地下降. 為了實現高性能的并發讀寫操作,數據存儲層采用硬盤存儲和內存存儲兩種模式. 硬盤存儲使用分布式的、面向列的開源數據庫Hbase,存儲離線數據以及將處理后的流數據進行落地. 目前主流的內存數據庫有Memcached和Redis. Memcached是一個高性能的,具有分布式內存對象的緩存系統;Redis是一個基于內存的高性能Key/Value數據庫.
二者主要的區別為: Redis會周期性地把更新的數據寫入磁盤或者把修改操作寫入追加的記錄文件,并且在此基礎上實現了master-slave(主從)同步. 與Memcached相比,Redis的優勢在于其具有高效的讀寫效率以及豐富的數據類型所帶來的快速開發. 另外,Redis作為緩存具有更高的安全性. 因此,本文選用Redis數據庫作為系統的緩存,用于保存整個系統的分析結果,實現緩存數據的持久化. 在節點宕機或者斷電的情況下,系統仍能夠從硬盤中獲取備份的數據,從而保證了系統的健壯性. 以下分別介紹兩種模式的具體實現:
(1) 將Storm中分析的實時數據存儲到Hbase中,為后期的查詢和離線分析做數據支持. 采用HBase大數據存儲框架,在保證足夠的存儲空間的前提下,利用HBase的分布式特點來提高數據的存取速度,解決數據的單點存儲隱患,保障數據的高可用性. 系統中實時車輛信息表如表2所示.

表2 實時車輛信息表
Hbase以表Table形式存儲數據,每行包括一個RowKey和多個Column Family,且每行通過RowKey唯一標記,行按照RowKey的字典序排列. 實時車輛信息表根據Hbase表的設計要求,RowKey為車牌號的反轉+“_”+時間戳,要盡量縮小 RowKey的長度,提高檢索效率,columnfamily要盡量的少,原因是過多的columnfamily之間會互相影響,所以設計了一個列簇CarInfo,在CarInfo下分為很多列,如車牌號,定位時間,經緯度等車輛軌跡信息.
(2) 將Storm中分析的最新實時數據緩存到內存數據庫Redis中,利用Redis高性能操作和運算上的優勢,為數據展示層提供既方便又快捷的數據檢索. 由于Redis數據庫容量受到物理內存的限制,不能用作海量數據的高性能讀寫,只能將最新的數據緩存到內存中,數據展示層首先查詢Redis,如果數據存在,直接從Redis獲取數據,否則從Hbase中獲取,如此以來提高數據的查詢檢索速度,優化系統性能.
可視化展示為數據存儲層中所有車輛實時信息提供統一的查詢入口,將車輛軌跡分析結果以可視化的形式展現. 本系統使用Dubbo分布式服務框架,將核心業務抽取出來,作為獨立的服務,使前端應用能更快速和穩定的響應,解決了服務器單點故障,方便后期的拓展和維護. 可視化展示主要包括以下幾個方面:
(1) 前端借助百度地圖,將查詢車輛的軌跡信息,包括已經行駛的時間,行駛過程中的停留點,速度等在整個地圖上動態的顯示.
(2) 根據車輛行使的路段,利用百度地圖查詢該地段的限速大小,并與車輛當前速度進行比較,檢測是否超速,如果超速,給予駕駛員警告.
(3) 電子圍欄,將車輛的位置信息與規定行使區域實時進行比較,檢測是否超出預定的行駛路線.
(4) 根據車牌號實時定位當前車輛的位置信息,行駛速度,急加速等信息,有效地監控當前車輛狀態.
實驗主要驗證系統的功能和Storm實時分析的效率. 通過部署局域網的10臺PC機,搭建集群進行測試. 實驗環境配置如下: Storm版本: 0.10.2,系統版本:centos6.7,JDK1.8.0_45-b14,Kafka2.11-0.10.0.1,Zookerper3.4.9,Hbase1.0.3,,Redis-3.2.3,Dubbo2.8.4,單機計算機節點配置: 內存大小: 8 G,CPU型號: intel Core i5,磁盤500 G. 本次實驗系統部署架構,集群各個節點的配置和功能描述如表3所示. 數據源是網約車平臺共享的數據,將數據源以日志的形式存儲在本地硬盤中,通過讀取文件來模擬車載終端發送的大量數據流.

表3 集群節點配置表
首先進行功能的測試,測試系統能否從終端獲取數據,并利用Storm實時解析并可視化的展視. 在web端根據車牌號對車輛進行定位查詢,后臺從緩存或數據庫中獲取當前車輛最新的數據,利用百度地圖實時定位,然后進行可視化展現. 圖4為實時定位效果的實例展示.個人軌跡的查詢,根據車牌號可以查詢某一時間段的車輛行駛軌跡. 后臺根據時間段和車牌號,從緩存或數據庫拿到車輛軌跡信息,并在地圖上繪制出來,軌跡查詢的效果圖如5所示.

圖4 實時定位圖

圖5 軌跡查詢圖
從效果圖可以看出實時分析層已經實現了將采集層采集的數據,過濾,解析,經緯度轉換,存儲到數據庫中,并通過展示層可視化的展現出來. 從而說明基于Storm的車聯網數據實時分析系統的功能基本實現,對數據的采集,轉發,解析,落地存儲功能均無問題.
測試系統實時處理的性能,主要指標是數據處理的吞吐量,數據處理延遲. 吞吐量反映系統單位時間內處理數據的規模. 對比分析Storm集群和Java線程池的吞吐能力. 不斷增加任務執行的數據量,記錄處理完成所需的時間,為了提高實驗結果的準確性,測試的數據保持一致,每項結果是經過5次測試取平均值,對比結果如圖6.

圖6 Java線程池與Storm運行時間對比
當數據規模較小時,利用Storm集群計算需要更長的時間,這是因為在集群中任務的分發,數據的傳輸都要經過網絡,需要消耗部分系統資源和時間. 隨著數據規模的增大,集群的處理能力明顯提升,這是因為Storm中計算任務被劃分為不同的組件,在多個Worker節點上的Executor執行. 因此,隨著數據規模進一步擴大,單機版的Java多線程處理的耗時將更加難以接受,甚至出現卡頓死機的情況,而Storm集群支持水平擴展,添加了Worker節點,能夠滿足更大規模的數據處理要求. 因此,Storm在流式計算方面的性能遠遠超過傳統的Java多線程平臺.
Storm集群和傳統Java多線程平臺在延遲性方面沒有可比性. 數據處理延遲與數據處理模塊的并行任務數有關. 一般來說,并行任務數越多,Tuple等待被處理的時間就越短,處理延遲越小.
通過實驗對比分析,將KafkaSpout的組件數目設置為2,分析處理模塊的Bolt分別設置為2,8. 測試結果如圖7所示,隨著處理的數據量越大,當處理模塊Bolt數目為2時,處理延遲越來越大,這是因為Spout不斷產生新的數據,分析處理模塊不能及時處理,導致數據積累,處理延遲呈上升趨勢. 當處理模塊的Bolt數目為8時,處理延遲都在毫秒級. 因此合理的設置各組件的任務數是優化Storm性能的有效途徑,提高Storm并行處理能力.

圖7 并行數Bolt為2,8處理時間對比
本系統還具備快速部署,易拓展的優點. 隨著業務的發展,數據量和計算量越來越大,僅需要增加Worker節點便可提高任務的計算能力. 具體地,新增節點首先解壓Zookeeper和Storm安裝包,修改配置文件,然后運行Zookeeper和Storm集群. 無需修改程序,在集群啟動后,重新提交topology即可完成部署. 隨著部署節點的數量不斷增加,系統易拓展的優勢將更加明顯.
為了對海量車聯網數據進行實時分析及可視化展示,本文設計了基于Storm的車聯網數據實時分析系統,系統融合了Kafka消息隊列、Storm流式計算框架、Hbase分布式數據庫、Redis內存持久化數據庫、Dubbo分布式框架等技術. 通過測試驗證,與傳統的多線程處理平臺相比,系統有高吞吐和低延遲的特性,
實現車輛狀態實時監控,從而提高車輛監管效率.系統的分布式負載均衡,調度優化等問題將是我們下一步重點關注的問題.
1周國亮,朱永利,王桂蘭,等. 實時大數據處理技術在狀態監測領域中的應用. 電工技術學報,2014,29(S1): 432-437.
2戴菲. 基于Storm的實時計算系統的研究與實現[碩士學位論文]. 西安: 西安電子科技大學,2014.
3李勁松. 一種基于Storm的分布式實時增量計算框架的研究與實現[碩士學位論文]. 成都: 電子科技大學,2015.
4孫朝華. 基于Storm的數據分析系統設計與實現[碩士學位論文]. 北京: 北京郵電大學,2014.
5王銘坤,袁少光,朱永利,等. 基于Storm的海量數據實時聚類. 計算機應用,2014,34(11): 3078-3081.
6李慶華,陳球霞,蔣盛益. 基于數據流的實時處理框架模型. 計算機工程,2005,31(16): 59-60,63. [doi: 10.3969/j.issn.1000-3428.2005.16.023]
7屈國慶. 基于Storm的實時日志分析系統的設計與實現[碩士學位論文]. 南京: 南京大學,2016.
8楊素素. 基于Storm的城市消防聯網遠程監控系統的實時數據處理應用. 計算機測量與控制,2017,25(3): 55-59.
9楊婷婷. 基于出租車GPS軌跡數據的實時交通狀態獲取和現有實時路況系統評估[碩士學位論文]. 上海: 華東師范大學,2016.
10McCreadie R,Macdonald C,Ounis I,et al. Scalable distributed event detection for twitter. 2013 IEEE International Conference on Big Data. Silicon Valley,CA,USA.2013. 543-549.
11Namiot D. On big data stream processing. International Journal of Open Information Technologies,2015,3(8):48-51.
12Maarala AI,Rautiainen M,Salmi M,et al. Low latency analytics for streaming traffic data with Apache Spark. 2015 IEEE International Conference on Big Data (Big Data). Santa Clara,CA,USA. 2015. 2855-2858.
13Nair LR,Shetty SD,Shetty SD. Applying spark based machine learning model on streaming big data for health status prediction. Computers & Electrical Engineering,2018,65(1): 393-399. [doi: 10.1016/j.compeleceng.2017.03.009]