朱小杰,趙子豪,2,杜 一,2*
(1.中國科學院計算機網絡信息中心,北京 100190;2.中國科學院大學,北京 100049)
(?通信作者電子郵箱duyi@cnic.cn)
伴隨著互聯網和物聯網技術的不斷發展,數據采集終端的種類和數目迅猛增加,數據量越來越大、產生速度越來越快[1]。同時,隨著越來越多的大科學裝置的建設和重大科學實驗的開展,科學研究也進入到一個數據密集型科學階段[2]。麥肯錫在2011 年的一份報告中稱,數據已經成為重要的生產要素[3]。然而,大數據時代的到來,使得大數據處理面臨數據復雜、計算復雜、系統復雜等問題[4],對數據處理軟件的實時性、靈活性、穩定性都提出了較高的要求,如金融行業的日常運營產生大量數據,這些數據產生于金融服務系統,時效性較強,需要在金融服務系統與數據處理系統之間流動并進行實時計算[5]。國家自然科學基金大數據知識管理服務平臺,需要融合來自MongoDB、Oracle、文件傳輸協議(File Transfer Protocol,FTP)等不同數據源上億條科研項目、科研成果、科研人員等數據[6]。
大數據處理過程包括數據采集、清洗、匯聚、分析等不同環節,不同環節之間具有較強的邏輯關系和執行順序,具有典型的流水線特征。構建大數據流水線有助于提高效率,但大數據流水線處理面臨如下問題:1)數據處理過程中,處理模塊由于缺乏合理的抽象,導致處理模塊的復用性低;2)相關工具選型太多導致開發復雜性增加;3)數據處理模塊缺乏統一的模型管理,導致數據處理框架或平臺的可維護性低。
針對大數據處理過程中遇到的上述問題,本文提出了一種支持大數據流水線機制的描述語言——PiFlowDL,實現了所見即所得的流水線設計,提出了基于分布式計算框架Apache Spark 的流水線執行機制,并給出了基于有向無環圖(Directed Acyclic Graph,DAG)的流水線執行調度策略。通過與主流工具的對比測試驗證了所提框架的可行性及性能。
模型驅動的開發方法是由對象管理組織(Object Management Group,OMG)提出的用以提高軟件開發效率的一種開發方法[7],其核心思想是將軟件系統建立在各種模型的基礎上,通過模型的變換來驅動系統的開發[8]。在軟件工程領域得到充分利用的同時[9],該方法被應用到了很多場景中。Vanderdonckt 等[10]給出用戶界面描述語言的概念,將模型驅動的開發方法應用到用戶界面設計與開發中。杜一等[11]改進了該方法,使用模塊化的描述方式重新設計了界面描述語言,將用戶界面設計與開發流程中多個角色進行了適配,提高了開發效率。隨后,可視化描述語言被提出,對任務、用戶、領域對象等抽象組成元素和表征、對話等具體組成元素,以及它們之間的映射關系進行描述[12],將模型驅動的開發方法應用于可視化系統開發中,極大地提高了系統開發的效率[13]。本文將模型驅動的開發方法引入大數據處理系統中,在大數據流水線的具體組成元素和界面模型中的抽象組成元素之間構建映射關系?;谟成潢P系設計大數據流水線模型描述語言PiFlowDL,將大數據流水線設計工具與執行引擎解耦合,從而提高了開發效率,實現了所見即所得的大數據流水線構建過程。
大數據技術的發展推進了以基于硬盤的MapReduce[14]和基于內存的Apache Spark[15]為代表的分布式計算模型的誕生。為了方便分布式處理,MapReduce 的任務之間數據不直接共享,大部分需通過網絡傳輸,這導致MapReduce在處理流程復雜任務時效率較低。批處理系統Hadoop[16]是基于MapReduce 的一種實現,其在傳統的批處理任務上具有較為廣泛的應用,但由于其自身延遲大的特點,并不適用于實時性要求較強的任務。有向無環圖框架在某種程度上解決了MapReduce 模型中任務間邏輯關系復雜的問題,目前在數據處理領域有大量對DAG 的應用和相關研究[17-19]。Apache Spark 是應用DAG 框架的典型代表,它將大數據處理任務在框架層以有向無環圖的模式進行組織,使得大數據開發人員可以更聚焦于數據處理邏輯本身。
而相關應用的發展,又對大數據處理的實時性提出了挑戰。Storm[20]、Flink[21]、S4[22]等一系列流式處理工具的誕生在一定程度上填補了數據實時處理領域的空白。不同大數據處理框架的對比如表1 所示。Storm 是一個開源的分布式實時計算系統,能夠可靠地處理無限的數據流,其拓撲結構靈活的編程方式和分布式協調大大提高了多步驟的數據處理流程的效率。Spark Streaming 用微批處理的方法實現流處理,Venkataraman 等[23]提出的流式處理工具Drizzle 具有比Spark Streaming 更高的效率。Apache Flink 同時支持了流計算和批處理,每個Flink 數據流以一個或多個源開始,并以一個或多個接收器結束。近年,有學者在Spark 的基礎上進行改進,增強了Spark 應用的實時性[24],提供了數據起源的相關支持[25]。然而,Spark提供的處理框架以及Storm、Flink等大數據開發工具,依然需要大數據開發人員理解如DataFrame、彈性分布式數據集(Resilient Distributed Datasets,RDD)等較多的框架本身的概念及開發模式,并編寫較多的數據處理邏輯,具有較高的使用門檻。為屏蔽不同數據處理工具之間在開發模式、編程接口上的差異導致的數據處理框架之間的遷移成本過高的問題,谷歌提出了Apache Beam(https://beam.apache.org/)框架,該框架統一了數據批處理和流處理的編程范式,使得開發者不需要了解底層數據開發接口,而直接通過Beam 軟件開發工具包(Software Development Kit,SDK)接口進行數據處理的加工與設計。然而,這些數據處理框架均沒有提供對應的數據處理工具,數據處理應用的開發門檻仍然較高,數據開發人員需要在理解業務的同時具有較高的大數據編程水平。

表1 大數據處理框架對比Tab.1 Comparison of big data processing frameworks
隨著相關技術的整體進步,部分數據大規模、無邊界、亂序的特點被放大。文獻[26]指出了物聯網應用對數據處理和分析的需求。傳統方法在處理無邊界數據集時,通常將其切分為有邊界數據集進行微批次處理,Google提出了Dataflow模型[27],在抽象層面提供了新的模型,以應對流數據的處理場景。
不同交互式流水線處理工具的對比如表2 所示。Google Cloud Dataflow[28]是對Dataflow 的具體實現,它是一種構建、管理和優化復雜數據處理流水線的工具,可以在不手工配置和管理MapReduce 集群的前提下構建復雜的pipeline,支持從批處理到流處理模式的無縫切換,使開發者可以將主要精力放在業務邏輯本身。StreamSets(https://streamsets.com/)是一種典型的大數據ETL(Extract-Transform-Load)工具,提供所見即所得的可視化數據流程配置界面,它將數據處理流程分為數據源(Origins)、執行器(Executors)、處理器(Processors)、數據存儲(Destinations)四類;但由于其規范了Processor 的類型,如一個pipeline中只能定義一個數據源,靈活性較差。這些工具由于封閉性、靈活性不足以及缺乏統一的模型支持,普遍存在擴展性較差的問題。Apache NiFi(http://nifi.apache.org/)是一個成熟的開源大數據流水線項目,它基于工作流式的編程理念,提供了較好的流水線定義和執行功能。Kafka[29]是一種具有較高吞吐能力的分布式消息系統,國外有學者在Kafka和Apache NiFi 的基礎上提出了一種數據流接收的框架[30]。然而,Apache NiFi 很難實現與Hadoop、Spark 等主流大數據框架的無縫集成,這導致其在處理海量數據時往往具有較差的性能。此外,NiFi在工作過程中會保存各個步驟的中間結果,導致磁盤I/O成為NiFi的瓶頸,這種機制導致的缺點在數據冗余量大的時候體現得尤為明顯。
基于以上相關工作,本文首先給出了基于模型驅動的大數據流水線描述語言PiFlowDL,該語言以模塊化、層次化的方式對大數據處理任務進行描述。同時,在PiFlowDL 基礎上,本文提出了一種模型驅動的大數據流水線框架PiFlow。該框架以所見即所得的方式配置流水線,同時為第三方處理模塊開發提供了易用接口,極大地提高了大數據處理環境的構建與開發效率。與同類框架在基準數據上的對比測試進一步驗證了所提框架在性能上的優勢。

表2 交互式流水線處理工具對比Tab.2 Comparison of interactive pipeline processing tools
大數據處理過程適合表示為流水線,不同的流水線之間在結構上差異較大,但在數據處理組件的層面上往往具有較強的相似性。文獻[31]提出了大數據分析即服務的模型驅動方法。當前的流水線描述語言缺少可復用性,擴展性較差,描述能力弱。針對這些問題,新的流水線描述語言的設計應遵循如下原則:
1)具有模塊化特征。模塊化的描述可以增加PiFlowDL的可讀性,同時能夠增加模塊在不同的數據處理流水線中的復用性。
2)具有層次化描述能力。能夠在數據處理模塊、數據處理單元等不同層次上對數據處理流水線進行描述。不同抽象層次的描述能力,進一步增加了描述語言的可擴展性,可擴展性的能力將傳導到基于該描述語言的框架實現中。
3)具有良好的擴展性。能夠支持新的數據處理組件的設計與引入。良好的可擴展性,可以使得框架能夠根據實際需求,快速進行處理單元的實現和復用,進一步提高大數據流水線系統的開發效率。
基于上述設計原則,本文提出了大數據流水線描述語言PiFlowDL。PiFlowDL 基于有向無環圖模型,將一條流水線的主要組成部分抽象為:一系列具有數據處理能力的數據處理組件Stop、數據在數據處理組件之間的流動方向Path,以及其他基本信息BasicInfo。其中,數據處理組件作為拓撲圖中的節點,數據在數據處理組件之間的流動方向及規則作為拓撲圖中的有向邊。PiFlowDL 主要部分的可擴展標記語言(eXtensible Markup Language,XML)Schema結構如圖1所示。圖1 中:每個矩形框表示一個節點;矩形框跟隨的加號表示節點可展開,減號表示節點不可展開;矩形框下方的數字表示節點允許出現的次數。另外,分別用“S”“C”“A”的矩形框表示XML Schema 描述時的“Sequence”“Choice”及“All”等三種模型。Flow 描述流水線信息,主要由3 個子模塊組成,包含基本信息模塊BasicInfo、數據處理組件模塊Stop 和數據流向模塊Path。其中,Flow 與BasicInfo 為一對一關系,與Stop 模塊和Path 模塊均為一對多的關系。BasicInfo 模塊是對流水線基本信息的描述,包括流水線名稱Name、流水線唯一標識UUID、流水線檢查點CheckPoint 和流水線運行模式RunMode,其中CheckPoint 和RunMode 是可選項。圖2 是BasicInfo 模塊XML Schema結構。

