何貞貞,于 炯,, 李梓楊,國冰磊
(1.新疆大學 軟件學院,新疆 烏魯木齊 830008;2.新疆大學 信息科學與工程學院,新疆 烏魯木齊 830046)
隨著云計算、大數據、物聯網、人工智能等信息技術的快速發展和傳統產業的數字化轉型,預計到2020年我國數據總量將達到8060 EB,占據全球數據總量的18%[1-4]。在這種爆炸式數據量增長情況下,其規模可以達到PB級別,產生速度可以達到GB/s級別[5],且數據的時效性很強。對于這些連續不斷的數據,目前的大多數解決方案卻不是把實時流數據當作流來處理,忽略了其數據產生的連續性和及時性。為了滿足這種實時性要求,流式計算[6]環境應具備較高的響應能力和較低的計算延遲,同時要求計算結果的準確性和可靠性。Apache Flink[7-13]是目前產業界應用最廣泛的新興流式計算平臺之一,在美團、淘寶的實時業務中已有一定應用。
在Apache Flink中,任務是執行算子的并行化實例的基本單元,每個任務在一個進程中執行,且Flink中的任務執行圖具有層次分明的拓撲結構。因此,為解決Flink計算平臺拓撲中因各關鍵節點上任務間不同類型通信所導致的通信開銷較大問題,本文提出基于一種Flink環境下的任務調度策略(task scheduling strategy in Flink,TSS-Flink),該策略是通過動態的調整關鍵路徑上各節點實例的任務分配,在保證關鍵路徑節點負載差異較小的同時降低通信開銷,從而降低關鍵路徑的響應時間,提高系統性能。同時,當數據流壓力發生變化后,只需要調整關鍵路徑上部分節點任務,不會引入更多的任務調度開銷。經實驗驗證得出,該策略對不同類型的benchmark作業都有較為明顯的優化效果,在保證系統穩定性的同時使計算延遲平均降低了13.09%。
在流式計算中,通常使用有向無環圖(directed acyclic graph,DAG)來描述大數據流的計算過程。在拓撲中,任何情況下都會存在一條關鍵路徑,且該關鍵路徑的計算延遲決定了整個任務拓撲的時延。為了保證資源負載差異較小的同時提高計算的實時性,必須提出一種一方面要有效適應數據流、資源等動態變化時所帶來的負載差異較大問題,另一方面也要避免因任務與任務之間不同類型的通信帶來巨大的開銷所造成的實時性問題。在保證各節點計算資源充分利用的前提下,最大程度降低計算延遲,提高計算實時性。
針對負載不均和任務間通信開銷較大問題,已有大量的學者開展相關研究。文獻[14]提出一種Storm環境下基于權重的任務調度算法,針對各個任務的CPU負載占用情況以及任務間的數據流大小,分別確定點權和邊權,并利用最大化邊權增益的思想,降低網絡傳輸開銷。但該算法只考慮了CPU負載對系統性能的影響,并未考慮內存資源和網絡帶寬資源對節點負載均衡的影響。文獻[15]提出一種實時和高效的資源調度模型Re-Stream,在大數據流式計算環境下實現高能效和低延遲,并結合拓撲執行關鍵路徑,提出對工作節點的內存電壓調控節能策略,該策略主要針對關鍵路徑和非關鍵路徑上的內存電壓節能,并未考慮到內存資源和網絡帶寬對系統的整體性能影響。文獻[16,17]提出高效的資源調度算法和優化框架,雖然解決了流式計算框架下的任務調度問題,但無法直接移植到Flink平臺。文獻[18]提出一種Storm框架下資源感知的任務調度策略,通過最大化資源利用率的同時最小化網絡延遲提高吞吐量。文獻[19]提出用計算延遲作為評估節點間負載的指標,通過降低任務的計算延遲達到負載均衡的效果,但該策略同時也帶來了較大的遷移開銷,且資源評估不全面。文獻[20]提出一種基于流網絡的流式計算動態任務調度策略,通過定義有向無環圖中每條邊的容量和流量將其轉化為流網絡模型,計算對應的增進網絡和優化路徑來提升集群吞吐量從而提升性能。
現有的研究多關注于節點內的計算開銷,不僅忽略了節點間的不同類型通信方式帶來的傳輸開銷,而且忽略了拓撲關鍵路徑對集群性能的重要影響。且大多數的已有研究并不適用于Apache Flink平臺。針對上述問題,本文的主要工作有:
(1)通過定義流式計算中的有向無環圖(DAG),將數據流大小作為邊權將拓撲轉化為AOE-網(activity on edge network),確定拓撲中關鍵路徑;
(2)提出負載均衡模型,主要針對關鍵路徑上負載較高的節點,降低節點間負載差異;
(3)提出一種任務調度策略,在降低關鍵路徑上節點過高負載的同時,最小化關鍵任務的節點間通信開銷,即降低關鍵邊的通信開銷,實現計算資源的最大化利用。
本節從Flink拓撲邏輯模型和物理模型考慮,確定任務執行拓撲關系圖中關鍵路徑,在關鍵路徑的基礎上,建立關鍵節點負載均衡模型和關鍵邊最優通信開銷模型,在關鍵路徑上降低部分關鍵節點過高負載的同時減少關鍵邊的通信開銷,從而降低整個任務拓撲執行的響應時間,為任務調度策略的設計與實現提供理論依據。
定義1 有向無環圖(DAG)。定義有向無環圖G=(V(G),E(G)), 其中V(G)={v1,v2,…,vn} 是拓撲中的節點集合,E(G)={e

圖1 拓撲有向無環圖
定義2 關聯度(association degree)。對于任意節點vk,若存在以節點vk為弧尾的n條有向邊e
節點vk的入度(in-degree)為指向vk的有向邊條數,記為ID(vi)。出度(out-degree)為從vk指向其它節點的有向邊條數,記為OD(vi)。例如圖1中的拓撲有向無環圖中,存在頂點集合V={va,vb,vc,vd,ve,vf,vg},有向邊集E={e
定義3 帶權路徑(path of weight,PW)。定義路徑p(vi,vj),且
根據流式計算的拓撲結構可知,在DAG拓撲中從源點va到匯點vg存在m條路徑,既P={p1(va,vg),p2(va,vg),…,pm(va,vg)}。 如果將節點vi流向節點vk的數據流大小作為弧vi→vk的權重,那么可以將流式計算拓撲圖轉為帶權值的AoE-網。
AoE-網拓撲模型如圖2所示。

圖2 AoE-網拓撲模型
由上可知,每條路徑上的計算延遲是由節點的計算開銷和節點間的通信開銷共同決定的,因此
(1)
其中,cvi表示某條路徑上節點的計算延遲,cej表示節點間的通信延遲。
那么關鍵路徑可以表示為
Dpj=max{dp1(vs,vt),dp2(vs,vt),…,dpm(vs,vt)}
(2)
根據定義1、定義2、定義3可知,拓撲中頂點的最早發生時間ve(i),頂點的最晚發生時間vl(j);在拓撲有向邊e
因此,按照拓撲順序,拓撲中頂點的最早發生時間ve(i)為

(3)
其中,s為源點,源點的最早發生時間為零;E(k)是從節點i到達節點j的所有有向邊的集合。
當按照逆拓撲順序時,拓撲中頂點的最晚發生時間vl(j)為
(4)
其中,t為匯點,匯點的最晚發生時間和最早發生時間相等;E(k)是從節點i發出的所有有向邊的集合。
拓撲中每條有向邊e
e(k)=ve(j)
(5)
拓撲中有向邊e
l(k)=vl(j)-w(e
(6)
算法1:關鍵路徑檢測算法(CP-Algorithm)。
輸入:有向無環圖G=
有向邊權值集合W←{w1,w2,…,wn};
輸出:關鍵節點集合Vcp,關鍵邊集合Ecp;
(1) if ID(vi)=0;/*從源點s出發進行遍歷*/
(2) for i=1;i≤n-1;i++;
(3) ve[i+1]←ve[i]+wi;/*計算vertex的最早發生時間*/
(4) vl[n-1]←ve[n-1];
(5) if OD(vi)=0;/*從匯點t出發進行遍歷*/
(6) for j=n-2;j>1;j--;
(7) vl[i-1]←vl[i]-wi;/*計算vertex的最晚發生時間*/
(8)通過式(3)和式(4)計算最早開始時間e
(9)when e



(11) end;
在算法1中,對于Flink環境中的DAG將數據流大小作為有向邊的權重構建AoE-網,然后CP-Algorithm依次對數據拓撲AoE網進行正向和反向遍歷,通過步驟(2)~步驟 (9)確定數據拓撲中關鍵節點和關鍵邊,因此,該算法的時間復雜度為T(n)=O(n+e),且在空間復雜度上,DAG拓撲結構并未發生改變,因此,該算法是可行的。
在本章節中通過算法1中檢測到的關鍵節點集合和關鍵邊集合對問題進行定義和建模。

(7)
(8)

(9)

由上可知,式(7)表示理想狀態下關鍵節點的負載情況,式(9)表示關鍵節點的實際權重與理想權重的偏離程度,并且標準差越小表示各個工作節點的負載偏離度越低,負載越趨于均衡。


圖3 任務分配模型

如上所述,在關鍵路徑上存在節點間通信和節點內通信,且節點間通信開銷遠大于節點內通信開銷,通過將節點間的通信開銷盡可能地轉化為節點內通信開銷,能夠降低關鍵路徑響應時間,從而降低整個任務拓撲的響應時間。基于以上思想,提出定理1。
定理1 最優通信開銷定理。當關鍵路徑上不存在或節點間通信開銷最少時,最小化的節點間通信開銷等價于最大化的節點內通信開銷。即

(10)
證明:由Flink拓撲模型可知,當提交拓撲給節點后,拓撲實例便不會發生改變,其包含的任務總數和數據流總數不可改變。因此,設總數據流大小為定值R,即
(11)
證畢。
對于節點內線程間通信開銷,Flink提供SlotSharingGroup類,會盡可能地讓更多的子任務共享一個任務槽;提供ColocationGroup類可將子任務強制放入一個任務槽內,SlotSharingGroup類和ColocationGroup類這兩種方法為我們減少關鍵節點內線程間通信開銷提供了幫助。對于節點間通信,通過Flink提供的operator chains,會盡可能地將operator的子任務chain在一起形成一個任務,每個任務在一個線程中執行,通過設置operator chains能夠減少進程之間的切換,減少進程之間通信開銷。因此,對于節點間通信,為達到定理1關鍵邊最優通信開銷模型要求,盡可能地將節點間通信轉為節點內通信方式,并且在降低關鍵節點計算開銷的同時降低關鍵邊的通信開銷,即在保證關鍵節點負載差異較小的同時降低關鍵節點間的通信開銷,盡可能地將負載過高關鍵節點上的任務調度到負載較低的計算節點上。
基于關鍵路徑的任務調度算法主要是在保證系統性能不發生改變的情況下,盡可能使得各關鍵節點負載差異較小的同時減少關鍵邊的通信開銷,從而降低整個任務拓撲執行響應時間,實現資源最大化利用。在上一節中通過關鍵路徑檢測算法確定拓撲關鍵路徑,并獲取關鍵路徑上權重集合Wcp、節點集合Vcp、邊集合Ecp。并且通過負載模型判斷出負載較高的關鍵節點,對此負載較高節點上存在節點間通信的任務執行任務遷移策略,將該任務的節點間通信開銷轉為節點內通信開銷,在保證關鍵節點負載差異較小的同時降低任務的通信開銷。
為了達到上述關鍵節點負載均衡模型和關鍵邊最優通信開銷模型的要求,提出了一種在Flink環境下的任務調度策略(TSS-Flink)。其算法具體過程如下:
算法2:拓撲關鍵路徑上任務調度算法。

(1)quicksort(Wcp,DESC);
/* 對輸入的關鍵邊權重集合元素降序排序 */

(3)calculate theδby (9);
/* 判斷關鍵路徑上是否存在負載不均衡的節點 */

/* 確定不均衡節點以及該節點上任務和前驅任務的集合 */

/* 確定關鍵節點上任務和它的前驅任務 */
(8) if np≠nq


(12) reschedule CP-Algorithm;

(14)end while;

Apache Flink 作為開源免費的分布式數據流處理平臺之一,在實時業務中得到廣泛應用。對于本章節的實驗,通過在Flink平臺上實現TSS-Flink策略,對該策略的有效性進行驗證。
實驗環境是由7個相同配置的普通物理PC機組成的Flink集群,其中包含1個JobManager節點,該節點負責Flink集群的作業調度和資源管理;6個TaskManager節點,負責執行具體任務計劃。此外,配置1個Zookeeper節點負責在任務執行過程中監控和記錄數據節點;1個Kafka節點和1個HDFS節點作為數據流的源點和匯點。各節點的具體的分布情況見表1。

表1 Flink集群節點分布
在Flink集群環境中,為保證實驗的順利進行,集群均采用相同的配置,具體配置參數見表2。
為了使TSS-Flink算法達到最優的執行效果,通過多次對原系統反復實驗,最終確定實驗相關參數。配置情況見表3。
在本章節的實驗測試中,執行了Streaming Benchmark 中的WordCount、TwitterSentiment基準測試進行驗證。在WordCount基準測試中,以英文小說《Harry Potter》作為

表2 節點配置信息

表3 TSS-Flink算法參數設置
輸入數據源,統計單詞頻次,其計算復雜度相對較低但對CPU資源的占用率較高。TwitterSentiment是一個針對Twitter用戶所發的推文內容進行情感分析的作業,該作業以160 000條文本作為輸入數據源,其對內存資源和CPU資源占用相對都較高。通過以上兩個基準測試,能夠對TSS-Flink算法的有效性進行驗證。
本章節中通過執行WordCount和TwitterSentiment這兩組資源敏感型基準測試,從計算延遲、CPU負載和RAM占用率3個方面對Flink集群中各個工作節點進行性能監測和評估,以驗證TSS-Flink的優化效果。
本節討論基準測試WordCount作業在Apache Flink默認調度算法和TSS-Flink下分別運行時集群各工作節點的負載情況。由于Flink默認調度算法采用隨機的方式分配任務,當從Source operator發送數據流到Sink operator時,極易導致各工作節點資源分配不均、負載差異較大情況,且TSS-Flink算法在執行過程中應該考慮到任務分配所導致的負載差異性。從圖4所示的實驗結果中可以得出:在Flink默認調度算法下,各個節點的CPU負載不均衡且差異較大,其中負載最高的節點是node5,負載最低的節點是node6,節點之間CPU負載最大相差28%。當節點node2和節點node5的CPU負載超過表3中設置的閾值0.7時觸發TSS-Flink算法,該算法執行后集群中各工作節點的CPU負載差異明顯縮小且均低于用戶設置閾值0.7,且其執行后的CPU負載標準差比Flink默認調度算法降低了8.28%。通過對集群各工作節點的負載均衡測試驗證了TSS-Flink算法的有效性。

圖4 WordCount CPU負載對比
為了進一步驗證TSS-Flink策略的優化效果,在本章節中繼續對benchmark作業執行過程實時監控節點的內存占用率。在Flink中,通過Monitor模塊進行內存實時監控,定義OperatorScopeFormat.java類獲取System Metrics。在實時監控過程中,通過定點采樣得到如圖5所示的實驗結果:當單位時間內數據元組數量不斷增加時,原系統部分節點由于負載過高導致內存占用率急劇上升,并且這些負載較高的節點無法及時處理數據從而導致拓撲處理時延變長,而另外一部分節點的資源也無法得到充分利用。通過使用TSS-Flink策略,對負載較高的節點上的任務重調度,使得拓撲非關鍵路徑上的節點分擔拓撲關鍵路徑上負載過高節點的資源使用壓力,最終被采樣節點的內存利用率都有一定程度的下降且逐步趨于平穩狀態。

圖5 WordCount內存占用對比
圖6表示benchmark在Flink默認調度算法和TSS-Flink下的工作節點間通信開銷,不管是在默認調度算法還是TSS-Flink下,節點間數據流大小均經歷一個從0快速上升到正常狀態的過程。TSS-Flink算法在執行中將關鍵節點上的線程遷移至前驅非關鍵節點上,從而減少線程節點間通信開銷。Flink默認調度算法運行且趨于穩定后(90 s-300 s),節點間數據流大小的平均值約為16 572 tuples/s;當執行TSS-Flink算法且系統趨于穩定后(125 s-250 s),節點間數據流大小約為12 410 tuples/s,相比Flink默認調度算法降低了25.1%。可見,TSS-Flink在降低節點間通信開銷方面具有更為明顯的效果且符合最優通信開銷模型思想,也進一步驗證了算法的有效性。

圖6 節點間數據流大小對比
圖7表示任務拓撲中匯點接收從source發出的每 10 000 條tuples時記錄一個延遲時間并持續15 min得到的實驗結果,實驗在WordCount作業上執行TSS算法與原系統算法比較得出:因為WordCount作業復雜度低于Twitter作業復雜度,所以WordCount作業的計算延遲相對較低,當經過TSS-Flink算法進行優化后,系統的計算延遲明顯下降。在原系統中,隨著數據流的連續不斷輸入,計算延遲也隨著慢慢升高,當某些節點的計算資源達到瓶頸無法及時處理數據時,導致計算延遲過長,系統執行任務拓撲的實時性較差。通過TSS-Flink算法,對計算資源相對緊張的關鍵節點上的任務執行調度策略,對比原系統該策略使集群的計算延遲降低最多達到388 ms,至少40 ms,平均降低了248 ms。

圖7 WordCount計算延遲對比
對于圖8,執行的Twitter作業本身復雜度比WordCount作業較高,因此計算延遲也較高。通過在連續15 min內記錄匯點每接受10 000條tuples數據時的延遲時間,可以得出:原系統在數據流不斷增加和快速變化下,導致數據流無法及時處理,造成數據堆積,數據堆積節點延遲過高,進而影響系統的實時計算能力。通過執行TSS-Flink算法將任務拓撲中每10 000條tuples數據的計算延遲最多降低了210 ms,至少降低了8 ms,平均降低了130 ms,該調度策略有效地降低了節點間通信開銷和關鍵路徑上計算延遲,提高了計算的實時性,使計算資源達到最大化利用。

圖8 Twitter計算延遲對比
綜上所述,實驗驗證TSS-Flink算法對夠通過降低節點間通信開銷從而降低響應時間,提高集群的性能。通過圖7、圖8可知,不同的作業類型下該策略對系統的計算延遲優化效果并不相同,但其平均優化比提高了13.09%,有效地降低了計算延遲,提高了系統性能。
通過對比現有的任務調度算法,發現多是對負載較重的節點執行任務調度策略,雖然這些調度策略能有效降低任務拓撲響應時間,提高系統性能。但也并未考慮到各任務的計算開銷和任務之間的通信開銷,且并未在Apache Flink平臺上實現該任務調度策略,所以在節點間負載均衡和通信開銷方面仍然存在很大的優化空間。本文通過找到直接影響整個任務拓撲響應時間的關鍵路徑,確定負載較高的關鍵節點和該節點上通信開銷較大的關鍵邊,建立關鍵節點負載均衡模型和關鍵邊最優通信開銷模型,提出一種Flink環境下的任務調度策略(TSS-Flink)。通過WordCount和Twitter兩個benchmark的實驗驗證,結果表明算法能夠實現對Flink集群的性能優化,盡可能地更好地利用計算資源。
下一步的研究工作將重點關注由于輸入數據流的急劇變化造成的資源分配不均問題,針對關鍵路徑上的負載傾斜較為嚴重的關鍵節點,如何判斷出節點內的任務通過橫向遷移和縱向遷移實現資源的最大化利用且保證遷移后的拓撲結構不發生改變。