譚 亮, 周 靜
(四川省交通運輸發展戰略和規劃科學研究院, 成都 611130)
隨著社會經濟的發展, 人民生活水平日益提高, 城市交通工具的保有量不斷加大, 道路供需矛盾日漸突出, 各種城市交通問題不斷涌現.為改善城市交通現狀,僅僅依靠增加道路基礎設施是遠不夠的, 還需要不斷提高交通管理的信息化和智能化水平.隨著智能交通系統的逐步實施, 各類交通檢測設備提供了大量的數據, 這些數據為解決城市交通問題打開了新思路, 是解決城市交通問題的最基本條件, 是制定宏觀城市交通發展戰略規劃和進行微觀道路交通管理與控制的重要保障[1].
因此, 針對智能交通系統中數據產生快、實時性強、數據量大等特點, 研究大數據技術在交通信息化領域的數據處理方法具有非常重要的意義.
通過文獻研究發現, 交通大數據在車輛軌跡統計、伴隨車輛發現、車流量預測和假套牌車輛甄別等方面的應用日趨增多, 但是在對數據分析之前都需要對數據進行相關的處理.隨著分布式計算技術的發展,大數據技術在交通數據處理和分析領域得到廣泛應用.
文獻[2]提出了基于MapReduce與K近鄰搜索算法對海量歷史數據進行短時交通流預測, 能在短時間內準確預測下一個時間段內的交通流.文獻[3]中曹波等人利用分布式并行數據處理框架Spark對交通攝像頭捕捉到的道路交通數據處理生成的車牌識別數據進行處理分析, 基于并行FPGrowth算法能快速高效的發現伴隨車輛.文獻[4]基于交通攝像頭采集的實時監控數據, 利用云基礎設施的并行計算能力, 能有效處理大規模的流式車牌識別數據, 即時發現疑似伴隨車輛并實時地輸出發現結果.經過對大量文獻的研究發現, 目前大數據技術在交通領域的應用主要有兩個問題:
(1) 大部分針對離線數據作處理.目前利用大數據技術對交通數據進行的處理大部分都是針對的離線數據, 采用MapReduce編程模型, 將數據處理過程抽象成Map和Reduce兩個操作, 利用“分而治之”思想進行分布式處理[5].這種處理方式針對的是靜態歷史數據,面對低延時的實時數據流處理, 其計算效率比較低下,無法滿足對流數據進行實時處理的需求.
(2) 針對單一數據源作處理.目前的文獻中利用大數據技術在城市交通領域主要是對單一的數據源進行處理分析, 比如視頻監控數據或RFID數據, 不能對多數據源的數據進行有效處理, 無法綜合有效利用智能交通系統采集的數據, 從而彌補單一數據的限制和缺陷.
針對以上問題, 本文重點研究如何利用大數據技術對智能交通中通過雙基基站獲取的RFID交通數據和視頻抓拍數據進行相關的實時計算處理.
雙基基站, 是指每個交通數據采集基站包含RFID交通數據采集系統和視頻抓拍系統2套數據采集設備, 即當車輛通過基站時, 能夠同時得到RFID過車記錄和視頻抓拍過車記錄[6].RFID交通數據采集系統主要由貼在車輛擋風玻璃上的RFID電子卡、安裝在路上的RFID讀寫器、通信傳輸網絡和信息中心組成.RFID讀寫器可采集到車輛所攜帶RFID標簽中的信息, 主要包括記錄序號、基站編號、車道編號、開始讀卡時間、結束讀卡時間、標簽編號、標簽類型、車牌號、車牌顏色、車型、車身顏色等.視頻抓拍系統采集到的數據屬性, 主要包括記錄序號、基站編號、車道編號、過車時間、車牌號、車牌顏色、車身顏色、地點車速、違章類型、圖像編號、是否超速等.
通過對以上2種設備采集的數據進行研究分析,這兩種數據都具有數據持續到達、數據實時到達和數據規模宏大等特點, 使得系統在單位時間內需要處理的數據量大幅增加, 傳統的數據處理架構已不能滿足實際需求.為了解決這個問題, 本文基于 Spark Streaming和Apache Kafka相組合設計了一種的分布式交通數據處理平臺.
Kafka最初是由LinkedIn 公司開發的一套高吞吐量的分布式消息訂閱和發布系統[7], 其吞吐量可隨集群的擴展而線性增加, 在海量數據處理領域被廣泛使用.圖1為Kafka的整體架構, 由生產者(Producer)、代理(Broker)和消費者(Consumer)三大部分構成;其中Producer負責收集消息并推送到Broker, 而Broker則負責接收消息, 并將消息本地持久化, Consumer則是消息的真正使用者, 從Broker拉取消息并進行處理[8].

