郝志峰,黃澤林,蔡瑞初,傅正佳,溫 雯,唐凱麟
(1.廣東工業(yè)大學計算機學院,廣州 510006;2.佛山科學技術學院數(shù)學與大數(shù)據(jù)學院,廣東佛山 528000)
物聯(lián)網(wǎng)、社交網(wǎng)絡、云計算等技術的出現(xiàn),同時伴隨計算能力、空間存儲和網(wǎng)絡帶寬的快速發(fā)展,人類生活中的各類數(shù)據(jù)在互聯(lián)網(wǎng)、通信、金融、商業(yè)、醫(yī)療等諸多領域不斷增長和累積。由于數(shù)據(jù)量日益增多,并且在各種應用場景下業(yè)務需求和競爭壓力對數(shù)據(jù)處理的實時性和準確性提出了更高的要求,因此傳統(tǒng)的單機器數(shù)據(jù)處理技術已經(jīng)無法應對時代的變化,而大數(shù)據(jù)的發(fā)展面臨著現(xiàn)實應用的難題。因此,各種創(chuàng)新性的技術得到了充分發(fā)展,如云計算[1]、量子計算、分布式計算等技術。其中,分布式計算以其強大的計算能力和低廉的成本得到了廣泛應用和深入研究。分布式系統(tǒng)[2]通常分為批數(shù)據(jù)處理系統(tǒng)和流數(shù)據(jù)處理[3]系統(tǒng)兩類,批處理應用廣泛的有Hadoop[4]和Spark[5]等,流處理應用廣泛的有Storm[6]、Flink[7]、Spark streaming[5]等。相對于批處理系統(tǒng),流處理更關注數(shù)據(jù)的實時性。實時數(shù)據(jù)流具有高速、不穩(wěn)定的特征,這也導致使用分布式流處理系統(tǒng)時對于計算資源必須有合理而準確的調(diào)度。
分布式集群資源的動態(tài)調(diào)度是當前的一個研究熱點,如Hadoop自帶的YARN[8]以及Apache開源的Mesos[9]框架,它們都可以在集群中對資源進行管理。而在國內(nèi),也有類似于基于分簇的小基站用戶資源分配方案[10],但不足之處在于,這種管理更多地需要人工干預,無法實時監(jiān)測集群資源的利用情況并進行合理的資源分配。在流數(shù)據(jù)處理任務中,集群需要一直處于準備狀態(tài),以應對流數(shù)據(jù)處理負載的變化。因此,需要根據(jù)負載情況,通過負載均衡對任務進行合理分配,如文獻[11]提出基于負載平衡和經(jīng)驗值的工作流任務分配策略。而對于流數(shù)據(jù)處理任務,資源負載是實時變化的,因此,如何在不停變化的實時負載過程中自動地對集群資源進行整理,降低當前暫不需要的資源的消耗,是亟需解決的一個難題。
資源動態(tài)調(diào)度及協(xié)同分配的實現(xiàn),難點在于如何根據(jù)當前系統(tǒng)的資源使用情況對后續(xù)資源需求做出準確的預測,以及對預測的資源計算出當前對系統(tǒng)加權延遲最低的資源分配比例,同時動態(tài)關閉未使用的資源,減少資源消耗。本文通過對流數(shù)據(jù)處理任務負載進行實時監(jiān)測,構建一個雙層資源調(diào)度模型,利用ZooKeeper[12]和YARN進行動態(tài)資源管理,從而實時改變集群資源分布情況,減少系統(tǒng)延遲。
在資源量化的方式上,本文系統(tǒng)是在YARN的基礎上構建的,而在YARN中,資源被虛擬化為容器這個概念,每個容器資源中包含了相對應的存儲空間和CPU大小等資源。
在資源動態(tài)調(diào)度方面,本文將資源調(diào)度的處理分為任務層和系統(tǒng)層兩個層面。在任務層,文獻[13]提出基于排隊論的動態(tài)資源調(diào)度模型,該模型需要在集群大小固定的情況下進行調(diào)度。以此為基礎,本文提出在資源調(diào)度時根據(jù)實時負載而改變集群大小減小延遲的模型。在系統(tǒng)層,已知的資源調(diào)度框架有YARN和Mesos等,主要解決了物理資源調(diào)度、將物理資源映射為虛擬化資源以及將虛擬化資源分配給分布式流處理系統(tǒng)進行使用這3個問題。但關于分布式流數(shù)據(jù)系統(tǒng)如何將不同的資源分配給不同的任務則沒有涉及。
在Hadoop自帶的YARN資源管理框架中有3種資源調(diào)度算法,即先來先服務調(diào)度算法、公平調(diào)度算法和Capacity調(diào)度算法。這3種算法在資源調(diào)度上基本可以實現(xiàn)對資源的合理分配以應對各種情況。但是YARN的調(diào)度算法是基于人為改變資源需求的前提,無法對資源的需求進行預測和動態(tài)調(diào)整。針對上述問題,本文提出一種雙層資源調(diào)度模型,通過中間件溝通系統(tǒng)層和YARN資源調(diào)度框架對集群資源進行動態(tài)調(diào)度和協(xié)同分配處理。
在集群管理方面,已知的如Storm on YARN[14]、Flink on YARN[15]、Storm on Mesos[16]、Flink on Mesos[17]等開源框架,都只是實現(xiàn)了對固定大小集群在多任務處理時的集群管理,但框架仍然需要更多人力進行管理,在資源負載變化時人為改變集群的大小。
綜上所述,本文系統(tǒng)以DRS動態(tài)資源調(diào)度模型[13]作為任務層,用以監(jiān)測實時延遲和負載變化等情況,并通過系統(tǒng)層溝通集群資源,通過增減資源實現(xiàn)對分布式流數(shù)據(jù)處理任務的動態(tài)資源調(diào)度管理和協(xié)同分配。
Storm on YARN是Yahoo開源的一個基于Storm的分布式集群資源管理框架[18],為基于Storm的分布式流數(shù)據(jù)處理任務提供了較完善的資源管理機制,其主要由ResourceManager、ApplicationManager、NodeManager、Container、Nimbus、Supervisor等部分組成。Storm on YARN系統(tǒng)結(jié)構如圖1所示。

