喬百友 朱俊海 鄭宇杰 申木川 王國仁
1(東北大學計算機科學與工程學院 沈陽 110819)2 (楊百翰大學計算機科學系 美國猶他州普若佛 84602)
?
一種基于Spark的多路空間連接查詢處理算法
喬百友1,2朱俊海1鄭宇杰1申木川1王國仁1
1(東北大學計算機科學與工程學院 沈陽 110819)2(楊百翰大學計算機科學系 美國猶他州普若佛 84602)
(qiaobaiyou@mail.neu.edu.cn)
針對云環(huán)境下空間數(shù)據(jù)連接查詢處理問題,提出了一種基于Spark的多路空間連接查詢處理算法BSMWSJ.該算法采用網(wǎng)格劃分方法將整個數(shù)據(jù)空間劃分成大小相同的網(wǎng)格單元,并將各類數(shù)據(jù)集中的空間對象,根據(jù)其空間位置劃分到相應的網(wǎng)格單元中,不同網(wǎng)格單元中的空間數(shù)據(jù)對象進行并行連接查詢處理.在多路空間連接查詢處理過程中,采用邊界過濾的方法來過濾無用數(shù)據(jù),即通過計算前面連接操作候選結果的MBR來過濾后續(xù)連接數(shù)據(jù)集,從而過濾掉無用的連接對象,減少連接對象的多余投影與復制,并采用重復避免策略來減少重復結果的輸出,從而進一步減少后續(xù)連接計算的代價.合成數(shù)據(jù)集和真實數(shù)據(jù)集上的大量實驗結果表明:提出的多路空間連接查詢處理算法在性能上明顯優(yōu)于現(xiàn)有的多路連接查詢處理算法.
云計算;Spark平臺;多路空間連接查詢;邊界過濾;重復避免
空間數(shù)據(jù)查詢處理技術一直是空間數(shù)據(jù)管理領域的研究熱點,而空間連接查詢是一種常用的空間查詢類型,也是該領域的重要研究課題之一.空間連接查詢作為一種基本空間操作,是最耗時的操作之一,由于其復雜性和重要性使之成為決定空間數(shù)據(jù)管理系統(tǒng)整體性能的重要因素.特別是近年來,隨著物聯(lián)網(wǎng)技術、對地觀測技術和基于位置的服務技術等技術的快速發(fā)展和廣泛應用,空間數(shù)據(jù)規(guī)模急劇增加,已成為一類非常重要的大數(shù)據(jù),在這種情況下,如何對這類大數(shù)據(jù)進行高效的空間連接查詢處理已成為當前研究的熱點之一.顯然,傳統(tǒng)的空間數(shù)據(jù)庫技術由于其擴展性問題而難以滿足這類大數(shù)據(jù)快速查詢處理的要求,而Spark[1]作為一種新型的超大規(guī)模數(shù)據(jù)分布式并行處理平臺而受到人們的廣泛重視,也是目前大數(shù)據(jù)處理的關鍵技術.然而由于Spark平臺并未對連接操作提供內(nèi)在的支持和優(yōu)化,因此研究如何利用Spark這種分布式并行處理平臺來實現(xiàn)對空間大數(shù)據(jù)的高效空間連接查詢處理,具有重要的理論研究意義和應用價值.
目前已有研究者在Hadoop平臺下,就空間連接查詢處理算法進行了較深入研究,并取得了一系列理論和應用成果,但這些成果主要集中在相似性連接查詢算法、2路空間連接算法和關系數(shù)據(jù)上的多路連接查詢算法等方面,而對于多路空間連接查詢處理算法的研究成果還相當有限,Spark平臺下的多路空間連接查詢處理算法的研究才剛剛開始.在通用多路連接查詢處理研究方面,Afrati等人[2]和Lin等人[3]主要就MapReduce框架下,關系數(shù)據(jù)上的多路連接查詢處理問題進行了研究,并給出了3種優(yōu)化策略;而Jiang等人[4]和王曉軍等人[5]的研究則主要針對MapReduce框架下多路連接查詢中的IO開銷問題,并提出了Map-Join-Reduce的編程框架及相關的優(yōu)化算法.Slagter等人[6]則主要從網(wǎng)絡流量角度入手,提出通過在多個Reducer之間重新分配元組,從而達到減少連接時間.上述研究工作主要基于Hadoop平臺,并且聚焦于通用多路連接查詢處理優(yōu)化方面,而本文的工作則主要聚焦于Spark平臺下的多路空間連接查詢處理問題.
在基于MapReduce的多路空間連接查詢處理方面,王璟玢等人[7]針對小規(guī)模的集中式多路連接查詢處理,提出了基于R樹連接的多路空間限制策略和多路平面掃描的優(yōu)化技術,顯然,并不適合大規(guī)模分布式的多路連接查詢處理;Gupta等人[8-9]提出了2種多路空間連接查詢處理算法Controlled-Replicate和ε-Controlled-Replicate.Controlled-Replicate將各類連接數(shù)據(jù)集中的空間對象劃分并復制到第4象限中的所有網(wǎng)格單元,然后進行多路連接運算.顯然這種方法造成了大量對象的復制,影響連接處理效率.為此作者又提出了改進的多路空間連接查詢處理算法ε-Controlled-Replicate,該算法減少了數(shù)據(jù)復制,在一定程度上提高了處理效率,但是還存在著復制過多的問題.
針對當前最新的代表性多路空間連接算法ε-Controlled-Replicate中存在的數(shù)據(jù)復制過多、影響查詢處理效率的問題,本文基于最新的分布式并行計算框架Spark,充分利用其內(nèi)存計算和RDD分布式彈性數(shù)據(jù)集的特性,從數(shù)據(jù)劃分和復制入手,提出了一種基于Spark的多路空間連接查詢處理算法BSMWSJ,該算法將數(shù)據(jù)空間劃分成大小相同的網(wǎng)格單元,并將數(shù)據(jù)集中的空間對象,按照其所在空間位置復制到與其相交疊的網(wǎng)格單元中,每個網(wǎng)格單元中的數(shù)據(jù)實現(xiàn)并行空間連接處理.在連接過程中,采用邊界過濾方法來減少無用連接數(shù)據(jù),首先,對劃分到每個網(wǎng)格單元的第1次連接所需的2類數(shù)據(jù)集執(zhí)行連接運算,并對所生成的連接候選結果中的一類待連接數(shù)據(jù)集,計算其MBR;其次,利用該MBR來實現(xiàn)對后續(xù)要連接數(shù)據(jù)集的過濾,從而過濾掉無結果的后續(xù)連接對象,減少后續(xù)連接的多余計算,以及連接對象的多余投影與復制,并采用重復避免策略來減少重復結果的輸出,從而全面減少后續(xù)連接計算的代價,提高多路連接查詢處理的效率.合成數(shù)據(jù)和真實數(shù)據(jù)集上的大量實驗結果表明,本文提出的多路空間連接查詢處理算法在性能上明顯優(yōu)于ε-Controlled-Replicate算法,具有良好的擴展性和適應性.
國內(nèi)外對MapReduce框架下的連接查詢處理算法及其優(yōu)化技術的研究已開展得較為廣泛,目前這些研究工作主要集中于相似性連接查詢算法、Theta連接和2路空間連接算法等方面.在多路連接查詢處理算法方面的研究相對開展的不多,而Spark環(huán)境下的多路空間連接查詢處理算法的研究成果更是相當有限.
在相似性連接查詢處理和Theta連接算法等方面,Luo等人[10]首次利用MapReduce模型來處理高維相似性連接查詢問題,提出了1種新穎的降維技術DAA,并給出了2種并行處理框架OSFR和TSFR,DAA雖然能夠減少高維向量之間的計算代價,但不能減少總的比較次數(shù).為此Ma等人[11]提出了有效減少DAA和初始向量計算次數(shù)的方法SAX和PAA,能夠?qū)⒏呔S向量分成不同的組,并提出了基于SAX和改進SAX的相似性連接查詢處理算法;文獻[12]則研究了基于MapReduce框架的Top-k相似性連接處理算法,提出了分治和剪枝策略,并在此基礎上提出了全分區(qū)方法和重要元組分區(qū)方法來最小化Map和Reduce任務之間數(shù)據(jù)通信量,從而達到減少后續(xù)計算代價的目的;文獻[13]則主要研究了MapReduce框架下的集合相似性連接算法,提出了3階段進行集合相似性連接的方法,實現(xiàn)了自連接、RS連接等示例,并提出了保證負載均衡和最小化復制的方法;文獻[14]主要研究了Hadoop下大規(guī)模不確定數(shù)據(jù)上的集合相似性連接方法,并結合前綴過濾原則,提出了Map端剪枝、Reduce端剪枝和混合剪枝3種方法來減少后續(xù)比較代價;文獻[15]提出了1種以Reducer為中心的代價模型和基于MapReduce框架的Theta連接模型,并在該模型基礎上提出了1-Bucket-Theta隨機算法,該算法在足夠的統(tǒng)計信息支持下具有較高Theta連接效率;文獻[16]則主要聚焦于非對稱分片復制連接問題,提出一種基于自適應分片的優(yōu)化算法AFR-AS,來降低MapReduce下任務啟動開銷以及非對稱分片復制連接中的數(shù)據(jù)廣播開銷;卞昊穹等人[17]提出了一種基于Spark的等值連接優(yōu)化算法,該算法結合了半連接與劃分連接的優(yōu)勢,并充分利用Spark內(nèi)存計算模型的特性,提高了等值連接處理性能.
在2路空間鏈接查詢處理方面,Wang等人[18]對2路空間連接算法進行了研究,提出了基于負載均衡的空間對象分區(qū)方法,并采用基于帶的雙向平面空間掃描技術來減少連接計算的代價;文獻[19-20]研究了基于MapReduce的2路空間連接算法,首次提出了2路空間查詢處理算法SJMR,但該算法沒有考慮過濾階段的優(yōu)化問題,導致了大量無用的計算操作,增加了查詢處理代價;為此,Qiao等人[21]改進了原有SJMR算法,提出了一種基于邊界過濾的空間連接查詢處理算法BFSJMR,該算能夠過濾掉無用的連接查詢代價,從而提高了2路空間連接查詢處理效率.
在多路連接查詢處理方面,Afrati等人[2]和Lin等人[3]主要就MapReduce框架下的多路連接查詢處理問題進行了研究,并給出了3種優(yōu)化策略,然而該研究工作主要針對關系數(shù)據(jù),顯然不適合于多路空間連接查詢處理.Jiang等人[4]和王曉軍等人[5]就MapReduce框架下多路連接查詢中巨大IO開銷問題進行了研究,提出了Map-Join-Reduce的編程框架及相關的優(yōu)化算法.雖然在一定程度解決了IO開銷問題,但由于對原有的MapReduce編程框架進行了較大修改,造成了兼容問題,不利于原有框架的完整性;Slagter等人[6]提出了一種網(wǎng)絡感知的多路連接方法,通過感知網(wǎng)絡流量實現(xiàn)多個Reducer之間元組的重新分配,從而達到減少連接時間;孫莉等人[22]則就列存儲數(shù)據(jù)中連接查詢優(yōu)化問題進行了研究,提出了基于規(guī)則的連接策略優(yōu)化方法,并設計了相應的優(yōu)化算法,在此基礎上提出了相應的代價估算模型,實現(xiàn)了策略的選擇;周國亮等人[23]針對聯(lián)機分析處理要求,提出一種能夠適合Spark環(huán)境并結合多維Bloom Filter的星型連接算法,該算法能夠避免事實表數(shù)據(jù)的移動,并利用多維布隆過濾器技術來減小需要廣播的數(shù)據(jù)量,該算法充分結合了廣播連接和重劃分連接的優(yōu)勢.
在基于MapReduce的多路空間連接查詢處理方面,王璟玢等人[7]提出了基于R樹連接的多路空間限制策略和多路平面掃描的優(yōu)化技術,然而該研究主要針對小規(guī)模的集中式多路連接查詢處理.Gupta等人[8-9]提出了2種多路空間連接查詢算法Controlled-Replicate和ε-Controlled-Replicate.Controlled-Replicate算法采用空間劃分方法,將各類數(shù)據(jù)集中的空間對象劃分并復制到第4象限中的所有網(wǎng)格單元,然后對每個網(wǎng)格單元中的數(shù)據(jù)分別進行多路連接運算.ε-Controlled-Replicate算法是在Controlled-Replicate算法基礎上提出的一種改進算法,主要是通過減少數(shù)據(jù)復制來降低通信代價,從而在一定程度上提高了多路空間連接查詢處理的效率.
針對現(xiàn)有多路空間連接查詢處理算法存在的問題,本文從減少數(shù)據(jù)復制和計算代價角度入手,結合Spark內(nèi)存計算框架的優(yōu)勢,提出了一種基于Spark的多路空間連接查詢處理算法,是一種類似于ε-Controlled-Replicate的多路空間連接查詢算法.
空間數(shù)據(jù)對象有多種類型,大多都是不規(guī)則的形狀,因此判斷2個空間對象是否符合某個查詢謂詞的代價非常昂貴.在空間連接查詢處理中通常采用最小邊界矩形(minimum bounding rectangle, MBR)來代表一個空間對象,僅當2個對象的MBR有交疊時,才進一步判斷這2個空間對象是否真正有交疊,這種分步的處理方法具有更高的處理效率.本文主要針對鏈式多路空間連接查詢,可以用一張圖來形象表示,圖中的節(jié)點對應空間數(shù)據(jù)集,圖中的邊對應于連接謂詞,這樣就形成了一個鏈圖.
鏈式多路空間連接查詢通常定義為:給定空間關系R1,R2,…,Rn(n>2),找到一組空間對象元組(r1,r2,…,rn),其中,r1∈R1,r2∈R2,…,rn∈Rn,空間對象r1和r2,r2和r3,…,rn-1和rn兩兩之間的幾何屬性存在相互交疊,可表示為
Overlap(P,R1,R2,…,Rn)=