圖1 Kafka 整體結構圖
一個主題Topic可以認為是一個隊列, 每條消息都必須指定Topic.為了使Kafka的吞吐率線性提高,每個Topic可分成一個或多個partition, 每個partition在物理存儲層面對應一個文件夾, 文件夾下包括這個partition的所有消息和索引文件.一個Topic的所有partitions被分布在Kafka集群中的多個server上, 每個server上的Kafka實例負責該server上的partition中消息的讀寫操作.當消息到來時, 被直接追加到該分區中, 屬于順序寫磁盤操作, 因此效率非常高;當消費者消費消息時, Kafka會為每個consumer保留偏移量信息 offset, 該 offset由 consumer控制, 正常情況下consumer在消費完一條消息后順序遞增該offset, 不需要Kafka使用鎖機制標記哪些消息被哪些消費者消費過, 從而為Kafka的高吞吐率提供了有力保障.通過這種分區機制, 保證了消息的保存/消費的效率, 有效提升了Kafka的吞吐率.
為了高可用性Kafka引入了replication機制, 每個partition被備份到多臺服務器上, 基于該方案, 就意味著需要對partition的多個備份進行調度;每個partition都有一個server為主導者, 負責所有的讀寫操作;其它 server 作為跟隨者, 只是進行消息同步.如果主導者server失效, 其他跟隨者server來將會接管成為新的主導者, 這樣始終只有一個server負責讀寫操作, 使得系統更加簡單高效.但是, 作為主導者的server會承載全部的請求壓力, 為了更好的負載均衡,Kafka會盡量將所有的partition均勻分配到整個集群上, 從而確保整體的穩定高效.
Kafka以時間復雜度為O(1)的方式提供消息持久化能力, 具有高吞吐率、高可靠性和易擴展的特點, 可同時支持離線數據處理和實時數據處理.基于此,kafka完全適用于分布式交通大數據處理系統.
流式計算作為大數據計算領域的一種主要模型,當前主流的流計算框架有Twitter公司開發Storm、Yahoo公司開發S4、微軟的Timestream以及UC Berkeley AMPLab 開發的 Spark Streaming 等[9].其中,Spark Streaming是基于DStream模型并構建在Spark計算引擎上的分布式流式計算框架, 可以實現高吞吐量的、具備容錯機制的實時流數據的處理.
DStream (Discretized Stream) 是 Spark Streaming中一個關鍵的程序抽象, 表示一個持續不斷輸入的數據流, 可以基于Kafka等輸入數據流創建[10].在內部, 一個DStream是通過一組時間上連續的彈性分布式數據集RDD序列組成;每個RDD包含了一定時間間隔內的數據流, 是一個不可變的分布式可重算的數據集, 如圖 2 所示.由于 Spark Streaming 是基于Spark處理引擎的, 其計算流程實際上就是將輸入數據分成一段一段的DStream, 每一段DStream都轉換成針對Spark的RDD, 然后通過Spark引擎將RDD經過操作變成中間結果保存在內存中, 最后再根據業務的需求對中間結果進行疊加或存儲到外部設備.

