黃文輝,馮 瑞
(1.復旦大學計算機科學技術學院,上海201203;2.上海視頻技術與系統工程研究中心,上海201203)
很多“大數據”是實時接收得到的,這些數據往往在剛得到時具有最大的價值[1],并且這些不斷產生的“大數據”無論是數據存儲還是數據快速處理問題都已經不是單臺物理機器能夠解決的。因此,許多分布式流式處理平臺不斷被開發出來,如Twitter公司開發的Storm[2]、Yahoo!公司開發的S4[3]、微軟的Timestream[4]以及UC Berkeley AMPLab 開 發 的Spark Streaming[5]等。其 中,Spark Streaming是基于D-Steam[1]模型并構建在Spark計算引擎上[6]的分布式流式計算框架,其特點是結合了流式處理和批處理,相比于其他分布式流式處理的框架,Spark Streaming比較突出的優勢就是能夠高效并行恢復失敗的節點和對執行任務比較慢的任務重新分配。
智能視頻監控很多情況下是要對采集的圖像進行實時處理。數量眾多的監控攝像頭,龐大的監控網絡,很短時間之內就會產生海量的圖像視頻數據,如何從這些海量數據中高效地提取出有用的信息,就成為智能視頻監控技術要解決的問題[7]。本文介紹了利用Spark Streaming 框架構建分布式視頻/圖像流處理的測試平臺,實現了從數據的傳輸、處理和存儲的整個流程,并為幾個重要的參數對性能的影響進行深入研究,提出了CPU 時間占用率作為評估指標,對于評估與改進集群的性能有著重要的參考意義,并為其他類似的分布式計算的性能評估提供了借鑒。目前,單純從總的處理時間來看,無法得到集群資源(CPU 等)的利用率情況,CPU 時間占用率結合總的處理時間,一方面可以從宏觀角度來衡量集群處理能力,另一方面可以從微觀角度判斷集群的資源利用情況,可以用來調整集群的參數以達到更高的性能或者是否選擇擴展集群等。
平臺組成如圖1所示,分為三大部分:數據傳輸、數據處理和數據存儲。
數據傳輸由Socket服務器、客戶端和Spark Streaming接收器(Receiver)三部分組成。數據源按照一定的通信協議發送數據包到Socket服務器,Socket服務器負責接收客戶端發送的數據并轉發到Spark Streaming 接收器。Spark Streaming接收器本質是一個Socket客戶端,接收器角色是由spark 集群中一個Worker扮演的(由Spark框架決定,該Worker依然會參與數據處理)。客戶端發送的信息包括數據源ID、圖像屬性、圖像數據等信息。
服務器充當中轉站的作用,Spark Streaming接收器、數據源和服務器三者可以得到分離,各自功能明確。若Server有著公網的IP地址,則數據源和Receiver可以運行在可以連接到公網的局域網中。

Figure 1 Composition of the testing platforms圖1 測試平臺的組成
接收器接收到數據后按照一定的解碼方式對數據進行解碼,得到的客戶端ID、圖像屬性和圖像數據等信息進行分布式處理,如果需要用到OpenCV 庫,圖像信息需要進一步解碼和轉換得到OpenCV 對應的Mat類型再進行處理。整個數據的序列化以及傳輸采用的是byte[]基本類型數組。
本文采用圖像的HOG(Histogram of Oriented Gradient)[8]特征檢測作為測試,HOG 特征已經被廣泛運用在圖像識別中,特別是在行人檢測方面獲得了很大的成功。如圖2所示為HOG 特征行人檢測的效果圖,方框為檢測的結果。

Figure 2 Result of the HOG detection圖2 HOG 特征檢測效果圖
數據存儲采用多種存儲結合的方式,針對不同的應用采取不同的存儲方式,主要有本地存儲、Hadoop 分布式存儲HDFS 和分布式數據庫HBase等。如網頁應用可以將結果返回集群的Driver的本地存儲,這樣可以快速獲取結果。
數據包的組織形式如圖3所示。

Figure 3 Format of data packets圖3 數據包的格式
(1)Block(0):數據包頭部,整型,四個字節,記錄后面一個數據的總長度;
(2)Block(2k+1):整型,四個字節,記錄Block(2k+2)的字節長度(k=0,1,2,…,n);
(3)Block(2k+2):任意長的字節數組,是真正的數據(k=0,1,2,…,n);
(4)Block(2k+3):CRC校驗,一個字節。
數據 包 在RDD(Resilient Distributed Datasets)中的抽象如圖4所示。在Receiver接收到數據包之后會保存到ArrayBuffer中,ArrayBuffer最終會被包裝成ReceivedBlock 并保存在Block-Manager中。在提交Job 的時候這些Received-Block會抽象成RDD,每個ReceivedBlock對應一個partition 并由一個任務Task 處理,每個任務(Task)所要處理的就是ReceivedBlock 中的數據包。