圖1 Flow模塊的XML Schema的結構Fig.1 Structure of XML Schema of Flow module

圖2 BasicInfo模塊XML Schema的結構Fig.2 Structure of XML Schema of BasicInfo module
數據處理組件泛指在流水線中具有數據采集、處理等功能的全部組件,Stop模塊是對流水線數據處理組件的描述,其中包括數據處理組件名稱Name、數據處理組件唯一標識UUID(Universal Unique IDentifier)、數據處理組件包名Bundle、數據處理組件屬性Properties、數據處理組件數據入口Inports 和數據出口Outports。圖3 是Stop 模塊XML Schema 結構。其中:Properties 包含0 到多個Property,表示數據處理組件的多個屬性;Inports包含1到多個Inport,表示數據處理組件的多個輸入端口;Outports 包含1 到多個Outport,表示數據處理組件的多個輸出端口。Path模塊是對流水線數據流動方向的描述,表示數據在各個Stop模塊中的流動順序。圖4是Path模塊XML Schema 結構,該模塊包括源數據處理組件From、源數據處理組件From 的數據輸出端口Outport、目標數據處理組件To,以及目標數據處理組件To的數據輸入端口Inport。

圖3 Stop模塊XML Schema的結構Fig.3 Structure of XML Schema of Stop module
在一些復雜的場景下,單條流水線并不能滿足需求,有些復雜邏輯需要多條流水線共同配合來完成。如截止到2018年2月,國家自然科學基金大數據知識管理服務平臺[6]共設計12類93條流水線進行大數據的采集、清洗、匯聚,并且需要根據實際業務的變化不斷增加。流水線的運行涉及數據的采集、清洗、匯聚等過程,存在先后順序,支持流水線和流水線組間的調度,可大大減少人工干預,提高后期運維效率。PiFlowDL 提出了Project、FlowGroup 和Flow 三層概念,并支持調度。Flow 是流水線調度的基本單位,在同一個項目中具備相似功能的流水線組織進一個FlowGroup中,Project是在項目層面對FlowGroup和Flow的集合。

