馬晟 劉雅倫 陳曉男 沈漪


摘? 要:城市的發展使得運營車輛日益增長,車輛調度愈發困難,傳統系統無法滿足現有眾多車輛的監控調度與運營。該程序基于大數據流處理系統,實現了大批量的車輛信息監測和實時處理以及車輛的精細監控與軌跡回放。可用于網約車、公交車以及貨運集團的調度中心進行實時監控和訂單把控,以提高車輛調度的靈活性,達到最優調度、減少成本的效果。
關鍵詞:SparkStreaming? 大數據? 軌跡回放? 交通
中圖分類號:TP31? ? ? ?文獻標識碼:A
Abstract: With the development of the city, the number of operating vehicles is increasing, and the vehicle scheduling is becoming more and more difficult. The traditional system can not meet the monitoring, scheduling and operation of many existing vehicles. Based on the large data stream processing system, the program realizes a large number of vehicle information monitoring and real-time processing, as well as vehicle fine monitoring and track playback. It can be used for real-time monitoring and order control in the dispatching center of online car hailing, buses and freight groups, so as to improve the flexibility of vehicle scheduling, achieve optimal scheduling and reduce costs.
Key Words:SparkStreaming;Bigdata;Track playback;Traffic
隨著城市的發展,運營車輛日益增長,車輛調度愈發困難,傳統系統無法滿足現有眾多車輛的監控調度與運營。基于大數據系統的車輛實時監控與調度需求隨著大數據技術的日趨發展有了實現的可能。
1? 數據處理系統的設計
該系統實現對海量車輛軌跡數據的采集、存儲、實時處理、軌跡回放功能。軌跡數據在蓋亞數據平臺申請達到,編程模擬產生實時數據流,經大數據平臺采集處理存入數據庫,然后在前臺顯示實時的車輛軌跡[1]。
1.1系統整體架構
基于系功能需求,該系統的總體設計為:先由車輛端上傳坐標數據(編程模擬產生),flume多源采集,然后寫入kafka的topic,接著通過SparkStreming實時消費kafka,再根據訂單存入redis,最后實現訂單數據列表生成以及訂單車輛軌跡回放[2]。它異于傳統數據系統的地方是:采用大數據流處理框架,具有高吞吐率、高負載、高可用性、實時性高的優點[3]。整個系統的邏輯實現如圖1所示。
1.2數據回放模塊設計
為了模擬真實業務場景,該程序基于蓋亞平臺坐標數據通過數據回放模塊模擬數據流產生[4]。使用python讀出坐標數據,用多線程并行輸出,從而模擬實際場景中車輛移動匯報的坐標打點數據,達到采集流數據的需求[5]。
核心代碼邏輯如下所示。
#坐標數據文件寫入
def consumer(queue, writer, csv_file):
while True:
line = queue.get()
deal_line(line, writer, csv_file)
queue.task_done()
#流數據文件生成
def producer(queue):
with open(‘test.txt’, ‘r’) as f:
for line in f:
queue.put(line)
queue = JoinableQueue(8)
pc = Process(target=producer, args=(queue,))
for _ in range(cpu_count()):
c1 = Process(target=consumer, args=(queue, writer, csv_file))
#等待生產者進程全部生成完畢
pc.join()
#等待所有數據全部處理完畢
queue.join()
1.3數據采集消費模塊設計
該模塊實現了通過flume采集車輛軌跡流數據,進而推送到消息隊列kafka中。
首先進行flume數據采集,在采集過程中通過集群形式達到大數據量及多源數據采集情況下的負載均衡及并行采集。設置flume靜態攔截器實現在采集到的數據的頭數據中插入自定義的key-value鍵值對以區分不同數據源,主要配置如下:
a1.sources.r1.interceptors.i1.type = static? ? ? ? ? #設置靜態攔截器
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = test_gps_topic#不同的數據源取不同的名稱
接著通過kafka集群接收flume采集的大量數據,以實現數據高吞吐率、高可用數據傳遞以及數據的實時處理,同時通過不同的topic保證不同數據流的分區。
flume監聽的文件數據發送到此kafka的主題當中,主要配置如下:
a1.sinks.k1.topic = test_gps_topic? ? ? ? ? ?#與前面的靜態攔截器value值配置相一致
1.4數據實時處理模塊設計
該模塊通過sparkStreaming程序實現消費kafka中的數據存到HBase中,其中的GPS位置經緯度信息保存到redis中,存為后續實時監控以及軌跡回放的數據源[6]。核心邏輯的Scala代碼如下:
//從kafka里消費數據,把經緯度信息存到redis
val result: InputDStream[ConsumerRecord[String, String]] = Tools.getStreamingContextFromHBase
(streamingContext,kafkaParams,topics,group,"(.*)gps_topic")
result.foreachRDD(eachRdd =>{
eachRdd.foreachPartition(eachPartition =>{
val connection: Connection = HBaseUtil.getConnection
val jedis: Jedis = JedisUtil.getJedis
eachPartition.foreach(record =>{
Tools.saveToRedis(connection,jedis,record)
})
1.5軌跡回放模塊設計
得益于redis內存數據庫高性能以及可持久化的穩定性,該模塊實現回放每個訂單車輛軌跡同時并發實時讀取到前端,通過高德地圖提供的地圖api接口,訂單號為同一個key的value坐標數據軌跡點按時間順序呈現在地圖上,從而監控每條車輛訂單的車輛軌跡情況[7]。
2? 實驗驗證
2.1 實驗環境
該次實驗采用了一主二從的CDH集群,機器配置如圖2所示,集群角色配置如圖3所示。
2.2數據集
此次實驗數據集來自滴滴蓋亞數據平臺的開放數據,形如表1所示。
首先是數據回放模塊的驗證,通過多線程輸出,flume采集源目錄,數據如期以多訂單并發每秒三條的流數據形式生成。其次是數據實時處理模塊,經檢查redis數據庫,回放的流數據以秒級單位處理寫入到數據庫。最后是數據回放模塊的驗證,經前端程序的讀取,車輛軌跡坐標成功呈現在了高德地圖上。
3 結語
該系統實現了大規模軌跡數據的處理,數據的吞吐量、延遲性、精準度已達到預期。程序通過Python模擬車輛軌跡數據流的產生,然后通過flume和kafka采集消費數據,sparkStreaming處理數據流,完成了模擬現實生活多車輛多數據流場景的數據產生、處理與軌跡回放。目前程序還停留在雛形階段,未來將在耦合度、靈活度上做出提高。
參考文獻
[1]? 楊小潤.基于深度學習的車輛軌跡特征識別與分析[D].南京:南京郵電大學,2020.
[2]? 陸鍵,王可,蔣愚明.基于車輛行駛軌跡的道路不良駕駛行為實時辨識方法[J].交通運輸工程學報,2020,20(6):227-235.
[3] 潘偉博,汪海濤,姜瑛,等.Hadoop集群異常節點實時檢測與診斷算法[J].陜西理工大學學報:自然科學版,2021,37(4):24-31.
[4]? 鮑裕麟.深度學習應用場景下的HDFS性能優化[D].合肥:中國科學技術大學,2021.
[5] 謝楓,婁靜濤,趙凱,等.基于行為識別和曲率約束的車輛軌跡預測方法研究[J].汽車工程,2019,41(9):1036-1042.
[6] 柯杰.基于SparkStreaming日志實時監測系統的設計與實現[D].南京:東南大學,2017.
[7] 苗莉.大數據云計算環境下的數據安全[J].科技資訊,2021,19(2):31-33.