趙 全,湯小春,朱紫鈺,毛安琪,李戰懷
(西北工業大學計算機學院,西安 710129)
近年來,數據中心上的數據分析作業常常是一些運行時間短的流處理作業,而且要求這些作業共享一套集群資源以便減少基礎設施的成本,例如,既存在一些結構化的數據查詢作業,也存在一些實時數據監控以及數據分析作業[1-5]。面對這些規模龐大并且延遲要求較低的流處理作業,一些流處理計算框架,如Flink[1]、Dremel[6]、Spark[7]等開始被大型互聯網企業使用。這類作業運行時,任務往往被分散到上千臺機器上運行,任務要么從磁盤獲得數據進行計算,要么使用存儲在內存的數據進行計算,作業的響應時間常常在秒級。例如,使用Spark計算一個數據大小為1 GB的查詢作業,運行時間僅僅1 s左右。當這樣的作業大量出現在一個數據中心并共享集群計算資源時,作業之間往往會出現相互的干涉或者對計算資源的競爭,導致部分作業被延遲執行或者出現執行失敗的情況[7]。本文希望建立一套新的集群資源調度框架,提供一種高效、共享的資源調度環境,提高整個集群的資源使用率,為各種大規模并行作業及時分配計算資源,保證每個作業能夠快速得到響應。
由短時間(毫秒級)任務組成的作業,其作業的調度過程具有較大的挑戰。在這些作業中,不但包括一些低延遲的流處理任務,也包括一些為了得到資源的公平分配和減少長時間滯留的任務,將長時間運行的批處理作業分解成大量的短時間運行的任務。例如,一些大規模的數據處理中,一個作業的執行流程通常被分解成一個有向無環圖(Directed Acyclic Graph,DAG)(https://tez.apache.org/)[8-10],DAG中的一個頂點代表一個獨立運行的操作,邊代表數據流向。在執行中,通過對數據進行分片,將操作和數據結合來形成一個一個可以獨立運行的計算任務。因此,大數據處理作業就變成了大量的短時間運行的并行任務。當作業中的每個任務的運行時間在幾百毫秒內,為了滿足低延遲的要求,集群資源調度的決策能力必須具有較大的吞吐量和較低的延遲。例如,假如一個集群系統通常包含上萬臺機器,每臺機器如果包含16核處理器的話,那么每一秒鐘調度100 ms的任務數量就可能達到百萬次的級別。另外,調度過程的延遲一定要低,對于一個100 ms的任務,如果調度延遲以及任務的等待時間超過執行時間的1/10,即數十毫秒,這些都是用戶所不能忍受的情況。最后,在客戶對于實際系統的使用中,面對大量的短時間交互任務,應用程序對于集群系統的可擴展性以及可靠性也是重點關注的問題。這些挑戰導致現有的批處理集群資源管理系統不再能夠滿足用戶的需求,而一些專用的流處理計算框架雖然滿足實時性要求,卻無法滿足集群計算資源在不同作業之間的共享。
針對大規模且高度并行的秒級計算任務,改進現有的集群資源調度框架,實現不同作業對集群資源的共享,滿足其低延遲和高吞吐量調度要求,存在一定的挑戰。現有的集群調度框架,如Mesos[11]、Yarn[12]、Slurm[13]、Mercury[14]、Omega[15]、Sparrow[16]、低延遲框架[17]、Quincy[18]等,都無法滿足大規模秒級任務的調度。對于這種種類繁多且任務并行度較大的計算作業,如果只使用單個調度器實例來分散任務的執行,這將是一件非常困難的事情。另外,為了滿足高可擴展性以及高可靠性,大規模任務的復制和恢復也要求在秒級內完成,這也使得單個調度實例成為系統的瓶頸。
本文考慮了現有集群資源管理系統的特點,在調度框架方面,取消了集中調度器或者邏輯全局資源狀態,讓每個計算節點具有自主資源管理和控制能力,實現調度器的多個實例,將調度器實例部署在不同的計算節點上。這種調度器實例分散的方式以及計算節點的自治特性,使得可擴展性以及可靠性得到大大的提高。當系統中在某個時間段出現大量的作業時,可以通過增加調度器實例來滿足高峰時段任務的調度需求;當某個調度器實例故障,可以快速啟動新的調度器實例來替代故障調度器實例,滿足用戶作業的調度要求。這種分散的多調度實例帶來了低延遲的好處的同時,也帶來了一些矛盾。與集中式的單個調度器比較,多個分散的調度器實例可能導致資源分配的矛盾[15-16,19-21],即不同的調度器實例將各自的任務分配到同一個CPU核上,造成多個任務對同一個資源進行競爭,導致任務的阻塞,從而延遲了任務的完成。
本文設計和實現了一個分布式集群資源調度框架DHRM(Distributed Heterogeneous Resource Management),它是一種無全局資源狀態、支持兩階段任務調度策略的集群資源調度框架。所謂兩階段任務調度,是指一個任務在調度過程中:首先由調度器向各個計算節點發送一個保持狀態的請求,一旦請求到達計算節點上的資源管理器,計算節點就發送自身的負載狀態信息到調度器;然后調度器進行決策后,向被選中的計算節點發送釋放消息,啟動任務在計算節點的執行,對于沒有被選中的計算節點,發送取消消息,從這些計算節點刪除保留狀態的任務。DHRM提供了以下三種主要的策略來實現不同類型作業對于集群計算資源的共享:
1)兩階段多路調度策略。對于并行的作業,直接使用兩階段任務調度可能會導致作業完成時間的延長。作業的完成時間與作業中最后一個任務結束時間直接相關,如果一個作業的最后一個任務長時間得不到執行,那么作業的完成時間就延長,必須等待最后一個任務完成,整個作業才能認為結束。而最后一個任務的長時間等待使得兩階段調度策略的效果不能滿足預期要求。兩階段多路調度可以同時為每個任務提供多個隨機選擇選項,使得作業中的每個任務可以同時選擇較好的資源來解決此問題。不像兩階段調度每次僅僅為作業中的一個任務提供候選資源,兩階段多路調度為一個作業中的n個任務同時至少提供d·n個計算節點(d>1)。經過論證分析,不像兩階段調度策略,當作業的并行度增加時,兩階段多路調度策略的調度延遲并不會增加。
2)執行隊列策略。對于集群節點上的CPU資源,可以劃分為多個執行隊列,每個隊列包含一定的CPU資源、內存資源、帶寬資源等。當集群計算節點上只存在CPU資源,如果采用傳統的調度算法,可能導致隊列頭的一個長時間運行的任務阻塞其他短時間運行的任務,造成短時間運行的任務延遲增大。一些調度方案采用強力搜索策略,遍歷每個可用資源和每個任務來匹配資源,缺點是調度的復雜度上升,調度開銷增大。通過設置多個執行隊列,可以保證CPU資源有空閑時,短時間運行的任務的快速調度。通過論證,這種方式并不會增加調度的開銷,不同種類任務混合時,調度延遲也并沒有明顯的增加。
3)資源協調策略。兩階段多路調度方法存在兩個性能瓶頸:第一,計算節點上的可以同時運行的任務數量不能很好地表示新到達的任務需要等待的時間;第二,由于資源狀態的變化以及網絡通信的延遲,多個并行的調度器實例可能會遇到資源競爭的情況。通過資源協調策略,某些等待時間過長的任務可能會被分散到其他等待時間較短的計算節點的隊列上。通過資源協調來解決資源分配沖突,實現計算資源的負載平衡。相比較單純的兩階段多路調度來說,可以大幅度減少作業的平均完成時間。
共享集群資源時,需要對每用戶資源使用量進行限制。DHRM通過計算節點上的多個隊列,來保證全局計算資源的分配目標,為不同的用戶設置不同的資源使用量限制。通過資源使用限制,來確保各個作業能夠共享集群計算資源,不會導致某些用戶由于缺乏資源而放棄對資源的共享使用。
本文將DHRM部署在包含16臺服務器的集群上。通過對事務處理性能委員會(Transaction Processing performance Council,TPC)給出的基準測試程序以及大規模的模擬作業進行調度,與理想狀態對比,DHRM的響應時間在10%以內,任務在隊列中的等待延遲平均在12 ms以內。對于大規模的集群,DHRM可為任務較短的任務提供較短的響應時間。通過設計調度模擬器,仿真結果表明,隨著集群規模增加到成千上萬CPU內核,DHRM將繼續表現良好。實驗結果表明,DHRM分布式調度是集中調度的一種可行的替代方案,可以實現大規模并行任務的低延遲調度。
對于集群資源管理系統,存在大量的文獻研究集中式調度框架,也存在一些系統使用分布式調度框架和混合式調度框架。例如Mesos及Yarn等系統,它們是集中式調度框架,無法滿足大規模并發短時間任務的低延遲調度。DHRM是一個分布式集群資源調度框架,能夠滿足大規模并發的短時間任務的低延遲調度要求。
Sparrow的研究目的是減少任務的尾部延遲,它建議使用對沖請求,其中客戶端將每個請求發送給兩個服務器,并在收到第一個結果時取消剩余的未完成請求。它還描述了綁定請求,客戶端將每個請求發送到兩個不同的服務器,但是服務器直接就請求狀態進行通信。當一臺服務器開始執行請求時,它將取消另一臺服務器。但是它無法估計不同的計算節點的負載狀態,也無法滿足集群計算節點上的負載平衡,造成資源饑餓,即一部分節點負載較重而另外一部分計算節點沒有計算任務的運行。
針對分布式系統中的負載共享機制,Sparrow使用隨機技術,DHRM采用仲裁。在負載共享系統中,對于每個任務的產生和任務的處理,缺省情況下,計算任務在產生的地方處理。當某個處理器上的負載超過其閾值限制,負載需要分散到其他處理器。分散過程要么采用接收者方式,即輕量級的計算節點隨機選擇一些處理任務,請求任務的轉送;要么采用發送者方式,重量級的計算節點隨機選擇一些計算節點,向這些計算節點發送請求,其完成的過程通過隨機采樣的方式實現。DHRM采用協調仲裁方式,輕量級的計算節點向協調器發送資源邀約,重量級的計算節點發送任務轉送請求,協調器匹配后通知雙方仲裁結果。
Quincy的目標是計算機集群上的任務調度,類似于Sparrow。Quincy將調度問題變成圖的最大流最小代價優化,目標是在集群資源在作業之間的公平共享、數據本地化。Quincy的圖調度支持更高等級的調度,但是,對于一個2 500臺機器的集群,它至少需要1 s的時間來計算任務的調度分配結果,因此,大規模集群環境下,Quincy無法滿足低延遲的調度要求。
其他許多調度框架旨在以粗粒度分配資源[22-23],這是因為任務往往需要長時間的運行,或者因為集群支持許多應用程序,每個應用程序都獲取一定數量的資源并執行自己的任務級調度,例如Mesos、Yarn、Omega等。這些調度為了實現復雜資源公平調度策略而犧牲了請求的粒度。作為結果,它們存在反饋延遲[24],在調度秒級任務時無法提供低延遲和高吞吐量。對于高性能計算環境下的調度,它們針對具有復雜約束的大型作業進行了優化,其調度目標是最大吞吐量,它以每秒能夠實現數十到數百個作業為調度目標,例如,Slurm。類似的系統還包括Condor[25],它支持一些復雜特征的作業流調度,使用豐富的約束語言、作業的檢查點以及群調度等,其匹配算法能夠達到的最大調度量是每秒數十到數百個作業。
本文的設計目標是為短時間任務提供低延遲調度框架,支持大規模并行任務的運行,滿足不同作業對集群計算資源的共享。
1)資源調度的低延遲。相對于批處理工作負載,低延遲的工作負載通常具有更復雜的技術要求。因為批處理負載會長時間使用資源,所以調度的頻率不高。而低延遲的負載,其任務的執行時間非常短,而且數量大,為了支持毫秒級別的任務調度,那么調度器就必須具有更快的調度頻率,支持每秒鐘百萬級別的任務調度規模。此外,調度框架向應用程序提供低延遲服務后,調度程序就必須承受任務的失效恢復。
2)資源的細粒度共享。支持多個不同的作業對集群計算資源的共享,但是相對于粗粒度的資源共享方式,細粒度資源共享允許不同作業的多個任務并發共享資源。DHRM提供細粒度的資源共享,允許多個任務對同一資源的并發訪問,它是對現有的粗粒度集群資源管理的一種補充。
3)降低調度反饋延遲。現有的批處理資源調度框架中,任務完成后,釋放資源,下一個任務才有可能獲得該資源,這樣會出現調度的反饋延遲,降低了資源的使用率。DHRM通過計算節點資源的細粒度共享,減少反饋延遲。
4)DHRM支持可擴展性和高可用性。通過可擴展性[26],提供更多的計算資源,降低任務的執行延遲;當存在大量的應用程序的低延遲流處理請求并超過可使用的資源總量時,DHRM提供嚴格的優先級別和帶權重的資源共享。DHRM還提供每作業的資源使用約束,來保證各個不同的作業都能及時獲得資源。當某個執行器發生故障時,可以通過快速啟動新的執行器的方式來保證任務的失效恢復功能。
1)計算節點。集群中的一個機器,其主要任務是運行各種計算任務。一個集群中包含大量的計算節點。如果一個計算節點上安排了大量的任務,超過其可以并行執行的最大值,那么一些任務就會在計算節點的隊列上排隊。
2)作業。一個作業中包含n個任務,每個任務都會被安排到計算節點上。
3)調度器實例。一個進程,運行在某個計算節點上,負責將作業的任務映射到某個計算節點上。在該分布式資源調度框架中,調度器實例是有多個的,其數量最多不超過集群中機器的數量,作業可以被任何一個調度實例處理。在某一時刻下,可能會有多個包含不同數量任務的作業向該分布式資源調度框架請求注冊。若是集中式調度器,則只有一個調度實例來處理大量作業的注冊請求并進行資源的調度,這樣會導致集中式調度器的調度壓力過大。但在該分布式調度框架中,有多個分布式調度器實例供作業選擇,作業可平均分配到各個調度器上進行注冊,或者尋找較為“閑”的調度器實例進行注冊。通過多個分布式調度實例可以并行地對大量作業進行調度,這大大降低了作業的調度延遲。但是多個分布式調度器實例對資源進行調度管理,有可能會出現多個調度器實例將同一塊資源分給不同作業的任務的情況,造成較多的任務爭搶相同的資源的情況,從而延長了作業的整體執行時間。該分布式調度框架采用兩階段的多路調度策略對不同作業的任務進行調度,關于兩階段的多路調度的詳細內容將在后續的章節詳細介紹。
4)作業響應時間。當一個作業的任務提交到調度器后,在其得到資源之前,這個時間被稱為調度時間;任務獲得資源后,在計算節點的隊列上排隊,當計算節點的并發執行數低于最大值時,任務實際開始執行,這段時間稱為等待時間;當任務實際開始執行到任務結束期間,被稱為執行時間。作業被提交到調度器,一直到最后一個任務完成,這個期間稱為作業響應時間。一個作業的延遲時間包括調度時間和等待時間的累計。如果采用零等待時間(相當于該作業中所有任務的最長執行時間)來調度作業的所有任務,可以得到一個理想作業響應時間。使用給定調度技術來調度作業的所有任務,可以得到一個具體的響應時間。理想作業響應時間和指定調度下的作業響應時間的差值叫延遲。
對于分布式調度框架來說,各個計算節點上需要有隊列。分布式調度器將任務提交到計算節點的隊列后,任務在隊列中處于等待狀態。當計算節點上正在運行的任務數量低于設置的運行數量限制或者說存在可用計算資源的話,隊列中等待狀態的任務就被調度執行。最簡單的調度方式是先進先出(First Input First Output,FIFO),也可以實施優先級別調度、短任務優先等策略。
對于CPU有關的資源分配,操作系統本身就支持按照時間片共享調度、或者按照優先級別搶占式調度等;因此,在各個計算節點上,DHRM允許用戶建立自己的隊列,隊列設置不同的優先級別。每個隊列上用戶可以設置自己的資源限制屬性。
執行隊列為CPU任務的專用隊列。執行隊列具有三類屬性:
1)資源量。CPU資源數量包括CPU核數、內存數量、IO帶寬以及網絡帶寬值。該限制值用來與要提交到隊列的任務的資源使用量限制作比較。要提交到隊列的任務中所設置的資源限制值若超過該值,任務的提交將被拒絕。
2)隊列優先級。表示隊列間的優先級。其決定調度最先運行哪個隊列中的任務。該值較大的隊列中的任務將被優先運行。若隊列間的優先級相同,則按照請求被提交到隊列的時間順序運行請求。
3)同時可運行的任務數。隊列內同時可運行的任務數。當前正在運行的任務數若達到該數值,下一個應該運行的任務將一直處于等待狀態,直到當前正在運行的某個任務運行結束后該任務才會被啟動。
DHRM的候選資源選擇策略是借鑒文獻[16]的sparrow方法,即兩種隨機選擇策略來實現獲得候選的計算節點,該策略使用無狀態隨機方法提供了較低的預期任務等待時間。但是,DHRM對兩種隨機選擇策略進行了改進:不同于sparrow的依據隊列長度來決策計算節點方式,DHRM會將任務放置在兩個隨機選擇的工作計算機中負載最小的一個上。與使用隨機選擇相比,以這種方式調度任務可顯著縮短預期的等待時間。
3.3.1 調度過程
本文將兩種隨機選擇策略直接應用于并行作業調度,其整個過程分為兩個階段:第一階段,調度程序為作業中的每個任務隨機選擇兩個計算節點的隊列,并向每個計算節點的隊列發送一個任務請求,任務請求中僅包含用戶的任務資源描述。每個計算節點上的隊列都會用當前隊列狀態回復調度程序。第二階段,調度程序收到全部的返回的消息后,根據隊列的狀態信息,選擇一個負載最輕的計算節點的隊列,即等待時間最短的隊列,將任務放在該隊列上。調度程序重復此過程,直到作業中的所有任務全部獲得計算資源。
其實現過程如圖1所示。調度器為任務1選擇兩個計算節點的隊列,然后發送hold(保留)消息(圖1中1a:hold及1b:hold)。根據返回消息,選擇其中一個最優隊列(圖1中的第二個隊列),然后調度器向選中的計算節點發送run(運行)消息(圖1中1b:run),使得任務在計算節點上變成等待狀態,對于另外一個計算節點的隊列,則發送del(刪除)消息(圖1中1a:del),取消任務的保留。一旦任務1調度完成,調度器開始任務2的調度,任務2的調度過程與任務1相同。該過程被稱為兩階段調度。