圖1 Storm on YARN系統(tǒng)結(jié)構Fig.1 Structure of Storm on YARN system
Storm on YARN系統(tǒng)主要有以下優(yōu)點:
1)彈性計算資源。在將Storm運行在分布式集群[19]中時,Storm可以和其他的應用程序(例如flink流數(shù)據(jù)處理任務或Spark批處理[20]任務)共享集群資源,當Storm流數(shù)據(jù)處理任務負載較大時,可以動態(tài)地為其增加資源;當Strom流數(shù)據(jù)處理任務負載較小時,可以為其減少資源,并將資源給予其他需要的應用程序。
2)共享底層存儲。YARN可以為集群中運行的分布式框架提供共享的HDFS[21]存儲,減少管理多個集群帶來的不便以及傳輸數(shù)據(jù)帶來的時間延遲。
3)多版本共存。不同版本的Storm都可以運行在一個YARN上,降低了版本維護的成本。
當然,Storm on YARN本身還可以做出一些改進,目前的Storm on YARN依然需要人為地通過對分布式流數(shù)據(jù)處理任務負載的監(jiān)測,再借由人為操作對集群資源進行管理,這樣當集群較大時,無疑對管理集群來說是一個十分巨大的挑戰(zhàn),如何在多個流數(shù)據(jù)處理任務情況下根據(jù)當前任務的重要程度協(xié)同分配資源,是亟待解決的問題。本文立足于此設計解決方案。
DRS是一個分布式動態(tài)資源調(diào)度系統(tǒng),該系統(tǒng)基于Storm分布式流數(shù)據(jù)處理框架,通過排隊論對流數(shù)據(jù)處理的時間延遲進行處理和預測,從而對集群資源的分配做出合理分配的一個集群資源管理系統(tǒng)。DRS系統(tǒng)工作流程如圖2所示。

