宗楓博,趙宇海+,王國仁,季航旭
1.東北大學(xué) 計(jì)算機(jī)科學(xué)與工程學(xué)院,沈陽110169
2.北京理工大學(xué) 計(jì)算機(jī)學(xué)院,北京100081
隨著分布式技術(shù)的快速發(fā)展,人們對大數(shù)據(jù)的處理能力有了飛躍式的提高,眾多優(yōu)秀的大數(shù)據(jù)引擎也應(yīng)運(yùn)而生。大數(shù)據(jù)處理引擎有著許多與傳統(tǒng)數(shù)據(jù)庫相似的計(jì)算操作,同樣也有許多近似的優(yōu)化方法。同時大數(shù)據(jù)處理引擎也與傳統(tǒng)數(shù)據(jù)庫有很多質(zhì)的區(qū)別,大數(shù)據(jù)處理引擎不再依賴于企業(yè)級服務(wù)器而是可以通過部署在成百上千臺PC 上協(xié)同工作,網(wǎng)絡(luò)對運(yùn)行效率的影響相比傳統(tǒng)數(shù)據(jù)庫而言更加顯著。
作為大數(shù)據(jù)處理引擎,當(dāng)連接操作涉及到多個表時,連接排序?qū)档蛨?zhí)行成本有相當(dāng)大的效果。連接操作符通過兩個表的公共屬性將它們連接起來,優(yōu)化器需要找到一個成本最低或幾乎接近的執(zhí)行計(jì)劃。連接順序的優(yōu)化問題通常被認(rèn)為是一個NP-Complete問題,執(zhí)行計(jì)劃的個數(shù)隨著連接表的數(shù)量和數(shù)據(jù)源之間的連接關(guān)系數(shù)量增加而呈階乘級增加,目前為止針對連接順序的優(yōu)化已經(jīng)提出了許多方法,包括確定性算法、啟發(fā)式算法、隨機(jī)算法、混合算法等。但是與傳統(tǒng)數(shù)據(jù)庫不同的是,分布式大數(shù)據(jù)處理引擎的節(jié)點(diǎn)要遠(yuǎn)遠(yuǎn)多于傳統(tǒng)分布式數(shù)據(jù)庫,在MapReduce 框架下各節(jié)點(diǎn)間I/O 對網(wǎng)絡(luò)傳輸造成的影響會隨著中間數(shù)據(jù)量的增長而成倍放大,同時I/O 對執(zhí)行效率的影響也被大幅放大。而在傳統(tǒng)的連接優(yōu)化算法中,通常只考慮連接順序優(yōu)化,通過減少全局的連接計(jì)算量來減少執(zhí)行時間,而并不考慮同時優(yōu)化每次連接的投影或通過減少全局的投影列數(shù)來減少I/O 代價,如圖1 連接樹中包括三個表、、,傳統(tǒng)方法中的優(yōu)化主要將表作為最小優(yōu)化粒度考慮,比如通過表的行數(shù)直接估算比較??和??哪個為更優(yōu)連接策略,但若將表細(xì)分為列來考慮,表下的[0,1]表示表包含兩個不同屬性的列,相同的數(shù)字表示列的屬性相同。在保證最終連接結(jié)果不變的條件下,可將原連接樹改變連接順序?yàn)閳D2 所示連接樹,中間表由(σ(?))變?yōu)?σ(?)),由于投影到中間連接表的列由變?yōu)榱耍瑴p少了一列,則可能進(jìn)而減少了I/O 代價。為了盡可能減少全局中間連接表體積進(jìn)而降低I/O 代價,本研究設(shè)計(jì)了一個連接索引結(jié)構(gòu),結(jié)合動態(tài)規(guī)劃算法在優(yōu)化連接順序的同時對每個連接表的投影關(guān)系進(jìn)行優(yōu)化,同時針對連接計(jì)算代價和I/O 代價進(jìn)行了優(yōu)化,有效地減少了多連接任務(wù)的運(yùn)算時間和中間數(shù)據(jù)的傳輸量。

圖1 原連接樹Fig.1 Origin join tree

圖2 改變連接順序Fig.2 Modify join order
本文的主要貢獻(xiàn)如下:
(1)設(shè)計(jì)了一種描述連接表中列關(guān)系的索引結(jié)構(gòu),此索引結(jié)構(gòu)可以快速找到兩個表的連接關(guān)系及每一列是否冗余列。
(2)基于此索引,結(jié)合動態(tài)規(guī)劃算法在優(yōu)化連接順序的同時刪除每次連接的冗余列,盡可能地減少所有中間連接表的體積。
(3)通過基于Flink 系統(tǒng)的實(shí)驗(yàn)對本優(yōu)化系統(tǒng)進(jìn)行了驗(yàn)證,實(shí)驗(yàn)結(jié)果顯示本優(yōu)化系統(tǒng)有效地減少了多連接任務(wù)的執(zhí)行時間和中間數(shù)據(jù)的傳輸量。

