姜海韜 羊帆 陳媛媛 付繼發 張輝國
摘 要:文章對R語言已有的并行計算框架進行系統分類、比較,分析了適用于基于R+MPI與基于R+Hadoop的應用場景。在此基礎上,結合兩者優勢提出了一種基于R+MPI的優化計算環境,該環境可以使用戶僅修改少量代碼就可以將原有的串行程序并行執行。最后,通過一個多元線性回歸模型的代碼實例展示編程的便捷性,通過與現有工具包的速度比較驗證了其有效性。
關鍵詞:代碼并行化;分布式算法;MPI;Hadoop;多元線性回歸
中圖分類號:TP312 文獻標志碼:A 文章編號:2095-2945(2018)23-0037-04
Abstract: In this paper, the existing parallel computing frameworks in R language are classified and compared, and the application scenarios based on R+MPI and R+Hadoop are analyzed. On this basis, and in view of the advantages of both, an optimized computing environment based on R+MPI is proposed, which enables users to execute the original serial program in parallel only by modifying a small amount of code. Finally, the convenience of programming is demonstrated by a code example of a multivariate linear regression model, and its effectiveness is verified by comparing with the speed of the existing toolkits.
Keywords: code parallelization; distributed algorithm; MPI; Hadoop; multivariate linear regression
引言
由于人們數據收集能力的提高以及“數據即資產”意識的發展,數據集的規模與維度已今非昔比。以使用二維矩陣或者表格儲存的數據為例,其屬性數(列數)與樣本數(行數)都達到了前所未有的數量級,這使得并行編程與分布式存儲與計算必然地成為了處理大數據問題的主流解決方案。
R語言是一款基于GNU協議的開源軟件,致力于統計計算、數據可視化以及數據分析。該語言擁有龐大的用戶社區,在數據挖掘和機器學習盛行的今天發展速度極快。對于研究較多的數學模型,CRAN或GitHub上多有現成的R語言實現。但與此同時,R語言也有自身的不足之處:R語言以單線程運行,不能發揮目前多核處理器的全部計算能力;對于單臺計算機內存無法存儲的數據集,R語言無法直接處理。這些問題的解決方案之一是將R語言與其他輔助軟件結合使用。
Message Passing Interface(MPI)是一個基于Socket的跨語言通訊協議,用于編寫高性能的并行程序。調用MPI的R工具包就可以使多個R語言進程間具有數據交換的能力,從而能夠分布式地對數據進行高效的計算處理。
Hadoop是一款開源的商業數據庫,使用Hadoop Distributed File System(HDFS)儲存構架,其良好的效率、可拓展性與高可用性吸引了很多工業界與學術界的科研工作者。基于HDFS的MapReduce計算框架為分布式計算提供了一種備選方案:R語言的接口包rhdfs,rmr允許R調用Hadoop進行海量的數據處理。
由于任務的差異性,目前并不存在統一的、將串行實現的代碼并行化的解決方案,因此目前并不存在一個能高效解決一切并行計算問題的“完美系統”。但對于規模相對不太大的數學模型,在能發揮R語言現有優勢的情況下提供一個相對統一的解決方案卻能大大減少任務并行化的工作量。
本文在探討目前R語言并行計算框架的基礎上,提出一種基于R+MPI的優化并行計算環境,并以實例展示,驗證了運用該環境編碼的便捷性和有效性。
1 現有的并行框架分析
目前,能夠為R語言提供并行計算能力的工具箱大體可分為兩類:第一類是以parallel、Rmpi為代表的并行工具包,該類工具箱的實質是打開多個R終端并搭建終端之間的通訊網絡;第二類是以rhdfs、rmr為代表的接口包,這類工具箱提供了R語言與其他數據庫環境的接口。
一個較為理想的環境需要同時考慮單節點的計算能力、節點之間的數據傳輸能力、平臺兼容性與代碼的可移植性,其中單節點的計算能力受制于R語言自身,平臺的兼容性受制于使用的具體軟件。因此下文將從數據傳輸能力、代碼可移植性兩個方面討論適宜兩種工具包的應用場景。
1.1 有關數據傳輸能力的討論
討論串行任務的并行化時,最核心的一個問題就是并行任務的粒度大小。所謂粒度,即將任務并行化時“切分”的細度。對于同一個數學模型,一般粗粒度的并行化方法會對單節點的計算能力有較高的要求,而細粒度的方法卻要求結點之間有較強的數據傳輸能力。
并行工具包的運行模式可以歸結為:主節點向各子節點分發數據與需要運行的程序,子節點計算后返回結果,經過主節點整合后得到最終結果[1]。為了達到負載均衡的目的,一些并行函數采用了所有數據傳輸均通過主節點的解決方案(如“parallel::parApply”),易造成主節點通訊管道的堵塞。這類函數更適于粗粒度的、計算量大而傳輸量較小的任務(比如迭代任務)。對于中間數據傳輸要求較高的任務,這類函數在數據傳輸過程中會消耗較多時間,從而影響了運行效率。
而大多數接口包的本質是借用商業數據庫的強勁數據處理能力,使R語言能處理遠大于單機內存容量規模的數據。目前對于Hadoop平臺上的任務并行化已經有較多的研究,如[2-3],通過R語言接口包rhdfs操作hdfs就可以使用R平臺進行數學模型的計算。但商業數據庫多基于高訪問、低更改的業務特性進行優化,因此這些接口包更擅長于對海量數據進行求和、篩選等具有數據庫特點的細粒度操作,卻不適合每次都需要將特定數據進行更改的迭代任務。以分布式矩陣乘法為例,Hadoop將原數據切分為大小一定大小(默認為128Mb)的塊,以多備份的方式存入HDFS。由于在計算(Reduce)過程中需要整合矩陣中特定的(aij,bjk)進行計算,沒有針對矩陣計算的特點進行儲存優化的自動分配地址會使shuffle數據處理過程需要較長的時間[5],這使其計算速度遠遠不及理論上的并行粒度。
1.2 有關代碼可移植性的討論
R語言擁有龐大的用戶社區與開源的可用程序包。同時,R語言又是一種向量化的語言,基本操作為向量/矩陣之間的計算。因此若能做到向量/矩陣計算的并行化,就能盡可能地利用R語言的現有代碼,縮短軟件的開發周期。
以Parallel為代表的并行工具包已經實現了向量計算的并行化,在不考慮運行效率的情況下,這些包僅通過少量的代碼更改即可實現并行版本的向量計算[4]。但目前并沒有廣泛使用的、用于矩陣并行化計算的軟件包。若使用Parallel包中的現有函數,不但要在每次計算前將矩陣切分為列表(list)儲存,還必須面對數據傳輸速度的瓶頸。
文獻[5-7]探討了Hadoop平臺上的矩陣操作問題,使用R語言的接口包調用Hadoop進行矩陣計算,并讀取計算結果至R中就可以使R語言擁有分布式矩陣計算的能力。但基于此方案的所有代碼需要修改為數據庫語言,這會使代碼的可移植性變弱,難以發揮R語言所擁有的龐大用戶社區、海量開源腳本的優勢。
1.3 適宜場景小結
使用R語言進行分布式并行處理的兩種方法的優缺點淺析見表1,可以發現兩種方式各有其優缺點。并行包有較強的代碼可移植性,從而能發揮R大量現有工具包的優勢,但數據傳輸能力有待提高;接口包的數據傳輸能力相對較強,能夠突破內存容量的限制,但數據庫語言的使用使其代碼可移植性降低。
2 并行框架設計
本文搭建的并行環境基于R語言與MPI,由于使用的工具都具有跨平臺的優良特性,該環境在Windows與linux操作系統上均可使用,具有很好的操作系統兼容性??紤]到Windows系統是目前被廣大高校所使用的,也是開發者保有量最多的環境,且R語言在該系統下可用的分布式環境一直較少,因此后文均在Windows系統上討論并行環境的搭建與應用。
2.1 基本框架
本環境中的并行計算運行流程可以用圖1表示。首先將計算環境初始化,然后將計算所需的矩陣存入環境中。至此就可以對已存入系統的矩陣進行并行迭代計算,而且每一步的計算結果仍然以相同的格式儲存在系統中,這樣可以做到靈活地調用、輸出計算過程中任意一步的結果。體現了本環境在使用過程中的友好性。
對于主從節點拓撲網絡的設置,選擇二維的矩形排布,其中節點標號以行優先。特別地,為了增加本系統的通用性,對于存入環境的矩陣均按塊狀劃分,所有矩陣行與列的切割策略相同。這是由于塊狀劃分的分布式矩陣計算方法具有較高的執行效率[8],同時可以保證存入的矩陣無論作為矩陣乘法的被乘矩陣或者乘子矩陣都可以直接進行矩陣計算,而且計算結果還可以繼續迭代直接參與計算。避免了每一步計算之前都對矩陣進行切分并重復發送的繁瑣操作,為需要迭代或操作較為復雜的任務提供了較多的便利。
2.2 函數設計
本工具箱中的函數分為兩類:第一類是基本函數,用于環境的啟動、將數據存入/取出并行環境與資源的回收。第二類是附加函數,用于對存入本環境中的數據進行操作,可以根據具體需要靈活增刪。以第三節要實現的多元線性回歸為例:需要加入矩陣預分配函數、數據傳輸函數、數據計算函數并包裝成統一的并行矩陣乘模板。
基本函數:
(1)初始化函數Mpinit(nrow,ncol)用于mpi環境的初始化、二維網格的設置。用戶可以自由設置二維網格的行列結點數量,一共開啟nrow*ncol個mpi進程。初始化完成后會返回“done”。
(2)發送函數SendtoSlave(Data,Name),用于將變量分發給各個子節點、存入并行環境。若Data為矩陣,則Name為一個字符串代表儲存進入環境中的變量名;若Data為一個列表,則Name也是一個長度相同的列表代表每個變量的名字。運行完畢后會返回成功存入環境中的矩陣數量。
(3)回收函數GetfromSlave(Name),用于從各個子節點回收特定變量名的數據,并以矩陣類型返回。
(4)退出函數mpiexit(),用于釋放空間,退出mpi環境。
附加函數(例):
(1)函數getGrid(Rank,nrow,ncol),在輸入子節點編號與網格配置后返回當前結點的二維坐標。(2)函數getPos(Pos,nrow,ncol),在輸入子節點二維坐標與網格配置后返回當前結點的編號。
(3)函數DDD(Name1,Name2,Name3),用于分布式計算已經存在于環境中的名為Name1與Name2的矩陣的乘積,結果以名字Name3存入環境。返回值為當次計算所消耗的時間。
3 數值實驗
3.1 多元線性回歸
回歸分析作為統計學的一個重要研究方向,在生物、醫學、農業、林業、金融等領域都有廣泛應用。而多元回歸分析是最為基本的一種回歸模型,利用所得到的回歸方程,可以評價單個預測變量的重要性,也可以預測一組給定的已知變量所對應的目標值[9]。伴隨大數據時代所帶來的樣本在激增、數據量巨大等問題,使得多元回歸分析的應用受到了制約。因此,在并行優化環境中實現回歸分析無論是在理論上還是在應用上都有重要的意義。
假設y是可觀測的隨機變量,共有p個影響因素,分別為x1,x2,…,xp。對于n組獨立觀測樣本:(yi;xi1,xi2,…,xip),i=1,2,…n可以建立如下p元線性回歸模型:
在樣本量n與屬性數p較大的時候,如何計算XT X與XT Y是需要考慮的主要問題。這里在實現了分布式矩陣乘(DDD算法[10])的前提下,給出本環境中實現多元線性回歸的R語言關鍵代碼(見代碼1)。
代碼1顯示了在本環境中實現數學模型的便捷性——只需要將大矩陣存入環境,并用環境自帶的矩陣計算函數進行計算,最后從環境中取出計算結果即可。取定n=10000,p=30,運行該代碼后觀察圖2可以發現回歸系數的估計值基本以微小的誤差均勻分布在真值的兩側,可以認為是噪聲ε的影響,回歸模型的輸出結果是正確的。
3.2 矩陣乘法速度比較
本節通過比較R語言現有的Parallel工具包與本環境中實現的分布式矩陣乘法(DDD算法)來驗證本環境的高