圖4 Path模塊XML Schema的結構Fig.4 Structure of XML Schema of Path module

圖5 Project模塊XML Schema的結構Fig.5 Structure of XML Schema of Project module
Project 包含BasicInfo(Name、UUID 等)、多個Flow、多個FlowGroup 和Condition,如圖5 所示。同樣的,FlowGroup 包含BasicInfo(Name、UUID 等)、多個Flow 和Condition。Condition表示Flow 和FlowGroup 之間,以及Flow 和Flow 之間的調度策略,Current 表示當前節點,After 表示執行當前節點Current 之前必須執行完畢的節點。FlowGroup 和Condition 模塊的結構如圖6所示。

圖6 FlowGroup和Condition模塊XML Schema的結構Fig.6 Structure of XML Schema of FlowGroup and Condition module
項目中可能涉及Project、FlowGroup、Flow 的協同調度,如數據定時采集與清洗,PiFlowDL 的XML Schema 的結構如圖7所示。其中:Expression 表示定時調度策略,Entity 為執行實體,包括Project、FlowGroup和Flow中的一種,如圖8所示。

圖7 PiFlowDL的XML Schema的結構Fig.7 Structure of XML Schema of PiFlowDL

圖8 調度實體模塊XML Schema的結構Fig.8 Structure of XML Schema of scheduling Entity module
基于流水線描述語言PiFlowDL 的PiFlow 系統架構如圖9所示。該系統由5 個模塊組成,分別是可視化引擎、RESTful API、執行引擎、監控和日志。