(1)
其中,P代表交疊連接謂詞,Overlap(P,ri,ri+1)表示空間對象ri和ri+1之間滿足連接謂詞P.
空間連接查詢處理通常分為過濾和精化2個階段,在過濾階段,通過檢查2個空間對象的MBR來消除不可能成為結果的元組,從而產(chǎn)生候選結果集合;精化階段則是對候選元組集合進行進一步檢測,需要使用計算密集型的幾何算法來實現(xiàn),確定其空間屬性是否真正滿足其連接謂詞.本文所提出的算法重點聚焦于提高過濾階段的處理效率.
本文主要從減少計算量和避免過度復制的角度來優(yōu)化多路空間連接查詢處理,提出了一種Spark平臺下的多路空間連接查詢處理算法BSMWSJ,該算法采用邊界過濾方法,重點是減少過濾階段的數(shù)據(jù)復制量和計算量,下面分別從空間劃分、數(shù)據(jù)投影與復制、過濾和重復避免等方面來對算法進行詳細描述.
3.1 空間劃分和編碼
在Spark環(huán)境下,實現(xiàn)大規(guī)模并行空間連接查詢處理,首先涉及到的是并行任務的劃分,需要將整個算法任務拆分成多個子任務并行執(zhí)行,這就涉及到數(shù)據(jù)的分區(qū)和編碼:首先需要將數(shù)據(jù)劃分到多個分區(qū)并進行編碼,然后在每個分區(qū)上做多路空間連接運算,從而實現(xiàn)并行處理,并降低整個連接操作代價.本文采用網(wǎng)格劃分方法,將整個數(shù)據(jù)空間劃分成許多大小相等的網(wǎng)格,每個網(wǎng)格被稱為一個分區(qū)單元,并對每個分區(qū)單元進行編碼,然后將數(shù)據(jù)投影到各個分區(qū)單元中,從而實現(xiàn)數(shù)據(jù)劃分.利用Z-order填充曲線對每個分區(qū)單元進行編碼,從而更好地保持數(shù)據(jù)之間的空間緊鄰關系,并通過Hash方式將每個分區(qū)單元映射給多個Executor,Z-order曲線編碼配合Hash的映射方案,可以讓Executor得到更均勻的任務映射,并且分區(qū)單元數(shù)量越多,數(shù)據(jù)分配的越均勻,有助于解決數(shù)據(jù)傾斜的問題.投影到分區(qū)單元中的多類空間數(shù)據(jù)對象會被作為Value值交給相應Executor進行處理.
圖1所示為一個劃分編碼的例子,整個數(shù)據(jù)空間被劃分為16個分區(qū)單元,采用Z-order填充曲線進行編碼,編號依次從0~15.劃分之后分區(qū)單元連同投影到各個分區(qū)單元上的數(shù)據(jù)被分別映射給3個Executor任務進行并行連接處理.

