劉旋律 顧進廣
(武漢科技大學計算機科學與技術學院 湖北 武漢430065)(智能信息處理與實時工業系統湖北省重點實驗室(武漢科技大學) 湖北 武漢430065)(武漢科技大學大數據科學與工程研究院 湖北 武漢430065)(國家新聞出版署富媒體數字出版內容組織與知識服務重點實驗室 北京 100038)
傳統的ETL是通過批處理的方式,利用晚上或者周末空閑的時間來完成提取、轉換和加載操作。然而,在如今競爭劇烈的商業環境下,面對著數據量大、數據需求多變、時間敏感等問題與挑戰[1],傳統的ETL無法應對。為了使數據能夠及時、連續地轉換到數據倉庫,以便用戶能夠快速做出決策,實時ETL被提出。在實時ETL系統中,高可用、低延遲和橫向可擴展是三個關鍵特性[2]。文獻[3]通過分析ETL過程,并行化執行同優先級的ETL操作的方式來減少ETL過程的執行時間,而文獻[4]通過將ETL過程運行到Hadoop計算引擎上,提高運行速度。文獻[5]使用面向服務的體系結構,將ETL過程中每一個操作封裝為一個RESTful API的方式來解耦ETL操作,通過升級和快速迭代服務的方式來解決數據需求多變的問題,通過集群部署的方式來提高ETL操作速度。
上述解決方案都能在一定限度上解決實時ETL中存在的問題。但是,以上解決方案都沒有考慮到數據的動態性。在很多實際場景中,數據源的數據生產速度隨著時間波動且波動區間較廣,例如,醫院藥房藥品數據、醫療傳感器實時數據、人群聽歌和購物的行為數據、物流倉庫中的物流數據。
在實時ETL系統中,每個ETL過程中的ETL操作都是常駐服務。在上述解決方案中,ETL過程初始化完成后各個操作的進程數不會改變,如果需要更改某個操作并行的進程數,需要重新初始化該ETL過程。對于數據生產速度波動較大的場景,如果ETL過程以數據源的數據生產速度最大值初始化ETL過程,則在大多數時間,ETL過程的資源利用率都不能達到理想狀態;如果ETL過程以低于數據生產速度最大值初始化,則存在一段時間,新增數據阻塞在數據源中;如果在允許的容忍時間范圍內,不能夠把新增數據及時轉換到數據倉庫中,則不利于用戶快速地做出商業分析。
針對現有方案未考慮在數據生產速度波動較大的場景下,ETL過程不能夠合理分配資源的問題,本文提出了一種基于穩定匹配的實時ETL彈性調度機制。
為了達到實時ETL的要求,文獻[2]提出了高可用、低延遲和橫向可擴展三個要求。Kim等[3]為了解決基于流的空間信息數據量過大的問題,采用分布式并行的方式設計并實現了一個空間ETL處理引擎來降低延遲。Zhang等[4]為了實時處理醫療傳感器收集的流式數據,使用Hadoop來解決該問題。文獻[4]監控了Map任務和Reduce任務的負載情況,如果某個MR任務節點負載高就分裂該任務。該方法具有一定的伸縮性,但未從數據源的角度和整體變化去考慮。另外,使用Map與Reduce組合的方式并不能靈活地表示ETL的各種復雜操作。Hsieh等[5]使用RESTful API提供數據遷移功能,該方法能夠花費較少的時間來應對數據格式不斷變更的問題,同時分布式的調度方式為該系統提供了可擴展性。Chen等[6]提出了一種基于多代理系統的并行ETL執行工作流框架,該系統通過初始化ETL過程時,將相同優先級的ETL操作并行化的方式來降低延遲。Santos等[7]為了能夠應對海量數據的數據分析,提出了一種基于Spark的ETL平臺。Diouf等[8]提出可以使用云上的技術來解決實時ETL帶來的問題,同時也指出當前研究沒有考慮到數據的動態性和資源使用不夠合理的問題。以上解決方案都通過并行度降低了延遲,通過分布式環境保證了橫向可擴展,但是都沒有考慮從整體上考慮資源的利用問題,ETL操作的可擴展性較低,導致在數據生產速度變動較大的時候,資源利用不夠合理。
匹配問題一直是眾多研究者討論的熱點問題。在兩個匹配集中,沒有比當前匹配關系更優的匹配關系存在,則當前匹配為穩定匹配。穩定匹配是沒有不穩定對的完美匹配。為了達到穩定匹配,多種匹配算法被提出。為了使云平臺節點負載更合理,Wang等[9]提出了針對虛擬機分配的匹配算法,該算法根據虛擬機和主機各自的偏好進行匹配。Islam等[10]提出了一種基于延遲接受的資源分配算法,用于將用戶蜂窩資源分配給D2D設備。Hamidouche等[11]提出了一種新的基于博弈論的匹配算法,用于解決小型基站和服務提供商服務器之間的多對多匹配博弈問題。Viet等[12]提出了一種雙向局部搜索算法,用于搜索穩定婚姻問題中的平等和性別平等穩定匹配。Zhou等[13]提出了一種迭代匹配算法,該算法首先基于固定偏好產生一個穩定的匹配,然后根據每次迭代的最新匹配結果動態更新偏好。本文的重點是實現ETL上下游操作中服務的關系匹配,保證數據的消費速度最大,同時網絡拓撲距離費用最低。
一個ETL過程(OP)表示從數據源(S)獲取數據,經過多次ETL操作(O)后,加載到目標數據倉庫(D)的過程。一個ETL過程包含多個ETL操作。每一個ETL操作在對應的ETL過程中都至少存在一個ETL服務(OS),多個相同的ETL服務構成一個ETL操作。
在實時ETL系統中,一個ETL過程初始化完成后,所有的ETL服務會常駐在系統中。本文通過調整ETL服務的個數來動態調整ETL過程的消費數據速度(CV)。根據調整的方式不同,本文將彈性調度機制分為彈性增長和彈性收縮兩種。首先,根據預測的數據生產速度(PV)來計算出ETL過程的最小消費數據速度(CVm)。然后,比較CVm與當前ETL操作的消費數據速度(CV)。如果CVm大于CV,則當前ETL過程需要彈性增長;如果CVm小于CV,則需要彈性收縮。
彈性增長流程需要判斷當前ETL過程中各個操作的消費數據速度(OCV)是否不小于CVm。若當前ETL操作不滿足條件,則計算當前操作需要調整的服務數量(SN),并添加相應數量的服務到該ETL過程中。最后,對于存在新增服務的操作Oi,重新確定與上下游操作Oi-1、Oi+1中各服務的匹配關系,保證所有的操作都能滿足CVm。
彈性收縮流程則是需要移除服務節省資源。本文不會將所有ETL操作的CV降低到CVm大小,這樣需要移除的ETL服務將覆蓋所有的ETL操作,執行代價太大。由于發生變更的ETL操作是少部分,因此本文只會移除新增的ETL服務。遍歷所有存在變更的ETL操作,根據計算的SN移除新增的服務。若SN不小于新增服務數量(IN),則只移除IN個服務;否則,按照加入順序依次移除SN個。
計算完SN后,需要調整ETL服務的數量。在分布式場景下,每個物理機器節點的性能不同,負載也不同,一次性添加或移除SN個ETL服務都會影響節點的負載。因此,需要考慮平衡物理節點的負載。
在調整完成后,需要確定與上下游ETL操作匹配關系。在分布式場景中,為了保證各節點的負載均衡,同一個ETL操作的ETL服務可能會分配到不同節點上。在數據量較大的情況下,跨節點ETL服務之間數據的轉發將增加網絡開銷,影響ETL過程的消費數據速度。因此,在調整ETL服務數量后,如何保證在新的匹配關系中,消費數據速度最大且網絡拓撲距離費用最小是需要考慮的另外一個問題。
ETL過程彈性調度流程如圖1所示。