確定性算法通過完全遍歷或在解空間中應(yīng)用一些啟發(fā)式剪枝方法,來執(zhí)行解空間的確定性搜索,如IBM 的System R 中使用的動態(tài)規(guī)劃算法,這個算法被幾乎現(xiàn)有的所有商用RDBMS 系統(tǒng)使用,該算法通過動態(tài)剪枝解空間來進(jìn)行完全遍歷,通過迭代已有的連接關(guān)系并盡可能剪枝一些次優(yōu)解來構(gòu)造所有備選連接樹,從而保證了動態(tài)規(guī)劃算法在解空間中找到最優(yōu)解。隨機(jī)化算法通過預(yù)定義的變換規(guī)則在解空間內(nèi)隨機(jī)移動,不斷尋找代價更低的點(diǎn),當(dāng)點(diǎn)不再移動的次數(shù)達(dá)到預(yù)定義的迭代次數(shù)閾值時,則認(rèn)為當(dāng)前的解為較優(yōu)的解。如模擬退火算法(simulated annealing algorithm),它允許當(dāng)前點(diǎn)向代價更高的點(diǎn)移動,從而降低了算法陷入局部最優(yōu)解的概率,文獻(xiàn)[6]中提出了一個算法參數(shù)來定義在給定時間點(diǎn)繼續(xù)搜索的可能性,并和當(dāng)前點(diǎn)與目標(biāo)點(diǎn)的代價比例共同決定點(diǎn)移動的概率。
基因算法通過模擬生物的進(jìn)化過程來尋找最優(yōu)解,通過將一個初始群體(解集合)中的個體(解)隨機(jī)交叉(crossover)和變異(mutation)產(chǎn)生下一代群體,將適應(yīng)度最高(通過代價函數(shù)定義)的部分個體保留,并重復(fù)交叉和變異的過程產(chǎn)生下一代群體,當(dāng)?shù)螖?shù)達(dá)到預(yù)定義的閾值或者群體適應(yīng)度已接近飽和,繼續(xù)進(jìn)化無法再顯著提高適應(yīng)度(通過閾值定義)時停止迭代過程。如PostgreSQL 中的Genetic Query Optimizer,它只考慮了左深樹的情況,實(shí)現(xiàn)了精英選擇算子(elitist selection operator),對于群體中的最優(yōu)解,只進(jìn)行交叉而不進(jìn)行變異,防止群體中的最優(yōu)個體在迭代到下一代時發(fā)生丟失,從而導(dǎo)致算法難以收斂到全局最優(yōu)解。
混合算法混合了以上兩種及以上的優(yōu)化策略,比如巡回模擬退火算法(toured simulated annealing),該算法首先通過確定性算法確定幾個初始點(diǎn),然后對每個初始點(diǎn)進(jìn)行模擬退火算法。
以上算法為主要基于傳統(tǒng)關(guān)系數(shù)據(jù)庫理論的多連接優(yōu)化,優(yōu)化的最小粒度為表,而在本文所提出的多表連接優(yōu)化方法中優(yōu)化的最小粒度為列。在傳統(tǒng)數(shù)據(jù)庫甚至一些節(jié)點(diǎn)數(shù)不高的分布式數(shù)據(jù)庫中,連接表的列數(shù),或者說連接表的體積對I/O 并不會造成顯著影響,而連接表的行數(shù),即連接表的元組數(shù)會對計(jì)算量產(chǎn)生很大影響,因此這些傳統(tǒng)的多連接優(yōu)化算法主要注重于對連接元組數(shù)進(jìn)行優(yōu)化以提高計(jì)算效率。隨著MapReduce 框架在大數(shù)據(jù)處理中的廣泛應(yīng)用,一個任務(wù)的計(jì)算節(jié)點(diǎn)可能達(dá)到成百上千個,那么節(jié)點(diǎn)間I/O 帶來的影響不再可以忽視。針對此問題,本文提出一種一般性的多表連接優(yōu)化方法,可以在優(yōu)化算法中同時考慮計(jì)算和I/O 的代價,并以動態(tài)規(guī)劃算法為例,在動態(tài)規(guī)劃算法的基礎(chǔ)上同時實(shí)現(xiàn)對計(jì)算和I/O 的優(yōu)化。并在實(shí)驗(yàn)中對多表多列數(shù)和多表低列數(shù)的情況進(jìn)行了測試,證明了本文的優(yōu)化方法不失一般性。
對于批數(shù)據(jù)連接計(jì)算,F(xiàn)link 系統(tǒng)在節(jié)點(diǎn)之間的傳輸有兩種策略:RR(repartition-repartition)和BF(broadcast-forward)。而節(jié)點(diǎn)中的本地批數(shù)據(jù)連接策略和傳輸策略是完全獨(dú)立的,在批數(shù)據(jù)傳輸完成后,每個連接實(shí)例將使用本地連接算法來連接分配的數(shù)據(jù),本地連接策略包括兩種:SM(sort-merge-join)和HH(hybrid-Hash-join)。
如圖3 所示,RR 策略將兩個批數(shù)據(jù)源R(等分為R1 和R2)和S(等分為S1 和S2)及它們的連接屬性通過相同的分區(qū)函數(shù)發(fā)送到各個節(jié)點(diǎn),每個節(jié)點(diǎn)被分配一個連接實(shí)例,確保有相同連接屬性的批數(shù)據(jù)被發(fā)送到相同節(jié)點(diǎn),RR 策略的代價即為將兩個數(shù)據(jù)集通過網(wǎng)絡(luò)進(jìn)行完全shuffle。

