關(guān) 沫,魏碧晴
(沈陽工業(yè)大學 信息科學與工程學院,遼寧 沈陽 110870)
隨著計算機技術(shù)和信息科技的快速發(fā)展,全球的數(shù)據(jù)量急劇增長,2015年全球的數(shù)據(jù)總量達到8.61 ZB,預估2020年全球的數(shù)據(jù)總量會超過40 ZB。通過移動互聯(lián)網(wǎng)、社交媒體等服務(wù)模式,大數(shù)據(jù)產(chǎn)業(yè)已滲透到人們生活的各個方面,并且數(shù)據(jù)價值的時效性越來越重要,集群必須以毫秒級的延遲從大規(guī)模的數(shù)據(jù)中提煉有價值的信息[1]。
TopN問題就是從許多的數(shù)值選出前N個最大或者最小的數(shù)值有序排好,最常見的應(yīng)用于微博熱搜榜、歌曲人氣榜、投票選舉等。由此可見利用大數(shù)據(jù)技術(shù)和計算機技術(shù)能輕松解決傳統(tǒng)排序問題。如微博熱搜榜,需要實時更新點擊量并按其從大到小的順序排列。而使用流計算框架Flink來解決TopN問題可以滿足其實時性和低延遲的要求。
Flink是一個開源的分布式流式處理框架,它針對數(shù)據(jù)流的分布式計算提供數(shù)據(jù)分布、數(shù)據(jù)通信和容錯機制等功能[2]。
與Storm框架相比,F(xiàn)link能提供Exactly-Once精確性保證、吞吐量高、有背壓處理機制[3]。與Spark框架相比,F(xiàn)link的延遲更短,是毫秒級別的,在流處理方面,F(xiàn)link的性能比Spark更好。
此外Flink框架使用Chandy-Lamport算法來做異步分布式快照,其本質(zhì)是checkpoint。checkpoint操作是異步的,不會打斷數(shù)據(jù)的處理,并且是非常輕量的,容錯的代價相對很低。Flink可以做到每條數(shù)據(jù)同步更新watermark,watermark的計算實時性高,輸出延遲低[4]。
由于Flink是以固定的緩存塊為單位來進行的數(shù)據(jù)傳輸,緩存塊的超時值可以人為規(guī)定,如果緩存塊的超時值為無限大,則Flink的數(shù)據(jù)傳輸方式就是批處理,此時系統(tǒng)可以獲得最大吞吐量[5]。如果緩存塊的超時值為0,F(xiàn)link的數(shù)據(jù)傳輸方式是流處理,此時雖然系統(tǒng)可以獲得最低的處理延遲,但是吞吐量也會降低[6]。
本文就基于Flink框架的TopN排序問題提高其吞吐量提出解決方案。
先將待排序的數(shù)字構(gòu)建成一個堆(堆是一棵完全二叉樹),如果是要按從小到大的順序排列,就先將這個堆調(diào)整成大頂堆,堆頂?shù)脑鼐褪亲畲笾担缓髮⒍秧斣嘏c末尾的元素交換位置,最大值“沉”到數(shù)組的末端。再將剩余的元素重新調(diào)整成大頂堆,把堆頂元素與倒數(shù)第二個元素交換位置,重復以上操作,最后就能得到一個從小到大排序的數(shù)組了。
堆排序的難點在于將無序堆調(diào)整成大頂堆,調(diào)整步驟如下:定義一個數(shù)組arr,設(shè)父節(jié)點是arr[i],那么這個父節(jié)點的左孩子節(jié)點就是arr[2i+1],它的右孩子結(jié)點就是arr[2i+2]。首先比較左孩子節(jié)點和右孩子節(jié)點的大小,然后將其中較大的孩子節(jié)點與父節(jié)點進行比較,如果父節(jié)點小于孩子節(jié)點,就需要交換它們的位置,否則就不用交換。從最后一個非葉子節(jié)點開始,從左至右,從下至上進行調(diào)整。
而TopN問題只需要選出前N個最大數(shù)即可,所以不需要把后面那些很小的數(shù)值也排出來。為了降低程序時間復雜度和空間復雜度,可以在原有的堆排序基礎(chǔ)上加以改進。定義初始化一個長度為N,元素都為0的數(shù)組arr。然后用迭代器的it.hasNext()方法判斷是否有待排序的元素,如果有就用it.next()方法接收待排序的元素x。把接收的元素x與arr數(shù)組的第一個元素進行比較,如果接收的元素x大于arr[0],就用x替換arr[0],然后把arr數(shù)組里的數(shù)字用大頂堆按從小到大的順序排列。利用迭代器依次接收待排序的元素,如此循環(huán)往復,最終就能選出最大的N個數(shù)值。
上文所描述的TopN問題的解決方法只能一個一個地接收數(shù)據(jù),數(shù)據(jù)的吞吐量較小,不具備處理大數(shù)據(jù)的能力,所以還需要繼續(xù)改進這個算法。把圖1中的TopN的解決步驟取名為Process,改進后的算法取名為HeapOptimize,該算法的步驟如圖1所示。