圖1 任務的兩階段調度過程Fig.1 Processof two-stage scheduling for tasks
3.3.2 隊列狀態的表示
在圖1所示的第一個階段中,計算節點的隊列向調度器返回隊列的狀態。最簡單的方式是返回隊列中執行任務的數量和處于等待中的任務的數量,調度器根據隊列的長度選擇一個最短的隊列作為任務提交的目標隊列。但是,在高負載集群環境下,這種方式的調度性能是比較差的。主要原因是因為隊列長度只能提供一種粗粒度的等待時間標準。例如,假如在兩個不同的計算節點上存在兩個隊列:第一個計算節點的隊列上存在兩個任務,執行時間之和為200 ms;而第二個計算節點的隊列上只有一個任務,執行時間為300 ms。如果任務按照隊列長度來決策調度節點,那么任務就會提交到后者,這將導致任務不必要的延遲。與選擇前者相比,延遲時間增加100 ms。因此,依據隊列長度來調度,效果較差,通過估計每個隊列中任務的執行時間來分配資源,即確定隊列的狀態,然后決策任務的分散節點,這將是一個較好的方案。
對于DHRM,將任務提交到某個節點的隊列后,需要經過等待和執行過程。首先給出各個隊列狀況的表示方式、任務的參數形式、等待時間的計算方法和決定一個任務投入到哪個節點的調度依據。
對于執行隊列,考慮CPU資源以及內存資源,暫時不考慮其他IO、帶寬等計算資源。調度器中作業jk的任務可表示為:

其中:k表示來自第k個作業,m表示內存需求量,c表示CPU使用時間,et表示作業估計執行時間,P表示優先級(由上面給出的算法得出)。
任務的等待時間,是指一個任務從提交到隊列上進入排隊狀態到其開始正式運行所經歷的時間。
現設任務投入第l個節點的第i個隊列,并設該任務需要的等待時間為wi,有如下三種情況:
當隊列中無排隊的任務時且隊列尚可以繼續提交任務,等待時間wi為:

當隊列中無排隊的任務時且不能繼續提交任務時,wi等于隊列中剩余時間最小的任務的剩余時間pti。

當隊列中運行著任務,并且存在n個排隊的任務,則等待時間等于當前隊列中剩余時間最小的任務的剩余時間pti與該隊列中來自n個作業中的所有排隊的任務的估計執行時間之和,即:

假如隊列i中的可用CPU核數為qi,內存大小為mi,新的任務提交到該隊列后,內存以及CPU核數滿足隊列限制條件時,作業需要等待的時間為wi/qi。對于每個任務中的CPU使用時間、內存需求量等參數,使用修正系數來進行調整,修正系數記為εi(該系數是通過對不同類型的作業進行測試并進行歸一化的結果)。
調度器收到各個隊列發出的hold消息后,選擇等待時間最短的隊列作為目標隊列。則最終任務的最小等待時間可以通過如下公式計算:

通過上述方式,可以獲取候選計算節點中最小的等待的時間。每次分布式調度器對各個作業的任務進行調度時,通過對候選計算節點計算任務的最小等待時間,以此為依據進行調度。
3.3.3 調度性能分析
與隨機調度相比,兩階段調度通過選擇等待時間最短的計算節點,提高了性能,但是相對于理想調度(即根據整個集群資源最新狀態,選擇一個空閑的CPU核,將任務提交到該資源,任務就可以立即執行,不需要等待計算資源)相比,其執行性能卻差得較多。直觀地講,對每個任務進行一次兩階段調度的問題在于,作業的響應時間取決于作業中任何一個任務的最長等待時間,這使得平均作業響應時間比平均任務響應時間長得多,特別是某些任務出現長時間等待的情況,即尾任務出現滯后的場合。本文模擬了由10 000個4核計算節點組成的集群中,每個任務使用兩階段調度和隨機分配,網絡通信代價為1 ms。如果作業按照均勻分布到達,并且每個作業包含100個任務。作業中任務的運行時間在指定范圍(平均100 ms)內隨機產生。在整個作業中,響應時間隨著負載的增加而增加,這是因為調度器串行為作業中的每個任務找到空閑計算資源的概率降低的結果。
兩階段調度過程中,為作業中的每個任務串行提供調度對性能有影響。例如,對于任務1,任意選擇兩個隊列,可能會出現兩個隊列都忙的情況,但是任務1必須從兩者之間選擇一個,所以無論怎么選擇,任務都必須等待計算資源。對于任務2,任意選擇兩個隊列后,可能兩個隊列都處于空閑狀態,但是任務2只能選擇其中一個,這就造成另外一個計算資源的饑餓。如果兩個作業同時選擇隊列,調度器可以根據返回的結構,將任務1和任務2都分散到任務2選擇的空閑隊列上,那么兩個任務都能夠獲得空閑資源,減少了等待,性能就會得到提高。為了解決該問題,DHRM使用兩階段多路調度。使用兩階段多路調度,調度器采用批量方式,一次對調度器上可以并發調度的所有任務進行一次資源分配。