圖3 RR 傳輸策略Fig.3 RR ship strategy
如圖4 所示,BF 策略中,批數(shù)據(jù)源S 保留在本地節(jié)點(diǎn)不通過網(wǎng)絡(luò)傳輸,而每個節(jié)點(diǎn)的批數(shù)據(jù)源R 則被發(fā)送到其他所有節(jié)點(diǎn),保證每個節(jié)點(diǎn)都能收到完整的批數(shù)據(jù)源R,因此BF 策略的代價僅取決于批數(shù)據(jù)源R 的大小和連接實(shí)例的數(shù)量,與批數(shù)據(jù)源S 的大小無關(guān)。顯然當(dāng)批數(shù)據(jù)源R 的大小遠(yuǎn)小于批數(shù)據(jù)源S時,系統(tǒng)更傾向于使用BF 策略。

圖4 BF 傳輸策略Fig.4 BF ship strategy
如圖5 所示,SM 的工作方式是本地節(jié)點(diǎn)的兩個輸入數(shù)據(jù)集按照連接屬性分別進(jìn)行排序(sort),然后將排序后的數(shù)據(jù)集進(jìn)行合并(merge),最后對合并后的數(shù)據(jù)集進(jìn)行連接。排序過程在內(nèi)存中進(jìn)行,當(dāng)內(nèi)存資源耗盡后則將內(nèi)存中已排序完成的數(shù)據(jù)寫入本地文件系統(tǒng)中,然后繼續(xù)對傳入的數(shù)據(jù)在內(nèi)存中排序,全部數(shù)據(jù)傳入完成后在本地文件系統(tǒng)中會存放每個輸入流的若干不相交的有序子集,最后通過有序合并每個輸入流的有序子集,同時對合并后的兩個有序數(shù)據(jù)集進(jìn)行連接。

圖5 SM 連接策略Fig.5 SM join strategy
如圖6 所示,HH 方式將兩個輸入數(shù)據(jù)集分為build 端和probe 端,并且分為兩個階段工作:probe 階段和build 階段。在build 階段中,將build 端的數(shù)據(jù)根據(jù)連接屬性插入到哈希表中,若內(nèi)存資源耗盡則寫入到本地文件系統(tǒng)中。在probe 階段中,probe 端數(shù)據(jù)根據(jù)哈希表索引與build 端數(shù)據(jù)連接,若索引到內(nèi)存中的哈希表,則直接連接并輸出,若索引到本地文件系統(tǒng)中的部分,則同樣也將數(shù)據(jù)寫入到本地文件系統(tǒng)中。在probe 端數(shù)據(jù)讀完后,若本地文件系統(tǒng)中存在有溢出數(shù)據(jù),則刪除當(dāng)前內(nèi)存中的哈希表,通過存儲的build 端溢出數(shù)據(jù)重新構(gòu)建新的哈希表,并將本地文件系統(tǒng)中對應(yīng)的probe 端溢出數(shù)據(jù)通過新的哈希表索引并連接,遞歸地進(jìn)行此流程直到所有probe端數(shù)據(jù)都被處理完成。若內(nèi)存資源能夠容納完整的哈希表,則HH 方式相當(dāng)于普通的In-Memory Hash Join,有著極高的效率;即使在少量數(shù)據(jù)寫入到本地文件系統(tǒng)的情況下,HH 仍然有著良好的性能。

圖6 HH 連接策略Fig.6 HH join strategy
首先舉例說明需要解決的問題,如圖7 所示的一段連接關(guān)系的構(gòu)建中,表與表連接生成中間表,key 為列1,這時需要確定哪些列需要通過投影關(guān)系保留到中,刪除冗余列。

圖7 列保留示例Fig.7 Example of column retention
對于列0,發(fā)現(xiàn)還有未連接的數(shù)據(jù)源包含此列,那么此列可能在后續(xù)連接中作為key,則此次投影中保留此列。
對于列1,在作為此次連接的key 后,發(fā)現(xiàn)此列不再存在于還未連接的任何數(shù)據(jù)源中,也不存在于最終結(jié)果result中,則可判定此列為冗余列,此次投影不再保留,列2 同理。
對于列3,發(fā)現(xiàn)原連接指定的result 中包含此列,則在每次投影中都需要保留此列。
通過解決以上所提到的冗余列問題,在同時考慮到連接順序?qū)B接效率的條件下,對原連接樹圖8使用本文提供的優(yōu)化方法,優(yōu)化后的連接樹為圖9,連接使用的數(shù)據(jù)集為TPC-H 標(biāo)準(zhǔn)測試數(shù)據(jù)集,其與Source表的對應(yīng)關(guān)系如表1 所示。