Figure 4 RDD containing data packets圖4 數據包的RDD 抽象
集群的硬件配置情況如表1所示。

Table 1 Hardware configuration of the cluster表1 集群的硬件配置
集群的軟件配置情況如表2所示。

Table 2 Software configuration of the cluster表2 集群的軟件配置
Spark Streaming集群采用Standalone模式、分辨率為1 272×767的PNG 格式的圖片,為了更好地進行性能評估,測試樣本采用1 000張相同的圖片。圖片處理方式為HOG 特征提取與檢測,提取結果返回Driver端存到本地磁盤。并分別探討幾個重要因素對于平臺性能的影響,分別是參數spark.streaming.blockInterval(下 面 用block-Interval代替)、spark.streaming.concurrentJobs(下面用concurrentJobs代替)、批處理時間batchDuration[9]和Receiver的數量。
性能評估采用處理時間和CPU 時間占有率作為指標。前者從宏觀角度橫向比較來分析平臺性能,后者從微觀角度出發,可以橫向比較也可以縱向比較。
(1)處理時間。
以流式處理1 000張相同的圖片所消耗的時間為依據。處理時間作為評估指標簡單直觀,能夠橫向對比出各個條件下集群處理數據的效率。
(2)CPU 時間占用率。
CPU 時間占有率表示某一段時間內CPU 各個核數用于數據處理的時間占總時間的比率。由于Spark Streaming每個任務Task都在單獨的線程中運行,統計Task的運行時間不難得出線程處理數據的時間。
設N表示為Spark Streaming用戶程序設定的最 大 使 用 的CPU 核 數(max-executor-core),n≤N表示第n個核t時刻(以ms為單位)的使用狀態,則可表示為:



ρ越大,也就是處理數據的時間占任務的總時間(包括處理數據的時間、任務調度時間、網絡傳輸時間等)的比重也大,說明平臺越高效。
(1)spark.streaming.concurrentJobs。
該參數表示一次最多可以同時運行Job的數量,采用線程池的方式來控制和實現Job的數量。batchTime設置為4 000ms,blockInterval設置為1 000ms,其他參數采用默認值的情況下,處理時間、CPU 時間占用率與concurrentJobs的關系圖如圖5所示。從圖5中可以看出,在一定的范圍內concurrentJobs越大,程序運行越高效,CPU 時間占用率在本文所用的測試平臺可以達到50%以上,CPU 時間占用率逐漸變得平緩,說明concurrentJobs對集群的影響逐漸達到一個最高水平。從Spark Streaming 原 理 來 講,concurrentJobs過小的情況會使得當前運行的所有Job分別對應的RDD 的Partition數量/Block數量(每個Partition對應一個Task)之和小于Spark Streaming用戶程序設定的最大使用的CPU 核數導致一些核處于空閑狀態。另外,concurrentJobs的設置要根據Spark Streaming 用戶程序的Driver所在機器的CPU 核數的情況來定。本文中Driver所在的機器CPU 核數為32。

Figure 5 Impact of conrrentJobs on performance圖5 concurrentJobs對性能的影響
(2)spark.streaming.blockInterval。
該參數設置Receiver的生成Block的間隔,直接影響Block中元素的個數(但不一定每個Block中元素的個數都相等)。在batchTime設為2 000ms以及其他參數采用默認值的情況下,處理時間、CPU 時間占用率與blockInterval的關系如圖6 所 示。從 圖6 中 可 以 看 出,blockInterval越小,處理時間越少,CPU 時間占有率越高。實際上在concurrentJobs設置為默認值1時,blockInterval越小,Job對應的RDD 的Partition越多,Partition中的元素的數量越少,這樣就可以增加Job中Task的數量而減少Task的計算量,進而Job執行的并行度更高,減少了空閑的CPU 核數,從而使得處理時間減少。
(3)批處理時間batchDuration。

Figure 6 Impact of blockInterval on performance圖6 blockInterval對性能的影響
batchDuration控制著周期生成Job 的時間,由Spark Streaming 的任務生成器JobGenerator周期性生成Job并交由任務調度器JobScheduler調度運行。在blockInterval設為1 000 ms、其他參數采用默認值的情況下,處理時間、CPU 時間占用率與batchDuration的關系如圖7 所示。從原理上來講當數據流均勻到達時,batchDuration設置越大,一次性處理的數據量越多,可以節省一部分任務生成和調度的時間,從而減少總體完成時間,特別是在單位數據的計算量較小以及對實時性要求不高的情況下可以適當地將batchDuration設置大一點。