Fig. 1 Demonstration of data partition and encoding圖1 數(shù)據(jù)劃分與編碼示意圖
3.2 數(shù)據(jù)投影與復制操作
整個數(shù)據(jù)空間被劃分成多個網(wǎng)格單元后,空間連接對象需要根據(jù)其所在的位置被映射這些網(wǎng)格單元,然后分配給多個Executor并行執(zhí)行連接運算,這首先涉及到數(shù)據(jù)的投影和復制問題.本文采用簡單策略,根據(jù)空間連接對象與網(wǎng)格單元的交疊情況進行投影,如果空間對象和網(wǎng)格單元有相交則將其投影到相應的網(wǎng)格中.在多路連接查詢處理過程中,生成的中間結果需要根據(jù)后續(xù)連接要求將其整體復制到相應的網(wǎng)格單元,以進行后續(xù)連接處理.下面詳細介紹空間對象投影和數(shù)據(jù)復制操作.
1) 空間對象投影.將空間數(shù)據(jù)對象根據(jù)其所在位置映射到相應的網(wǎng)格單元中.設C=(c1,c2,…,cn)代表一個劃分,ci代表每一個網(wǎng)格單元;設R為一類待連接處理的空間對象集合.若一個空間對象u∈R,其MBR與網(wǎng)格單元ci(ci為該網(wǎng)格單元的Z-order編碼)有交疊,則將對象u映射到網(wǎng)格單元中ci中,并生成相應鍵值對(ci,u),如果一個空間對象和多個網(wǎng)格單元有交疊,則會形成多個鍵值對.投影操作可以表示為
Project(u,C)→{(ci,u)},?i, s.t.u∩ci≠?.
(2)
2) 數(shù)據(jù)復制操作.在多路連結查詢處理中,需要多個數(shù)據(jù)集之間進行多次連接,數(shù)據(jù)復制操作則主要是將當前網(wǎng)格單元上的前1次連接的中間結果復制到相關的其他網(wǎng)格單元,從而進行后續(xù)的連接操作,其結果與投影操作類似.若t∈T為連接中間結果集中的元組,t.u為將要進行下一次空間連接的對象,則數(shù)據(jù)復制操作可以表示為
Replicate(t,C)→{(ci,t)},?i,t.u∩ci≠?.
(3)
圖2為投影與復制操作的示例,從圖2中可以看出,對象r1被投影到6號和12號網(wǎng)格單元,r2被投影到9號和12號單元,r3則被投影到9號和11號單元.即Project(r1,C)={(6,r1),(12,r1)},Project(r2,C)={(9,r2),(12,r2)},Project(r3,C)={(9,r2),(11,r2)}.當執(zhí)行r1,r2和r3依次進行多路連接時,由于r2和網(wǎng)格單元9有交疊,因此網(wǎng)格單元12中的對象r1和r2的連接中間結果(r1,r2)要被復制到網(wǎng)格單元9中,從而形成鍵值對(9,(r1,r2)),實現(xiàn)與網(wǎng)格單元9中的空間對象r3的后續(xù)連接操作,避免了連接結果的丟失.