圖2 Spark Streaming 處理過程圖
Spark Streaming 支持從多種數據源獲取數據, 比如 TCP Socket、Flume、Kafka、Twitter、ZeroMQ 等.針對于Kafka消息系統, 在系統開發過程中直接調用Kafka的簡單消費者API CreateDirectStream讀取數據,并創建和Kafka分區一樣的RDD分區個數進行一一對應, 從而提高系統性能.
選擇Spark Streaming一方面是因為數據流入系統就可以進行實時處理并得出結果, 框架具有很好的擴展性、容錯性和吞吐量, 適合數據連續產生和需要實時處理的應用場景;另一方面是因為它基于Spark批處理引擎構建, 能和批處理、即時查詢放在同一個軟件棧中, 實現數據的無縫共享, 從而降低開發和維護成本, 也為系統后期提供離線數據處理和挖掘服務提供基礎.
本文利用Kafka和Spark Streaming框架設計的分布式交通流數據處理平臺的系統架構如圖3所示, 包括數據接收部分、Kafka消息訂閱系統、數據處理程序、以及數據存儲部分.
數據接收部分, 用于接收從前端數據采集器發送的數據, 而且要保證與客戶端數據傳輸的高效性和穩定性.Netty作為一個高性能、異步事件驅動的網絡應用程序框架[11], 利用多線程或IO多路復用技術可以同時并發處理成千上萬個客戶端的接入請求, 在大數據分布式計算領域被廣泛應用, 本文設計的數據處理平臺采用Netty作為通信服務器.在本系統中前端數據采集器作為客戶端, Netty服務器作為服務端收集從客戶端發送過來的數據請求消息.Netty數據接收服務器,被視作消息生產者, 負責將接收到的數據通過創建的生產者接口寫入Kafka消息系統, 不同類型的交通數據發送到Kafka的不同Topic中.

圖3 系統整體架構圖
Kafka消息系統, 將來自RFID采集設備和視頻抓拍設備的數據匯集成數據流.Kafka消息系統將不同類型的數據分散在不同的Topic中;并根據不同的業務處理需求, 將數據轉發到Spark Streaming集群中不同的處理模塊中進行處理.
數據處理程序, 運行在 Spark Streaming 集群上, 是系統的核心模塊;以Kafka消息系統作為數據輸入流進行實時處理.Spark Streaming從 Kafka消息隊列中按照時間窗口不斷提取數據, 然后進行業務處理, 其中包括直接業務處理程序和間接業務處理程序兩種.直接業務處理程序在本文中主要指過車流量統計;間接業務處理程序首先進行數據拼接處理, 然后在按照業務規則進行處理, 本文主要包括軌跡數據統計和異常數據提取.隨著業務的不斷擴展, 可以根據需要開發其他的業務處理程序方便快捷的部署在集群上.
數據存儲部分, 對處理的結果數據進行存儲.當數據經過數據處理程序完成數據拼接、統計和異常處理后, 需要針對不同的處理結果采用不同的存儲方式, 主要有關系型數據庫、內存數據庫和分布式數據庫等.車輛軌跡處理結果存儲在HBase中, 將HBase劃分為多個不同的域, 以車牌號和時間字符串的逆序字符串為鍵進行存儲;對各個采集點的過車流量統計結果存儲在內存數據庫Redis中;將異常數據存儲到關系數據庫MySQL中.
圖4表示交通數據處理平臺的內部結構圖.Spark Streaming集群由多個工作節點組成, 每個工作節點上運行多個 Spark Executor, 在 Spark Executor上運行相關的業務處理程序.同時, 在每個工作節點上安裝分布式數據庫系統HBase、內存數據庫Redis和關系型數據庫 MySQL;HBase用來存儲車輛軌跡數據;MySQL用來存儲異常數據記錄;Redis用存儲采集點的過車流量數據.為了保證系統的可擴展性, 本文選擇的三種存儲方式HBase、Redis和MySQL都可以采用集群方案進行部署.
來自視頻抓拍設備的數據使用話題Topic1寫入Kafka消息系統, RFID數據采集器的數據使用Topic2寫入Kafka消息系統.為了保證系統的吞吐量,需要對Kafka主題進行分區, 通過分區從而提高系統的并發讀寫能力.在系統的內部結構中, 每個Spark Executor運行一個直接業務處理程序或者間接業務處理程序.同時, Topic1的每個消息分區有一個數據消費者(間接業務處理程序);Topic2的每個消息分區有兩個數據消費者:一個是直接業務處理程序, 指的是基于Spark Streaming流式集群的過程流量統計處理程序;另一個是間接業務處理程序, 首先與Topic1的視頻數據按照規則進行拼接程處理, 然后按照業務規則進行實時數據處理, 包括車輛軌跡和異常數據的處理.一個或多個業務處理程序組成一個消息消費者小組, 同時保證每個分區的數據至少會被每個消費者小組中的一個消費者接收.