圖9 大數據流水線系統PiFlow的系統架構Fig.9 System architecture of big data pipeline system PiFlow
可視化引擎以可視化的方式提供數據處理組件配置、數據流向配置和屬性配置等功能。PiFlowDL生成器將用戶通過可視化方法配置的流水線轉換為流水線描述語言。Web Service通過RESTful API將流水線描述語言發送給執行引擎。執行引擎包括PiFlow 解析器、執行模塊和調度模塊。PiFlowDL解析器將以流水線描述語言表示的流水線轉化為對應的有向無環圖模型。執行模塊實現流水線的執行策略,包括流處理和批處理。在調度子模塊中,執行引擎對基于Project、FlowGroup、Flow 三層概念組織的流水線進行調度,被調度流水線在Yarn 上執行。監控模塊對流水線運行情況進行監控,保存監控數據,并向上提供接口。日志模塊負責采集流水線運行過程中的日志,通過獲取Yarn 日志并解析,向上提供接口。
執行引擎是大數據流水線系統PiFlow的核心模塊。用戶配置的流水線在執行引擎中轉換為可執行流水線,等待被調度并執行。執行引擎中包括PiFlowDL 解析器、執行子模塊、調度子模塊等組件。PiFlowDL解析器將流水線描述語言所描述的流水線解析成DAG Graph。執行子模塊采用圖遍歷的執行策略對流水線進行批處理或流處理。調度子模塊對流水線進行調度。
3.1.1 PiFlowDL解析器
PiFlowDL解析器將可視化引擎生成的流水線模型描述語言轉換成流水線的有向無環圖DAG 供執行子模塊執行。模型描述語言采用XML 形式化表示,經PiFLowDL 解析器解析生成FlowBean、StopBean、PathBean 類對象,并最終生成AnalyzedGraph 類對象。AnalyzedGraph 類對流水線數據處理組件,組件間關系進行結構化定義,并提供流水線執行接口。PiFlowDL 解析流程如圖10所示,該圖僅以執行某條Flow為例進行說明。

