權躍龍,廖湖聲,高紅雨
(1.北京工業大學 計算機學院,北京100124;2.北京工業大學 軟件學院,北京100124)
利用多核環境提供的并行計算能力和多線程技術提高XQuery語言處理性能是可擴展標記語言 (extensible markup language,XML)數據查詢領域的研究熱點。任務調度作為XQuery并行實現過程中的一個重要環節,調度策略的選擇是影響系統性能的關鍵因素。針對XQuery函數式語言的特點,充分利用XQuery內在并行性實現任務調度是提高系統整體處理效率的重要途徑。研究人員針對并行系統中存在的不同并行方式提出很多調度方法,如基于任務并行的多核環境平臺的調度[1];基于線性規劃思想,通過將任務并行與數據并行相結合的方式提高系統性能[2];針對多核或集群環境下的任務調度問題,采用任務并行與流水線并行相結合的方式實現任務調度[3-6]。然而,這些算法大多針對并行系統中存在單一或兩種并行方式的情況,不能適應XQuery查詢請求中3種并行方式同時存在的場景。這些調度算法也不能根據系統負載狀態動態決定任務的并行執行策略,滿足XQuery并行實現中動態調度需求。
為了更好地解決XQuery并行實現中的任務調度問題,本文從XQuery語言內在并行性入手,提出了一種以任務節點出度和任務執行代價為優先級,結合并行方式選擇策略和負載均衡機制的調度算法,達到了提高XQuery并行處理效率的目的。針對XQuery語言中的流水線并行執行方式,建立了一種流水線局部并行自動機模型,利用流水線節拍之間的空閑等待時間實現系統資源的有效利用。
作為一種函數式語言,XQuery程序描述的XML 數據查詢請求表現為多個表達式與函數調用的求值。對于相互之間沒有數據依賴關系的表達式,其計算次序的改變不會對查詢結果產生影響,可以按照任務并行方式進行求值;同時,在XQuery語言描述的查詢中,許多計算表達式將作用于數據集的每個元素,具有數據并行特點;而XQuery以序列作為數據模型,使得函數表達式之間又具有流水線方式工作的潛力[7]。
例如對于圖1 (a)中的XQuery查詢程序,該程序中前兩個let子句描述了來自不同XML 文檔的查詢,相互之間沒有數據依賴關系,因此可以作為2個獨立的任務,采用任務并行的方式處理;FLWOR 語句作為XQuery語言的核心表達式,用于對其輸入數據序列中的每一個元素進行同樣的操作處理,而通常其輸入數據序列的數據量很大,可以將輸入序列劃分為多個子序列,利用數據并行的方式處理,對于圖1 (a)中第2-4行和第5-8行所示的表達式均可采用數據并行方式處理。此外,一些不同子句之間存在著生產者和消費者關系,前一表達式的輸出序列是后一表達式的輸入序列,例如圖1 (a)中第2 行的for子句從bookstore.xml文檔獲取元素book,并將結果交給第3行的篩選表達式進行逐個處理。對于這種存在生產者-消費者關系的2個查詢任務,可以采用流水線并行的方式進行處理。圖1 (b)給出了該查詢程序的執行計劃。

圖1 XQuery語言的內在并行性
圖1中可以看出XQuery語言存在任務并行、數據并行和流水線并行的內在特性。任務并行和數據并行是兩種重要的空間并行技術,對于提高系統的吞吐率具有重要作用。而流水線并行作為一種時間并行技術,能夠有效地縮短程序的整體執行時間。充分利用XQuery函數式語言內在并行性,將空間并行技術和時間并行技術相結合提高XQuery語言的查詢效率是本文研究的主要動機。
本節介紹XQuery語言的并行實現框架,并在此基礎上給出本文調度算法的任務圖模型和算法的設計思想。
圖2 給出了XQuery 語言的并行實現框架。在該XQuery并行查詢系統中,一個XQuery查詢程序經過如下執行過程得到最終查詢結果:首先,詞法語法分析器對XQuery源程序進行詞法分析語法分析,生成XQuery語法樹。然后,XQuery語法樹經過任務生成器,生成用來描述XQuery查詢計劃的任務圖,任務圖中標示出各個任務是否適合流水線并行、數據并行等并行方式以及各個任務節點之間的數據依賴關系。最后,由并行處理引擎結合任務圖和對應的XML文檔進行計算求值,獲得查詢結果。其中,并行處理引擎包括調度模塊和執行模塊兩部分:調度模塊負責對任務圖中當前就緒任務按照調度算法進行調度處理;執行模塊負責將調度模塊所提交的候選任務進行查詢處理。