圖8 原連接樹Fig.8 Origin join tree

圖9 優(yōu)化后的連接樹Fig.9 Optimized join tree

表1 Source表對應(yīng)的TPC-H 表Table 1 TPC-H table corresponding to Source table
為了執(zhí)行此優(yōu)化策略,本文設(shè)計(jì)了一種關(guān)于列和表的投影關(guān)系索引,通過此索引能夠快速判斷兩個表的連接鍵值,以及連接后所需要保留的必要列。建立投影關(guān)系索引主要由兩個模塊組成:對列關(guān)系的溯源和列與連接關(guān)系索引的建立。
在得到投影關(guān)系索引后,可以基于此索引對每次連接的投影進(jìn)行判斷,及時刪除冗余列減小I/O負(fù)擔(dān),具體實(shí)現(xiàn)方法為3.3節(jié)基于投影關(guān)系索引的投影判斷。
為了衡量連接的優(yōu)劣,需要設(shè)計(jì)一個綜合考慮CPU 和I/O 的代價函數(shù),基于前文2.1 節(jié)Flink 中的批數(shù)據(jù)傳輸策略可以得到I/O 的代價函數(shù),基于前文2.2節(jié)Flink 中的批數(shù)據(jù)連接策略介紹可以得到CPU 的代價函數(shù)。通過將代價函數(shù)與動態(tài)規(guī)劃算法結(jié)合可以得到連接順序的優(yōu)化結(jié)果,具體實(shí)現(xiàn)方法為3.4 節(jié)基于投影關(guān)系索引的連接順序優(yōu)化。
此算法目的在于:對于連接樹中指定一個表的一個列,能夠找到以此列為key 具有連接關(guān)系的所有Source表的集合。
如算法1 所示,給定一個表和此表中的一個列,若表為Source 表,則創(chuàng)建一個記錄節(jié)點(diǎn)(行3),記錄此表的id 和列在此表中的位置(行4),將記錄節(jié)點(diǎn)加入到中(行5);若表為Join 表,則對投影到列的輸入表進(jìn)行遞歸溯源,將記錄節(jié)點(diǎn)加入到中(行9),若列同時為此Join 表的key,那么同樣要對另外的輸入表進(jìn)行遞歸溯源,將記錄節(jié)點(diǎn)加入到中(行11)。
Trace
輸入:,the table currently accessed;,the column to be traced to the current access table.
輸出:,the set of root tables found by traversing.


舉例說明,與通過列6 為key 進(jìn)行連接,需要得知哪些Source表包含了列6,這些Source表均有以列6 為key 的連接關(guān)系。
如圖10 所示,對于中的列6 進(jìn)行溯源,為Join表且列6為key,則列6的來源包括和,遞歸地對表溯源,找到和,則以列6 為key 具有連接關(guān)系的Source 表集合為{4,5,6}。

圖10 對列6 的溯源結(jié)果Fig.10 Result of tracing column 6
此算法的目的在于,對于連接過程中的列,在每次連接的投影關(guān)系策略中,需要保留可能會在之后的連接中作為key 的列和包含在Result 中的列,為此建立一個十字鏈表形式的索引結(jié)構(gòu),由Source 表索引、Key 節(jié)點(diǎn)索引和連接信息節(jié)點(diǎn)構(gòu)成,每個連接信息節(jié)點(diǎn)中記錄所有能夠作為同一連接key 的列,通過這個索引可以快速地確認(rèn)當(dāng)一次連接完成后哪些列在投影操作中可以被刪除,減小對I/O 的影響。
這里說明用于建立所有key 列與Source 表的關(guān)系索引的算法2,對于所有Join 表的集合,遍歷其中的每個Join 表,對的連接key 進(jìn)行溯源得到記錄節(jié)點(diǎn)集合(行3),創(chuàng)建一個新的連接信息節(jié)點(diǎn)記錄這個集合(行4),遍歷集合中的每個記錄節(jié)點(diǎn),通過Source 表索引查找,若存在已有的連接信息節(jié)點(diǎn)包含(行7),則說明所記錄的key 和所記錄的key 是等價的,因?yàn)檫@些key 所源自的Source 表互相存在連接關(guān)系,所以可以將與合并(行8),并用重新指向合并后的節(jié)點(diǎn)(行9);若不存在于對應(yīng)Source 表的鏈表中,則說明這是一個有關(guān)于此Source 表的新的連接關(guān)系,將其加入到Source表的鏈表中(行12)。而若中的每個記錄節(jié)點(diǎn)都不存在于已有連接信息節(jié)點(diǎn)中,則說明所代表key 不與已有的任何key 存在等價關(guān)系,則將其作為一個新的連接信息節(jié)點(diǎn)加入到key 節(jié)點(diǎn)索引中(行15)。
Key columns index
輸入:,the set of JoinNode;,the linked list array of the relationship between the source table and the key information node.
輸出:,the index array of key information node.

