任 剛,李 鑫,趙開新,劉小杰,張 陽,郜廣蘭,4
(1.河南工學院 計算機科學與技術學院,河南 新鄉 453003;2.中國礦業大學 計算機科學與技術學院,江蘇 徐州 221116;3.河南省生產制造物聯大數據工程技術研究中心,河南 新鄉 453003;4.新鄉市虛擬現實與系統重點實驗室,河南 新鄉 453003;5.河南工學院 數據挖掘與智能計算研究所,河南 新鄉 453003)
路段行車時間(Road Link Travel Time, RLTT)是智能交通應用系統的重要基礎數據之一,路段行車時間計算(Calculation of RLTT, CoRLTT)是指從各類交通數據中計算出路段行車時間信息的計算過程,是智能交通應用關鍵支撐技術之一,也是智能交通領域的熱點研究內容之一[1]。近年來,高清攝像頭(High Definition Camera, HDC)開始在城市交通中普遍應用。該設備被廣泛部署于各交通路口,通過視頻識別技術記錄通過該路口車輛的車牌、車輛通過時間等信息,產生大規模交通卡口過車數據集。該類數據集為路段行車時間計算提供了良好的數據源。交通卡口過車數據的路段行車時間計算方法成為智能交通領域研究熱點之一。
Liu等[2]于2011年較早提出了基于交通卡口過車數據的路段行車時間基本計算方法。但該算法是串行算法,當數據規模較大時,其性能往往不佳。隨后出現的大數據并行計算模型MapReduce[3-5]為該問題解決提供了可能。
MapReduce是一種基于Hadoop磁盤文件系統(Hadoop Distributed File System, HDFS)的大數據并行計算模型,該模型以作業為基本計算單位,采用2種操作計算模式。該模型1個作業由map和reduce 2個操作構成,每次作業完成,計算結果會存入HDFS。張帥等[6]利用MapReduce實現了交通卡口過車數據集上的路段行車時間并行計算,該方法利用2次作業迭代完成了計算過程,實驗結果表明,基于MapReduce的路段行車時間方法大幅提高了計算效率。但是,由于該算法采用MapReduce模型2次作業迭代完成計算過程,在2次作業迭代之間,中間結果數據需要置于磁盤文件系統HDFS存儲,消耗了一定時間,當數據規模較大時,性能具有改善空間。
Spark[7-8]是一種新興的大數據并行編程模型,與MapReduce模型相比,Spark改善了作業的執行方式,1個作業可以包括多個數據轉換操作,避免了MapReduce模型1個作業僅有map和reduce 2個數據轉換操作的限制,更擅長處理多迭代型問題[9]。因此,利用Spark模型來實現一個能夠處理交通卡口過車數據的路段行車時間計算方法成為一個迫切需求。
為此,本文研究路段行車時間計算串行算法與Spark模型數據轉換的內在關系,提出基于Spark模型的路段行車時間計算方法(Spark-based CoRLTT, Spark-CoRLTT )。最后,通過實驗驗證算法的有效性。
Spark是一種基于彈性分布式數據集(Resilient Distributed Dataset, RDD)的大數據并行計算模型。RDD是一種存儲于內存的只讀數據集合,通過map、groupByKey和reduceByKey等轉換(Transformation)操作完成RDD轉換,其計算架構如圖1所示。該模型從HDFS讀入數據,形成RDD,通過一系列轉換操作,結果重新寫回HDFS。在利用RDD進行迭代操作時,通常采用鍵值對RDD這種數據集形式,該數據集元素以(key,value)鍵值對形式出現。

圖1 Spark計算架構
map輸入為(key1,value1),該操作根據具體業務邏輯將其轉換為新的鍵值對(key2,value2),map操作鍵值對轉換形式定義如下:
map::(key1,value1)→(key2,value2)
(1)
map生成(key2,value2)后,發送至groupByKey或reduceByKey進一步處理。由于groupByKey和reduceByKey是寬依賴轉換,Spark系統會啟動shuffle操作,將具有相同key2的value2鏈接起來,形成list(value2),shuffle操作形式定義如下:
shuffle::(key2,value2)→(key2,list(value2))
(2)
reduceByKey操作負責將合并具有相同鍵的值,并對該值進行聚合操作,其形式如下:
reduceByKey::(key2,list(value2))→(key3,value3)
(3)
groupByKey操作和reduceByKey類似,接收來自map將輸入鍵值對按鍵值分組,相同的鍵值會被分到同一組,不同的是group不進行聚合操作。
過車數據一般包含4列數據,表1列出一組樣本數據,一條記錄可形式化表示為:

表1 交通卡口過車樣本
r=(id,vehicle,hdc,timei)
(4)
其中,id為記錄關鍵字;vehicle為車牌;time為過車時間;hdc為高清攝像頭。1個具有n條記錄的過車數據集可表示為:
R={r0,r1, …,rn-1}
(5)
其中,ri=(idi,vehiclei,hdci,timei)
定義1:車輛出行軌跡
一條車輛出行軌跡由多個hdc和通過時間序列構成,形式化表示如下:
{(hdc0,time0), …, (hdcn-1,timen-1)}
(6)
Spark計算架構如圖1所示。
定義2:路段
一條路段是位于兩個hdc之間的路徑,可形式化定義為:
link=
(7)
定義3:路段行車時間
路段行車時間是在一段時間內通過該路段所有車輛行車時間平均值。假設在計算周期內有n輛通過路段linki,則路段行車時間可形式化定義為:
(8)
其中,timevehiclej為車輛vehiclej在路段linki上的行車時間。基于上述定義,路段行車時間串行計算方法可分為如下4個階段。
階段1:按車牌分組
具有相同車牌的車輛被分為相同組,1個數據集R就被分為多個子數據集,形式表示如下:
R=∪Rvehiclei
(9)
其中,
Rvehiclei= {(rj∈rj∈R,vehiclej=vehiclei)}
(10)
階段2:按過車時間排序
集合Rvehiclei按過車時間排序,形成vehiclei的1次出行軌跡,形式如下:
Rvehiclei={(hdc0,time0),…, (hdcn-1,timen-1)}
(11)
階段3: 按路段切割車輛軌跡
將車輛軌跡Rvehiclei按照相鄰卡口進行切割,形成路段行車時間集合TT,形式如下:
TT={tt0,tt1,…,ttn-1}
(12)
其中,
tti=(
(13)
階段4:按路段分組并計算RLTT
將路段行車時間集合TT按照路段分組,可形成子集TTlinki,每個子集包含該路段的所有行車時間,形式如下:
TT=∪TTlinki
(14)
其中,
TTlinki={(ttj|ttj∈TT,linkj=linki)}
(15)
計算TTlinki均值,該值即是linki的RLTT,形式表示如下:
(16)
其中,j為TTlinki中元素的個數。
根據前述串行計算方法,并結合Spark計算模型數據轉換操作特點,研究組提出路段行車時間串行計算過程到Spark數據轉換映射關系。如圖2所示,階段1按車牌分組由一個map實現,稱為First map;階段2按過車時間排序由groupByKey實現;階段3按路段分割車輛軌跡由另一個map實現,稱為Second map;階段4按路段分組并計算路段行車時間由reduceByKey實現。

圖2 Spark-RLTT算法映射關系
根據上述映射關系,研究組提出基于Spark-CoRLTT并行架構,如圖3所示。

圖3 Spark-RLTT并行架構
First map讀取原始數據集R,每行標記為(key1,value1),然后將其轉化(key2,value2)健值對。(key1,value1)鍵值對形式化表示為:
key=i
(17)
value1= (idi,vehiclei,hdci,timei)
(18)
其中,key1為行號;value1為數據集R的1行。生成(key1,value1)后,取出value1中的vehiclei賦予key2,取出hdci和timei賦予value2,形式化表示如下:
key2=vehiclei
(19)
value2=
(20)
主要實現代碼如下:

算法1: First map of Spark-CoRLTTInput:(key1,value1)Output:(key2,value2)01key2=value1.getVehicle();02hdc=value1.getHDC();03time=value1.getTime();04value2=
生成(key2,value2)后,Spark模型自動啟動shuffle過程,相同key2的value2被連接為list(value2),這樣,groupByKey操作實際輸入為(key2, list(value2))。groupByKey將key2直接賦予key3,根據time排序,形成value3。具體表示如下:
key3=key2
(21)
value3=(
(22)
主要實現代碼實現如下所示:

算法2: groupByKey of Spark-CoRLTTInput: (key2, list(value2))Output: (key3, value3)01key3 = key2;02value3 = sortByTime(list(value2));03Emit(key3, value3);
該操作按路段切割車輛的出行軌跡,形成路段和行車時間的鍵值對。輸入是(key3,value3),首先取出value3中相鄰元素的hdc,形成key4,取出相鄰元素的time求差值,將該值賦予value4,形式表示如下:
key4=
(23)
value4=timej-timej+1
(24)
主要實現代碼如下:

算法3: Second map of Spark-CoRLTTInput: (key3, value3)Output: (key4, value4)01for (i=0; i++; i < value3.getSize - 2)02 firstElement=value3.getElement(i);03 secondElement=value3.getElement(i +1);04 hdc1=firstElement.getHDC();05 time1=firstElement.getTime();06 hdc2=secondElement.getHDC();07 time2=secondElement.getTime();08 key4 =
生成(key4,value4)后,Spark模型啟動shuffle過程,相同的key4的value4被連接為新的鍵值對(key4, list(value4))。
該操作用于將相同路段的行車時間規約為一組數據集,并求平均數,計算出路段行車時間。reduceByKey輸入為(key4, list(value4)),鍵值保持不變,遍歷value4中所有元素,求均值賦給value5,形式表示如下:
key5=
(25)
(26)
主要實現代碼如下:
實驗用Spark集群包含8個計算節點,每個計算節點由12核2.1 GHz處理器、16 GB內存和1 T硬盤構成。經典串行算法記為Serial-CoRLTT,基于MapReduce的RLTT計算方法記為MR-CoRLTT,提出的基于Spark模型的RLTT計算方法記為Spark-CoRLTT。實驗數據來自我國西南某省會城市157個交通卡口,該城市高峰時期每小時可產生600萬條過車記錄,規模約9 GB,每天可產生2 000萬條過車記錄,規模約30GB。
分析該實驗數據規模對計算時間的影響。Serial-CoRLTT計算節點設置為1,MR-CoRLTT和Spark-CoRLTT計算節點設置為4。過車記錄從600萬條逐步增加到2 000萬條。實驗結果如圖4所示,可以看出,隨著數據集規模增加,3種算法計算時間都隨之增長,Serial-CoRLTT計算時間總是最多,說明不管是基于MapReduce模型的并行計算方法還是基于Spark模型的并行計算方法,計算性能都明顯優于傳統串行計算方法。
其次,提出的Spark-CoRLTT算法計算時間總是小于MR-CoRLTT算法計算時間,這說明提出的Spark-CoRLTT算法性能優于現有MR-CoRLTT算法。其原因在于提出的Spark-CoRLTT算法由1次作業就能完成整個計算過程,而傳統的MR-CoRLTT算法由2次作業完成整個計算過程,2次作業之間大量的數據存儲時間消耗了計算時間,這說明了本文算法的有效性。還能看出,隨著數據規模的增大,提出的Spark-CoRLTT效率優勢更加明顯,其原因在于隨著數據規模的增加,傳統MR-CoRLTT算法所消耗的磁盤存儲越多,造成計算時間越長,效率也越低。
該實驗分析計算時間和計算節點數的關系。計算節點數從1增加到8。數據規模設置為2千萬條,保持不變。實驗結果如圖5所示,可以看出:首先,隨著計算節點的增加,提出的Spark-CoRLTT和MR-CoRLTT計算時間都隨之減少,這說明2種算法都具有較好的計算節點擴展性。其次,提出的Spark-CoRLTT算法計算時間總是小于MR-CoRLTT計算時間,這說明本文提出算法計算性能優于原算法,其原因在于提出的算法1次作業即可完成整個計算過程,較之原算法少了1次數據存儲時間。最后,還應該注意到,隨著節點數的增加,下降幅度逐步減少,這是因為每個并行算法都有計算下限。

圖5 計算節點擴展性
本文提出了一種基于Spark模型的路段行車時間計算方法Spark-CoRLTT,實驗表明,較之傳統基于MapReduce模型的計算方法,文中提出的Spark-CoRLTT計算效率明顯提升。