圖2 XQuery語言的并行實現框架
一個XQuery程序的任務圖由一組既有前后數據傳遞約束關系,又有并行關系的多個任務組成,可以通過有向無環圖 (directed acyclic graph,DAG)來表示,圖3 給出了圖1 (a)中XQuery查詢程序的DAG 表示形式。XQuery查詢的任務圖可以表示為一個二元組,DAG= (V,E),這里V= {v1,v2,…,vi,…}表示圖中任務節點v 的集合,一個任務節點代表一個可獨立運算的任務。vi表示第i個任務。E= {e1,e2,…,ej,…}表示圖中有向邊e 的集合,e在任務圖中表示任務之間的數據依賴關系,弧的方向代表了數據流的方向。ej表示第j 條邊。由于算法設計僅考慮共享內存環境,這里忽略任務之間的通信開銷。一個XQuery查詢程序的任務圖可以有多個葉子節點,作為初始就緒任務,但僅有一個根節點,表示查詢任務的結束。根據vi描述的查詢任務的特征,vi可以有多種并行處理方式。任務節點的定義為

其中,expr是描述該查詢任務的表達式;isPipeline和is-DataParallel分別表示能否支持流水線并行和數據并行處理;parallelType表示該任務在調度時選擇的執行方式,parallelType取值1、2、3 分別表示任務并行、數據并行和流水線并行;cost是該任務的執行代價;arcIn、arcOut都是有向弧集合,分別表示連接當前任務的前驅后繼。pipePred、pipeSucc分別指向流水線中前驅任務和后繼任務。由于表達式可能存在嵌套的情況,subGraph 用來表示當前任務包含的子任務。例如圖3中的v8節點代表的任務為Task= (child (book),true,true,2,0,{e9},{e7},null,{v7},null)。因為任務并行是并行調度的基礎,在圖3中所有的節點均適合任務并行方式。其中,v1、v2、v4、v5、v7、v8適合數據并行,v5、v7、v8和v1、v2適合流水線并行。

圖3 DAG 任務圖模型
在XQuery并行實現中,針對DAG 中各個節點之間的數據依賴關系在任務調度開始時已知和任務執行代價在任務就緒時才能計算的特點,采用任務節點出度和執行代價為調度算法中優先級的兩個參數。將DAG 中入度為0的任務作為就緒任務存入就緒任務隊列,按照依賴度從大到小排列,依賴度相同的按照執行代價從大到小排列。在調度過程中,依次對就緒隊列中的任務確定其并行方式,存入候選任務隊列。選擇任務節點出度和執行代價為優先級能夠降低代價模型[7]帶來的優先級誤差,并能很好的適用于DAG 中任務節點少的場景,保證系統的吞吐率。
在并行方式選擇過程中,如果任務適合數據并行時,根據任務執行代價與就緒任務的平均執行代價的比值確定將要處理的數據劃分份數,以保證每一份數據的處理時間基本一致,達到負載均衡的目的;如果任務適合流水線并行時,根據系統當前可用工作線程數量和任務所在流水線長度,確定該流水線任務所在流水線的任務數量。之后將當前流水線任務存入候選任務隊列,而流水線中其它數據未就緒的流水線任務存入到阻塞任務隊列,執行時通過流水線局部并行自動機模型完成任務的查詢處理;對于即適合數據并行又適合流水線并行的任務根據該任務在數據并行和流水線并行兩種方式下的可并行任務數量進行比較,選擇可并行任務數量較大的并行方式進行處理。
由于任務圖中各個任務的執行代價在任務就緒時才能獲取,而各個工作線程的負載不均衡會對整體執行時間帶來影響。在任務調度之后,對負載較小的工作線程追加適當粒度的就緒任務使得各個工作線程的負載基本一致:從未被調度的就緒任務中追加新的任務到候選任務隊列中,直到當前工作線程的負載大于所有工作線程的平均負載為止。
本節將給出本文調度算法的描述、任務并行方式選擇算法以及針對流水線并行優化問題建立的流水線局部并行自動機模型。
針對2.2節的任務圖模型,本文調度算法的處理過程如下:
(1)從DAG 中找出所有入度為0的任務,作為就緒任務存入就緒任務隊列RTL。
(2)按照任務代價模型計算所有就緒任務的執行代價T.cost及其平均執行代價avgCost。
(3)將RTL中的就緒任務按照出度從大到小排列,出度相等的按照執行代價從大到小排列。
(4)當可用工作線程數量num>0且RTL 中有未被調度的任務時,從RTL 中依次選擇任務,調用P-S算法進行并行方式選擇處理,并將處理后的任務移動到候選任務隊列STL,其中流水線阻塞任務存入候選阻塞任務隊列SBTL 中,更新可用工作線程數量num。重復該步驟直到num=0或RTL 為空。
(5)當可用線程數量num=0且RTL 中存在未被調度的任務時,對候選任務隊列STL 中的任務進行負載均衡處理。從RTL 中向負載小于avgCost 的工作線程追加適當粒度的任務到STL。
(6)將STL、SBTL 分別提交給執行模塊的執行任務隊列TQ、阻塞流水線隊列BTL。
其中,第(4)步處理過程中確定各個任務并行方式時的選擇策略參見3.2節中的并行方式選擇算法P-S。在執行模塊完成本次調度過程中所有任務的查詢處理后,判定DAG中所有的任務是否均已執行完畢,如果是,則返回XQuery的查詢處理結果,否則更新DAG 中各個任務節點的入度信息。重新調用第(1)步,完成下一次的調度處理。
在任務調度時需要確定每個候選任務的并行方式,本節給出任務T 并行方式的選擇算法。其中,num 代表當前可用工作線程個數;T.cost代表任務T 的執行代價;avg-Cost代表就緒任務的平均執行代價;T.pipeLength 代表任務T 所在流水線的長度。其中,不同并行方式的最大并行度計算公式如下