Fig. 2 An example of project and replicate operations圖2 投影與復制操作示例
3.3 多路空間連接查詢算法的總體流程
根據(jù)Spark并行分布式處理平臺特點及其編程模型,本文提出了基于Spark的多路空間數(shù)據(jù)連接查詢處理算法(BSMWSJ).該算法按照Spark中有向無環(huán)圖的思想,將算法中的每個操作作為有向無環(huán)圖中的節(jié)點,依次進行連接操作.多路空間連接查詢Qn=Overlap(R1,R2,R3,…,Rn),根據(jù)定義,可以表示為Qn=Overlap(…Overlap(Overlap(R1,R2),R3),…,Rn).
圖3為BSMWSJ多路空間連接查詢處理算法的處理流程,這里僅以4路空間連接Q4=Overlap(R1,R2,R3,R4)的處理過程為例來進行說明.
Q4=Overlap(R1,R2,R3,R4)的多路空間連接查詢算法的總體處理流程主要包括4個操作步驟:
步驟1. 根據(jù)網(wǎng)格劃分編碼方法對R1,R2,R3,R4數(shù)據(jù)集進行投影,并將編碼值作為Key值,將每個空間對象的標識及其MBR等屬性信息作為Value值,形成一系列的鍵值對,并分別將數(shù)據(jù)集R1,R2,R3,R4的投影結果放到彈性分布式數(shù)據(jù)集RDD1,RDD2,RDD3和RDD4中.
步驟2. 計算Overlap(R1,R2),即對RDD1和RDD2執(zhí)行Cogroup操作,將RDD1和RDD2中的數(shù)據(jù)根據(jù)Key值聚集到一起得到RDD12,對RDD12中對象執(zhí)行空間連接運算.在運算過程中,首先利用邊界過濾策略對RDD12進行過濾,去掉不可能有結果的數(shù)據(jù)對象,然后進行實際空間連接運算,執(zhí)行重復避免策略,并形成連接中間結果;對連接中間結果執(zhí)行數(shù)據(jù)復制操作,形成中間結果數(shù)據(jù)集RDDresult12.

Fig. 3 The processing flow of BSMWSJ multi-way join algorithm圖3 BSMWSJ多路連接算法處理流程
用一個例子來說明該復制操作的具體處理過程.假設2個空間對象r1∈R1,r2∈R2,若r1與r2有交疊,則說明它們是連接中間結果,此時利用復制操作Replicate計算出r2所跨的所有分區(qū)單元,并將其編碼作為Key值,將r1和r2的MBR屬性信息等組合在一起作為Value值,形成一組Key-Value鍵值對放到中間連接結果RDDresult12中.若r1與r2沒有交疊,則不進行處理,這樣就避免了無用數(shù)據(jù)的復制,減少了后續(xù)計算代價.
步驟3. 按照與步驟2相同的計算方法計算RDDresult12和RDD3之間的連接運算,最終得到R1,R2,R3的連接結果RDDresult123.
步驟4.RDDresult123與RDD4執(zhí)行Cogroup操作,生成RDD1234,在此基礎上進行邊界過濾、連接運算處理,并將結果直接輸出,形成RDDresult1234,并保存到HDFS文件系統(tǒng).由于是最后一步連接操作,故不在需要進行復制操作.
上述為BSMWSJ多路空間連接算法的處理流程,從中可知除了開始和結束步驟,中間處理步驟是相同的,這也是由鏈式多路連接查詢的性質(zhì)決定的.
3.4 過濾策略
空間連接查詢處理通常由過濾和精化2個階段構成,BSMWSJ算法主要是從減少數(shù)據(jù)復制和降低計算代價的角度出發(fā),對過濾階段進行優(yōu)化.在BSMWSJ算法中,多路連接實際上被拆分成多個2路連接來依次并行執(zhí)行連接運算,在執(zhí)行連接運算的過程中,采用邊界過濾策略,去掉不可能產(chǎn)生結果的元組,并僅對可能有結果的元組進行復制,大大減少存儲和后續(xù)計算的代價,具體包括2種策略.
1) 邊界過濾.在進行連接執(zhí)行過程中,利用前面已完成的連接結果來過濾即將要連接的數(shù)據(jù)集,即首先統(tǒng)計前1次已完成連接結果中相關連接對象的邊界MBR,并利用該MBR來過濾掉后續(xù)要連接數(shù)據(jù)集中不可能有結果的空間對象,從而減少后續(xù)連接計算代價.具體操作可以表示為
Filter(ti,c)→{(c,ti)},?i, s.t.c.mbrs∩ti≠?,
(4)
其中,c代表一個分區(qū)單元,ti為劃分到分區(qū)單元c中的一個空間對象(ti∈T),T為將要進行連接的數(shù)據(jù)集.若Jc=R…S為分區(qū)單元c中已經(jīng)執(zhí)行完成的多次空間連接操的結果集,則c.mbrs為集合Jc中相對應的集合S中的空間對象的邊界MBR.
圖4所示是一個邊界過濾的例子,其中3個數(shù)據(jù)集R,S,T依次進行3路連接運算RST,投影到網(wǎng)格單元3中的空間對象如圖4所示,RS的結果分別為(r1,s1),(r1,s2),(r1,s3),可以得到本次連接結果集中的對應S集合中的對象為s1,s2,s3,其邊界MBR為圖4中虛線所示,在與數(shù)據(jù)集T中對象進行連接運算時,可以直接過濾掉投影到網(wǎng)格單元3中的與該MBR不相交的空間對象t1,t4,t5,從而避免了這些空間對象分別與s1,s2,s3進行連接運算,大副減少了計算代價.

