中圖分類號:TP391 文獻標志碼:A 文章編號:1001-3695(2025)08-028-2460-07
doi:10.19734/j. issn.1001-3695.2024.11.0490
Dynamic time window-based optimization method for unified stream and batch processing in data warehouses
Chen Binlin, Tang Xiaoyong? (Scholof Computer amp;CommunicationEnginering,Changsha UniversityofScience amp; Technology,Changsha 41O114,China)
Abstract:Data warehouse is the coreof enterprise data management,and batch processing and stream procesing are he two coredata processing paradigmsforbigdataanalytics.Toaddressthehighlatencyandresourceconsumptionoftraditionalbatch processing technologies,aswellasthedataqualitychallenges faced bystreamprocessingtechnologies whenhandlingmultistreamdataassociationand historicaldatacomputation,this studyproposedaunified streamandbatch procesing method.The proposedmethodanalyzedthechangesofthedatasetindiferenttime windows,andintegratedthedynamic timewindowdivisionbasedonschedulingtimewiththesimplestdatasetsearchbasedontheDFS(depth-firstsearch)algorithm.Experimental results demonstrate that,comparedwiththe mainstreammicro-batch procesing method,this methodreduces overallcomputation time by 57.2% and memory consumption by 24.2% ,while ensuring strong data consistency. This method holds important referencevalueforenterprises inbuildingdata warehouses withhigh procesing eficiencyandlowresourceconsumption,integrating stream and batch processing.
Key words:data warehouse;data streamprocessing;dynamic time windows;minimal dataset;stream-batch integration
0引言
數據倉庫是企業數據管理的核心,它幫助企業從大量復雜的數據中提取有價值的信息,提升企業競爭力和市場響應速度[1]。互聯網發展、數字化普及和網絡技術進步導致數據量激增,數據倉庫技術成為企業利用這些數據的關鍵[2]。通過ETL流程整合數據,數據倉庫確保了數據的一致性、可靠性和高效訪問[3]。進一步地,數據查詢、統計分析和可視化幫助企業發現商業機會、優化流程、改進決策[4]。大數據時代,數據倉庫處理能力增強,但業務需求對數據處理時效性提出更高要求,傳統的 t+1 或小時級的更新頻率已無法滿足實時分析需求。業務場景需要分鐘級甚至秒級的數據反饋,以提供及時的業務洞察和優化用戶體驗。如何在確保數據質量的同時,靈活滿足時效性要求,減少系統資源消耗,是當前數據處理領域面臨的一個重大挑戰[5]
批量處理和流式處理是數據倉庫領域中數據處理的兩大核心范式,學術界對此進行了廣泛研究[1]。數據倉庫中的數據集被視為無界限的數據流,每個事件的發生時間是固定的,而處理時間則因數據在處理流程中的流動而動態變化[6]。批量處理和流式處理根據業務時間將數據流劃分到不同的時間窗格中,并基于時間窗格處理數據,實現對無界限數據流的有效消費[7]。在數據集成時,數據流從多個異構源同步到數據倉庫,由于來源和時間戳的差異,數據可能非順序到達,引發亂序問題[8]。為了解決數據亂序問題,一種常見的方法是通過設定數據時間邊界,僅當所有在邊界時間之前發生的事件均已到達后,才觸發批量作業。這種數據邊界的確定依賴于監控數據流中的數據到達信號,但在信號延遲或同步丟失的極端情況下,可能導致數據調度延遲或持續等待。另一種方法是水位線機制,它通過比較當前時間與已接收事件的發生時間間隔來判斷數據是否全部到達[9,10]。當時間間隔超過預設閾值時,系統認為所有數據已到達,然而水位線機制的時間間隔設定對數據質量有顯著影響,不當的設定在數據同步延遲時可能導致數據質量問題[11]
流式計算在滿足業務對實時數據處理的需求中扮演著關鍵角色,其主要優勢在于處理速度的快速響應,盡管這可能以犧牲一定的數據準確性為代價[12.13]。文獻[14]提出了一種基于Flink等技術棧的實時流數據處理方法。該方法通過將實時數據分配到不同的時間窗口中,并利用時間窗口函數對窗口內的數據進行處理,從而實現對數據流的即時分析[15]。盡管此方法在提升業務系統響應速度方面表現卓越,但尚未充分解決由數據延遲到達引發的問題,同時,該方法還面臨著新的挑戰,包括多實時流數據的關聯處理和歷史時間窗數據的回算,這些問題可能導致流式計算無法有效滿足此類計算需求[11]
批量計算適用于數據量大且對處理時延要求不高的場景,尤其在處理歷史數據方面具有優勢[1]。然而,它在處理數據延遲和實時場景需求方面存在局限。為了克服這些限制,學者們提出了Lambda架構。該架構通過整合批處理和流處理來實現數據處理,其中批處理負責歷史數據,流處理則負責實時數據,旨在平衡數據的實時性和準確性[16~18]。當流處理在數據準確性和完整性方面存在不足時,可以依賴批處理來調整和補充數據。但是Lambda架構需要維護兩套系統,這增加了維護成本和資源消耗。為了解決這些問題,Kappa架構應運而生,它消除了批處理層,通過統一的代碼和引擎處理所有數據[19]。盡管Kappa架構在追求時效與數據質量的場景中提供了一種解決方案:微批調度,即通過高頻調度批量代碼來滿足時效與質量要求,但這種方式會消耗大量計算資源,并受到數據時延的限制,導致微批調度的頻率無法進一步提升[7]。此外,調度頻率的變更伴隨著代碼的變更,增加了編程成本。
在數據庫性能評估領域,文獻[20,21提出了一種全面的數據庫模型評估框架。該框架評估數據庫模型性能不僅考量CPU、內存和磁盤空間等資源消耗,還考慮維護成本和潛在代價。這種方法有助于更精確地評估數據庫模型的總體擁有成本(totalcostof ownership,TCO),為數據庫技術的選型提供更全面的視角[22]。在數據處理調優方面,文獻[23]深人探討了數據集與SQL操作之間的映射關系,并展示了SQL查詢如何轉換為集合操作。研究者提出了一種創新框架,充許在數據庫內部直接表示和查詢集合相關性,完全使用純SQL實現,無須依賴額外的數據庫功能或插件。同時其理論分析部分表明,通過調整過濾閾值,可以顯著提升查詢性能,同步不影響查詢結果的準確性,這一發現為數據庫查詢優化提供了一種有效策略。文獻[24]進一步提出了數據庫高階語義的統一化概念,強調通過高階語義定義數據處理邏輯,以適應多種業務場景,這種策略有助于降低數據庫技術在開發和運維階段的成本,提高系統的整體效率。
針對前述挑戰,本文提出了一種創新的流批一體處理算法。該算法依據數據到達的實際時間來劃分時間窗口,并利用深度優先搜索(DFS)算法來實現最簡處理數據集的搜尋;通過基于調度時間的動態時間窗口設定,在確保數據質量的同時,實現了流處理和批處理在高階語義層面的統一。這種一體化方法不僅優化了數據倉庫的流批處理流程,而且顯著降低了系統資源的消耗和運維成本。
1流批一體數據倉庫研究框架
本文研究框架如圖1所示,總體上分為四個部分:a)基礎設施即服務(infrastructureasaservice,LAAS)。提供了虛擬化的硬件資源,包括虛擬服務器、存儲和網絡資源等。b)平臺服務(platformasaservice,PAAS)。包括流批一體數據倉庫管理、作業調度管理、流批一體計算引擎、存儲、監控運維工具。其利用作業調度管理來動態控制調度頻率,實現時間窗口的靈活調整,同時,數據倉庫通過搜索最簡處理數據集來優化處理流程,計算完成后,將結果回寫入數據倉庫。此外,流批一體的數據倉庫管理系統提供了多樣化的流批一體計算引擎,以適應不同的計算需求和場景。c)數據服務(data as a service,DaaS)。流批一體數據倉庫負責統一管理數據,通過為業務系統數據建模,將數據劃分為操作數據存儲(ODS)數據倉庫細節層(DWD)數據倉庫匯總層(DWS)和應用數據存儲(ADS)等層次,以實現數據的清晰理解和高效管理。統一數據服務則負責集中管理數據消費過程,確保數據使用的一致性和便捷性。通過這種整合,DAAS提供了一個全面的數據管理平臺,旨在簡化數據操作,提高數據的可訪問性和可用性。d)數據應用。在數據倉庫的基礎上,針對分析和決策需求,構建多樣化的業務智能(BI)應用、人工智能(AI)應用、實時監控預警系統以及客戶數據平臺(CDP)應用,以支持全面的數據分析和決策制定。
2基于DFS算法的多實時流關聯場景最簡數據集搜索
2.1數據流關聯關系的模擬研究
本節將集中探討實時流數據處理中涉及的數據集間的相互關系。數據集被視作一個無界限、持續流動的數據流,其中每個事件的發生時間是確定的。一旦事件在業務系統中生成,它將被數據集成工具捕獲,并集成到數據處理系統中,以便于進一步的分析和處理。這種處理方式確保了數據流的連續性和實時性,為實時分析和決策提供了堅實的數據基礎。鑒于數據流的連續性和無限性,數據處理任務不能等到所有數據完全到達才啟動計算,通常會采用劃分時間窗口的方法,將無限的數據流切分為有限的、可管理的數據子集進行處理。數據流的時間窗劃分是指將連續的數據流按照時間順序劃分為若干個離散的時間區間,每個區間稱為一個時間窗。這種劃分允許對數據流中的事件和模式在特定時間段內進行分析和處理。每個時間窗內的數據集合包含了該時間范圍內的所有數據點,從而為實時或歷史數據分析提供了結構化的框架。通過定義不同的時間窗長度和滑動策略,可以靈活地適應各種數據處理需求和應用場景。在傳統的數據處理框架中,批處理和流處理主要依據業務日期來劃分時間窗口,然而本文提出了一種創新的方法,即以數據的實際到達時間為基準來劃分數據窗格。
如圖2所示,該圖模擬了兩個數據集以流式方式連續輸入至數據處理系統,并進行處理的過程。 x 軸表示事件進入數據處理系統的時間, y 軸代表不同的數據集。數據到達后,基于這兩個數據集進行的數據加工將產生計算結果,這些結果將被用于后續的處理流程和數據應用場景。數據集 A 和 B 之間存在關聯關系,這種關聯通過序號來區分,例如,A1和B1因具有相同的序號1而表示它們之間的關聯關系。在 x 軸上,劃分了最新時間窗、歷史時間窗和未來時間窗,其中 (α-∞,1) 代表歷史時間窗, (1,now) 代表最新時間窗,而 (now,+∞ )代表未來時間窗, now 表示最新調度批次的處理時間點,而1表示最新計算批次的時間下限。數據集 A 和 B 進入處理系統的時間落在 (1,now) 內,這些是需要處理的數據。
Fig.2Data stream processing simulation
2.2多實時流的數據關聯定義
定義1時間窗格劃分。在每次調度周期,數據處理邏輯將依據當前時間窗內的最新數據集執行處理任務,一旦處理完成,隨著下一個調度周期的到來,系統將遵循既定的時間窗劃分規則,劃分新的最新時間窗,上一個周期的最新時間窗數據將被歸檔至歷史時間窗中。
定義2數據處理任務的數據集。在數據處理任務中,涉及多個數據集,其中每個數據集至少與另一個數據集存在關聯關系。數據集中的數據元素可以通過元素內的鍵值進行匹配關聯,并且兩個數據集中的元素之間僅存在一對一的對應關系。
定義3基于關聯關系和時間窗的數據元素重分類。依據數據集之間的關聯關系和它們所處的時間窗口,對圖2中的數據元素進行系統性的重新分類,將它們劃分為不同的集合,如式(1)所示。
SA1={A1,A2,A3,A5}
SA2={A6}
SA3={Ah,A7}
SB1={B1,B2,B4,B6}
SB2={B3}
SB3={Bh,B7}
SBF={Bf}
其中: SA1 是數據集 A 在(1,now)時間區間到達的數據元素集合; SA2 是數據集 A 在 時間區間到達的數據元素集合,這些數據元素與數據 B 在最新時間窗格內的數據元素有關聯; SA3 為數據集 A 在 (?-∞,1) 時間區間到達的數據元素集合,這些數據元素與數據集 B 在最新時間窗格內的數據元素無關聯,數據元素中的 h 為任意正整數; SAf 為數據集 A 在當前時間點 now 之后到達的數據元素集合,代表未來數據,元素中的 f 為任意正整數; SB1 是數據集 B 在(1,now)時間區間到達的數據元素集合; SB2 是數據集 B 在 (-∞,1) 時間區間到達的數據元素集合,這些數據元素與數據集 A 在最新時間窗格內的數據元素有關聯; SB3 為數據集 B 在 (-∞,1) 時間區間到達的數據元素集合,這些數據元素與數據集 A 在最新時間窗格內的數據元素無關聯,數據元素中的 h 為任意正整數; SBf 為數據集 B 在當前時間點now之后到達的數據元素集合,代表未來數據,元素中的 f 為任意正整數。
定義4在當前的調度周期內,根據既定的時間窗劃分規則,明確界定了最新時間窗,與無界限的歷史時間窗相比,歷史時間窗所涵蓋的數據集規模顯著大于當前時間窗內的數據集。
2.3調度周期內最簡處理數據集理論
當前調度周期內,對應最新時間窗(1,now)中的now由變量轉為常量,從而使得時間區間變得明確。相應地,該時間區間內所包含的數據集也隨之確定。基于這個最新時間窗的數據集,進一步研究與之相關聯的數據元素分布情況,以圖2的數據流處理過程為例,與最新時間窗數據集相關聯數據元素的位置關系如下:a)位于最新時間窗{(A1,B1),(A2,B2);b)位于歷史時間窗{(A3,B3), (A6,B6){};c) 位于未來時間窗(204號 {(A5,B5),(A4,B4)} ,其中 B5?Bf,A4?Af
根據上述分析可以了解到,當前調度批次的計算涉及到的數據元素分布在當前時間窗、歷史時間窗以及未來時間窗三個不同的時間窗口。對于未來時間窗,將其視為無關聯處理,未來的數據將在未來的批次中進行關聯計算,因此當前批次的計算只需專注于當前時間窗和歷史時間窗的數據元素。
依據式(1),將最新時間窗內到達的數據集合定義為 SA1∪ SB1 ,而與之相關聯的歷史時間窗的數據集合定義為 SA2∪SB2 ,對于當前調度批次的計算,可以通過取所有參與計算的數據元素的并集得到最簡處理數據集,具體的關系可以表示為
φ(S)=SA1∪SA2∪SB1∪SB2
其中: φ(S) 代表當前調度批次的最簡處理數據集。這一概念同樣適用于多實時流數據的關聯處理,即:在多實時流數據關聯的場景中,每個當前調度批次都會存在一個最簡處理數據集φ(S) 。
由于最新時間窗 (1,now) 是一個有界的時間區間,所以其對應的數據元素數量是有限的。設當前最新時間窗內到達的數據集合 SA1∪SB1 的數據元素個數為 N ,根據定義2,數據元素之間存在一對一的關聯關系。因此,當所有關聯數據元素都位于歷史時間窗時,當前調度批次的最簡數據集 φ(S) 有最大的元素個數,即2N;相反,如果關聯數據元素不在歷史時間窗,那么當前調度批次的最簡數據集 φ(S) 有最少的元素個數,即 N 根據定義4,歷史時間窗涵蓋的數據集 SA3∪SB3 規模遠大于當前時間窗內的數據集,因此歷史時間窗的數據元素個數也遠大于當前調度批次最簡數據集,有如下關系:
SA3∪SB3gt;gt;φ(S)
因此,在當前調度批次中,通過精確識別并處理最簡數據集,可以顯著減少需要處理的數據量,進而減少數據計算所需的資源消耗,并提升計算速度。
2.4參與計算數據集時間窗分布
在數據倉庫和數據庫管理系統中,事實表和維度表是兩個核心概念,事實表主要存儲業務過程中的度量或事實數據,而維度表提供描述性和補充性信息,豐富事實表內容。圖3展示了事實表和維度表參與計算數據集合的比例分布。觀察發現,與最新時間窗內的事實表數據相關聯的維度表數據集合,在不同時間窗中均勻分布;與最新時間窗內的維度表數據相關聯的事實表數據,大多集中在最近時間窗內,歷史時間窗中數據較少。這種差異與它們的性質有關:事實表基于已有維度記錄業務事件,而維度表調整可能影響相關歷史事實數據。
圖3當前調度批次最簡數據集元素分布比例 Fig.3Distribution proportion ofminimal data set elementsin current schedulingbatch
2.5基于DFS算法的最簡數據集搜索
深度優先搜索算法(depth-first-search,DFS)是一種用于遍歷或搜索樹或圖的算法[25]。它沿著樹的深度遍歷樹的節點,盡可能深地搜索樹的分支。在圖的遍歷中,DFS從某個頂點開始,深入探索圖的每條路徑,直到無法前進為止,然后回溯。
在數據倉庫的應用中,經常需要對多個數據集進行關聯分析以得出計算結果。數據集之間的關聯可以類比為深度優先搜索(DFS)算法中的圖路徑搜索。當兩個數據集之間存在關聯關系時,表明這兩個數據集(視為圖的頂點)之間具有連通性。基于這一概念,本文提出了MDS-DFS(minimaldatasetsearchviadepth-firstsearch)算法,旨在實現數據集中最簡關系的搜索,具體過程如算法1所示。
算法1基于深度優先遍歷的最簡數據集搜索算法(MDS-DFS)a)初始化。選擇一個起始數據集作為搜索的起點,并標記為已訪問。b)遞歸遍歷。對于當前訪問的數據集,遍歷其所有未訪問的關聯數據集(鄰接節點)。c)回溯。當前數據集的所有關聯數據集都被訪問后,從棧中彈出當前數據集,并返回到上一個數據集(父節點)。d)結果收集。在遍歷過程中,收集每個數據集關聯到的數據元素,追加寫入到最簡處理數據集中。e)繼續遍歷。繼續訪問棧中下一個未完全遍歷的數據集,并重復步驟b~d),直到棧為空,表明所有可達的數據集都已被訪問。f)結束條件。當棧為空,且沒有更多的數據集可以訪問時,算法結束。
3基于動態時間窗格的流批一體數據倉庫優化
3.1動態時間窗下的最簡處理數據集變換
如圖4所示,調整時間窗劃分規則,將時間窗格的寬度從(1,now) 調整為 (x,now) 時,時間窗格的上限從1擴展到 x ,這種變化對定義3中各個數據元素集合的影響具體如下:
SA1={A1,A2,A3,A5}SA1={A1,A2,A3,A5,A7}
SA3={Ah,A7}SA3={Ah}
SB1={B1,B2,B4,B6}SB1={B1,B2,B4,B6,B7}
SB3={Bh,B7}SB3={Bh}
時間窗的調整會導致數據元素被分配到不同的時間窗格中,進而在不同的時間窗格內進行計算,這意味著,時間窗格的變化會直接影響到數據元素被處理的具體時間點。
圖4時間窗格變換下的流式數據關聯關系
Fig.4Streamdataassociationundertimewindowtransformatior
當同一個主鍵的數據在業務系統中經歷更新操作時,僅記錄變更后的結果,因此在不同的時間節點,可以觀察到同一主鍵對應的不同數據快照,計算結果始終基于當前時點數據快照進行,從全局視角來看,最終只需要展示最后一次計算的結果。例如,元素B2在不同時間點的狀態分別為 B21,B22 和 B23 ,當新的時間點的數據到達時,它會覆蓋之前記錄的數據,假設B21,B22 和 B23 之間的時間間隔超過了時間窗格的長度,那么這些狀態分別獨立地進入了不同階段的批處理時間窗。當前調度批次最新處理時間窗調整為 (x,now) 時, B22 和 B23 落在同一個時間窗內,此時系統將僅保留最終狀態的數據,即 B23 ,從整體上看,這意味著在這一時間窗內處理的元素數量減少了。這種變化可以表示為 {B21,B22,B23}{B21,B23} ,因此在存在具有相同主鍵的元素跨越相鄰處理時間窗的情況下,擴展處理時間窗的長度可能會導致所需處理的最簡數據集規模減小,但是不影響數據處理的最終結果。
3.2動態時間窗對計算結果的影響
動態時間窗下最簡處理數據集變換的結論可以擴展到多個數據集場景,設 n 個有關聯關系的實時流全量數據集表示為Set1,Set2,Set3…,Setn ,其中每個數據集 Seti 對應的時間窗格的數據集合記為 Setij , i 表示不同數據集合的編號, j 表示對應數據集合的時間窗格編號。根據式(1),單次運算所需處理的最簡數據集合可以表示為
φ(S)=?i=1,j=1i=n,j=mSetij
由動態時間窗下的最簡處理數據集變換分析可知,調整當前調度批次最新時間窗的間隔,并不會影響數據處理的最終結果。增大時間窗格的間隔長度可能會導致所需處理的最簡數據集規模減小,結合式(2)和(5),可以推導出多實時流最簡處理數據集在動態時間窗變化中的關系:
其中: Seti=NewSeti mgt;p
因此,在多實時流的場景中,時間窗格的變化雖然會影響最新時間窗格內的數據集合,但不會改變最終的計算結果。這意味著,通過采用基于最簡處理數據集的搜索算法,不僅能夠在大規模數據集中高效定位目標處理數據集,而且能夠將時間窗劃分規則的調整與計算結果的準確性解耦。這種方法有助于解決因業務日期劃分數據計算時間窗而導致的數據質量問題。
3.3基于調度時間的動態時間窗格劃分方法
如圖5所示,固定時間窗格是將時間流劃分為等長且相互獨立的時間段,各時間段之間無重疊。滑動時間窗格同樣是等長的時間段,但每次向前滑動時,步長需小于時間窗格的間隔。語義時間窗格則是依據業務語義來確定計算的時間范圍,從而劃分時間窗格。由于數據來源多樣,數據集成過程中存在時延,多流程關聯的實時流數據難以確保在同一時間參與計算。所以,固定時間窗格與滑動時間窗格需依賴上游數據的完成信號或時延水位線來確定調度時間,而起調時點通常會晚于當前時間窗格的上限,這種方式不僅會導致起調時延,還難以保證數據質量。在語義時間窗格方式中,恰當的語義選擇可避免數據質量問題,但隨著時間推移,時間窗格內的數據元素會逐漸增加,同一數據元素可能被反復計算,這將浪費大量計算資源。
基于3.2節的分析結果,時間窗格間隔調整不會影響計算結果的準確性。本文提出了一種基于任務調度時間劃分數據時間窗格的方法,將上次的起調時點作為時間窗格的下限,當前起調時點作為時間窗格的上限,實現根據調度時點動態調整時間窗格,解決批量計算和流計算中的數據質量問題和計算頻率變化問題。從狹義看,流批一體指流處理和批處理共用一套代碼;廣義上,它指一套代碼能滿足調度作業,靈活調整周期且確保數據一致性。基于批量調度時間劃分數據時間窗格,可統一批量和流式計算。如時間步長為1天,效果等同于傳統批處理;若步長極短,近似實時流處理。因有最簡數據集,數據一致性得以保障,這支持了數據倉庫中流批一體化處理的實現。
3.4基于動態時間窗格的流批一體調度融合算法
通過基于任務調度時間驅動的數據時間窗格劃分方法,實現了動態時間窗格下的流批一體數據倉庫的構建,使得流式計算與批量計算無縫融合,核心算法過程如圖6和算法2所示。
算法2基于動態時間窗格的流批一體調度融合算法
a)業務系統變更捕獲。業務系統中相關數據集的新增、修改、刪除操作都通過變更數據捕獲(changedatacapture,CDC)機制被捕獲。捕獲的變更數據隨后被傳遞至數據處理系統。b)時間戳標記。在數據處理系統中,每一條數據元素在固化存儲時都會被標記上到達的時間戳,以實現數據到達時間的精確標記。c)時間窗格劃分。調度系統根據設定的調度頻率,將涉及的數據集劃分為不同的時間窗格。這些時間窗格按照固定窗口方式進行劃分。d)最新時間窗格數據獲取。從劃分的時間窗格中獲取最新的時間窗格數據,形成一個數據集合。e)獲取最簡數據集。通過MDS-DFS算法實現最簡處理數據集搜索。f)數據處理計算。最后,最簡處理數據集通過數據處理引擎進行必要的計算。
圖6基于動態時間窗格的流批一體調度融合算法流程 Fig.6Flowchart of the stream-batchunified scheduling fusion algorithmbasedondynamictimewindows
4地產行業營銷駕駛艙對比實驗
4.1需求背景
某地產公司需統計當日營業數據,并基于過去兩年業務數據進行深入分析,包括各級機構的營業額同比環比增長,以便管理團隊快速掌握業務動態并調整營銷策略,增加利潤。由于業務數據頻繁變動,需處理數據的撤銷和更新,確保準確性和一致性。數據開發團隊采用主流微批方法處理,每分鐘調度一次,重新計算兩年內的業務數據,但隨著業務量增加,調度執行時間延長,無法滿足業務敏捷性需求,急需新的數據處理方案優化流程,提升效率和響應速度。
4.2營銷駕駛艙實施方案設計
如圖7所示,業務系統選用MySQL存儲結構化數據,鑒于業務數據量未達TB級別,所以引人StarRocks數據庫。Star-Rocks有著卓越的數據處理能力,可作為數據處理系統的核心引擎及在線分析處理(OLAP)引擎,同時通過解耦調度系統與代碼管理,增加了調度頻率設置的靈活性。
通過FlinkCDC,業務系統將數據實時集成到StarRocks中。在同步過程中,對源數據庫的Binlog的操作op進行分類,實現增量同步,并能識別刪除操作,針對刪除的操作,系統會給存量數據打上硬刪除標識。硬刪除標識是實現處理邏輯增量更新的必要條件之一,無此標識,無法保證數據的完整性。
圖7地產行業數據處理技術棧關系
Fig.7Real estate industrydata processing technology stack relationship
4.3 實驗結果及分析
4.3.1對比方法與實驗數據集
為了驗證本文方法中基于DFS算法實現的最簡數據集搜索在性能和資源上的優化效果,以及基于調度時間的動態時間窗格劃分方法對數據準確性和流批一體轉化代價的影響,選取了其他三種前沿的流批一體化處理方法進行對比,具體如表1、2所示。BWS-SPM[11]通過設置水位線來處理延遲數據,從而提升準確率;BSP-SPM[13]通過保存所有狀態來解決加工準確率問題;BBDC- ?MB[26] 通過業務時間劃分固定時間窗格,以滿足業務固定語義下的準確率要求。BDTWMDS-MB是本文方法。
表1對比方法與簡稱Tab.1 Comparison methods and abbreviations
表2評估方法特性對比
Tab.2 Comparison of evaluation method characteristics
4.3.2處理時延與資源消耗對比
執行耗時是衡量數據處理方法產出時延的核心指標,內存消耗和CPU耗時則是評估方法硬件成本的關鍵因素,而掃描行數和掃描大小則反映了方法運行時的效率。
如表3、4所示,BWS-SPM在數據處理時延方面表現最為出色,且運行時處理的數據量級最小。BWS-SPM和BSP-SPM為了保持流式特性,采用持續占用內存和CPU的模式,且需要預先估算內存大小。其中,BWS-SPM由于設置了水位線,所以對內存的需求較少;而BSP-SPM因要保留所有數據狀態,故需事先準備足夠大的內存。相比之下,BBDC-MB和本文BDTWMDS-MB采用微批調度方式,執行完畢后會釋放資源,從資源消耗角度來看,這兩種方法遠低于持續占用模式,具有更低的硬件成本。
表3各方法的處理時延與資源消耗對比Tab.3Comparison of processing latency and resource consumption for each method
在同為微批調度的方法中,BDTWMDS-MB因基于DFS實現了最簡數據集搜索,其掃描大小和掃描數據量均低于BBDC-MB,這一優勢也體現在內存消耗、CPU耗時和執行耗時上。BDTWMDS-MB不僅使產出時延較BBDC-MB降低了 57.2% ,內存消耗還降低了 24.8% ,CPU耗時降低了 56% 。
表4各處理流程耗時對比
Tab.4Comparison of elapsed time for each processing procedurr
4.3.3各方法的準確率比較
從表5可以看出,BWS-SPM在單實時流場景下準確率100% ,這與該場景不涉及歷史數據變更的特性密切相關,合理設定水位線可保數據準確。但在多實時流關聯場景,其準確度最低僅 24.98% ,這是由于水位線方法難以滿足多實時流關聯時的歷史時間窗數據計算需求。BSP-SPM因保留全數據狀態,準確率恒為 100% 。BBDC-MB基于業務時間滿足固定語義,處理單實時流和多實時流關聯任務時,因部分數據超出業務語義范疇,部分流程結果準確率未達 100% ,本文BDTWMDS-MB根據數據到達時間進行動態時間窗格劃分,全局保持數據一致性,使準確率達到 100% 。
表5各方法的準確率對比
Tab.5Comparison of accuracyratesamong methods
4.3.4實驗整體效果評估
流式可視為特殊微批,兩者在特定條件下可相互轉換。表6對比了四種方法,在確保準確率的前提下,BSP-SPM時延表現最優,但資源消耗巨大,且因需調整代碼及重新回刷歷史數據,其流批轉化代價最高。相比之下,BDTWMDS-MB憑借基于調度時間的動態時間窗格劃分,僅需調節調度頻率,便能完成流批轉化,流批轉化及資源消耗成本均為最低。與主流的BBDC-MB相較,BDTWMDS-MB的處理時延大幅降低,降幅達57.2% 。
表6整體效果評估
Tab.6 Overall effectivenessevaluation
5結束語
在商業環境不斷演變的背景下,企業對數據處理的頻率和質量提出了更高的要求。針對傳統離線數據倉庫在處理效率上的不足,本文深入探討了數據處理的特性,揭示了數據到達的延遲問題,以及單次運算所需處理的數據集合分散在不同時間窗口的現象。基于這些發現,本文提出了一種創新的數據倉庫流批一體化優化方法。該方法在數據集成階段引入了數據到達時間的概念,并據此動態劃分時間窗口,實現了數據處理邏輯與調度頻率的解耦。這不僅使得在保證準確性的前提下,通過調整調度頻率來滿足批處理和流處理的需求成為可能,還在語義層面上統一了流處理和批處理邏輯,實現了數據倉庫中處理代碼的流批一體化。此外,本文通過MDS-DFS實現了最簡處理數據集的搜索,進一步降低了數據處理所需的資源,顯著減少了系統資源消耗和運維成本。對于企業在數字化轉型過程中構建高效數據處理平臺具有重要的參考價值。
本文提出的流批一體化處理方法在數據關聯處理場景中表現出色,但在處理含回撤數據流的聚合任務時存在局限,無法搜尋最簡處理數據集;且在資源利用效率上還有改進的余地。未來的研究可以從兩個方向著手:a)基于數據集成的op操作進一步優化實現流程,以更好地適應聚合場景;b)運用預測技術來預測數據處理結果的消費模式,實現調度時間的自適應調整,從而更有效地節省計算資源。
參考文獻:
[1]魏莉,龔燦,謝慧.基于數據倉庫技術的數據分析實踐[M].北 京:中國原子能出版社,2022.(WeiLi,GongCan,XieHui. Practice of data analysisbased on datawarehouse technology[M]. Beijing:China Atomic EnergyPress,2022.)
[2]Lamer A, Saint-Dizier C,Paris N,et al.Data lake,data warehouse, datamart,and feature store:theircontributionsto thecompletedata reusepipeline[J].JMlR Medical Informatics,2024,12: e54590.
[3]Dinesh L,Devi KG.An efficient hybrid optimization of ETL process indata warehouse of cloud architecture[J].Journal of Cloud Computing,2024,13(1):12.
[4]王國鋒,張文,武麗君,等.面向數據分析的數據倉庫設計與應 用[J].數字通信世界,2023(8):114-116.(WangGuofeng, ZhangWen,Wu Lijun,etal.Designandapplicationofdatawarehousefordataanalysis[J].Digital CommunicationWorld,2023 (8):114-116.)
[5]Bojicic I,Marjanovic Z,Turajlic N,et al.Domain/mapping model: anovel data warehouse data mode [J]. International Journal of ComputersCommunicationsamp;Control,2017,12(2):166.
[6]Zhang Yu, Zhang Feng,Li Hourun,et al.Data-aware adaptive compression for stream processing[J]. IEEE Transon Knowledge and DataEngineering,2024,36(9):4531-4549.
[7]MencagliG,DazziP,Coppola M.Springald:GPU-accelerated window-based aggregates over out-of-order data streams[J].IEEE Transon Parallel and Distributed Systems,2024,35(9):1657- 1671.
[8]Mehmood E,Anees T. Challenges and solutions for processing realtimebig data stream:a systematic literature review[J].IEEE Access,2020,8:119123-119143.
[9]呂鶴軒,黃山,艾力卡木·再比布拉,等.Flink水位線動態調整 策略[J].計算機工程與科學,2023,45(2):237-245.(Lyu Hexuan,HuangShan,ZaibulaElikamu,etal.Adynamicwatermark adjustment strategyin Flink cluster[J].Computer Engineeringamp; Science,2023,45(2):237-245.)
[10]舒玨淋,謝紅韜,周毅,等.一種基于Flink動態水位線調整的 方法及相關設備:中國,CN117609751A[P]2024.02.27.(Shu Jueling,Xie Hongtao,Zhou Yi,et al.A method and related equipmentfordynamicallyadjustingFlinkwatermarks: China, CN117609751A[P].2024-02-27.)
[11]ZhangHang. Research and design of real time big data visualization analysisplatform based on Flink[J].Journal of Physics:Conference Series,2023,2504(1):012053.
[12]TantalakiN,Souravlas S,Roumeliotis M.A review on big data realtimestream processing and its scheduling techniques [J]. International Journal of Paralel,Emergent and Distributed Systems, 2020,35(5):571-601.
[13]DeepthiBG,RaniKS,KrishnaPV,etal.An efficient architecture forprocessing real-time traffic data streamsusing Apache Flink[J]. Multimedia Toolsand Applications,2024,83(13):37369- 37385.
[14]曹云柯.一種基于Flink實時數倉的系統設計及功能實現研究 [J].電子技術與軟件工程,2022(6):219-222.(CaoYunke. Research on system design and function realization of real-time warehouse countingbased onFlink[J].Electronic Technologyamp; Software Engineering,2022(6):219-222.)
[15]SahalR,KhafagyMH,OmaraFA.Big data multi-query optimisation with apache Flink[J]. International Journal of Web Engineering and Technology,2018,13(1):78.
[16]Numnonda T.A real-time recommendation engine using lambda architecture[J].ArtificialLife andRobotics,2018,23(2):249- 254.
[17]Sampé J,Sanchez-ArtigasM,Vernik G,et al.Outsourcing data processing jobs with lithops [J]. IEEE Trans on Cloud Computing, 2023,11(1):1026-1037.
[18]Pereira FH,BezerraFE,OlivaD,etal.Forecast model update based on areal-time data processing lambda architecture for estimating partial discharges in hydrogenerator[J].Sensors,2020,20(24): 7242.
[19]Cuzzocrea A,Ferreira N,Furtado P.A rewrite/merge approach for supporting real-time data warehousingvia lightweight data integration [J].The Journal of Supercomputing,2020,76(5):3898- 3922.
[20]Ascherson N.Data valuation approach and application in view of data fulllifecycle[J].BigDataResearch,2023,9(3):39-55.
[21]DiTriaF,LefonsE,TangorraF.Cost-benefit analysis of data warehouse design methodologies[J].Information Systems,2O17,63: 47-62.
[22]Singh T,Patidar V,Singh M.A novel metric for assessing structural complexity of data warehouse requirements models[J]. Expert SystemswithApplications,2024,255:124754.
[23]Gao Fei,Song Shaoxu,Chen Lei,etal.Efficient set-correlation operator inside databases[J].Journal of Computer Science and Technology,2016,31(4):683-701.
[24]DebNathRP,RomeroO,PedersenTB,etal.High-levelETLfor semantic datawarehouses[J].Semantic Web,13(1):85-132.
[25]Baswana S,Goel A,KhanS.Incremental DFSalgorithms:a theoretical and experimental study[C]//Proc of the29th Annual ACMSIAM Symposium on Discrete Algorithms. New York:ACM Press, 2018:53-72.
[26]Gueddoudj EY,Chikh A,AttiaA.OS-ETL:ahigh-efficiency, open-scala solution for integrating heterogeneous data in large-scale datawarehousing[J].Ingénierie Des Systemes d Information, 2023,28(3):557-565.