舉例說明,在對表中的key 列3 進(jìn)行溯源后,得到如圖11 所示索引。左側(cè)0~7 的縱表為Source表索引,對于一個節(jié)點(diǎn),節(jié)點(diǎn)頭(3)表示此key 列在整體連接樹中的列編號為3,在所有的key 列中編號為2。指向節(jié)點(diǎn)內(nèi)容1 表示此key 列在中的位置為1。指向節(jié)點(diǎn)內(nèi)容0 表示此key 列在中的位置為0。

圖11 對Join2 表中的列3 溯源得到索引Fig.11 Index of tracing column 3 in Join2 table
如圖12 所示,在對表中的key 列6 進(jìn)行溯源后,得到記錄節(jié)點(diǎn)集合{列1,列1},通過Source 索引查詢,發(fā)現(xiàn)并不與現(xiàn)存的連接信息節(jié)點(diǎn)有交集,那么作為新的連接記錄節(jié)點(diǎn){(6),列1,列1}加入到key 節(jié)點(diǎn)索引中。

圖12 對Join3 表中的列6 溯源得到索引Fig.12 Index of tracing column 6 in Join3 table
如圖13 所示,在對表中的key 列6 進(jìn)行溯源后,得到記錄節(jié)點(diǎn)集合{列1,列1列0},通過Source 索引查詢,發(fā)現(xiàn)與已有的節(jié)點(diǎn)存在交集,則將記錄集合與節(jié)點(diǎn)合并。

圖13 對Join4 表中的列6 溯源得到索引Fig.13 Index of tracing column 6 in Join4 table
通過此方法,最終得到如圖14 所示連接樹的索引結(jié)構(gòu)。

圖14 原連接樹的連接關(guān)系索引Fig.14 Join relations of origin join tree
同樣,對于最終結(jié)果Result 表的所有列采取類似的算法得到關(guān)于投影到Result 表上所有列與Source表的關(guān)系索引。
此算法的目的在于,基于已有的投影關(guān)系索引,確定每次連接時需要保留的列。
如算法3 所示,對于兩個輸入表和,按照Source 索引,將中的所有已連接的Source 表所在的記錄節(jié)點(diǎn)標(biāo)記為finished(行7),將記錄節(jié)點(diǎn)所在的KeyNode 節(jié)點(diǎn)標(biāo)記為processed 表示已被遍歷(行6);對進(jìn)行同樣的操作,若有KeyNode 被兩次遍歷,則說明此KeyNode 所代表的列同時存在于和中,即為和的連接key 列(行14~15)。若有Key-Node 中的所有列信息節(jié)點(diǎn)均被標(biāo)記為finished,則說明此KeyNode 所代表的key 列連接Source 表已全部連接,那么此key 列在后續(xù)連接中不會再被使用(行12~13)。同時若此key 列不存在于Result 索引中,即與最終投影結(jié)果無關(guān),那么在此次連接中不再對此列進(jìn)行投影保留(行22)?;诖怂惴梢源_保每次連接投影所保留的列只包括可能會在后續(xù)連接中所用到的key 列和最終結(jié)果的來源列。
NewJoinNode
輸入:,InputNode0;,InputNode1;,linked list array of the relationship between the source table and the key information node;,index array of key information node;,index array of result information node.
輸出:,the new JoinNode obtained by connectionand.


舉例說明,如圖15 所示,連接和表,通過Source 索引查詢,發(fā)現(xiàn)和表同時包含列,則為和的連接key,連接結(jié)果為表,投影的key 列包括列。連接表和表,通過Source 索引查詢表和表同時包含列,則為和的連接key,同時發(fā)現(xiàn)節(jié)點(diǎn)中的所有Source 表都已被連接,節(jié)點(diǎn)中的表還未連接,則此次連接投影的key 列包括列,不再投影保留列,并從索引中刪除節(jié)點(diǎn),刪除后的索引如圖16 所示。

圖15 連接索引查詢Fig.15 Query by join index

圖16 刪除冗余列及所在索引Fig.16 Delete redundant columns and indexes
連接順序的優(yōu)化需要基于每次連接的代價計(jì)算,包括計(jì)算代價和I/O 代價。
對于輸入,估算其表體積()為公式:

其中,()為的元組數(shù),為經(jīng)過投影關(guān)系索引判斷后所保留的列。根據(jù)2.1 節(jié)中的傳輸策略介紹,可得RR和BF兩種傳輸策略對應(yīng)的傳輸代價公式:



在假設(shè)和均勻分布的情況下,根據(jù)2.2 節(jié)中的連接策略介紹,可得關(guān)于Hybrid-Hash-Join 和Sort-Merge-Join 的CPU代價公式及I/O代價公式,其中為分配給Hash Table的內(nèi)存大小。

據(jù)此可以得到CPU 代價公式和I/O 代價公式,其中為選擇因子,1 表示選擇Hash 策略,0 表示選擇Sort-Merge策略。

據(jù)此得到每次連接的代價(?),其中、分別為節(jié)點(diǎn)間傳輸和節(jié)點(diǎn)與本地文件系統(tǒng)間傳輸?shù)腎/O 代價權(quán)重,為CPU 計(jì)算代價權(quán)重,三者均與系統(tǒng)硬件相關(guān)。

在假設(shè)輸入表中連接屬性的值在表中均勻分布的情況下,兩表連接結(jié)果的元組數(shù)(?) 估算為公式:

其中,為連接鍵值,(,)表示在中不同值的數(shù)量。
通過代價公式和連接元組數(shù)計(jì)算,可以得到基于動態(tài)規(guī)劃的連接順序重排算法4。已知Source 表數(shù)量為,初始化狀態(tài)矩陣,將每個Source 表依次加入到中(行4),對于矩陣中的每個節(jié)點(diǎn),其行編號為(已連接的Source 表數(shù)-1),列編號為該連接樹生成時起始的Source 表編號。對于D的連接樹,需要連接的表數(shù)共為+1,且包含Source,那么可連接的備選項(xiàng)為之前已計(jì)算得出的局部最優(yōu)連接樹D,D(<,<)(行9~10),具體是否可連接,是否為重復(fù)連接,通過3.3 節(jié)等多個條件判斷(行11)。通過此方法可以得到D節(jié)點(diǎn)的最優(yōu)連接策略,這樣從到D逐行完成狀態(tài)轉(zhuǎn)移矩陣,D中的最小代價的連接策略即為結(jié)果(行20)。此算法的最差時間復(fù)雜度為(),其中為連接表數(shù)。
Dynamic programming
輸入:,number of source tables for all connections;,index of the source table.
輸出:,optimal connection diagram.

對于本文提出的投影和連接順序優(yōu)化方法,在Flink 系統(tǒng)上進(jìn)行了實(shí)現(xiàn),通過修改Plan 層的源碼,對比優(yōu)化后與優(yōu)化前的執(zhí)行時間及中間結(jié)果傳輸量。通過對比不同體積、不同數(shù)量、不同冗余度的連接樹在優(yōu)化前后的執(zhí)行時間和中間數(shù)據(jù)傳輸量對優(yōu)化方法進(jìn)行了分析。
本文采用的數(shù)據(jù)集為TPC-H 標(biāo)準(zhǔn)數(shù)據(jù)集,TPCH 是TPC 委員會發(fā)布的用于決策支持的測試標(biāo)準(zhǔn)。它由一套面向業(yè)務(wù)的即席查詢和并發(fā)數(shù)據(jù)修改組成,其數(shù)據(jù)具有廣泛的行業(yè)相關(guān)性,其中TPC-H 表的關(guān)系模式及表的內(nèi)容如圖17 所示。本文實(shí)驗(yàn)中所用到的不同規(guī)模的數(shù)據(jù)集均由TPC-H Tools 指定數(shù)據(jù)體積生成,其中體積分布如表2 所示。

圖17 TPC-H 關(guān)系模式圖Fig.17 Relational schema of TPC-H

表2 TPC-H 表數(shù)據(jù)體積及分布Table 2 Data size and distribution of TPC-H tables
在5 表到8 表的連接實(shí)驗(yàn)中,默認(rèn)連接策略皆為優(yōu)先連接體積較大的表,實(shí)驗(yàn)記錄如無特殊聲明并行度皆為36。
本文所描述的相關(guān)技術(shù)細(xì)節(jié),均在Flink 1.8.0 中進(jìn)行實(shí)現(xiàn),采用Java 語言進(jìn)行編寫。實(shí)驗(yàn)的運(yùn)行環(huán)境及軟硬件環(huán)境如下。
(1)硬件環(huán)境:實(shí)驗(yàn)采用的分布式環(huán)境由4 臺服務(wù)器組成,1 臺主節(jié)點(diǎn),3 臺從節(jié)點(diǎn)。從節(jié)點(diǎn)采用的服務(wù)器配置為:①CPU,Intel Xeon E5-2603 V4 *2,核心數(shù)目6 核心,主頻1.7 GHz。②內(nèi)存64 GB,主頻2 400 MHz。③硬盤400 GB SSD。主節(jié)點(diǎn)配置為:①CPU,Intel Xeon E5-2603 V4 *2,核心數(shù)目6 核心,主頻1.7 GHz。②內(nèi)存128 GB,主頻2 400 MHz。③硬盤400 GB SSD。
(2)軟件環(huán)境:①操作系統(tǒng)Centos 7。②存儲環(huán)境MySQL 5.6.45。③Flink 版本1.8.0,JDK 版本1.8.0。
這里定義一個連接任務(wù)中的冗余列c和冗余率,c指此列在此后的連接中不再作為key,且不存在于結(jié)果列中,表示所有冗余列數(shù)量和占所有Join 表中的列數(shù)量的比例。