Fig. 4 An example of boundary filtering圖4 邊界過濾示例
2) 復制階段過濾.在多路連結查詢處理過程中,需要對前1次連接處理之后的中間結果進行數(shù)據(jù)復制操作,將其復制到其他可能會產(chǎn)生連接結果的網(wǎng)格單元中,執(zhí)行后續(xù)連接操作,避免丟失連接結果.在對中間連接結果復制中,僅對涉及跨網(wǎng)格連接對象的中間結果進行復制,這樣減少了數(shù)據(jù)復制和計算量,提高了系統(tǒng)的整體性能.
設C=(c1,c2,…,cn)表示一個數(shù)據(jù)空間劃分,若某個網(wǎng)格單元cj∈C上,其前m個數(shù)據(jù)集的連接結果集合S=R1R2…Rm;則對于任意si∈S,si=(r1i,r2i,…,rm i),若空間對象s.rm i與其他網(wǎng)格單元ck存在交疊,則保留si,并調(diào)用Replicate(si,ck)復制操作將其復制到ck網(wǎng)格單元,并生成相應的鍵值對;否則將其過濾掉.具體操作可以表示為
Filter(si,cj)→{(si)},
?k,si.rm i∩ck≠?ck≠cj,
(5)
Replicate(si,ck)→{(ck,si)},
?k,si.rm i∩ck≠?ck≠cj.
(6)
在圖4的示例中,網(wǎng)格單元3中數(shù)據(jù)集R和S的連接結果內(nèi)僅有元組(r1,s2)中的s2對象和網(wǎng)格單元4相交疊,因此僅將(r1,s2)復制到網(wǎng)格單元4中,以便與數(shù)據(jù)集T中的空間對象進行后續(xù)連接操作.可見這種復制階段的過濾策略能夠減少中間數(shù)據(jù)的復制量,從整體上減少了系統(tǒng)的計算代價.
3.5 重復避免策略
在Spark環(huán)境下的多路空間連接查詢處理中,數(shù)據(jù)被劃分到多個網(wǎng)格單元中,進行并行處理.由于在數(shù)據(jù)劃分編碼過程中,跨越多個分區(qū)的空間對象被投影到多個分區(qū),并且對部分中間結果需要進行復制,如果不采取措施,就會導致多個網(wǎng)格單元輸出相同的結果,這就需要進行去重操作,從而增加系統(tǒng)開銷、降低了系統(tǒng)效率,因此需要進行重復避免.在BSMWSJ算法中,采用了重復避免策略,僅讓一個網(wǎng)格單元來負責輸出結果,具體策略為在2個跨多個網(wǎng)格單元的空間對象進行連接時,僅讓這2個空間對象相交疊而成的左下角交點所在的網(wǎng)格單元負責輸出連接結果,這樣就避免了結果的重復輸出,減少了后續(xù)處理代價.
圖5所示為重復避免的例子,其中集合S中的對象s1被投影到其所交疊的網(wǎng)格單元2,3,6,8,9,12,R集合中的對象r1則被投影到網(wǎng)格單元3,6,9,12,r2對象被投影到了4個網(wǎng)格單元8,9,10,11中,如果不進行重復避免,在進行連接處理中,網(wǎng)格單元3,6,9,12就會輸出相同的連接結果(r1,s1),而網(wǎng)格單元8和9也會輸出相同連接結果(r2,s1),顯然出現(xiàn)了重復.

Fig. 5 An example of duplication avoidance圖5 重復避免示例
根據(jù)所提出的重復避免策略,如圖5所示,對象交疊部分所形成的對象的右下角(圖5中點P和點Q)所在的網(wǎng)格單元負責輸出,即由網(wǎng)格單元3負責處理輸出r1和s1的連接結果,網(wǎng)格單元8負責處理輸出r2和s1的連接結果,顯然該策略避免了重復處理和結果的重復輸出,降低了計算代價.
3.6 多路空間連接查詢處理算法
基于Spark分布式大數(shù)據(jù)處理框架,結合上述多路空間連接查詢處理思路,設計實現(xiàn)了多路空間連接查詢處理算法.下面以3路空間連接查詢處理為例來給出具體的多路空間連接查詢處理算法,算法描述如算法1.
算法1. 多路空間連接查詢處理算法(BSMWSJ).
輸入:3類待連接數(shù)據(jù)集、數(shù)據(jù)空間范圍、分區(qū)數(shù)量和輸出目錄(dataSet1,dataSet2,dataSet3,dataspaceRange,partitionNumber,outputFileDir);
輸出:連接結果集.
① defprojectOperation(mbr:MBR,extend:MBR,partitionNumber:Int)={
②ZOrder.getZOrder(mbr,dataspaceRange,PartitionNumber).map(splitNum? (splitNum,mbr))};
③ defcreateRdd(sc:SparkContext,filePath:String):RDD[(Int, MBR)]={
④sc.textFile(filePath).map(line?{
⑤ valmbr=MBR(line.split(" "))
⑥mbr.flatMap(cur?projectOperation(cur,dataspaceRange,partitionNumber))})};
⑦RDD1=createRdd(sc,dataSet1);
⑧RDD2=createRdd(sc,dataSet2);
⑨RDD3=createRdd(sc,dataSet3);
⑩RDDresult12=RDD1.cogroup(RDD2);