圖4 系統內部結構圖
圖4描述的系統內部結構包含兩個工作節點組成的Spark Streaming集群, 每個工作節點運行三個Spark Executor, 一個 Spark Executor負責一個業務處理程序;其中, 有兩個Spark Executor負責間接業務處理程序,一個Spark Executor負責直接業務處理程序.圖中每個消息話題分了四個分區, 間接業務處理程序消費一個Topic1的消息分區和一個Topic2的消息分區的數據;直接業務處理程序負責消費Topic2的兩個消息分區的數據.為保證系統性能, Topic的分區個數最好設置為消費者的倍數, 同時最好保證每個消費者負責處理的消息分區個數是相等的.
數據拼接程序, 屬于間接業務處理程序中的首要模塊.數據拼接程序根據Kafka消息系統從Kafka中按照時間窗口不斷提取數據, 按照設定的時間間隔從持續的Kafka分布式消息隊列中獲取RFID過車數據和抓拍數據, 每次累計獲取設定時間段以內的數據進行拼接處理, 然后將處理結果供其他處理程序使用, 具體拼接處理如圖5所示.首先將RFID數據、抓拍數據分別封裝為相應的RDD;然后對兩類數據RDD分別進行轉換, 得到鍵值對形式的RDD, 以方便比對連接操作的進行, 其中鍵為車牌號、時間、基站編號三個字段組成的字符串;最后, 將兩種數據的RDD根據鍵值進行比對并連接.
間接業務處理程序除了數據拼接程序之外, 還運行著多個實時數據處理程序, 包括車輛軌跡處理程序、異常數據處理程序.每個實時處理程序根據數據拼接程序處理的結果再按照業務規則進行處理, 并把數據處理的結果存儲起來.

圖5 數據拼接過程
(1) 車輛軌跡數據統計程序
車輛軌跡處理程序根據拼接處理的格式化RDD,利用Spark Streaming提供的比對函數對車牌號和時間字符串進行逆序處理, 使用車牌顏色和通行時間字段對數據 行過濾, 得到車輛軌跡數據;以車牌號和時間字符串的逆序字符串為鍵, 將所述車輛軌跡數據存儲在HBase中, 將HBase劃分為多個不同的域, 以車牌號和時間字符串的逆序字符串為鍵進行存儲.
(2) 異常數據提取程序
異常數據提取程序根據拼接處理的格式化RDD,利用異常數據的判定規則分別對RFID過車數據和抓拍數據進行過濾, 提取出異常數據, 并對異常數據進行統計.數據異常類型分為數據缺失、車牌顏色字段缺失、抓拍圖片鏈接確實、號牌不一致、顏色不一致.根據數據異常類型, 首先判斷RFID數據是否缺失, 如果存在RFID數據, 則判斷RFID數據中顏色字段是否存在、抓拍數據中抓拍圖片鏈接是否存在, 如果字段完整, 則判斷RFID數據和抓拍數據中號牌、顏色是否一致, 最后, 將提取出的異常數據存儲到MySQL數據庫中, 并標識異常類型.采用關系型數據庫MySQL進行存儲.
(3) 過車流量統計程序
過程流量統計程序將RFID過車數據轉換為以采集點字段和精確至小時的時間字符串為鍵的鍵值對形式;根據 Spark Streaming 分布式大數據處理的原理, 對具有相同鍵的數據記錄進行計數, 然后對每個采集點的統計結果以設定的時間間隔進行求和, 得到各個采集點在相應時間段內的過車流量記錄;使用內存數據庫對各個采集點的過車流量進行存儲.
為了保證平臺的數據處理能力, 同時考慮系統的可擴展性和負載均衡, 本文搭建了如圖所示的測試環境.測試環境使用vSphere搭建6臺虛擬機集群, 每個虛擬機的配置為 8 核 CPU, 內存 16 GB, 磁盤 80 G, 其具體部署情況如表1所示.在實驗過程中, 采用LoadRunner工具模擬RFID采集器和視頻抓拍設備采集數據;Netty作為數據通信接收器, 用于接收從LoadRunner發送過來的數據并寫入給Kafka消息訂閱系統;Zookeeper負責分布式應用程序的協調服務.