假如每個任務可以選擇兩個隊列,即d=2,如果系統中所有隊列的個數q≤2*k,那么調度器向全部的隊列發送任務的hold請求。反之,調度器選擇2*k個隊列,向這些隊列發送hold請求。當所有hold請求的返回消息全部到達調度器后,調度器按照作業的截止時間為基準,采用貪心算法為作業的每個任務分配執行隊列。假設每個作業的提交時間為dt,作業中每個任務在隊列中等待時間為w,依據dt按照遞增順序排序,對于w也按照遞增順序排序,貪心調度(Greedy Schedulering,GS)算法如下所示。
算法1 貪心調度(GS)。
輸入:Q={q1,q2,…,qn},n=2*k,J={J1,J2,…,Jn};
輸出:任務J到Q上的一個映射。
1)sort(Q),sort(J) //遞增排序
2)for(JjinJ){
3) for(tiinJi){
4) for(qkinQ){
5) if(qk.m≥ti.m){
6)Q=Qqk
7)Ji=Ji i
8) }
9) }
10)}
11)}
GS算法中的第1)行,分別按照作業提交時間為作業排序,按照執行隊列上的等待時間為執行隊列排序。對于作業中的每個任務,從隊列中選擇滿足條件的執行隊列,將任務分配給執行隊列,如GS算法中的第3)~10)行。GS算法為每個作業分配完成后,如果還存在有未分配的任務,則繼續調用兩階段多路分配算法,為剩余作業中的任務繼續分配資源。
GS算法的目的在于盡量降低每個作業的完成時間,所以采用貪心算法保證每個作業的滯后任務盡量早一點分配到內存,能夠盡快完成作業的執行。但是,算法執行過程中,可能每次都有任務不能夠獲得資源,算法將這些任務與新的任務一起,在下一次兩階段多路調度過程中為這些任務重新分配執行隊列。
使用兩階段多路調度策略后,雖然在一定程度上提高了性能,但是還存在一個問題。例如,假如隨機選擇的執行隊列都是負載較重的隊列,而一部分沒有選擇的執行隊列卻處于空閑狀態,造成負載的不平衡。可以觀察到,從一個任務被分散到某個隊列開始,在任務實際開始執行的這個期間,整個集群節點上會出現一個更好的執行隊列。原因可能是預計開始時間的估計錯誤、資源的競爭或者計算節點失效等情況時,造成任務選擇的執行隊列不是最優的。因此需要考慮一些策略來減少這種情況的發生,例如動態負載平衡技術。在DHRM中,通過采用負載平衡技術來實現任務在不同隊列上的計算資源的平衡。
為實現負載平衡,需要指定一個計算節點為協調器,負載較輕的計算節點向協調器發送空閑隊列的資源狀態,負載較重的計算節點的隊列向協調器發送任務的資源請求。協調器收到資源狀態和資源請求后,通過匹配方式,為負載較重的計算節點上的任務匹配到需要的資源,實現負載在計算節點之間的平衡。其實現過程圖2所示。圖2中包含資源協調器(Negotiator)、計算節點1(machine1)和計算節點2(machine2)3個組成部分。計算節點1用來檢測需要轉送的任務,計算節點2用來確定負載較輕的隊列,協調器用來將計算節點1上的任務匹配到計算節點2上。