圖2 DRS系統(tǒng)工作流程Fig.2 Workflow of DRS system
DRS系統(tǒng)工作具體步驟如下:
步驟1系統(tǒng)收集預設時間窗口內(nèi)的負載信息,包括λ(source,m)、λ(i,in)、λ(i,out)、μi、si、ki、pji等。
步驟2數(shù)據(jù)擬合還原,將收集到的時間窗口內(nèi)的數(shù)據(jù)通過預設的數(shù)據(jù)擬合函數(shù)進行擬合,使之后對資源調(diào)度的決策更接近真實數(shù)據(jù)結(jié)果。
步驟3將擬合后的數(shù)據(jù)提交到資源調(diào)度器以供其做出正確的決策。
在DRS系統(tǒng)中,資源調(diào)度器對分布式流數(shù)據(jù)處理任務的負載情況界定包括3種狀態(tài),即合適(feasible)、過載(overproviding)和資源不足(shortage)。DRS根據(jù)資源調(diào)度器對負載情況做出的負載情況界定,并通過時間窗口內(nèi)通過數(shù)據(jù)擬合后得到的數(shù)據(jù),對分布式流數(shù)據(jù)處理任務后續(xù)需要分配的資源情況做出正確的決策,包括刪減資源或增加資源等。但DRS的不足之處在于,它只對固定集群大小的資源進行分配,無法控制集群本身做出關閉節(jié)點減少資源或打開節(jié)點提供資源等操作。
由于DRS資源調(diào)度器只能在固定給定物理資源的條件下對資源進行合理分配和調(diào)度,而在分布式流處理任務中,有時根據(jù)任務的負載情況需要減少集群資源或者增加集群資源,而這需要通過人為幫助才能做到,即系統(tǒng)層資源調(diào)度器無法通過任務層資源調(diào)度器的分配策略動態(tài)調(diào)整集群資源情況。
本文提出雙層資源調(diào)度模型,該模型結(jié)合了任務層資源調(diào)度器和系統(tǒng)層資源調(diào)度器,并在系統(tǒng)層通過Zookeeper與YARN建立連接,通過YARN對集群資源進行調(diào)整。
如圖3所示,雙層調(diào)度模型分為任務層資源調(diào)度器和系統(tǒng)層資源調(diào)度器,任務層資源調(diào)度器的作用是管理集群的所有工作任務,根據(jù)當前各個任務的負載情況,以最有效降低整體任務延遲為目的,制定出符合當前情境的資源分配策略。例如,在當前集群中,假設各節(jié)點的資源占用為(5,5,5),此時節(jié)點1的延遲高于預設值,而節(jié)點3的延遲低于預設值,則任務層資源調(diào)度器可能會將資源占用比設定為(6,5,4)。任務層資源調(diào)度器使用文獻[9]基于排隊論的動態(tài)資源調(diào)度模型,用于制定合理的資源分配策略。系統(tǒng)層資源調(diào)度器的作用為依賴Storm on YARN框架對集群資源的高級管理能力,根據(jù)中間件調(diào)度器給出的資源信息增加或減少資源。

圖3 雙層資源調(diào)度模型架構Fig.3 Architecture of two-tier resource scheduling model
此過程相對目前的Storm on YARN框架而言,能夠起到動態(tài)分配資源的作用,而不需要人為增減資源,并且由于根據(jù)中間件調(diào)度器的能力來控制資源總數(shù),對于當然資源冗余的節(jié)點,可以實時地關閉節(jié)點,不會占用集群資源,方便其他集群使用。
本文提出的雙層資源調(diào)度模型主要服務于分布式流數(shù)據(jù)處理任務過程,其對集群資源進行增減管理,并在多個任務中考慮到不同任務的重要性不同,根據(jù)任務權重協(xié)同地調(diào)配資源比例。該模型工作的具體步驟如下:
步驟1任務層資源調(diào)度器對分布式流數(shù)據(jù)處理任務,根據(jù)當前負載做出決策。
步驟2任務層資源調(diào)度器發(fā)出資源調(diào)度請求,由中間件調(diào)度器接收。
步驟3對于集群目前的資源情況,中間件調(diào)度器做出決策,當資源冗余時,跳至步驟4,當資源不足時,跳至步驟5,否則跳至步驟6。
步驟4當出現(xiàn)資源冗余時,中間件調(diào)度器與系統(tǒng)層資源調(diào)度器發(fā)起聯(lián)系,并按照資源使用情況,優(yōu)先關閉資源使用率較低的節(jié)點,然后通知中間件調(diào)度器,完成流程。
步驟5當出現(xiàn)資源不足時,如果通過打開集群中關閉節(jié)點可滿足需求時,則中間件資源調(diào)度器直接與系統(tǒng)層資源調(diào)度器溝通;否則,中間件調(diào)度器根據(jù)預設的任務權重,由計算出的多個任務的權重延遲比,對資源進行協(xié)同分配,再通過與系統(tǒng)層資源調(diào)度器溝通,分配資源。
步驟6中間件資源調(diào)度器檢測到目前資源為最優(yōu)狀態(tài),觸發(fā)流數(shù)據(jù)任務的資源調(diào)整。
基于雙層資源調(diào)度模型的工作流程,本文主要關注點在于中間件調(diào)度器在不需要協(xié)同分配資源時對集群節(jié)點的減增策略。