圖10 PiFlowDL解析Fig.10 PiFlowDL parsing
3.1.2 流水線執行模塊
流水線執行模塊基于一定策略執行流水線任務。有向無環圖的遍歷有自頂向下和自底向上兩種邏輯。流水線執行模塊可采用自頂向下的圖遍歷方式,自頂向下的流水線執行機制可對入度為0 的數據處理組件并行執行,以提高執行效率。但PiFlow 支持數據的合流(Merge)、分流(Fork)、連接(Join)等操作,數據處理組件需滿足上游組件全部執行完畢的條件才能被執行,以獲得上游組件處理完畢的數據進行相關操作。自頂向下的執行策略會增加邏輯的復雜性。
本文采用自底向上的串行執行邏輯,通過遞歸調用的方式完成流水線的執行,可以保證每條流水線執行且執行一次,簡化執行邏輯。對基于有向無環圖遍歷執行策略的描述如下文偽代碼所示,該策略的核心思想是:在DAG 中找到所有出度為0 的數據處理組件,逐個遍歷,判斷當前stop 是否有入邊,若有入邊,則逐個遞歸調用入邊所有stop,之后執行當前stop;若沒有入邊,則直接執行當前stop。這種策略可以保證流水線中的每個數據處理模塊都得到執行。同時,因為某個數據處理組件的下級stop之間數據互不影響,所以對出度為0的stop的遍歷順序不影響執行結果。


3.1.3 批量處理與流式處理
本文實現了對PiFlow流水線進行批處理和流處理的兩種方法。圖11是流水線批處理示意圖。該示例流水線包含7個Stop:A、B、C、D、E、F、G,6 個Path:A→B,B→D,C→D,D→E,E→F,E→G。具體執行邏輯如圖11所示。
PiFlow 采用微批處理策略實現流處理。圖12(a)給出了流處理流水線配置方式。其中:流水線頂部為Streaming 類型數據處理組件;虛框中表示由普通數據處理組件組成的批處理流水線BatchProcess。Streaming 類型處理組件設置批處理時間窗口BatchTime,每隔BatchTime 時間產生一批數據傳遞給下游BatchProcess進行批處理,具體見圖12(b)。

圖11 PiFlow批處理Fig.11 Batch processing in PiFlow