圖2 負載平衡過程Fig.2 Process of load balancing
在正常情況下,任務在各自的隊列中等待計算資源,一旦獲得計算資源就開始運行任務。當某個節點的隊列中任務的等待時間超過某個閾值hw,計算節點就為隊列中的任務啟動負載平衡分散過程。另外,當某個計算節點的隊列等待時間低于閾值lw時,計算節點向協調器發送資源狀態信息,協調器根據資源狀態和資源請求,從負載高的節點上轉移部分任務到負載低的節點上,實現計算節點的負載平衡。
負載平衡主要包含三個階段:第一階段,任務向資源協調器提交分散請求;第二階段,協調器經過調度而得到最終的目標節點,然后協調器將目標節點信息返回給原節點;第三階段,原節點和目標節點進行確認后,將任務分散到目標節點。
如圖2所示,計算節點1的一個任務發出分散請求(圖2中的過程①)。協調器接收到任務的請求后,將其放入隊列中,一旦計算節點2上的負載較輕,即等待執行的任務低于閾值,計算節點2向協調器發送資源狀態(圖2中的過程②),協調器根據最新的隊列等待時間矩陣,選擇一個最佳的目標節點,例如本例子中的計算節點2,然后將分配結果返回給計算節點1(圖2中的過程③)。計算節點1收到消息后,開始向計算節點2發送確認信息(圖2中的過程④),計算節點2收到信息后,根據本節點的狀態對任務請求進行檢查,如資源檢查、用戶權限檢查、隊列負載狀態檢查等,檢查結果通常會有如下兩種情況:
1)如果請求不被目標節點許可的原因不是由于節點忙,那么該請求就會被終止。
2)如果請求不被目標節點許可的原因是該節點忙,比如當前狀態下隊列上已經被運行著的任務占滿了,并且隊列上存在一些排隊的任務。那么該請求的標識符將被保存到計算節點2上(圖2中的過程⑤),請求在計算節點1上處于等待狀態。
當計算節點2可以執行該任務,而要求轉送保存在該節點上的備份任務時,就會發送轉送請求給計算節點1(圖2中的過程⑥)。此時,計算節點1就會將該任務由等待狀態變成排隊狀態,并設置較高的優先級別,同時運送任務請求的內容到目標節點(圖2中的過程⑥和⑦)。
任務請求轉送完成,計算節點1正式向計算節點2提交任務(圖2中的過程⑧)。此時新的任務的加入,如果導致計算節點的等待時間超過閾值,計算節點向協調器發送取消資源狀態消息,不再接受負載平衡任務。
在調度策略方面,DHRM只支持兩種調度策略。本章介紹對兩種流行的調度程序策略的支持:第一,任務執行時的數據本地化訪問,DHRM在進行任務調度的時候,為了提高任務的響應速度,通常需要盡量保證執行任務的機器就是數據所在機器。調度器可以對兩階段多路調度的貪心調度算法添加本地化約束,從而最大限度地保證數據本地化;第二,不同用戶資源共享時的隔離問題。DHRM旨在實現不同類型的作業共享同一套集群資源,因此,在集群中大部分的時間內,會有不同類型的作業以及任務運行在集群中,DHRM的調度器需要保證作業之間運行的時候不能互相干擾,這點調度器通過為每個作業啟動若干個容器,作業的任務都會在容器中運行,從邏輯上隔離開了資源的使用,確保作業之間不會互相影響。
DHRM支持作業級別以及任務級別的約束。這些約束在大數據處理中經常出現,當任務運行時,數據的訪問位置與數據計算位置最好在同一個計算節點上。輸入數據與任務計算在同一個計算節點時,因為不需要通過網絡傳輸輸入數據,通常會減少任務的響應時間。對于每個任務的本地化訪問約束,每個任務都可能具有一組計算節點,任務按照本地化要求進行計算,DHRM無法使用兩階段多路調度來匯總每個作業中所有任務的本地化信息。相反,DHRM使用兩階段調度時,可以從該任務的本地化約束中找到待選的目標計算節點,這樣通過限制每個任務發送hold消息的目標計算節點數量,使得每個任務在執行時才會為其選擇執行的計算節點,從而實現任務與數據的后期綁定。雖然DHRM無法對作業中的每個任務實現數據本地化約束,但是在貪心算法中添加本地化約束策略,可以最大限度地支持任務的數據本地化約束。為了在貪心算法中添加本地化約束,可以將GS算法第5)行的條件判斷略作豐富,即除了判斷隊列資源是否滿足任務需求的同時,需要判斷該隊列所在計算節點是否存在于任務本地化訪問約束集合中。若存在,則可以將其添加到任務到隊列的映射中;若不存在,可以使用原有的方法為其找到一個合適的隊列。
當總的資源需求超過集群計算節點的容量時,集群調度程序將根據特定策略分配資源。DHRM支持嚴格優先級資源分配。許多集群共享策略簡化為使用嚴格的優先級,DHRM通過在工作節點上維護多個隊列來支持所有此類策略,依據任務的優先級別和隊列的優先級別,實現作業的優先調度,減少作業中尾任務的滯留。從常用的FIFO、最早截止日期優先和最短作業優先轉變到為每個作業分配優先級,然后優先運行優先級最高的作業。例如,以最早的截止時間為最高優先級別時,將截止時間較早的工作分配給更高的優先級。集群資源管理系統也可能希望直接分配優先級,例如,將生產作業設置高優先級別,批處理作業設置較低的優先級別。為支持這些策略,DHRM為計算節點的每個隊列維護一個優先級別。當資源變為空閑時,DHRM從優先級最高的非空隊列中取出任務并執行。當高優先級任務到達那些正在運行低優先級任務的計算機時,DHRM也支持搶占。DHRM的設計思想是提高每個作業的完成時間,不出現某個作業中一個或者幾個任務的嚴重滯后的任務,因此DHRM暫時不支持計算資源在各個作業之間的公平訪問,這也是將來得探索的問題。
本文中的DHRM采用了網絡隊列系統(Network Queue System,NQS)[27-28]和Mesos資源調度框架。NQS在各個計算節點安裝,本文為NQS添加了一個ResourceTracker服務,用來獲得各個執行隊列中任務的最新狀態。“Qsub-hold”命令作為第一階段的消息,將原來NQS中返回結果進行了變更,即由單純的請求編號改變為編號和任務等待時間兩個部分;分布式調度器由mesos改造而來,借鑒了mesos中“推送”的資源管理模式,作業注冊到mesos調度框架后,mesos根據作業中的任務描述,向各個集群節點請求資源;獲得資源后,將資源打包成ResourceOffer,然后推送給Spark計算框架,由Spark的調度器經過決策后,選擇需要的資源,啟動執行任務需要的Executor。最后由Executor完成任務的執行。
主要的改進有:1)對于作業框架,重新封裝了Spark的調度器,以便適應細粒度的任務調度模式。2)資源管理方面,改進了Mesos集群資源管理框架,計算資源的獲得不再通過主導資源分配公平(Dominant Resource Fairness,DRF)[29]方式分配。3)在計算節點改進NQS系統后,DHRM支持兩階段多路調度中執行隊列的等待時間的計算以及負載平衡時的空閑隊列的資源狀態的收集。
圖3給出了DHRM任務調度以及負載平衡的過程。任務調度是一個循環過程;負載平衡是動態觸發的循環的過程,通常是在集群負載較高的時候才會觸發。任務調度中,一旦計算框架的調度器(Spark Scheduler)向集群資源管理系統的一個 調 度 器 實 例(Distributed Scheduler)注 冊,Distributed Scheduler就選擇2倍的任務數量的候選隊列,啟動qsub-hold命令,等待全部返回執行隊列狀態(queue status)后,調度器從返回的結果中,選擇最優的執行隊列,然后向Spark Scheduler發送資源邀約(Resource Offer)命令。當Distributed Scheduler收到Spark Scheduler的返回的任務集合(taskset)后,使用Qrls命令啟動不同計算節點上的執行器(Executor),最后Executor從計算框架獲得任務并運行。負載平衡中,當一個隊列系統中的任務等待時間超過閾值,就向協調器(Negotiator)發送資源請求(request),協調器選擇負載較輕的目標隊列返回給資源請求者(resource),然后資源請求者向目標隊列轉送任務(Task Migration)。