在算法1中,CallRebalance指Storm改變本身的資源分配平衡,之后中間件調(diào)度器察覺到分布式流處理任務所需資源發(fā)生變化,則與當前系統(tǒng)所有資源做對比,并通過與系統(tǒng)層資源調(diào)度器的會話聯(lián)系,做出相應的資源調(diào)整,增加或者刪減資源,最后中間件調(diào)度器察覺到當前資源處于可提供的最優(yōu)狀態(tài),觸發(fā)流處理任務的資源調(diào)整。具體步驟如下:
步驟1需要任務層資源調(diào)度器預先生成調(diào)度決策。生成資源調(diào)度決策包含信息監(jiān)測、診斷生成和決策生成3個部分。其中,系統(tǒng)運行時狀態(tài)收集是第一步,也是很重要的一步,只有收集到足夠多、足夠準確的系統(tǒng)狀態(tài),才能做出正確的資源調(diào)度決策。而其中對于數(shù)據(jù)堆積情況的監(jiān)測尤為重要,因為在實時系統(tǒng)中,穩(wěn)定運行狀態(tài)下一旦發(fā)生數(shù)據(jù)堆積,系統(tǒng)實時性會降低,延遲會增大。此時,必然是因為工作負載變大或者系統(tǒng)本身出現(xiàn)問題,應當迅速對此做出反饋,解決數(shù)據(jù)堆積的情況。例如,在判斷數(shù)據(jù)堆積的情況時,可以通過基于排隊論的方法[9]來實時地反映系統(tǒng)不斷變化的狀態(tài)。最后再利用決策算法[9],通過多次迭代來保證資源分配比例最優(yōu)。
步驟2在系統(tǒng)層資源調(diào)度器層面,將系統(tǒng)可利用的物理資源虛擬化為可分配、可量化的虛擬化資源,如容器節(jié)點,然后根據(jù)中間件調(diào)度器的指示進行動態(tài)地增加或者刪減資源,目的是自動對資源進行管理,減少人工介入。
在上述步驟中,當中間件調(diào)度器與系統(tǒng)層資源調(diào)度器會話時,如果當前集群可啟動的節(jié)點無法滿足所需資源,則會給中間件調(diào)度器返回false,之后中間件調(diào)度器會一直嘗試獲得資源,直到對資源需求改變,或者需求得到滿足。
在任務層資源調(diào)度器中,本文通過預設的時間窗口將分布式流處理任務的實際延遲保存到一個預設長度的隊列中,根據(jù)該隊列中的數(shù)據(jù)計算出預設的擬合函數(shù)的參數(shù),并將參數(shù)發(fā)送到中間件調(diào)度器。在中間件調(diào)度器中,利用DRS中的排隊論算法[9],根據(jù)流處理任務的目標節(jié)點數(shù)獲得改變資源后的預測延遲,再通過擬合函數(shù)擬合出更接近真實數(shù)據(jù)的值。
當多個任務在系統(tǒng)中并且集群無法滿足這些任務所需要的的資源時,需要權衡各個任務的重要性,從而協(xié)同地分配資源。通過將獲得的預測延遲(已擬合過)與預設的每個任務的權重做加權延遲和,并比較不同資源分配比例時哪個加權延遲和最小,將該結(jié)果為最佳資源協(xié)同分配方案。
在需要協(xié)調(diào)分配資源,即有多個流數(shù)據(jù)處理任務要同時管理(目前只支持兩個)時,將為不同的任務設定不同的權重來代表該任務的重要性,每個任務在任務層資源調(diào)度器中,依然通過3.2節(jié)中的調(diào)度決策策略來根據(jù)自身負載改變資源分配比例。由于此時有多個任務需要改變自身節(jié)點數(shù),因此中間件調(diào)度器會先向系統(tǒng)層查詢當前集群的資源數(shù)是否滿足任務層資源需求。例如,任務1的資源需求由(5,5,5)->(5,5,6),任務2的資源需求由(5,5,5)->(5,6,6),此時中間件調(diào)度器會向系統(tǒng)層查詢集群資源是否滿足,由于系統(tǒng)資源只能提供30個節(jié)點資源,因此任務1和任務2需要協(xié)同分配資源。中間件調(diào)度器根據(jù)任務層資源調(diào)度器的反饋,依據(jù)兩個任務的權重不同,通過窮舉的方式計算出各種分配方案下系統(tǒng)總延遲最佳的分配方案,從而將新的分配方案通知給系統(tǒng)層進行資源分配。
通過實驗驗證本文系統(tǒng)的資源動態(tài)調(diào)節(jié)功能和多任務協(xié)同分配功能是否運行正常。
在由虛擬機搭建的Hadoop集群上進行YARN資源調(diào)度相關實驗。通過對虛擬機配置的統(tǒng)一性,排除在實驗過程中由于機器配置如內(nèi)存、CPU、網(wǎng)絡等因素對實驗結(jié)果造成的不必要的影響。相關配置如表1所示。