3.7 算法正確性分析
鏈式多路空間連接查詢本質(zhì)是一個迭代求解的處理過程,本文提出的BSMWSJ算法同樣采用迭代方式來處理鏈式多路空間查詢;然而BSMWSJ算法充分利用了Spark處理架構的并行處理特性,首先將各類數(shù)據(jù)集進行劃分,然后在劃分后的子空間中進行并行迭代連接處理,從而從總體上提高了多路空間連接查詢的處理效率.下面以3路空間連接查詢S=R1R2R3為例來說明BSMWSJ算法的正確性.
根據(jù)BSMWSJ算法的查詢處理過程,首先將R1,R2和R3數(shù)據(jù)集投影到各個網(wǎng)格單元,由于采用簡單的投影策略,即只要某一空間對象Oi和某個網(wǎng)格單元Cj有交疊就將其投影到Cj中.因此只要2個空間對象a和b(a∈R1,b∈R2)存在相互交疊,則a和b必然被投影到相同的一個或多個網(wǎng)格單元,由于采取重復避免策略,因此只由某個網(wǎng)格單元Ci負責進行連接計算,并輸出連接結果(a,b),同時(a,b)會被復制到與空間對象b有交疊的其他網(wǎng)格單元.在進行后續(xù)第2次連接運算中,元組(a,b)中的對象b又會和數(shù)據(jù)集R3中空間對象進行空間連接運算,其執(zhí)行過程與第1次空間連接類似,由于根據(jù)R2集合中對象的交疊情況對第1次的連接結果進行了復制操作,因此不會發(fā)生丟解的情況,故本文提出的算法是正確的.
為了驗證本文所提出的多路空間連接查詢處理算法的有效性,在真實數(shù)據(jù)集和合成數(shù)據(jù)集上做了一系列實驗,并和當前最新的多路空間連接查詢處理算法ε-Controlled-Replicate進行了比較分析.由于目前沒有找到有關基于Spark平臺的多路空間連接查詢處理算法的研究工作,而ε-Controlled-Replicate算法的研究內(nèi)容和目標與本文提出的算法最相似,但該算法是在Hadoop環(huán)境下實現(xiàn),為此在Spark下重新實現(xiàn)了該算法,并和本文提出的算法進行了比較,下面就具體實驗環(huán)境及結果對比情況進行詳細說明.
4.1 實驗環(huán)境
實驗環(huán)境由15臺IBM PC機架式服務器組成的Spark集群構成,其中1臺為管理節(jié)點,其余為計算節(jié)點.每臺服務器的配置為E5-2620 CPU(6核,2.0 GHz)、32 GB內(nèi)存和6 TB的硬盤,每臺服務器都安裝了Centos6.4系統(tǒng)和相應的Spark集群計算軟件.
4.2 實驗結果分析
本文采用真實數(shù)據(jù)和合成數(shù)據(jù)對算法的性能進行了測試,真實數(shù)據(jù)來自Census2000 TIGER地圖文件數(shù)據(jù)集,其中道路數(shù)據(jù)的數(shù)量有2 092 079個,水文數(shù)據(jù)的數(shù)量為37 950個.合成數(shù)據(jù)由腳本生成,模擬真實數(shù)據(jù)分布(高斯分布),分別合成了3類數(shù)據(jù)集,3類數(shù)據(jù)集的大小相同,個數(shù)均為250萬個空間對象,整體數(shù)據(jù)空間范圍為100 000×100 000,每個空間對象的最大MBR為100×100.本文首先就網(wǎng)格劃分粒度、任務數(shù)量對BSMWSJ算法的影響進行了分析,之后與ε-Controlled-Replicate算法進行了比較,下面給出具體的實驗結果.
1) 網(wǎng)格劃分粒度對算法性能的影響
由于數(shù)據(jù)實際分布存在數(shù)據(jù)傾斜的現(xiàn)象,因此數(shù)據(jù)空間的不同劃分粒度對算法的性能具有一定的影響,因此選擇合適的劃分粒度至關重要.圖6為3組數(shù)據(jù)集數(shù)據(jù)個數(shù)分別為300萬、450萬和600萬個空間對象,采用BSMWSJ算法執(zhí)行3路空間連接時,其執(zhí)行時間隨劃分粒度的變化情況.

Fig. 6 Execution time of BSMWSJ with the number of grid cells圖6 BSMWSJ算法執(zhí)行時間隨網(wǎng)格單元數(shù)量變化情況
從圖6可以看出,隨著劃分粒度的增大,連接查詢執(zhí)行時間逐漸變小,到一定程度后又開始增大.這是因為劃分粒度小時,由于存在數(shù)據(jù)傾斜導致數(shù)據(jù)分配不均勻,個別任務的運行時間較長,影響了整體的性能.當劃分粒度變大時,網(wǎng)格單元中的數(shù)據(jù)對象能夠更加均勻地分配給任務去執(zhí)行,因而時間減少,但隨著劃分的網(wǎng)格數(shù)量的進一步增加,就導致了跨網(wǎng)格單元對象越來越多,造成投影和復制的數(shù)據(jù)量大大增加,從而造成了計算量的增加,因此劃分粒度要適合,這里選擇劃分4 096個網(wǎng)格單元為最佳選擇.
2) 執(zhí)行時間隨任務數(shù)量的變化情況
Spark環(huán)境下,通常任務數(shù)越多表示并行度越高,執(zhí)行時間就越快.圖7是網(wǎng)格單元數(shù)為64的情況下,在3組不同大小的數(shù)據(jù)集(分別為300萬、450萬和600萬個空間對象)上分別執(zhí)行BSMWSJ算法時,當并行任務數(shù)量不同時的算法執(zhí)行時間變化情況.

Fig. 7 Execution time of BSMWSJ with the number of tasks圖7 BSMWSJ算法執(zhí)行時間隨任務數(shù)量的變化情況
從圖7可以看出,當數(shù)據(jù)量一定時,隨著任務個數(shù)的增加,執(zhí)行時間下降,但下降的幅度慢慢趨緩,到一定程度后,執(zhí)行時間不再下降,這主要是由于任務的開啟會帶來一定的代價,增加任務的數(shù)量能夠提高算法的并行度,降低查詢的響應時間,但這種降低并不是線性的.這也說明在數(shù)據(jù)集大小一定的情況下,任務的數(shù)量不一定越多越好,因此任務數(shù)量要適當.
3) 算法性能比較
圖8是網(wǎng)格單元個數(shù)為64、Spark任務數(shù)為64時2種算法的執(zhí)行時間隨數(shù)據(jù)集大小變化的情況.從中可以看出,2種算法的執(zhí)行時間都隨著數(shù)據(jù)集數(shù)量的增加而增大,這和理論預期是一致的.然而BSMWSJ算法明顯優(yōu)于ε-Controlled-Replicate算法,這是因為BSMWSJ算法在投影和復制操作中進行了相應的優(yōu)化,在連接處理中采用了邊界過濾方法進行過濾,減少了數(shù)據(jù)復制操作的數(shù)量,從而降低了實際計算代價.