圖3 DHRM順序圖Fig.3 Sequence diagram of DHRM
系統在一個集群上進行實驗,集群中包含16臺服務器,其中15臺服務器(浪潮NF5468M5服務器)作為計算節點,1臺中科曙光服務器620/420,作為協調器。每個服務器節點包含2顆Xeon2.1處理器,每個處理器包含8個核,32 GBDDR4內存。集群上包含1臺AS2150G2磁盤陣列。服務器操作系統為Ubuntu 7.5.0,采用C++11作為編程語言,Mesos的基礎版本為1.8,Spark的基礎版本為2.4.3。這些計算機被組織在4個機架內,每個機架包含4臺計算機。機架內的計算機通過機架連接,各個機架交換機通過級聯方式與匯聚交換機連接。
測試時使用了5個分布式調度器,首先,本文使用DHRM為TPC-H工作負載調度任務,其負載具有分析查詢功能。首先在理想的調度程序的基礎上比較作業的響應時間。本文提供了DHRM細粒度資源共享時的開銷并量化了其性能。同時,為了比較本文中所述任務調度算法與已有的調度算法的優劣,于是在同等條件下測試了sparrow的任務調度算法包括批采樣以及每任務采樣等技術所造成的延遲,并與DHRM進行了比較,分析兩者之間的差距。其次,本文展示了DHRM在調度延遲以及等待時間方面的對比。最后,為了體現大規模集群環境下,大量任務的調度延遲,本文設計了一個基于Spark調度器的模擬程序和一個模擬的大規模集群環境,通過模擬程序在模擬集群環境的調度,分析了調度延遲的特點。
本文使用運行在Spark的TPC-H(www.tpc.org/tpch/)查詢基準來測試DHRM的調度性能。TPC-H基準代表對事務數據的查詢,這是低延遲數據并行框架的常見用例。每個TPCH查詢在Spark下運行,Spark將查詢轉換成DAG,按照不同的階段來調度執行。查詢的響應時間是各個階段響應時間之和。5個不同的用戶啟動多個TPC-H查詢負載,查詢負載隨機排列,并且在大約15 min的時間內維持集群負載在80%左右。本文針對實驗中200 s的數據進行了分析,在200 s中,DHRM調度了超過4 000個作業,這些作業構成了1 200個TPC-H查詢。每個用戶都在TPC-H數據集的副本上運行查詢,數據集的大小大約為2 GB,并分為30個分區,每個分區的副本數設置為3。
為了比較DHRM的性能,本文假設任務在計算節點上沒有等待時間,就像粗粒度資源分配方式一樣,這個響應時間稱為理想時間。在計算某個查詢的理想響應時間時,本文分析了一次查詢的DAG過程中的每個階段,去掉隊列等待時間后,將其他階段的時間相加,最后得到查詢的理想響應時間。由于DHRM在計算理想響應時間時,采用了任務的執行時間,所以理想響應時間也包括每個任務的本地化約束。理想的響應時間不包括將任務發送到計算節點所需的時間,也不包括在利用率突發期間不可避免的排隊等待時間,所以,理想響應時間是調度器可以實現的響應時間的下限。
圖4給出的數據可以看出,與隨機調度、兩階段調度相比,DHRM的兩階段多路調度最優,基本上不超過理想調度時間的13%。與隨機調度相比,兩階段多路調度只是隨機調度平均響應時間的25%~33%,95%分位數的響應時間最多只是隨機調度時間的20%。與兩階段調度相比,兩階段多路調度的響應時間是兩階段調度響應時間的80%,95%分位數的響應時間也只達到兩階段調度的60%。與兩階段多路調度相比,負載平衡策略的使用,平均響應時間減少了14%。DHRM還提供了良好的絕對性能:其中值響應時間僅比理想調度程序所提供的響應時間高12%。