圖12 PiFlow流處理配置方式及處理流程Fig.12 PiFlow flow processing configuration mode and processing flow
3.1.4 組件擴展
PiFlow 通過數據處理組件擴展子模塊支持用戶自定義開發數據處理組件Stop。圖13 是組件擴展模塊的統一建模語言(Unified Modeling Language,UML)類圖,通過將上下游數據進行抽象并封裝成JobInputStream 和JobOutputStream,以統一的input/output格式(本文采用Spark數據格式DataFrame)增強了數據處理組件的擴展性和可復用性。自定義的Stop需繼承抽象類ConfigurableStop,并實現相應接口。其中:inportList、outportList 為自定義數據處理組件的數據入口列表和出口列表;initialize 為流水線初始化函數;perform 為實現組件功能的具體函數,通過JobInputStream.read(inport)獲取上游數據,JobOutputStream.write(outport,newDataFrame)將數據通過端口outport傳遞給下游組件。同時,支持用戶自定義屬性,通過setProperties 函數對屬性進行初始化,getPropertyDescriptor 給出自定義屬性的描述信息。getIcon 和getGroup 提供組件的圖標和組信息。自定義流數據處理組件需繼承ConfigurableStreamingStop,并實現getDStream等接口即可。
3.1.5 調度模塊
調度模塊負責流水線的調度,圖14 展示了某一Project 內流水線配置,包含F1、F5、F6 三條流水線和一個流水線組G1,G1內包含F2、F3和F4三條流水線,實線箭頭表示After條件。
流水線設計等待Waiting、開始Started、完成Completed、錯誤Error 四種狀態,不同狀態之間具有邏輯關系不可隨意轉換,如:等待狀態只能轉換為開始狀態,不能直接轉換為完成狀態。在大數據流水線中,通常由等待狀態轉換為開始狀態、開始狀態轉換為完成狀態或錯誤狀態。本文采用基于流水線狀態的調度機制,其具體調度策略如圖15 所示。所有流水線根據狀態放置到四個資源池中,分別為等待調度資源池Waiting Pool、正在運行資源池Started Pool、已完成資源池Completed Pool、錯誤資源池Error Pool,其中Waiting Pool 為初始狀態。同時設置兩個Monitor:Condition Monitor 和Task Monitor。Condition Monitor 負責拉取Waiting Pool、Completed Pool 和Error Pool 中流水線,判讀滿足After 條件的流水線,啟動該流水線并放置到Started Pool 中。Task Monitor 拉取Started Pool 中流水線,若任務完成則放置到Completed Pool中,若任務失敗則放置到Error Pool 中。直到所有流水線全部進入Completed Pool或者Waiting Pool中只剩余Error Pool中流水線的后續流水線,整個調度任務結束。
PiFlow 流水線還包括可視化引擎、流水線監控模塊、流水線日志模塊以及RESTful API。
可視化引擎提供所見即所得方式配置流水線的功能,具體架構見圖9,其中包括數據處理組件配置和流水線配置。數據處理組件子模塊Stop,負責加載、配置后端Stop 信息。流水線子模塊Flow,負責配置流水線信息,監控流水線,獲取日志。為增強復用性,流水線相關信息可保存為模板,模板子模塊Template 負責流水線模板的保存、下載、上傳、加載。流水線監控模塊通過Listener 機制對流水線及每個數據處理組件進行監控。監控包括流水線開始時間、結束時間和狀態,以及每個數據處理組件開始時間、結束時間和狀態。流水線狀態定義為三種:STARTED、COMPLETED、FAILED,數據處理組件狀態定義為四種:INIT、STARTED、COMPLETED、FAILED。具體監控機制如圖16所示。
針對每個數據處理組件,Listener 在數據處理組件的初始化、開始、結束以及異常情況提供接口進行監控。同時考慮輕量級部署,監控數據存儲到內嵌式數據庫H2DB 中。PiFlow 日志模塊通過調用Yarn API 獲取日志,通過日志解析器解析日志,并向上提供接口。RESTful 采用Client/Server 架構,將客戶端和服務器解耦合。統一的接口要求客戶端和服務器之間通信的方法必須統一化,提高交互的可見性。鑒于此,PiFlow 采用RESTful API 方式暴露接口,供用戶和可視化引擎使用。接口具體包括啟動流水線startFlow、停止流水線stopFlow、獲取流水線狀態getFlowInfo、獲取流水線日志getFlowLog,以及獲取數據處理組件相關信息的接口如getAllStops 等。同時,包括啟動、停止Project 和FlowGroup 等接口。

圖13 組件擴展UML類圖Fig.13 UML class diagram of component extension

圖14 Project流水線樣例Fig.14 Project pipeline sample
基于PiFlowDL 的系統原型如圖17~18 所示。圖17 展示了流水線配置界面,左側欄分組展示數據處理組件,通過拖拽方式可將Stop 放入到畫布中央,右側欄展示Stop 的基本信息和需要設置的屬性信息。圖18 展示流水線運行監控頁面,監控流水線及數據處理組件的執行時間及狀態。同時,考慮遷移問題PiFlow 支持模板功能,將流水線Flow 保存成模板,新環境中導入模板即可完成遷移工作。

圖15 調度示意圖Fig.15 Dispatching schematic diagram