圖1 彈性調度流程圖
本文將整個調度流程分為以下四個步驟:(1) 預測數據生產速度PV;(2) 計算ETL操作中需要調整的服務數量SN;(3) 調整服務數量;(4) 確定上下游ETL操作匹配關系。本文中出現的主要的符號見表1。

表1 符號表

續表1
不管是彈性增長還是彈性收縮,只有預測出數據生產速度,才能執行彈性調度的策略。在數據生產速度波動較大的場景中,數據生產速度并不會一直增加,而是達到最大值后下降或者維持一段時間。所以,ETL過程的CV與PV不要求相等。當生產數據速度增長時,系統可以容忍一段時間內新增的數據不用立即處理而是緩存到數據源中。如果數據速度增長后長期保持不變,則CV不能繼續小于PV,而是需要繼續增長避免未被消耗的數據數量持續增長。
這里引入容忍時間(DTT)和容忍容量(DTC)兩個變量,ETL過程可以容忍一個DTT后,新增數據的容量不超過DTC的大小。令C為當前未被消費的數據總量。
當C≤DTC時,此時可以容忍CV小于PV,即:
DTT×(PV-CV)≤DTC-C
(1)
當C>DTC時,此時為了恢復到容忍容量以內,所以CV應大于PV,即:
DTT×(CV-PV)≥C-DTC
(2)
為了使得ETL過程滿足要求,只有計算出需要調整的服務數量才能夠向相應的ETL過程中添加或移除服務。一個ETL過程包含多個ETL操作,每個ETL操作包含多個相同的ETL服務。在實時ETL中,數據是通過數據流的方式在操作之間流轉。因此,ETL過程的消費數據速度取決于屬于該ETL過程i的所有操作中消費數據速度最小的操作。即:
CVi=min{OCV0,OCV1,…}
(3)
式中:CVi為ETL過程i的消費數據速度;OCV0為ETL操作0的消費數據速度。
對于ETL操作j,其消費數據速度OCVj為該操作中所有的服務消費數據速度之和,即:
(4)
式中:OCVj為ETL操作j的消費數據速度;OMVjk為ETL操作j的ETL服務k的最大消費數據速度。
在分布式環境中,一個ETL操作的多個服務雖然邏輯相同,但是由于分布不同,其消費數據的速度不是絕對的相等,而是會在某個固定范圍內波動。本文使用該ETL操作的所有服務的消費數據速度的平均值作為添加的服務所使用的消費數據速度,即:
(5)
式中:αj表示ETL操作j的ETL服務平均消費數據速度;OCVj為ETL操作j的消費數據速度;OMVjk為ETL操作j的ETL服務k的消費數據速度。
調整SN個數量的ETL服務,會使得多個節點的負載不夠均衡。一個節點的負載由該節點上的服務個數和服務類型決定,即:
Wm=WS1×n1+WS2×n2+WS3×n3+…
(6)
式中:Wm表示節點m使用資源量;WS1表示ETL服務類型1的資源量;n1表示ETL服務類型1的在節點m上的數量。這里需要考慮如何調整SN保證各節點的負載能夠均衡。
一個ETL過程的消費數據速度不僅僅由ETL操作最大消費數據速度決定,也與操作之間的關系匹配有關。以圖2和圖3兩種匹配為例:ETL操作j的最大消費數據速度為:OMVj1+OMVj2+OMVj3=83;ETL操作j+1的最大消費數據速度為:OMVj+1,1+OMVj+1,2=95。