圖4 DHRM不同調度算法的響應時間對比Fig.4 Comparison of response time of different scheduling algorithms in DHRM
除了在DHRM內測試不同調度算法的性能,本文還比較了DHRM與sparrow的批采樣調度算法以及每任務采樣調度算法。結果如圖5所示。從圖5中的結果可以看出,DHRM兩階段多路調度算法加負載均衡技術和sparrow的批采樣算法最優,其中sparrow的批采樣調度算法中值響應時間僅比理想的影響時間高出13%左右,和DHRM的兩階段多路調度算法性能大致相當,這表明本文的兩階段多路調度算法具有較為良好的性能。但是兩階段多路調度若是不使用負載平衡技術,則性能要低于sparrow的批采樣技術。雖然sparrow的批采樣技術性能較佳,但是sparrow的每任務采樣技術性能比其他調度技術要差,其比理想調度延遲高出大約40%。兩階段多路技術即使不使用負載平衡技術在性能上也略高于每任務采樣技術。從圖5的結果可以看出,DHRM的調度技術在性能上不輸于sparrow調度框架,而sparrow也是應用于大規模低延遲任務的調度框架。

圖5 DHRM和sparrow不同調度算法的響應時間對比Fig.5 Comparison of response time of different scheduling algorithms in DHRM and sparrow
為了理解DHRM相對于理想調度程序增加的延遲的組成部分,本文將作業的響應時間分解為調度時間、隊列中等待時間以及任務執行時間3個單獨的時間。圖6中給出了不同時間中任務的累計概率函數圖。圖中的每一根曲線代表一個任務數量與時間的變化曲線。
從圖6的曲線看出,60%的任務其調度延遲時間為5 ms左右,40%的作業其調度延遲為12 ms;而任務在隊列中的等待時間較長,75%的任務在隊列中的等待時間為50 ms,剩余25%的任務的等待時間大約為100 ms。任務的執行時間在350 ms以內,其中40%的任務執行時間在150 ms內,超過250 ms的任務只占有15%左右。從圖6可以看出,分布式集群調度框架確實能夠減少調度延遲。

圖6 任務延遲統計Fig.6 Task delay statistics
為了驗證DHRM的資源調度框架在大規模集群環境下的調度延遲,本文設計了一套資源調度框架性能模擬器工具,DHRM_Simulator,用于對DHRM的資源調度功能進行測試。它可以在多臺機器上模擬大規模集群,并根據歷史日志分析提取出應用程序負載信息,模擬完整的資源分配、任務調度和資源回收過程。通過使用8臺服務器來模擬大規模的集群環境。實驗過程中每臺機器均為NF5468M5服務器,包含2顆Xeon2.1處理器,每個處理器包含8個核,32 GBDDR4內存。
6.3.1 模擬機器數量的設置
為了測試大規模集群計算資源的調度性能,使用1臺機器模擬協調器,其余7臺機器模擬計算節點。每臺工作節點機器設置50個隊列,每個隊列模擬一臺物理機器,稱為邏輯機器。隊列的同時運行的槽數(slot)設置為8,代表一個邏輯機器上包含8個模擬的CPU核。這樣的話,1臺物理機器就可以模擬50臺邏輯機器,7臺物理機器模擬350臺邏輯機器,每臺模擬的邏輯機器上模擬8個CPU核,那么整個模擬系統可以使用的CPU計算資源為2 800個模擬的CPU核。
6.3.2 作業的定義
采用類似Spark調度器,本文實現了一個模擬作業框架。每個作業框架上包含數量不等的任務。任務的執行時間由sleep代替。作業提交后,按照task任務數量,隨機選擇2倍的task任務數量的隊列(一個隊列代表一個模擬機器),獲得隊列中的剩余的可以使用的slot數,選擇剩余slot最大的隊列作為選擇的資源,使用qsub啟動一個Spark Executor類似的進程,進程內的作業執行類似sleep執行,不設置具體的數據執行任務。任務的執行時間由sleep長短來決定,即等待時間長短,如200,代表任務執行時間是200 ms。
向模擬器連續啟動7×20個作業(即7個分布式調度器實例),即每個分布式調度器實例啟動20個作業,每個作業按照160個任務設置。作業按照一定的時間間隔向各自的分布式調度器注冊并請求資源執行。集中式調度器的測試則是將7×20個作業同時向該集中式調度器注冊啟動。
6.3.3 響應時間
為了測試分布式調度框架的低延遲,使用集中方式與分布方式進行對比。集中式調度器采用類似mesos的方式實現。測試中,分別設置任務的sleep時間為100 ms、200 ms、1 000 ms三種不同的任務執行時間,用來驗證作業響應時間與調度器的調度延遲之間的關系。圖7給出了三種情況下的模擬測試結果。

圖7 模擬環境下集中式和分布式調度的響應時間對比Fig.7 Response time comparison of centralized and distributed scheduling in simulated environment
從圖7的結果看,任務運行時間在1 000 ms時,分布式調度和集中調度方式下,作業的響應時間基本相同。但是當任務的執行時間是200 ms時,分布式調度的平均響應時間是集中式調度方式下的一半左右。而任務執行更短,即100 ms時,分布式調度器和集中式調度器的差別就更大,幾乎是10倍左右的差別。因為,分布式調度器能夠減少調度的延遲時間,為大規模并行任務的執行提高了效率。因此,相較于mesos等集中式資源調度器,DHRM的分布式調度器能夠對大批量的短任務有更好的調度效率,使大批量的短任務具有較低的響應時間。
大規模運行時間較短的任務越來越多,呈現出并行度越來越高的趨勢,這種作業的調度普遍受到重視。數據中心上運行的這類任務對延遲非常敏感。另外,隨著集群計算節點規模的擴大,調度延遲是影響作業吞吐量和性能的主要瓶頸。但是,傳統的集群資源調度框架在低延遲方面存在一定的缺陷,本文通過兩階段多路調度以及負載平衡技術,解決了現有調度框架中延遲較高和負載不均衡的問題,通過TPC-H基準測試以及大規模集群下的模擬測試,調度框架能夠保證短時間任務的低延遲要求。但是,本文算法在數據本地化訪問以及資源分配的公平性方面有待進一步的提高。