圖16 PiFlow監控機制Fig.16 Monitoring mechanism of PiFlow
DBLP(DataBase systems and Logic Programming)是計算機領域內對研究的成果以作者為核心的一個計算機類英文文獻的集成數據庫系統。按年代列出了作者的科研成果,包括國際期刊和會議等公開發表的論文。以DBLP 數據為例,使用PiFlow 進行采集、清洗、入庫的流水線,如圖17 所示。首先下載壓縮文件dblp.xml.gz,然后解壓生成dblp.xml,最后針對dblp.xml 中 Article、Inproceedings、Proceedings、Book、Incollection、WWW(World Wide Web)、Mastersthesis、Phdthesis不同標簽進行解析并寫入Hive 數據倉庫。圖18 展示了數據處理過程流水線監控頁面。
為驗證PiFlow 的性能,針對該場景與Apache NiFi 進行了對比測試,共設計了4 條流水線,測試流水線如表3 所示。F1功能為DBLP 數據采集入庫,F2~F4 為讀取Oracle 數據寫入Hive。測試使用5 臺物理機搭建的集群環境,每臺物理機為32核CPU,內存為128 GB。性能對比測試結果見表4,針對每條流水線的運行時間分別進行了三次測試,結果取平均值(見平均耗時)。其中PiFlow 性能提升比例公式為:PiFlow 性能提升比例=NiFi平均耗時/PiFlow 平均耗時-1。Apache NiFi所需資源如表5,PiFlow 所需資源如表6。Apache NiFi 采集Oracle數據庫的策略為將數據進行分頁,每頁10 000條數據,以頁為單位進行并發讀寫。PiFlow 采用分區方式進行讀寫,所采用的線程數與Apache NiFi 相同。由于Apache NiFi 基于FlowFile 文件形式計算,而PiFlow 基于內存計算,針對設計的4 條流水線PiFlow 相較Apache NiFi 平均性能提升了5 倍,且數據量越大優勢越明顯。

圖17 DBLP數據采集流水線配置頁面Fig.17 DBLP data collection pipeline configuration page

圖18 DBLP數據采集流水線監控頁面Fig.18 DBLP data collection pipeline monitoring page

表3 流水線測試樣例Tab.3 Pipeline test samples
通過將微生物組學數據,微生物組、酶、合成元件、代謝產物數據,微生物資源文獻,專利報告等微生物大數據資源進行采集、清洗、匯聚形成微生物大數據平臺,并通過關聯數據對促進微生物技術領域知識發現(如新酶的開發、新的物種和功能的挖掘)具有重大意義。利用PiFlow提供的組件擴展功能,本實例設計了10 類可復用的不同的數據處理組件。通過組合10類擴展的微生物領域的數據處理組件以及原生的4類數據處理組件,實現了10 類微生物數據的解析、處理、存儲。相關數據處理組件如表7所示。

表4 性能對比測試結果Tab.4 Test results of performance comparison

表5 NiFi性能對比測試資源Tab.5 Resources of NiFi performance comparison test
下面以GenBank 數據為例進行說明。GenBank 數據來源為FTP服務器(ftp://ftp.ncbi.nlm.nih.gov/genbank),數據類型為*.seq.gz 壓縮文件。該實例的需求為將GenBank 數據下載、解析、存儲到ElasticSearch 中。流水線流程為:1)下載FTP數據LoadFromFtpToHDFS;2)解壓文件UnzipFilesOnHDFS;3)解析數據GenBankData;4)存儲到ElasticSearch 中PutEs。圖19展示了PiFlow的流水線配置。

表6 PiFlow性能對比測試資源Tab.6 Resources of PiFlow performance comparison test

表7 微生物相關數據處理組件Tab.7 Processing components of microorganism related data

圖19 GenBank數據采集流水線Fig.19 GenBank data collection pipeline
本文首先設計了一種大數據流水線模型描述語言PiFlowDL,在此基礎上,提出了基于PiFlowDL 的大數據流水線系統。該系統以所見即所得的方式配置流水線,支持實時監控流水線運行狀態,查看流水線運行日志,同時提供模板功能。該系統提供了豐富的數據處理組件,同時集成了科學大數據領域的相關算法。但PiFlow 還存在一些不足:首先,PiFlow 雖然提供了對Streaming 源的支持,但在流計算時不支持多個Streaming 源;其次,流水線間調度,不支持流計算流水線。我們將在上述方面進行下一步工作。