并行方式選擇算法P-S:
輸入:任務T、就緒任務平均執行代價avgCost、可用工作線程數量num、候選任務隊列STL、候選阻塞任務隊列SBTL
輸出:候選任務隊列STL、候選阻塞任務隊列SBTL
算法:


對于流水線并行任務,由于流水線上各個任務處理數據的速度的不一致性,容易造成一些流水線任務處于空閑等待狀態。為了減少各流水線任務的空閑等待時間,充分利用計算資源,本文建立了一種流水線局部并行自動機模型,用于描述一個流水線任務的工作狀態。如圖4 所示,各流水線任務根據要處理數據是否到達,將流水線任務的狀態分為就緒、阻塞、執行和完成4種狀態。執行模塊剝奪處于阻塞狀態的流水線任務所占用的計算資源,以滿足其它任務的執行。待阻塞狀態的流水線任務要處理的數據就緒時,再將該任務存入待執行任務隊列TQ 中,等待被其它工作線程獲取執行。其中,就緒、阻塞和執行狀態的任務分別處于系統中的TQ、BTL 和工作線程中。

圖4 流水線局部并行自動機模型
對于每一個流水線任務,均對應一個輸入數據緩沖區inBuffer和輸出數據緩沖區outBuffer。每個流水線任務均從inBuffer獲取數據進行查詢處理,并將處理后的結果存入outBuffer 中。對于流水線中非起始節拍任務而言,每一個流水線任務的輸入緩沖區均為前一節拍的輸出緩沖區。只有流水線任務對應的inBuffer中有數據時,該流水線任務才能進行執行。否則該任務處于數據未就緒狀態,即阻塞狀態。當流水線任務在執行時,如果輸入緩沖區數據尚未到達,則工作線程將當前流水線任務保存到執行模塊的BTL 中。當前工作線程從TQ 中重新獲取其它待執行的任務進行查詢執行。當流水線中非結束節拍的任務產生輸出數據時,判斷其后續任務是否處于阻塞狀態,如果是,則將后續任務從BTL 移入TQ 中,等待被工作線程執行。這樣一條包含n個流水線任務的流水線,可以看作n-1個流水線局部并行自動機。流水線任務的執行,可以理解為n-1個并行自動機的多次迭代,直到流水線任務執行結束,達到提高程序的查詢執行效率,減少由于不同流水線任務處理數據速率的差異帶來的性能影響的目的。
我們將第3節中的調度算法和并行方式選擇算法及流水線局部并行自動機模型在基于JDK6.0的XQuery并行查詢系統中進行了實現。根據不同并行方式的組合分別實現了任務并行、任務并行和數據并行、任務并行和流水線并行以及3種并行方式同時存在的4種版本。針對流水線優化的實驗,在3種并行方式共存的場景下,實現了包含流水線局部并行自動機模型和不包含流水線局部并行自動機模型的兩種版本。實驗平臺為HP Z600 工作站,E5504 CPU (4核×2),4GRAM。查詢案例采用W3C 的標準查詢 (http://www.w3.org/TR/2007/NOTE-XQuery-usecases-20070323/),所選取測試程序結構特點各異,充分考慮到了不同并行方式、單/多數據源的分布。實驗數據采用XML基準數據集XMark。不同并行方式下的性能對比和流水線優化性能對比采用的數據集大小為32 M,工作線程數量為4。而不同核數下算法性能對比實驗中,分別針對32 M 數據集和64 M 數據集在串行和工作線程分別為2、4、8共4種情況下進行測試。
表1給出了測試案例在不同并行方式下的查詢時間對比。通過實驗我們可以看出充分利用查詢任務中存在的并行方式可以提高查詢任務的執行效率。由于q7中查詢條件的約束,使得查詢結果規模較小,從而使得不同并行方式下的性能提升不明顯。以任務并行為基準,我們可以發現q1-q6中,任務并行、數據并行組合下性能提升最低的是q1為7.6%,最高的是q3為19.9%,平均提高的執行時間為13.4%;任務并行、流水線并行組合下性能提升最低的是q5為4.8%,最高的是q6為11.5%,平均提高的執行時間為8.8%;3種并行方式同時存在時,性能提升最低的是q5為11.3%,最高的是q1為19.3%,平均提高的執行時間為16.9%。由此,我們可以發現不同并行方式下性能的提升與XQuery程序結構有重要關系。不同測試案例在不同并行組合下的性能提升效果各不相同。綜合來看,數據并行方式帶來的性能提升優于流水線并行方式,而3種并行方式同時存在時對于XQuery整體并行的效率提升效果最明顯。
表2給出了測試案例在不同數據規模和不同工作線程數下的性能對比。同時,給出了本文算法與串行執行方式下的比較。通過實驗我們發現大部分查詢案例在本文算法下能夠很好利用多核環境的計算資源提高系統的執行效率,減少查詢程序的執行時間。其中存在個別測試案例沒有達到預期的執行效果,原因在于這些測試案例的查詢結構過于簡單,查詢結果規模較小。查詢結構簡單導致并行方式處理和數據并行時的數據劃分操作會帶來額外開銷。從表2中我們可以看出,隨著線程數量的增加,XQuery測試案例的執行時間在逐步減小,但減小的幅度在逐步降低,例如q5在32 M 數據集下的測試結果。另外,本文的調度算法具有較好的內存使用效率。對于測試案例q6在64 M 數據集下的測試中,串行方式下會出現內存溢出錯誤,不能得到執行結果。而在工作線程數量為2、4、8 時,通過流水線并行方式可以減少中間結果存儲,提高內存的使用效率,從而最終得到查詢結果。

表1 不同并行方式下的性能對比 (時間單位/ms)

表2 不同工作線程下的算法性能比較 (時間單位/ms)
表3給出了測試案例在含有流水線優化和不含流水線優化兩種情況下的性能對比。通過實驗數據,我們可以發現本文提出的流水線局部并行自動機模型能夠充分利用各流水線節拍之間的等待時間,提高系統資源的利用率,降低XQuery程序的整體執行時間。但同時我們也應該注意到,在q7測試案例中,查詢規模較簡單且查詢結果規模較小時,流水線局部并行自動機模型帶來的性能優化效果不明顯。

表3 流水線優化的性能比較 (時間單位/ms)
近年來,研究者根據DAG 中任務所適用的不同并行方式,提出了多種不同的調度方法。PF.Dutot等人針對任務并行方式研究了多核平臺下的調度問題[8]。T.Ntakpe等人研究了任務與數據并行方式下的并行調度算法,結合數據并行提高了系統的處理效率[9]。針對流水線并行方式,Antonino Tumeo等人研究了異構嵌入式系統中流水線并行調度問題,提出了一種基于貝葉斯的優化調度算法[10]。針對任務并行、數據并行及流水線并行同時存在的調度場景,H Yang等人提出了一種利用ILP 思想的調度算法[11]。然而,該調度算法所針對的任務圖模型具有一定的限制性,其模型假設各個任務的執行代價在調度開始時已知,各個任務執行時所采取的并行方式在調度之前是確定的,不能隨系統資源的改變而改變。當前的調度算法不能很好的利用XQuery內在的多種并行方式特點,將空間并行技術與時間并行技術相結合。同時,也不能很好的支持XQuery并行實現中根據系統資源動態改變任務的執行方式,滿足動態調度的需求。
本文針對XQuery整體并行查詢實現過程中的任務調度問題,提出了一種適用于基于共享內存多線程環境下的調度算法。利用函數式語言的特點,將XQuery程序劃分為可并行執行的計算任務,在運行時代價模型基礎上,針對不同任務采用任務并行、數據并行和流水線并行進行處理,在一種新型的綜合調度策略的指導下,提高了XQuery語言查詢處理XML數據的性能,滿足了動態調度的需要。實驗結果表明這種調度算法能夠很好解決共享內存多線程環境下,并行任務存在多種并行方式場景下的調度問題。本文提出的面向多種并行方式的調度算法比單一并行方式或兩種并行方式下的執行效率更好,與串行算法相比,本文算法能夠很好的利用多核環境的計算資源提高程序的執行效率。同時,在流水線并行優化方面,建立的流水線局部并行自動機模型,能夠充分利用計算資源,提高程序的局部并行效率,減少流水線各節拍之間的等待時間。然而,目前方案中負載均衡、流水線局部并行自動機模型等方面仍有改進的余地。如何針對XQuery程序的特點利用多線程技術來充分發揮多核環境的并行計算能力,仍然是值得探索課題。
[1]Lakshmanan K,Kato S,Rajkumar R.Scheduling parallel real-time tasks on multi-core processors[C]//Real-Time Systems Symposium,2010:259-268.
[2]Yang H,Ha S.ILP based data parallel multi-task mapping/scheduling technique for MPSoC [C]//Proceeding of ISOCC,2008:134-137.
[3]Tony Minoru Lopes,Andre Augusto Cire,Cid Carvalho Souza,et al.A hybrid model for a multi-product pipeline planning and scheduling problem [J].Constraints,2010,15 (2):151-189.
[4]Frederic Desprez,Frederic Suter.A bicriteria algorithm for scheduling parallel task graphs on clusters [C]//Proceedings of the 10th IEEE/ACM International Conference on Cluster,Cloud and Grid Computing,2010:243-252.
[5]Wang Zheng,Michael FP.Mapping parallelism to multi-cores:A machine learning based approach [C]//Proceedings of the 14th ACM SIGPLAN Symposium on Principles and Practice of Parallel Programming,2009:75-84.
[6]Stephen L Olivier,Allan K Porterfield.Scheduling task parallelism on multi-socket multi-core systems [C]//Proceedings of the 1st International Workshop on Runtime and Operating Systems for Supercomputers,2011:49-56.
[7]LIAO Husheng.The principle and implementation technology of XQuery language [M].Beijing:Science Press,2013:187-189 (in Chinese).[廖湖聲.XQuery語言原理和實現技術 [M].北京:科學出版社,2013:187-189.]
[8]Dutot PT,Ntakpe T,Suter F,et a1.Scheduling parallel task graphs on(almost)homogeneous multi-cluster platforms[J].IEEE Transactions on Paralllel and Distributed Systems,2009,20 (7):940-952.
[9]NtakpeT,Suter F.Concurrent scheduling of parallel task graphs on multi-clusters using constrained resource allocations [C]//Proceedings of the 10th IEEE International Workshop on Parallel and Distributed Scientific and Engineering Computing,2009:1-8.
[10]Antonino Tumeo,Marco Branca.Mapping pipelined applications onto heterogeneous embedded system:A Bayesian opti-mization algorithm based approach [C]//Proceeding of the 7th IEEE/ACM International Conference on Hardware/Software Codesign and System Synthesis,2009:443-452.
[11]Yang H,Ha S.Pipelined data parallel task mapping/scheduling technique for MPSoC [C]//Design,Automation and Test in Europe Conference and Exhibition.2009:69-74.