Figure 7 Impact of batchDurationon performance圖7 batchDuration對性能的影響
(4)Receiver的數量。
Receiver充當接收數據的角色,用戶在程序中通過StreamingContext每設定一次Receiver就會產生一個InputDStream,由Spark Streaming框架的DStreamingGraph管理,設定N次就會有N個Receiver,在默認的分配策略下這些Receiver會均勻分布在集群中。圖8展示了在blockInterval設定為1 000ms、batchDuration設定為4 000ms,其他按照默認參數的情況下處理時間和CPU 時間占用率與Receiver數量之間的關系。在數據流傳輸速度一定的情況下,Receiver數量過少會導致數據集中,集群的處理能力跟不上數據接收的速度時會造成一定的滯后;Receiver數量過多會導致數據分散,每個Partition/Block中元素的數量過少,造成任務過多,任務的調度消耗時間增多。

Figure 8 Impact of receivers on performance圖8 Receiver數量對性能的影響
Spark Streaming使用的是Spark 計算引擎,該計算引擎“壓榨”計算機的硬件資源,包括CPU資源,CPU 時間占用率正是對CPU“壓榨”程度的一種重要反映,CPU 時間占用率越高說明對CPU的利用程度越高。
在理想情況下,Spark Streaming用戶程序使用的集群中的CPU 總核數等于concurrentJobs×Receiver數量×batchDuration÷blockInterval時,CPU 能得到最大程度的利用,實際上由于框架調度運行時間,數據接收速率不均勻等因素影響需要實際調整:這里面比較好控制的是concurrent-Jobs,但該參數受限于Driver所在機器的CPU 的核數;Receiver可根據數據的傳輸速度來調整;blockInterval根據數據的計算量來設定,block-Interval過大可能會造成單個Task 計算時間過長,當集群開啟straggler監控機制時可能被當作straggler處理;batchDuration的設置根據實時性要求設定,當實時性要求不太高是可以適當設置大一點。
本文針對智能視頻監控領域對大規模視頻/圖像流高效處理的需求,提出了利用Spark Streaming分布式框架構建對視頻/圖像進行流式處理的思想。重點深入研究了幾個重要參數/因素對性能的影響,創新性地提出CPU 時間占用率來衡量性能,并結合總的處理時間,從微觀和宏觀、橫向和縱向來評價,對于參數調整和框架改進以及對于是否擴展集群有著非常重要的參考意義,并對其他分布式架構的性能的評估有著重要的借鑒意義。接下來的研究重點在于針對視頻/圖像處理領域對Spark Streaming框架進行調整與改進,并針對大規模視頻/圖像數據流式處理問題進行實際部署和應用。
[1] Matei Z,Tathagata D,Haoyuan L,et al.Discretized streams:Fault-tolerant streaming computation at scala[C]∥Proc of the 24th ACM Symposium on Operating Systems Principles,2013:423-438.
[2] Apache Software Foundation.Storm,distributed and faulttolerant real time computation[EB/OL].[2015-06-25].http://storm.apache.org/.
[3] Neumeyer L,Robbins B,Nair A,et al.S4:Distributed stream computing platform[C]∥Proc of the 10th IEEE International Conference on Data Mining Workshops(ICDMW 2010),2010:170-177.
[4] Qian Z,He Y,Su C,et al.Timestream:Reliable stream computation in the cloud[C]∥Proc of the 8th ACM European Conference on Computer Systems,2013:1-14.
[5] Apache Software Foundation.Apache Spark,lightning-fast cluster computing[EB/OL].[2015-06-28].http://spark.apache.org/.
[6] Zaharia M,Chowdhury M,Das T,et al.Resilient distributed datasets:A fault tolerant abstraction for in-memory cluster computing[C]∥Proc of the 9th USENIX Conference on Networked Systems Design and Implementation,2012:2.
[7] Huang Kai-qi,Chen Xiao-tang,Kang Yun-feng,et al.Intellegent visual surveillance:A review[J].Chinese Journal of Computers,2015,38(6):1093-1118.(in Chinese)
[8] Dalal N,Bill Triggs.Histograms of oriented gradients for human detection[C]∥Proc of the 2005IEEE Computer Society Conference on Computer Vision and Pattern Recognition,2005:886-893.
[9] Apache Software Foundation.Performance tuning [EB/OL].[2015-06-25].http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning.
附中文參考文獻:
[7] 黃凱奇,陳曉棠,康運鋒,等.智能視頻監控技術綜述[J].計算機學報,2015,38(6):1093-1118.