冗余率為0,即連接性能幾乎只受連接順序影響的情況,不指定數(shù)據(jù)傳輸策略和連接策略,由系統(tǒng)自動選擇。
如圖18 所示,連接順序優(yōu)化在表數(shù)較少的情況下優(yōu)化效果并不明顯,因?yàn)榈捅頂?shù)下連接順序的解空間同樣非常小,最優(yōu)解與默認(rèn)解的執(zhí)行效率也很少會出現(xiàn)較大差距。同時可以看出,5個table在700 MB規(guī)模以下的數(shù)據(jù)集執(zhí)行時間幾乎一致,可以判斷系統(tǒng)啟動執(zhí)行的最小時間為14 s 左右,且5 個table 在700 MB 規(guī)模以下的數(shù)據(jù)集對系統(tǒng)資源消耗極小,沒有顯著優(yōu)化效果。且通過優(yōu)化前后幾乎無差別的執(zhí)行時間可以看出,5 個table 下的優(yōu)化程序執(zhí)行時間同樣可以忽略不計(jì),即使在小規(guī)模數(shù)據(jù)集計(jì)算上,優(yōu)化程序仍不會對整體運(yùn)算造成可見的負(fù)擔(dān)。

圖18 5 表連接的優(yōu)化前后執(zhí)行時間Fig.18 Execution time before and after optimization in 5 tables join
如圖19 所示,在6 個table 下,大規(guī)模數(shù)據(jù)集對計(jì)算時間的影響已非常明顯,并可以看出一定的優(yōu)化效果。由于解空間仍然很小,且已加入的連接表之間的體積差距并不足夠顯著,雖然可以明顯看到優(yōu)化效果,但是優(yōu)化前后的執(zhí)行時間差距并不是很大。

圖19 6 表連接的優(yōu)化前后執(zhí)行時間Fig.19 Execution time before and after optimization in 6 tables join
通過圖20、圖21 可以看出,隨著表數(shù)量的增加,優(yōu)化效果變得非常顯著。通過圖22 可以看出,隨著表數(shù)量的增加,優(yōu)化效果變得更加明顯,最大達(dá)到了58.69%。這是由于新加入的連接尾表體積非常小,大表與小表之間的體積差距非常小,連接順序?qū)B接效率的影響非常顯著,且由于解空間的大小隨著連接表的數(shù)量呈階乘級增加,更容易找到效率遠(yuǎn)高于默認(rèn)解的最優(yōu)解。

圖20 7 表連接的優(yōu)化前后執(zhí)行時間Fig.20 Execution time before and after optimization in 7 tables join

圖21 8 表連接的優(yōu)化前后執(zhí)行時間Fig.21 Execution time before and after optimization in 8 tables join

圖22 不同表數(shù)下平均執(zhí)行時間優(yōu)化比例Fig.22 Average execution time optimization ratio of different tables
通過圖23~圖26 可以看到,優(yōu)化程序?qū)Χ啾磉B接的中間數(shù)據(jù)傳輸有著固定的優(yōu)化效果,由于各節(jié)點(diǎn)的數(shù)據(jù)傳輸量僅和連接樹有關(guān),不受系統(tǒng)基礎(chǔ)運(yùn)行時間影響,即使在低連接表數(shù)下也可以看到顯著的優(yōu)化效果。通過圖27 可以看到,不同數(shù)據(jù)集體積下各表有著幾乎一致的優(yōu)化比例,6~8 個table 的連接優(yōu)化比例在不同數(shù)據(jù)集下的差別均在1%左右浮動。由于本文采用的優(yōu)化算法為確定性算法,對于同一連接樹的優(yōu)化策略是固定的,可以確定對于同一分布的數(shù)據(jù)集,在相同的優(yōu)化策略下,中間數(shù)據(jù)傳輸優(yōu)化比例與數(shù)據(jù)集體積無關(guān)。

圖23 5 表連接優(yōu)化前后的中間數(shù)據(jù)傳輸體積Fig.23 Intermediate data ship size before and after optimization in 5 tables join

圖24 6 表連接優(yōu)化前后的中間數(shù)據(jù)傳輸體積Fig.24 Intermediate data ship size before and after optimization in 6 tables join

圖25 7 表連接優(yōu)化前后的中間數(shù)據(jù)傳輸體積Fig.25 Intermediate data ship size before and after optimization in 7 tables join