表1 實驗配置Table 1 Configuration of experiment
本文基于YARN對Storm分布式流數(shù)據(jù)處理任務進行動態(tài)資源調(diào)度和協(xié)同分配。通過一個簡單的單詞統(tǒng)計(以下簡稱WordCount)任務來驗證系統(tǒng)功能是否正常,之所以只通過單詞統(tǒng)計來進行實驗,是為了方便在實驗過程中對任務負載通過句子發(fā)送頻率進行調(diào)控。
實驗中WordCount任務以一個包含100 000條句子的文件作為數(shù)據(jù)源,通過采用Redis做消息隊列,向Storm的spout以預設的速率發(fā)送句子。在Storm的完全延遲趨于穩(wěn)定后,通過改變消息隊列的發(fā)送速率來調(diào)整分布式流處理任務的負載情況,從而使系統(tǒng)對資源的需求發(fā)生改變。WordCount任務拓撲如圖4所示。可以看出,在WordCount中,當切分單元、統(tǒng)計單元和報告單元對當前到達的數(shù)據(jù)的速率(工作負載)λi超過各個單元本身算子對數(shù)據(jù)的處理速率μi乘以單元個數(shù)ki時,表明該單元的資源數(shù)需要調(diào)整。

圖4 WordCount任務拓撲Fig.4 WordCount task topology
在介紹關于分布式流處理任務的負載改變?nèi)绾慰刂坪螅O置如表2所示的一組實驗參數(shù)。其中,數(shù)據(jù)源的輸入速率λs=400 tuple/s,而此時設置的切分單元的處理能力為k切分單元×μ切分單元=2×150=300 tuple/s。若切分單元可以處理所有到達的句子,在本文實驗數(shù)據(jù)集中,每個句子平均可以切分為6個單詞,則下一單元到達率為6×400=2 400 tuple/s,此時統(tǒng)計單元的處理能力為k統(tǒng)計單元×μ統(tǒng)計單元=5×400=2 000 tuple/s<2 400 tuple/s。此外,報告單元的處理能力為k報告單元×μ報告單元=5×400=2 000 tuple/s,也小于報告單元數(shù)據(jù)的到達率。系統(tǒng)將根據(jù)當前的負載情況,判斷出此時資源處于shotage狀態(tài),從而重新做出新的資源分配決策,中間件調(diào)度器檢測到分布式任務的資源請求發(fā)生改變,則會在通過計算當前資源是否能滿足任務需求后,再決定是否向系統(tǒng)層資源調(diào)度器申請資源。