圖1 HeapOptimize結(jié)構(gòu)圖
箭頭表示數(shù)據(jù)流Stream的走向,最開始的Process算子接收并處理數(shù)據(jù)流,每一個Process從接收到的數(shù)據(jù)挑選出前N個最大的數(shù),然后把這N個數(shù)傳給下一層的Process去處理。第二層的Process把第一層所有Process選出的較大值再進行挑選排序,把最終的結(jié)果傳給Sink。圖中以兩層Process為例展示其算法過程,也可以擴展為更多層的Process來處理數(shù)據(jù),增加數(shù)據(jù)的吞吐量。可以根據(jù)需求,選擇合適的Process層數(shù),快速處理大量的數(shù)據(jù)。
本文目前實現(xiàn)兩層的Process解決TopN問題,TopN作業(yè)在Flink框架上運行成功的算子拓撲圖如圖2所示。先將Source數(shù)據(jù)源里的數(shù)據(jù)通過FlatMap把String類型格式轉(zhuǎn)化成int類型,便于后面的計算。
下一步要做窗口計算,在定義窗口之前,要指定流是否需要keyed,使用keyBy(“word”)將無界流分成邏輯的keyed stream。擁有keyed stream將允許窗口計算由多個任務(wù)并行執(zhí)行,因為每個邏輯key流都可以獨立于其余任務(wù)進行處理,key相同的元素將被發(fā)送到同一個任務(wù)。這里采用的是滾動窗口,沒有時間重疊,窗口周期是1 s。滾動窗口分配器將每個元素分配給固定窗口大小的窗口。例如,如果指定大小為1 s的滾動窗口,則將執(zhí)行當前窗口,并且每1 s鐘將啟動一個新窗口,如圖3所示。

圖2 算子拓撲圖

圖3 滾動窗口示意圖
在這里可以設(shè)置Process的并行度(即一層Process的個數(shù))。如果需要多層Process計算,可以重復從keyBy分流到Process這些步驟。最后需要把數(shù)據(jù)都集中到一個Process算子作業(yè)來排序,再把計算結(jié)果輸出到Sink里打印最終結(jié)果。算子拓撲圖的第三個算子是窗口計算和第一次Process排序合在一起的。
假如一分鐘需要處理v條數(shù)據(jù),一個Process算子一分鐘最多能處理m條數(shù)據(jù),數(shù)據(jù)通過一個Process算子處理后得到N條數(shù)據(jù)傳輸?shù)较乱粚樱绻麅?nèi)存和CPU足夠大,設(shè)x層需要執(zhí)行Process算子任務(wù)的個數(shù)(Process的并行度)是px,最后一層的并行度一定是1(因為最后需要統(tǒng)計出N條最大的數(shù)據(jù)),前幾層的Process的并行度不固定,但有一個上限,假設(shè)前n-1層的Process的并行度取最大值,則可以得出層數(shù)與數(shù)據(jù)量的關(guān)系。
如果只有一層Process,該Process的并行度也為1,需要滿足的條件是v≤m;