Fig. 8 Comparison of execution time with different dataset size圖8 不同數(shù)據(jù)量下算法執(zhí)行時間比較
圖9是當空間數(shù)據(jù)對象大小增大情況下2種算法的執(zhí)行時間變化情況比較,其中3類空間連接數(shù)據(jù)對象的數(shù)量分別為200萬個,共計600萬個空間對象,劃分的網(wǎng)格單元數(shù)量為64個,任務個數(shù)為64,空間對象的最大MBR依次設置為100×100到500×500.

Fig. 9 Comparison of execution time with max length of MBR圖9 不同MBR最大長度下的運行時間比較
從圖9可以看出,2種算法的執(zhí)行時間隨著空間對象MBR最大長度的增加而快速增加,這主要是由于當空間對象的MBR增大時,對象之間的交疊增加,投影和復制的數(shù)據(jù)對象就會越來越多,其計算量必然大幅增大,從而造成執(zhí)行時間的大幅增加.
圖10是在真實數(shù)據(jù)集上執(zhí)行3路空間連接查詢,3個數(shù)據(jù)集中空間對象個數(shù)分別為200萬、3.7萬和200萬,網(wǎng)格單元個數(shù)為64時,2種算法的執(zhí)行時間隨著任務數(shù)量變化的情況比較.

Fig. 10 Execution time of the algorithms varying with the number of tasks圖10 算法執(zhí)行時間隨任務數(shù)量變化情況
從圖10中可以看出,隨著空間任務個數(shù)的增加,查詢執(zhí)行時間快速下降,但下降的幅度慢慢趨緩,到一定程度后執(zhí)行時間不再下降,這主要是由于任務的開啟會帶來一定的代價造成的,這同理論分析相一致.但從2種算法執(zhí)行時間比較來看,本文提出的BSMWSJ算法要優(yōu)于ε-Controlled-Replicate,這主要是由于BSMWSJ算法采取的數(shù)據(jù)投影方式避免了數(shù)據(jù)的大量復制,其邊界過濾策略能夠過濾掉一部分不會成為結果的對象,降低了計算和通信代價.
從2個方面的比較可以看出,BSMWSJ算法的性能明顯要高優(yōu)于ε-Controlled-Replicate算法.
本文針對現(xiàn)有云環(huán)境下的多路空間連接查詢處理算法存在的性能優(yōu)化方面的不足,提出了一種基于Spark的多路空間連接算法BSMWSJ,該算法采用網(wǎng)格劃分方法對數(shù)據(jù)空間進行劃分,并基于空間對象所在的位置來進行數(shù)據(jù)投影和復制,計算過程中采用邊界過濾方法來過濾掉無用的連接對象,并通過縮小復制范圍來減少連接對象的多余復制,從而減少算法的計算代價.實驗表明:本文所提出的多路空間連接查詢處理算法要明顯優(yōu)于ε-Controlled-Replicate算法,并具有良好的性能和擴展性.在后續(xù)工作中將進行更大規(guī)模的實驗研究,并進一步改進相關算法,大幅提高數(shù)據(jù)投影、復制和過濾的效果,從而提高算法性能,同時也將考慮結合索引技術來進一步提高算法的性能.
[1]Apache. Apache SparkTMis a fast and general engine for large-scale data processing[EBOL]. 2012[2016-07-26]. http:spark.apache.org
[2]Afrati F N, Ullman J D. Optimizing multiway joins in a Map-Reduce environment[J]. IEEE Trans on Knowledge and Data Engineer, 2011, 23(9): 1282-1298
[3]Lin Yuting, Agrawal D, Chen Chun, et al. Llama: Leveraging columnar storage for scalable join processing in the MapReduce framework[C]Proc of the 2011 ACM SIGMOD Int Conf on Management of Data. New York: ACM, 2011: 961-972
[4]Jiang Dawei, Tung A K H, Chen Gang. MAP-JOIN-REDUCE: Toward scalable and efficient data analysis on large clusters[J]. IEEE Trans on Knowledge and Data Engineering, 2011, 23(9): 1299-1311
[5]Wang Xiaojun, Sun Hui. Research of optimizing multiway joins based on MapReduce[J]. Computer Technology & Development, 2013, 23(6): 59-66 (in Chinese)(王曉軍, 孫惠. 基于MapReduce的多路連接優(yōu)化方法研究[J]. 計算機技術與發(fā)展, 2013, 23(6): 59-66)
[6]Slagter K, Hsu C, Chung Y, et al. SmartJoin: A network-aware multiway join for MapReduce[J]. Cluster Computing, 2014, 17(3): 629-641
[7]Wang Jingfen, Peng Zhixing. Research of optimization algorithm for multi-way spatial Join[J]. Journal of Chinese Computer Systems, 2013, 34(11): 2431-2436 (in Chinese)(汪璟玢, 彭志星. 多路空間連接優(yōu)化算法研究[J]. 小微型計算機系統(tǒng), 2013, 34(11): 2431-2436)
[8]Gupta H, Chawda B, Negi S, et al. Processing multi-way spatial joins on Map-Reduce[C]Proc of the 16th Int Conf on Extending Database Technology. New York: ACM, 2013: 113-124
[9]Gupta H, Chawda B.ε-Controlled-Replicate: An improved controlled-replicate algorithm for multi-way spatial join processing on map-reduce[C]Proc of the 15th Int Conf on Web Information Systems Engineering. Berlin: Springer, 2014: 278-293
[10]Luo Wuman, Tan Haoyu, Mao Huajian, et al. Efficient similarity joins on massive high-dimensional datasets using MapReduce[C]Proc of the 13th IEEE Int Conf on Mobile Data Management. Piscataway, NJ: IEEE, 2012: 1-10
[11]Ma Youzhong, Meng Xiaofeng, Wang Shaoya. Parallel similarity joins on massive high-dimensional data using MapReduce[J]. Concurrency & Computation Practice & Experience, 2015, 28(1): 166-183
[12]Kim Y, Shim K. Parallel Top-ksimilarity join algorithms using MapReduce[C]Proc of the 28th IEEE Int Conf on Data Engineering. Piscataway, NJ: IEEE, 2012: 510-521
[13]Vernica R, Carey M, Li C. Efficient parallel set-similarity joins using MapReduce[C]Proc of the 2010 ACM SIGMOD Int Conf on Management of Data. New York: ACM, 2010: 495-506
[14]Ma Youzhong, Meng Xiaofeng. Set similarity join on massive probabilistic data using MapReduce[J]. Distributed and Parallel Databases, 2014, 32(3): 447-464
[15]Okcan A, Riedewald M. Processing theta-joins using MapReduce[C]Proc of the 2011 ACM SIGMOD Int Conf anaon Mgement of Data. New York: ACM, 2011: 949-960
[16]Pan Wei, Li Zhanhuai, Chen Qun, et al. An optimization for processing MapReduce-based asymmetric fragment and replicate join[J]. Journal of Computer Research and Development, 2012, 49(l): 296-302 (in Chinese)(潘巍, 李戰(zhàn)懷, 陳群, 等. 面向MapReduce的非對稱分片復制連接算法優(yōu)化技術研究[J]. 計算機研究與發(fā)展, 2012, 49(1): 296-302)
[17]Bian Haoqiong, Chen Yueguo, Du Xiaoyong, et al. Equi-join optimization on Spark[J]. Journal of East China Normal University: Natural Science, 2014, 2014(5): 263-280 (in Chinese)(卞昊穹, 陳躍國, 杜小勇, 等. Spark上的等值連接優(yōu)化[J]. 華東師范大學學報: 自然科學版, 2014, 2014(5): 263-280)
[18]Wang Kai, Han Jizhong, Tu Bibo, et al. Accelerating spatial data processing with mapreduce[C]Proc of the 16th IEEE Int Conf on Parallel and Distributed Systems. Piscataway, NJ: IEEE, 2010: 229-236
[19]Zhang Shubin, Han Jizhong, Liu Zhiyong, et al. Spatial queries evaluation with MapReduce[C]Proc of the 8th IEEE Int Conf on Grid and Cooperative Computing. Piscataway, NJ: IEEE, 2009: 287-292
[20]Zhang Shubin, Han Jizhong, Liu Zhiyong, et al. Sjmr: Parallelizing spatial join with MapReduce on clusters[C]
Proc of 2009 IEEE Int Conf on Cluster Computing and Workshops. Piscataway, NJ: IEEE, 2009
[21]Qiao Baiyou, Zhu Hunhai, Shen Muchuan, et al. A boundary filtering based spatial join query processing optimization algorithm[C]Proc of the 12th Int Conf on Fuzzy Systems and Knowledge Discovery. Piscataway, NJ: IEEE, 2015: 1764-1769
[22]Sun Li, Li Jing, Liu Guohua. Join strategy optimization in column storage based query[J]. Journal of Computer Research and Development, 2013, 50(8): 1647-1656 (in Chinese)(孫莉, 李靜, 劉國華. 列存儲數(shù)據(jù)查詢中的連接策略優(yōu)化方法[J]. 計算機研究與發(fā)展, 2013, 50(8): 1647-1656)
[23]Zhou Guliang, Sa Churila, Zhu Yongli. Star join algorithm based on multi-dimensional bloom filter in Spark[J]. Journal of Computer Applications, 2016, 36(2): 353-357 (in Chinese)(周國亮, 薩初日拉, 朱永利. Spark環(huán)境下基于多維布隆過濾器的星型連接算法[J]. 計算機應用, 2016, 36(2): 353-357)