表2 實驗參數(shù)Table 2 Parameters of experiment
以上為檢測系統(tǒng)的資源動態(tài)調(diào)度功能,而對于系統(tǒng)多任務系統(tǒng)分配資源的功能,本文則通過設置2個相同的分布式流處理任務,在2個任務同時改變?nèi)蝿肇撦d的情況下,判斷當前的集群資源分配是否可協(xié)同分配,根據(jù)權重和預測延遲的改變做出更合理的分配決策。
驗證系統(tǒng)的功能是否工作正常,主要關注2個指標的變化:一個是在負載變化后,集群是否做出決策;另一個是在集群資源重新分配后,分布式流處理任務的延遲是否發(fā)生改變。實驗以關閉動態(tài)調(diào)度功能的系統(tǒng)作為對比,驗證系統(tǒng)功能。
如圖5所示,基于Storm on YARN的動態(tài)資源調(diào)度功能在最初消息隊列發(fā)送數(shù)據(jù)不穩(wěn)定,因此,本文過濾掉前1 min的latency數(shù)據(jù),之后在第3 min,任務層資源調(diào)度器做出了對資源分配的新的決策,即(2,5,5)->(3,6,6),此時集群資源足夠?qū)θ蝿辗峙渥龀鲰憫褐皇褂昧?臺共12個節(jié)點,所以,需要中間件調(diào)度器向系統(tǒng)層資源調(diào)度器申請資源,之后資源分配變?yōu)椋?,6,6)共15個節(jié)點,需要使用8臺機器,latency降到預設的范圍,并趨于穩(wěn)定。

圖5 WordCount任務資源動態(tài)調(diào)度過程Fig.5 Dynamic scheduling process of WordCount task resource
如圖6所示,在上一個實驗的基礎上,通過Strom提供的Rebalance機制,強制調(diào)整分布式流數(shù)據(jù)處理任務WordCount的節(jié)點分配為(1,4,4),可以看出,任務節(jié)點分配改變后,WordCount任務的延遲突增,這是因為任務資源無法滿足當前的數(shù)據(jù)輸入速率,此時任務層資源調(diào)度器會重新做出資源分配決策,中間件調(diào)度器察覺到任務層資源調(diào)度器做出決策后,則提供動態(tài)調(diào)度功能調(diào)整集群資源分配,之后任務延遲回到正常水平。

圖6 WorkCount任務資源調(diào)度的調(diào)整過程Fig.6 Adjustment process of WorkCount task resource scheduling
如圖7所示,設置WordCount_1的節(jié)點分配為(3,6,6),在有兩個相同分配的WordCount任務同時進入集群時,由于WordCount_1先進入集群,因此其占據(jù)了15個節(jié)點數(shù),此時另一個任務WordCount_2只能獲得節(jié)點分配為(1,2,2),設置2個任務的權重都為0.5,則在中間件調(diào)度器為任務2修改資源時,會根據(jù)2個任務的權重公平地分配資源,所以,此時2個節(jié)點的資源分配都是(2,4,4)。

圖7 多任務的資源協(xié)同分配Fig.7 Multi tasks resource collaborative allocation
以上3個實驗都驗證了本文系統(tǒng)具備動態(tài)調(diào)度系統(tǒng)資源的功能,在通過動態(tài)調(diào)度后,任務的延遲可以達到預設的延遲,提高流數(shù)據(jù)處理任務的處理效率。
相比于非動態(tài)調(diào)度,雙層資源調(diào)度模型可以分層準確地對系統(tǒng)進行信息監(jiān)測,診斷生成及生成決策,并通過中間件調(diào)度器的調(diào)度,將分配策略同步給系統(tǒng)層資源調(diào)度器,從而對系統(tǒng)可分配的虛擬化資源進行合理分配,降低系統(tǒng)延遲,這也是雙層調(diào)度框架相對于非動態(tài)調(diào)度可以提高系統(tǒng)資源利用率的原因。
本文提出一種基于YARN的分布式流數(shù)據(jù)處理任務資源的動態(tài)調(diào)度系統(tǒng)。相對于開源Storm on YARN框架,該系統(tǒng)基于雙層調(diào)度模型,可檢測流數(shù)據(jù)處理任務延遲,實現(xiàn)對集群負載的動態(tài)調(diào)度,從而實時調(diào)整集群大小,避免人工干預集群,降低集群維護成本。本文系統(tǒng)只能對2個任務進行協(xié)同分配,下一步將在協(xié)同分配多個任務資源方面對其進行改進,并設計只通過窮舉方式計算資源分配的方案。