圖2 不合理匹配 圖3 合理匹配
如果按圖2匹配,則當前ETL過程的最大消費數據速度為OMVj+1,1+OMVj3=78;
如果按圖3匹配,則當前ETL過程的最大消費數據速度為OMVj1+OMVj2+OMVj3=83。

另外,為了保證各節點的負載,同一個ETL操作的ETL服務可能分布在不同的節點上。因此,在數據量大的情況下,節點之間數據傳輸的帶寬消耗不容忽視。因此,如何確定匹配關系保證網絡拓撲距離費用最小也是需要考慮的問題。本文節點間網絡距離的計算參照Hadoop中網絡拓撲距離計算定義的方法,即:

(7)
步驟一預測數據生產速度。數據源的數據生產速度的變化與其應用實際場景有關,可以將數據生產速度預測問題轉為一個網絡流量預測問題,通過構建時序預測模型來解決。Local Regression Robust(LRR)算法是文獻[8]提出的用來預測虛擬機關鍵主機用量的算法。該算法是一種自適應預測檢測算法,它通過趨勢多項式擬合當前的最后k個觀測值的方式來預測下一個觀測值。由文獻[8]中給出的結果可以看出,該算法優于其他算法。因此,本文使用LRR算法來預測數據源生產數據速度。
步驟二計算調整服務數量。
根據式(1),可以推導出:
(8)
根據式(2),可以推導出:
(9)
因此,對于預測數據生產速度PV,要求的最小ETL過程消費數據速度為:
(10)
彈性增長流程中,若對于ETL過程i,存在ETL操作j的消費數據速度OCVj (11) 彈性收縮流程中,此時CV>CVm,對于存在變更操作j,存在ETL服務n個,新增的服務數量為INj,該操作理應移除的服務數量Uj為: (12) 本文只是對存在變更的ETL操作進行移除,且移除數不會超過新增的服務數量,因此ETL操作j應該移除的服務數量為: (13) 步驟三調整ETL服務數量。為了保證在調整ETL服務數量的時候,各個物理節點的負載均衡,本文提出了基于資源的貪婪負載均衡(Greedy Load Balance,GLB)算法。在彈性增長流程中,GLB算法選擇資源使用最小的節點分配。另外,GLB算法不會將SN個數量一次都分配到每個節點,而是采用依次分配的方式保證負載的均衡。在彈性收縮流程中,GLB算法選擇存在該ETL服務的最大資源使用節點移除。算法過程如算法1所示。 算法1GLB 輸入:ETL操作的當前消費數據速度v;ETL操作目標速度t;當前ETL操作數量n;ETL操作的平均速度a;每個節點的資源利用集合U。 輸出:被選擇節點的索引。 1.begin 2.ifv 3.sn←calculated adjusted number(v,t,n,a) 4.fork←0tosndo 5.i←find minimum resource usage node(U); 6.endfor 7.else 8.sn←calculated adjusted number(v,t,n,a) 9.fork←0tosndo 10.a←resource usage array sorted by maximum(U); 11.forl←0tolength(a)do 12.ifexist ETL service in lthen 13.i←l 14.break 15.endif 16.endfor 17.endfor 18.endif 19.returni; 20.end 步驟四ETL操作關系匹配。對于ETL操作,其各個ETL服務處理完數據后可以隨機轉發給下游的任意服務。因此,一定存在一種匹配關系使得操作j與操作j+1的整體消費數據速度為min{OCVj,OCVj+1}。當ETL操作j中添加了服務,需要確定Oj與Oj-1、Oj與Oj+1的匹配關系。通過步驟二已經保證各個操作的消費數據速度都滿足條件。因此,只需要確定匹配關系使得{Oj-1,Oj}和{Oj,Oj+1}這兩組的消費數據速度CV分別為min{OCVj-1,OCVj}和min{OCVj,OCVj+1}即可滿足條件。 從可行流開始增廣時,最終的增廣量是一定的。所以為了滿足最小費用,只需要每次找最小費用的增廣路即可。本文提出基于Dicnic算法的改進(Dicnic Improved,DI)算法來解決該問題。傳統的Dicnic算法通過先廣度優先搜索(BFS)分層,后深度優先搜索(DFS)的方式來找尋增廣路。本文求解的流量網絡是一個由Oj、Oj+1兩個操作的所有服務組成的網絡,其具有二分圖的性質,每個操作即可為一層。因此,DI算法不需要通過BFS分層。由于操作已經分層,因此,可以直接使用層次遍歷的方式來找尋最小費用增廣路。算法的步驟如算法2所示。 算法2DI算法 輸入:上層服務流量集合S;基礎服務層流量集合T; 輸出:匹配關系矩陣G;匹配成本C;最大消費數據速度V。 1.begin 2.s←calculate total service traffic(S), t←calculate total service traffic(T), needTranspose←false 3.ifs>tthen 4.M←S,S←T,T←S,needTranspose←true 5.endif 6.i←0,C←0,V←0,G←[length(S)][length(T)] 7.whiletruedo 8.whilei 9.i++ 10.endwhile 11.ifi 12.I←MAX,J←MAX,minCost←MAX 13.fork←itolength(S)do 14.ifS[i] has trafficthen 15.forj←0tolength(T)do 16.ifT[j] has trafficthen 17.cost←calculatecost(i,j) 18.ifminCost>costthen 19.minCost←cost,I←i,J←j 20.endif 21.endif 22.endfor 23.endif 24.endfor 25.C←C+minCost, 26.minStream←min(S[i],T[j]) 27.S[i]←S[i]-minStream,T[j]←T[j]-minStream, G[I][J]←minStream,V←V+minStream 28.else 29. break 30.endif 31.endwhile 32.ifneedTransposethen 33. transpose(G) 34.endif 35.returnG,C,V; 36.end 本文基于容器調度平臺kubernetes構建了一個分布式調度平臺。每個ETL服務都被封裝成為一個RESTful API的服務并轉化為一個docker鏡像保存到鏡像倉庫中。當需要調整ETL服務數量的時候,通過kubernetes來進行調整。該平臺配置如表2所示。 表2 平臺配置 在實時ETL系統中,ETL服務常駐在ETL系統中,數據以數據流的方式在ETL服務之間流轉。通過隨機構造了100個正態分布函數用來模擬數據生產速度隨時間發生波動較大變化的場景,并以此變化的數據生產速度不斷地向初始化后的ETL過程提供實時數據。在彈性調度方面,與傳統的調度方案在資源利用方面進行了比較。在匹配問題方面,與貪婪(greedy)匹配算法和輪詢(poll)匹配算法在消費數據速度和網絡距離費用方面進行了比較。 為了驗證實時ETL平臺彈性調度比傳統的調度在數據波動較大的場景下,資源的利用具有優越性,本文構建速度浪費μ和堆積浪費ε兩個變量。 如果當前的PV>CV,則ETL過程以最大消費數據速度運行,因此速度浪費μ=0,但是會造成堆積浪費,其值為: (14) 如果當前的PV (15) 式中:Δt為單位時間;W為ETL過程投入的內存大小。本文使用速度浪費和堆積浪費之和作為資源代價的衡量指標。結果越大,代表浪費越多,資源利用率越低。 本文匹配算法的提出是為了確定ETL操作匹配關系使得在消費數據速度最大的同時網絡距離費用最小。如果消費數據速度越大,或者網絡距離費用越小,則該匹配越穩定。因此,本文構建了穩定度θ來表示匹配是否達到穩定。其值為: (16) 式中:COST表示的是匹配后網絡距離費用。θ越大表示匹配關系越穩定。 圖4給出了傳統的實時ETL調度機制中,以消費數據速度最大值調度(max)、以消費數據速度的平均值進行調度的代價變化(avg)及本文彈性調度機制的代價變化(paper)曲線圖。圖5給出了三種調度方式隨著時間增加消費的數據總量變化圖。從圖4中可以看出,在生產數據速度發生波動的時候,彈性調度機制的代價一直都是低于傳統的調度方式的代價。從圖5中可以看出,彈性調度的方式并不會影響數據消費的速度,其消費數據的總量和最大消費數據速度配置的ETL過程差不多。因此,本文的彈性調度機制與傳統的調度機制相比,在數據生產速度波動較大的場景下,其消費數據速度與最大消費數據配置的ETL過程相差不多,但資源代價更小。 圖4 不同的調度機制資源代價變化曲線 圖5 不同的調度機制消費數據總量變化圖 圖6給出了相同的ETL過程增長相同的服務個數后,使用不同算法確定匹配關系的消費數據速度變化圖。圖7給出了不同算法匹配后網絡拓撲距離費用變化圖。圖8給出了不同算法匹配后匹配度的變化圖??梢钥闯觯疚奶岢龅钠ヅ渌惴軌虮WC消費數據速度最高的同時費用最小,匹配的穩定度最高,匹配更穩定。 圖6 不同匹配算法消費數據速度變化圖 圖7 不同匹配算法網絡距離費用變化圖 圖8 不同匹配算法的穩定度變化圖 圖9給出了實驗環境中三個工作節點在數據波動過程中資源消耗情況。由圖9可以看出,使用GLB算法來調度服務的個數,能夠保證節點的負載均衡。 圖9 GLB算法節點資源消耗變化曲線圖 本文提出了一種基于穩定匹配的實時ETL彈性調度機制,用于處理在數據波動較大的情況下,傳統實時ETL方案資源配置不合理的問題。該機制通過預測生產數據速度來計算需要調整的ETL服務個數。然后使用GLB算法和DI算法來分別完成服務的調度問題及服務的匹配問題。通過實驗驗證了方案的有效性,也為ETL過程調度提供了參考。針對實時ETL平臺中多個ETL過程的資源彈性調度是下一步工作的重點。
4 實 驗
4.1 實驗環境

4.2 評測方法
4.3 結果分析






5 結 語