實驗搭建的集群是由兩臺PC組成,由分布式隊列Kafka作為數(shù)據(jù)源點生產(chǎn)數(shù)據(jù),F(xiàn)link提供FlinkKafkaConsumer接口從Kafka取數(shù)據(jù),交給后面的operator來處理。用TaskManager節(jié)點構(gòu)建整個算子拓撲,將計算結(jié)果保存在HDFS中,以Zookeeper作為集群的同步協(xié)調(diào)節(jié)點負責分布式節(jié)點間的信息同步。集群分布情況如表1所示。

表1 集群節(jié)點分布信息
由于實驗條件有限,TaskManager節(jié)點只創(chuàng)建了一個,當然TaskManager節(jié)點數(shù)越多越好。如果條件允許也可再創(chuàng)建一個JobManager節(jié)點作備用。集群節(jié)點的配置參數(shù)如表2所示。

表2 節(jié)點配置參數(shù)
在一開始執(zhí)行作業(yè)的時候,Kafka開始生產(chǎn)數(shù)據(jù),Process算子的吞吐量從0開始慢慢變大到2 000。當Kafka生產(chǎn)的數(shù)據(jù)量特別大,一個Process算子接收數(shù)據(jù)的速率高于它在一個瞬時脈沖內(nèi)處理的數(shù)據(jù),F(xiàn)link系統(tǒng)就會出現(xiàn)背壓,把數(shù)據(jù)量壓到底層算子Source上,數(shù)據(jù)流降速,并行度為1的Process算子的吞吐量也會變小,由于數(shù)據(jù)源的多樣性和輸入速率的變化以及集群的不穩(wěn)定性,使得Process算子的吞吐量在某個范圍波動,如圖4所示,該算子的吞吐量大概穩(wěn)定在1 500左右,處理數(shù)據(jù)的速度較為緩慢。

圖4 Process的吞吐量v0
此時若把該算子的并行度改為3,讓三個Process算子同時處理從數(shù)據(jù)源傳來的數(shù)據(jù)流,然后把這三個算子處理后的數(shù)據(jù)合流到下一個Process集中處理,也可以完成TopN作業(yè)。這三個Process的吞吐量如圖5、圖6、圖7所示,一個算子的吞吐量能達到2 000,v1+v2+v3>v0,把Process算子的并行度提高到3之后,整個TopN作業(yè)的吞吐量提高了約7 000條,TopN作業(yè)的吞吐量提高了。由此可見,當算子出現(xiàn)性能瓶頸時,增加Process的并行度可以增加整個排序作業(yè)的數(shù)據(jù)處理速度,解決Flink的背壓問題,同時也增加了排序作業(yè)占用的內(nèi)存和CPU。

圖5 Process的吞吐量v1

圖6 Process的吞吐量v2

圖7 Process的吞吐量v3
本文是將傳統(tǒng)的堆排序進行優(yōu)化,用以處理TopN問題,把HeapOptimize算法放在Flink框架去執(zhí)行,把待處理的海量數(shù)據(jù)分割成小塊的數(shù)據(jù)來處理,用化整為零的思想處理數(shù)據(jù),把第一層的Process算子的處理結(jié)果傳到下一層Process進行處理,這種方法比傳統(tǒng)方法的計算速度快,吞吐量增大。Process的層數(shù)根據(jù)數(shù)據(jù)量來設(shè)定,由數(shù)據(jù)量大小計算出理想的Process層數(shù)。把Flink資源合理地分配利用,最大限度地提高Flink吞吐量。