Qiao Baiyou, born in 1970. PhD and associate professor in Northeastern University. Member of CCF. His main research interests include cloud computing, virtualization technology, big data and spatial data management.

Zhu Junhai, born in 1989. Master. His main research interests include big data management and spatial data management.

Zheng Yujie, born in 1993. Master candidate. Her main research interests include big data management and spatial data management.

Shen Muchuan, born in 1992. Master. His main research interests include cloud computing, virtualization technology and big data.

Wang Guoren, born in 1966. Professor and PhD supervisor in Northeastern University. Senior member of CCF. His main research interests include cloud computing, big data, memory computing, and database theory.
A Multi-Way Spatial Join Querying Processing Algorithm Based on Spark
Qiao Baiyou1,2, Zhu Junhai1, Zheng Yujie1, Shen Muchuan1, and Wang Guoren1
1(SchoolofComputerScienceandEngineering,NortheasternUniversity,Shenyang110819)2(DepartmentofComputerScience,BrighamYoungUniversity,Provo,Utah,USA84602)
Aiming at the problem of spatial join query processing in cloud computing systems, a multi-way spatial join query processing algorithm BSMWSJ is proposed, which is based on Spark platform. In this algorithm, the whole data space is divided into grid cells with the same size by grid partition method, and spatial objects in each type data set are distributed into these grid cells according to their spatial locations. Spatial objects in different grid cells are processed in parallel. In multi-way spatial join query processing, a boundary filtering method is proposed to filter the useless data, which calculates the MBRs of the candidate results generated by the previous join processing, and uses these MBRs to filter the subsequent join data sets. This allows it to filter out the useless spatial objects, and reduce the redundant projection and replication of spatial objects. At the same time, a duplication avoidance strategy is applied to reduce the outputs of redundant results, and further minimizes the cost of the subsequent join processing. Many experiments on synthetic and real data sets show that the proposed multi-way spatial join query processing algorithm BSMWSJ has obvious advantages and better performance than the existing multi-way spatial join query processing algorithms.
cloud computing; Spark platform; multi-way spatial join query; boundary filtering; duplication avoidance
2016-08-02;
2016-10-20
國家自然科學基金項目(61073063,61332006);國家海洋公益性行業(yè)科研專項經(jīng)費項目(201105033) This work was supported by the National Natural Science Foundation of China (61073063, 61332006) and the National Marine Industry Research Special Funds for Public Welfare Projects (201105033).
TP311.13