表1 測試環境部署結構表
衡量系統的實時處理能力通常是測試系統處理一條數據所需要的時間.系統的處理每條數據的時間越短, 說明系統的實時處理能力越強.在測試過程中, 將LoadRunner工具的并發量分別設置為2000、4000、6000、8000、10 000, 分別表示模擬 2000、4000、6000、8000、10 000 個采集設備采集數據, 采集頻率都為1條/秒.啟動系統進行計算處理, 觀察并記錄一條RFID數據和視頻抓拍設備的數據從傳輸到處理完成所需要的時間.

圖6 實驗結果
圖6中橫坐標表示數據采集設備的并發數量;縱坐標表示計算處理時間.從圖中可以看出當并發量在2000 到 10 000 的時候, 其處理時間在 0.748 ms 和0.763 ms 之間波動, 而且變化的幅度較小, 其處理時間都接近于0.7 ms.根據實驗結果, 一條數據從傳輸到處理完成所需的時間是毫秒級的, 說明本系統具備很好的實時處理能力, 完全可以在大規模高并發的數據流場景中應用.
Spark Streaming 與 Kafka 組合使用, 當數據量較小, 很多參數的默認配置便能夠滿足應用情況, 但是當數據量大的時候, 就需要從多方面入手對系統性能進行調優, 一方面是在開發過程中盡量選擇高性能的算子, 比如使用mapPartitions替代普通map、使用foreachPartitions替代foreach等;另一方面是根據實際情況通過實驗對各項參數進行調整, 比如對Spark Streaming的批處理時間batchDuration、Kafka拉取量maxRatePerPartition和分區數量partition等參數進行實驗調優.
本文提出的基于Spark Streaming和Kafka相組合的交通數據處理平臺充分利用了當前最先進的實時大數據流處理技術, 并針對交通數據流的特點對系統架構和內部結構進行了研究和設計.經過研究分析, 本文設計的實時交通數據平臺具有如下優勢:
(1) 具有高效的數據處理能力.系統采用當前先進的流式處理框架Spark Streaming和高可靠、高吞吐量的Kafka消息系統相組合作為系統核心框架.由于Spark Streaming的數據處理過程是基于內存實現的,具備很強的實時處理能力, 其吞吐率比傳統架構的處理方式提高上百倍.
本文采用 Spark Streaming 流處理技術, 按照設定的時間間隔從持續的Kafka分布式消息隊列中獲取RFID過車數據和抓拍數據, 每次累計獲取設定時間段以內的數據, 再按照規則進行拼接和處理.利用時間窗口機制, 可以有效解決數據等待問題, 提高平臺的數據處理能力.
(2) 具有很強的可擴展性.系統架構中的Kafka消息系統、Spark Streaming集群、數據存儲(HBase、Redis和MySQL)以及Netty數據通信服務器都可以采用分布式集群架構, 具有很強的擴展能力.所以, 在實際應用的時候, 只需要根據目前的處理需求設計系統部署方案即可, 不必過多考慮將來的業務擴展需求,從而減少項目初期部署的設計負擔和建設成本.
(3) 具有很好的業務功能擴展能力.在當前的設計中, 本文只提供了車輛軌跡數據統計、過車流量統計和異常數據提取這三個實時數據處理功能, 將來可以在系統中根據實際業務需求方便地增加新的處理程序.新增加的業務處理程序將會與原有的處理程序并行進行, 不會影響現有的業務處理功能.
(4) 具有雙基基站數據處理能力.本文設計的系統可以同時利用和處理從RFID采集器和視頻抓拍設備采集的數據流, 將兩種數據流分類處理后進行存儲, 有利于后期的查詢和分析.