圖26 8 表連接優(yōu)化前后的中間數(shù)據(jù)傳輸體積Fig.26 Intermediate data ship size before and after optimization in 8 tables join
通過圖27 與圖28 對比可以得知,中間數(shù)據(jù)傳輸?shù)膬?yōu)化比例遠(yuǎn)高于執(zhí)行時間的優(yōu)化比例,這是因?yàn)镕link 在執(zhí)行時節(jié)點(diǎn)之間默認(rèn)采用的為Pipeline 傳輸策略。這個策略規(guī)定了一個節(jié)點(diǎn)產(chǎn)生數(shù)據(jù)后立刻發(fā)送到下一個節(jié)點(diǎn)進(jìn)行處理,Pipeline 策略一定程度上可以緩解由于大表在前產(chǎn)生的上游數(shù)據(jù)阻塞的情況。

圖27 冗余率為0 時的中間數(shù)據(jù)傳輸體積優(yōu)化比例Fig.27 Intermediate data ship size optimization ratio in redundant ratio 0

圖28 冗余率為0 時的執(zhí)行時間優(yōu)化比例Fig.28 Execution time optimization ratio in redundant 0
這里選取8個table在不同冗余率(=0%,9.70%,19.40%,38.70%,51.60%)下的默認(rèn)連接策略與優(yōu)化后的連接策略進(jìn)行對比,由于優(yōu)化程序會清除連接中的冗余列,優(yōu)化后的連接策略可視為冗余率為0。
通過圖29 可以很明顯地看到冗余率的不同對執(zhí)行時間的影響。同樣由于100 MB 體積數(shù)據(jù)處理時間的優(yōu)化瓶頸不在于連接策略,優(yōu)化比例和500 MB以上的數(shù)據(jù)存在較大差異。

圖29 不同冗余率下執(zhí)行時間優(yōu)化比例Fig.29 Execution time optimization ratio in different redundant ratios
由于冗余列的存在并不影響計(jì)算效率,不同冗余率對運(yùn)算時間的影響僅來自于中間數(shù)據(jù)傳輸量對網(wǎng)絡(luò)傳輸時間的影響,不同冗余率下執(zhí)行時間優(yōu)化比例明顯和冗余率成正相關(guān),但是彼此間差距并不巨大,冗余率從0%提升到51.60%,而時間優(yōu)化比例僅提升了10%。
通過圖30 可以看出,不同數(shù)據(jù)集體積下各表在同一冗余率下有著幾乎一致的優(yōu)化比例,這點(diǎn)與前文根據(jù)圖27 所得到結(jié)論是相符的。在相同的優(yōu)化策略和冗余率下,中間數(shù)據(jù)傳輸優(yōu)化比例與數(shù)據(jù)集體積無關(guān)。

圖30 不同冗余率下數(shù)據(jù)傳輸優(yōu)化比例Fig.30 Data ship optimization ratio in different redundant ratios
同時可以看到冗余率對中間數(shù)據(jù)傳輸?shù)挠绊懸葓?zhí)行時間的影響更大,冗余率從0%到51.60%,優(yōu)化比例最大提升了19%。
表3 對比了0%和9.70%冗余率下500 MB 體積的8 個table 在默認(rèn)連接策略下的中間節(jié)點(diǎn)的數(shù)據(jù)發(fā)送量??梢悦黠@看到冗余列導(dǎo)致了不必要的數(shù)據(jù)傳輸,且冗余列若不及時通過投影清除會增加后續(xù)節(jié)點(diǎn)無用的數(shù)據(jù)發(fā)送量。

表3 0%和9.70%冗余率下中間節(jié)點(diǎn)的數(shù)據(jù)發(fā)送量Table 3 Data ship of intermediate nodes in 0%and 9.70%redundant ratios
圖31 對比了不同并行度下的連接執(zhí)行時間優(yōu)化比例。同樣在100 MB 規(guī)模的小體積數(shù)據(jù)上優(yōu)化效果并不明顯,隨著數(shù)據(jù)體積的增大,在數(shù)據(jù)體積相同時,優(yōu)化比例明顯隨著并行度增加而提高。

圖31 不同并行度下執(zhí)行時間優(yōu)化比例Fig.31 Execution time optimization ratio in different parallelism
本文針對大數(shù)據(jù)連接處理中投影關(guān)系的優(yōu)化設(shè)計(jì)了一種基于投影關(guān)系索引的優(yōu)化方法,并結(jié)合動態(tài)規(guī)劃算法在Flink 中進(jìn)行了實(shí)現(xiàn)。實(shí)驗(yàn)結(jié)果表明,本文方法所針對的冗余列問題確實(shí)對連接效率有著顯著影響,隨著冗余率的增加,連接效率會明顯降低。本文所提供的優(yōu)化方法在優(yōu)化連接順序的同時考慮到了冗余列的影響,可以大大減少連接任務(wù)的執(zhí)行時間和中間數(shù)據(jù)傳輸量。