詹杭龍 劉瀾濤 康亮環 曹東剛 謝 冰
(高可信軟件技術教育部重點實驗室(北京大學) 北京 100871)(北京大學(天津濱海)新一代信息技術研究院 天津 300450)(zhanhl@pku.edu.cn)
一種基于Actor模型的彈性可伸縮的流處理框架
詹杭龍 劉瀾濤 康亮環 曹東剛 謝 冰
(高可信軟件技術教育部重點實驗室(北京大學) 北京 100871)(北京大學(天津濱海)新一代信息技術研究院 天津 300450)(zhanhl@pku.edu.cn)
流處理是一種重要的大數據應用模式,在金融、廣告、物聯網、社交網絡等眾多領域得到了廣泛應用.在流處理場景中,流數據的產生速度往往變化劇烈且不容易預測.這時,如果數據流量峰值超過處理系統的承載能力,可能使得系統運行緩慢甚至崩潰,導致處理作業失效;如果為了應對數據流量峰值而過度配置資源,則可能在系統輕載時產生不必要的浪費.為了解決流處理中負載和資源的匹配問題,流處理系統應該具有彈性可伸縮的能力,一方面以高效的方式組織運算資源;另一方面能根據數據流量的實時變化自動地調整資源使用量.然而,現有的流處理框架對于彈性可伸縮的支持尚很初步.介紹了一種基于Actor模型的彈性可伸縮的流處理框架eSault.eSault首先基于Actor模型將批量的處理單元進行分層管理,通過2層路由機制實現了對伸縮性的支持;在此基礎上,設計一個基于數據處理延遲的過載判斷算法和基于數據處理速度的輕載判斷算法來指導系統對資源的有效使用,進而實現彈性可伸縮的流處理.實驗結果表明:eSault具有較好的性能,而且能夠很好地實現彈性可伸縮.
流處理;Actor模型;云計算;彈性可伸縮;2層路由機制
大數據時代數據規模不斷增加,數據產生的速度越來越快.在很多領域,數據的價值隨著時間的推移迅速流失[1],應用對數據時效性的要求越來越強,這就要求數據處理系統能夠對大量“新鮮”數據進行實時分析.例如社交網絡公司可能需要在幾分鐘內分析話題走向、廣告商可能需要實時分析哪些用戶點擊了廣告、服務運營商可能需要在幾秒內通過分析日志文件發現系統異常等.因此,流處理作為一種契合上述應用場景的處理模式得到了廣泛應用.流處理是指對一連串在時間上連續的消息數據進行實時分析、運算的處理過程[2].在流處理中,一個受到普遍關注的問題是,消息數據往往由外部產生,其流量經常處于變化之中,甚至會突然爆發式增長.例如,亞洲移動電話網絡的呼叫記錄在峰值時可以達到每秒幾十萬條記錄,而在低谷時只有每秒幾千條記錄;重大新聞引起新聞網站的訪問量突然增大[3].在這種場景下,如果高峰值的消息流量超過了流處理系統的承受能力,可能導致系統運行緩慢甚至崩潰;而如果為了應對消息流量峰值而過度配置資源,則可能在系統輕載時產生不必要的浪費.這些現象的本質其實是流處理中的運算資源無法與負載變化實現動態匹配.為了解決這個問題,一些流處理系統通過在負載過高時,隨機丟棄一些消息以應對流量峰值;另一些流處理系統通過重排消息或定義消息優先級,從而在系統負載較高時優先處理一些消息[4];此外,大部分流處理系統通過使用消息隊列對消息數據進行緩存[5],從而平滑輸入流量,但是這種方式違背了流處理實時性和低延遲性的需求,并沒有真正解決此問題.
近年來,云計算的發展為解決流處理中運算資源與負載變化的動態匹配問題提供了新的思路.云計算是一種基于互聯網的計算方式,其運算資源是按需聚合與彈性綁定的[6].在云環境中,一個作業在運行過程時具有獲取更多資源的能力.對于流處理作業,如果能夠在消息流量較大時向云環境申請更多的資源,在負載變低時合理釋放部分運算資源,這樣便能較好地實現運算資源與負載變化的動態匹配.這樣的流處理系統被稱為是彈性可伸縮的.彈性是云計算的基本屬性之一[7].云環境能夠將底層的分布式集群組織起來,通過虛擬機部署設施(如OpenStack)以及資源調度工具(如Yarn,Mesos等)為上層的處理作業提供彈性的運算資源.然而,僅有云環境的彈性支持是不夠的,上層的處理作業還需要根據運行時負載的大小實時地調整對運算資源的使用規模,這樣才能夠實現彈性可伸縮.
為了實現彈性可伸縮的流處理系統,有2個必備條件:1)流處理系統是可伸縮的.伸縮性是指系統可以利用變化的資源以調整負載承受力的能力[8].2)系統有一套自適應算法,能夠根據運行時負載的變化來決策如何對運算資源進行伸縮.然而,現有的流處理系統尚未完全支持這2方面的條件.典型的流處理框架系統如Apache S4[9],Storm[10],Mill-Wheel[11],Spark Streaming[12]等尚未完全支持彈性可伸縮.一些學術工作對彈性的流處理技術進行了研究,取得了一定進展.如Esc[13]通過中心式的負載監控器監控機器負載情況,根據MAPE loop自動分析負載情況并觸發彈性伸縮.Esc不支持有狀態處理單元的伸縮,并且在處理單元內部消息通過單點轉發給所有處理元素,效率較低.StreamCloud[3]將流處理單元劃分為子處理單元,并根據機器負載情況動態遷移子處理單元,從而實現彈性伸縮.但StreamCloud只提供有限的查詢操作,并不支持通用的流數據分析.SEEP[14]實現了狀態管理系統,通過中心式的負載監控實現了有狀態處理單元的動態擴展和狀態容錯.然而,SEEP缺乏自適應的調度機制來實現彈性可伸縮.
本文介紹了一種基于Actor 模型的支持彈性可伸縮的流處理框架eSault.eSault除了實現通用流處理框架的基本功能外,重點是支持了應用的彈性可伸縮運行.eSault首先基于Actor模型將批量的處理單元進行分層管理,通過2層路由機制實現了對伸縮性的支持;在此基礎上,設計一個基于數據處理延遲的過載判斷算法和基于數據處理速度的輕載判斷算法來指導系統對資源的有效使用,進而實現彈性可伸縮的流處理.
與現有彈性流處理系統的研究工作相比,本文的主要特點在于:
1) 同時支持彈性擴展和彈性收縮.現有研究多只關注彈性擴展的實現,而eSault基于處理元素動態創建退出機制和2層路由機制的彈性實現方式,在統一的解決方案下同時支持了彈性擴展和彈性收縮.
2) 基于消息處理延遲和速度的負載判斷算法.現有研究工作主要基于機器資源使用情況進行負載判斷,這種方式的限制在于:①資源使用情況的獲得需要底層資源管理系統的支持,這增加了框架層與資源層解耦的難度;②流處理應用需要綜合使用網絡、CPU、內存甚至磁盤等資源,較難設計一種能夠準確反映應用負載情況的綜合指標.而eSault設計的基于消息處理延遲的過載判斷算法和基于消息處理速度的輕載判斷算法,完全在應用層實現負載判斷,更加直接地監控應用的性能.
3) 完全基于Actor模型的設計與實現.eSault探索了基于Actor模型設計與實現彈性流處理框架的可行性,并得到了較好結果.
Actor模型是一種并發編程模型,由Hewitt等人[15]在1973年提出.它把“Actor”作為并發編程的基本元素,Actor可以根據收到的消息進行本地決策,用于創建更多Actor,發送更多消息和決定如何響應下一個消息.Actor模型如今已成為許多計算理論和并發系統的理論基礎.Actor模型具有許多特性,例如無共享狀態、簡單的高層抽象、異步非阻塞的事件驅動編程模型等,這些特性使其非常適合用來對并發程序建模.此外,目前大部分Actor模型的實現中都將Actor實現得非常輕量,可以快速地批量創建和銷毀,“幾十萬甚至上百萬進程同時并行運行十分常見,而且經常僅僅占用很少的內存”[16].這為實現支持彈性的流處理框架帶來了2點好處:
1) 簡化流處理框架的編程模型.在流處理應用中,數據流的key往往數量巨大.如果使用輕量級Actor實現處理元素,我們可以為每個key標識的數據子流啟動一個處理元素,使用戶在編寫處理元素的處理邏輯時可以直接對數據子流進行處理,而不需進一步進行數據分流,從而簡化流處理框架的編程模型.
2) 簡化彈性的實現機制.實現系統彈性的基礎是伸縮性,可以通過批量創建和銷毀輕量級Actor,實現處理元素的批量遷移,從而動態調整處理元素在集群中的分布.
因此,基于Actor模型對彈性流處理系統建模并予以實現,一方面可以簡化流處理系統的設計;另一方面可以充分利用Actor模型的特點簡化框架的編程模型并高效地實現彈性伸縮.在現有的基于Actor模型設計和實現的流處理框架中,S4尚未支持彈性;而Esc雖然支持了彈性,但一方面其論文中并未表明其彈性收縮支持,另一方面其作為原型系統實現較為初步,性能優化空間較大.
2.1 編程模型與系統架構
eSault的編程模型如圖1(a)所示,將流處理應用的處理單元根據功能的不同分為了Spout和Bolt 2種類型.一個流處理應用事實上是Spout與Bolt拼接成的DAG圖,Spout是圖的源節點,其他節點為Bolt,圖中的邊表示處理單元之間的數據路由.Spout產生Tuple格式的流數據,傳遞給Bolt處理,經過多級Bolt處理后生成最終結果從輸出端流出.其中,Tuple是框架中數據流的傳輸形式,它事實上是一個鍵值對(key/value),框架中的數據流都是由連續不斷的Tuple組成的.Spout是流處理應用的數據流來源,它源源不斷地生成Tuple形式的數據流交由后續的Bolt處理.Bolt是流處理應用負責數據處理的單元,它接收由上游傳來的Tuple數據,調用用戶定義的處理方法對數據進行處理后,將新產生的Tuple數據發送給下游Bolt進行處理.流處理應用的主要處理邏輯都在各階段的Bolt中實現.
eSault是基于Actor模型設計的,其各功能模塊及其子模塊在設計時都嚴格保證了無共享狀態,且只通過發送消息交換數據,每個模塊都可以抽象成1個Actor.eSault的系統架構如圖1(b)所示.應用驅動運行在用戶端,為用戶提供編程模型中應用程序的接口,使用戶得以構建、提交和控制流處理應用.框架驅動運行在集群中,框架的所有其他模塊都由框架驅動啟動并控制.Spout/Bolt處理單元:Spout和Bolt在集群中的運行實例,其包含分布在集群中的大量處理元素(processing element, PE).框架驅動模塊通過控制Spout與Bolt,使得流數據可以依據應用程序所定義的邏輯一步步進行處理.Ack(acknowledgement)服務器保證了所有在規定時間內處理完成的消息會被確認,而其他處理超時的消息將由Spout重發,從而保證了至少1次(at least once)的消息語義.資源接口封裝了資源管理器的管理接口,框架驅動通過調用資源接口申請和釋放資源,而不需考慮具體的下層資源管理器類型,從而保證框架與資源管理器解耦.

Fig. 1 The architecture and programming model of eSault圖1 eSault的系統架構與編程模型
2.2 系統架構與編程模型
流處理單元是eSault的數據處理模塊,其處理邏輯由用戶定義.在編程模型層面一個流處理單元是一個整體,但在實際運行過程中,框架會在集群中啟動大量處理元素,使它們并行地執行用戶定義的處理邏輯.用戶通過應用驅動將實現的流處理單元代碼提交給流處理框架,然后通過應用驅動提供的方法創建流處理單元實例并對流處理單元進行動態拼接,從而實現流處理應用.
2.2.1 子處理單元與處理元素
在流處理系統運行過程中,處理單元會在集群中啟動并管理數量巨大的處理元素.如果處理單元對這些處理元素進行集中管理,會使處理單元的邏輯變得復雜,運行時負載也較高,很容易導致處理單元運行異常.所以eSault將處理單元劃分為多個子處理單元,實現分層管理.子處理單元是處理單元的組成部分,與處理單元不同的是,其只運行在1臺機器上,并且在該機器上啟動和管理大量的處理元素.處理單元通過啟動和管理多個子處理單元,間接地管理分布在集群中的大量處理元素.圖2所示Spout與Bolt中對處理單元進行分層管理的結構圖.其中,子處理單元管理器是處理單元的功能模塊,其負責啟動和管理所有子處理單元;PE管理器是子處理單元的功能模塊,其負責在子處理單元所在的機器上啟動和管理大量的處理元素.添加子處理單元后,所有處理元素均由子處理單元管理,處理單元只需管理數量有限的子處理單元即可.這樣,處理單元將主要的處理元素管理邏輯下放至子處理單元,從而分散負載并簡化了管理邏輯,使系統變得更加穩定,也有利于提高路由效率.

Fig. 2 Hierarchical management of processing unit圖2 Spout與Bolt中處理單元的分層管理
2.2.2 2層路由機制
一個典型的流處理應用通常由許多處理單元組成,而每個處理單元在同一時間會啟動大量的處理元素.在如此大規模的處理元素之間路由消息,保證消息嚴格按照key進行分發,并且使這個過程高效、動態、可靠是非常困難的.為了保證消息轉發效率,同時又使路由表可以在運行過程中動態進行更改,eSault提出了2層路由轉發機制.
如圖3所示,eSault的2層路由轉發機制的主要思想就是結合集中路由和分布路由,在子處理單元之間進行分布路由,在子處理單元內部進行集中路由.源處理單元的所有處理元素均將產生的數據發送給所在子處理單元的輸出路由器;輸出路由器將數據按照key值路由給相應的目標子處理單元的輸入路由器;目標子處理單元的輸入路由器收到數據后,將數據轉發給相應的處理元素.輸入路由器和輸出路由器是eSault的2層轉發機制的核心構件,兩者內部各保存有1張路由表用來進行數據路由.這2張路由表的設計對于eSault的消息轉發效率影響巨大,下面分別介紹根據輸入路由器和輸出路由器各自的功能特點設計和路由表的數據結構.

Fig. 3 Two-ayer routing forwarding mechanism圖3 eSault的2層路由機制
輸入路由表使用散列(Hash)表實現,表的鍵是輸入數據流的key,表的值是處理該key所標記的數據流的處理元素的索引PEindex.當有輸入數據時,輸入路由器在路由表中查找數據的key所對應的路由表項,從而得到該數據對應的處理元素,并將該數據轉發給該處理元素.在大規模的數據量下進行快速地增刪改查,Hash表是一個非常理想的選擇,因為理想情況下Hash表的增刪改查的平均時間復雜度都為O(1),與表項數目無關,所以使用Hash表可以高效地實現輸入路由表.
輸出路由表的作用是將所有key盡可能平衡地分給所有目標處理單元的子處理單元,并保證路由效率盡可能的高.eSault的輸出路由表使用線索2叉樹表示的類似區間樹的數據結構實現,2叉樹中的節點由key和PEindex組成,其中key表示[key, 后繼節點的key)的區間范圍,若沒有后繼節點,則表示[key, 最大整數INT_MAX]的區間范圍;PEindex則表示該區間對應的子處理單元的索引.該數據結構的主要特點是可以將在查詢某個整數所在的子區間、分裂任意區間和任意相鄰區間的時間復雜度控制在logn以內,其中n為樹中存儲的子區間個數.
圖4展示了區間分裂的過程,初始情況圖4(a)中總的區間范圍為[0,INT_MAX];圖4(b)中通過插入INT_MAX2+1節點,實現了對區間[0,INT_MAX]的分裂操作;圖4(c)中插入了INT_MAX4+1,實現了對區間[0,INT_MAX2]的分裂操作;圖4(d)中進一步插入了INT_MAX×34+1,實現了對區間[INT_MAX2+1,INT_MAX]的分裂操作.當有輸出數據時,輸出路由器首先對數據中的key在[0,INT_MAX]的區間內進行重新散列(rehash),然后在2叉樹中查找小于等于散列值的最大節點,之后取出該節點對應的子處理單元,即為輸出數據的目標子處理單元.該操作在logn時間內完成,n為子區間的個數也即子處理單元的個數,因為子處理單元的數量一般與集群規模在同一數量級,最多達到數百數千的級別,所以該時間開銷是可以接受的.使用線索2叉樹實現輸出路由表的最大作用在于配合輸入路由表可以非常方便的實現彈性伸縮,這在2.3節中會進一步介紹.

Fig. 4 Interval splitting procedure of fan-out routing table圖4 eSault輸出路由表的分裂過程
2.3 彈性可伸縮機制
實現伸縮性的關鍵是能夠在有新可用資源時,在新資源上處理任務,從而利用新資源提高系統并行度;在資源減少時,將被減少資源中的任務重新調度到可用資源上處理,從而使系統正常運行[17].對于流處理應用來說,也就是在有新資源時,能夠將數據流分流至新資源上進行處理;在資源減少時,能夠將被減少資源處理的數據流導流至可用資源進行處理.
eSault伸縮機制的主要設計思想是在處理單元的層面,以子處理單元為單位實現伸縮.當有新資源時,負載較高的處理單元會在新資源上創建子處理單元,并將部分數據流分流至新的子處理單元,以提高處理能力;當資源減少時,處理單元會將受影響的子處理單元進行遷移,或直接將其輸入數據流合流至未受影響的子處理單元.
圖5(a)完整地描述處理單元的動態擴展過程:
① 處理單元在新資源上創建新的子處理單元;
② 處理單元修改子處理單元路由表,將被分流子處理單元對應的區間進行分裂;
③ 處理單元將新的子處理單元路由表發送給所有源處理單元;
④ 源處理單元收到路由表后用其更新所有子處理單元的輸出路由表;
⑤ 輸出路由表的變化使一部分數據被導流至新建的子處理單元;
⑥ 子處理單元動態創建新的處理元素處理數據流;
⑦ 被分流的子處理單元中的處理元素因為超時退出.
至此,處理單元的動態擴展過程完成.處理單元的動態收縮過程與動態擴展過程相似,唯一的區別是需要將路由表中受影響子處理單元對應的區間與其相鄰區間進行合并,從而實現將其輸入數據合流入其相鄰區間對應的子處理單元,故在此不再贅述.
在伸縮性的基礎上,如果系統能夠自適應地根據負載情況申請和釋放資源,并自動地觸發伸縮,則實現了彈性伸縮.eSault通過消息延遲監控器監控消息的處理延遲和處理速度,并根據基于消息處理延遲的過載判斷算法和基于消息處理速度的輕載判斷算法,實現了自動根據負載情況申請和釋放資源并觸發伸縮機制,從而最終實現了彈性可伸縮.

Fig. 5 Workload monitoring and scaling of eSault圖5 eSault的伸縮機制與延遲監控器
2.3.1 消息處理延遲監控
在對系統負載進行動態監控時,資源使用情況是最直接的衡量指標,一些研究彈性的流處理系統,例如SEEP,Esc等,也都使用這一指標.但是,使用其作為負載衡量指標也存在一些缺陷:一方面資源使用情況的獲得需要底層資源管理系統的支持,這使框架難以與資源管理系統解耦;另一方面,流處理應用需要綜合使用網絡、CPU、內存甚至磁盤等資源,很難設計一種能夠準確反映應用負載情況的綜合指標.因此,eSault選擇在應用層通過性能監測負載,因為無論底層任何資源成為瓶頸,最終表現都是應用無法達到性能要求,而且這也有助于實現框架與資源層解耦.eSault認為消息處理延遲是非常理想的負載衡量指標,因為其他指標都只能部分地反映負載情況.例如,輸入消息數量很大時,如果每條消息實際處理時間很短,則處理負載并不一定會高,也就很難確定一個消息數量作為負載限額;同理,使用輸入數據吞吐量和消息隊列長度也會出現類似的情況.然而,消息處理延遲綜合反映了網絡傳輸時間、排隊時間和處理時間,而且因為流處理應用的核心價值就在于在線實時處理從而降低處理延遲,所以消息處理延遲是流處理應用的非常理想負載衡量指標.
eSault的負載監控是由每個處理單元獨立進行的,即每個處理單元監控自身各子處理單元的負載,并根據負載情況作出彈性伸縮的決策.eSault在Bolt中添加了延遲監控器模塊.如圖5(b)所示,延遲監控器周期性地給Bolt的所有子處理單元發送探針(probe),探針流經輸入路由器、最近一次處理數據的處理元素和輸出路由器后,返回延遲監控器,之后延遲監控器即可通過探針發出時間和返回時間,判斷對應子處理單元的消息處理延遲.
2.3.2 基于消息處理延遲的過載判斷算法
算法的思路如下:延遲監控器每隔PROBE_PERIOD向所有子處理單元發送探針,并監控探針的處理時間是否超過MAX_LATENCY,如果有一個子處理單元在OVERLOAD_REACTION_TIME個采樣周期內的總超時次數所占比例超過OVER_LOAD_FACTOR,則認為該子處理單元過載,需要申請新的資源,并觸發伸縮機制將該子處理單元分裂至新資源上.
算法中的4個關鍵變量:MAX_LATENCY,PROBE_PERIOD,OVERLOAD_REACTION_TIME和OVERLOAD_REACTION_FACTOR需要應用配置指定,以改變算法的額外開銷、靈敏度等屬性.
1)MAX_LATENCY.算法允許的最大探針處理延遲.改變該變量,可以改變算法可容忍的最大消息處理延遲,應根據處理單元的任務類型和應用對消息處理處理延遲的要求合理配置該值.
2)PROBE_PERIOD.發射探針的采樣周期.改變該變量,可以調整算法的額外開銷和靈敏度.增大該值,會使采樣周期變長,采樣次數變少,從而使發送和處理探針帶來的額外開銷減小,但也會使算法對過載的反應時間變長,算法靈敏度下降;反之,會使算法的額外開銷增大,反應時間變短,靈敏度提高.
3)OVERLOAD_REACTION_TIME.過載判斷的反應時間.改變該變量可以調整算法的反應時間,從而調整算法靈敏度.增大該值,算法需要更長的時間才能確定過載,因而靈敏度下降;反之,算法靈敏度上升.
4)OVERLOAD_REACTION_FACTOR.允許的OVERLOAD_REACTION_TIME內超時記錄占總記錄數的比例.改變該變量,可以調整算法判斷條件的嚴格程度,從而調整算法靈敏度.增大該值,算法允許的超時次數增大,過載判斷條件更為嚴格,算法靈敏度下降;反之,算法靈敏度上升.
2.3.3 基于消息處理速度的輕載判斷算法
消息處理延遲在判斷過載時非常有效,但在判斷輕載時卻無法顯著反映負載情況.消息處理延遲主要由網絡傳輸時間、排隊時間和處理時間組成,動態擴展的主要目的是降低排隊時間和處理時間.在高負載情況下排隊時間和處理時間成為消息處理延遲的主要部分,所以其可以顯著反應排隊時間和處理時間,從而反映系統負載;而在輕載情況下網絡傳輸時間成為消息處理延遲的主要部分,其不再顯著反映排隊時間和處理時間,從而無法顯著反映系統負載.
為了解決這個問題,eSault設計了基于消息處理速度的輕載判斷算法,通過消息處理速度是否顯著低于峰值,判斷子處理單元是否處于輕載狀態.算法的主要思想是:延遲監控器依然每隔PROBE_PERIOD向所有子處理單元發送探針;子處理單元的輸入路由器會統計2次探針之間,子處理單元處理的消息總數,并在收到探針時將該結果存入探針中;延遲監控器根據探針中的消息總數是否低于LOW_WATERMARK×消息處理峰值判斷子處理單元是否輕載;如果有1個子處理單元在UNDERLOAD_REACTION_TIME個采樣周期內的輕載次數所占比例超過UNDERLOAD_REACTION_FACTOR且在OVERLOAD_REACTION_TIME個采樣周期內超時次數為0,則認為該子處理單元輕載,需要觸發伸縮機制將該子處理單元與相鄰單元合并,并釋放資源.
在該算法中,當消息處理速度顯著提高時,算法會將所有峰值信息重置,因為其認為這意味著系統的工作負載發生了顯著變化,算法應當使用最新的峰值來做決策.在基于消息處理速度的輕載判斷算法中,當探針超時的時候,說明系統進入了新一輪高負載運行狀態,算法通過使歷史峰值信息無效化來適應新的工作負載.算法中的4個關鍵變量:LOW_WATERMARK,PROBE_PERIOD,UNDERLOAD_REACTION_TIME和UNDERLOAD_REACTION_FACTOR同樣需要應用配置指定,4個變量的作用與基于消息處理延遲的過載判斷算法基本一一對應.
至此,eSault通過在處理單元中增加延遲監控器監控各子處理單元的消息處理延遲和處理速度,并通過基于消息處理延遲的過載判斷算法和基于消息處理速度的輕載判斷算法自動分析子處理單元負載,從而實現了根據負載自適應地分配和釋放資源,并自動地觸發伸縮,最終實現了彈性可伸縮.
3.1 系統實現
本文使用編程庫Akka實現了eSault的原型系統*https://github.com/pkusei/Sault.Akka是一個運行在JVM上的基于Actor模型的開源工具包和運行時.Akka具有輕量級Actor,Actor位置透明、消息分發高效等特點,非常適合構建高效的分布式并發應用.在實現eSault的過程中,所有的功能模塊均使用Akka的Actor進行刻畫,這使得eSault的結構非常簡單直觀.
3.2 彈性可伸縮效果驗證
本實驗的主要目的是驗證eSault彈性可伸縮的能力,即證明eSault上運行的流處理應用可以隨輸入流量的變化自動調整資源使用量,并保證處理延遲的穩定.實驗的主要思路是:階段性地調整Emitter生成單詞的速度,并在此過程中監測單詞生成速度、單詞平均處理延遲和Counter的子處理單元數目的變化情況.其中輸入流量對應單詞生成速度,并行度對應Counter的子處理單元數,延遲對應單詞平均處理延遲.
如圖6(a)所示實驗過程中,輸入流量共經歷了2次上升和下降的變化周期,每個周期為時約250 s.在每個周期內,輸入流量在前30 s內,每10 s提升約50 000 tuples/s的單詞生成速度,并在達到峰值后穩定約70 s;此后每20 s下降約50 000 tuple/s的單詞生成速度,并在達到谷值后穩定約70 s.

Fig. 6 Verification of elastic effect圖6 彈性效果驗證
通過分析輸入流量、處理延遲和并行度三者的變化關系,可以發現如下符合實驗預期的現象:
1) 實驗過程中的大部分時間,消息處理延遲基本穩定在較低水平.如圖6(b)所示,在整個實驗過程中,應用的消息處理延遲基本穩定在100 ms以下,即使在輸入流量達到峰值后,消息處理延遲在大部分時間也穩定在100 ms以下.
2) 并行度隨輸入流量的變化趨勢明顯.圖6(c)中可以觀察到Counter的初始并行度為2,隨著輸入流量提高,其并行度快速增加以適應負載增加,最終達到8;隨著輸入流量降低,其并行度逐漸減少以釋放多余資源,最終達到1.上述實驗結果基本滿足實驗預期,可以證明eSault基本支持了流處理應用的彈性可伸縮.
3.3 彈性可伸縮的必要性驗證
本實驗的主要目的是驗證上述彈性效果實驗中,系統能夠根據數據流負載自動作出資源調整,從而保證系統不會因突然的數據流量高峰崩潰,也不會在數據流量較低時浪費資源.實驗的主要思路是:使用與彈性效果實驗中相同的Emitter,測試不同并行度的Counter,觀察單詞平均處理延遲的變化情況,并與彈性效果實驗中的延遲變化情況進行比較,從而驗證彈性伸縮的必要性.
圖7展示了在預設為不同并行度以及彈性可伸縮執行的情況下,處理延遲隨著輸入流量而變化的情況.由于并行度為1,2的情況較為特殊,單獨在圖7(a)中展示;其余并行度以及彈性執行的情況在圖7(b)中展示;圖7(c)詳細展示了輸入流量達到峰值且彈性伸縮延遲穩定后(60~110 s)不同并行度情況下的延遲比較.

Fig. 7 Necessity validation of elasticity圖7 彈性必要性驗證
通過觀察實驗結果可以發現,當并行度固定為1和2時,系統在輸入流量上升后產生了嚴重的消息堆積,最終導致底層通信機制因來不及處理心跳信息而出錯,使系統無法正常運行.而彈性伸縮情況下,雖然Counter的初始并行度也為2,但其通過自動提高并行度度過了流量高峰,并保持了延遲的基本穩定.這說明較低的并行度無法在規定延遲內處理數據流量高峰,而彈性伸縮機制可以通過增加資源應對流量高峰,此現象符合實驗預期.
如圖7(b)所示,當并行度固定為4和6時,系統可以承受數據流量高峰,且沒有彈性伸縮的延遲波動期.但如圖7(c)所示,在彈性伸縮的延遲穩定后,并行度為4和6的情況下,處理延遲會高于彈性伸縮的情況.并行度為8的情況下,延遲均值、方差和最大值均顯著低于并行度為4和6的情況,而僅略高于并行度為10的情況.這說明8是該流量峰值下合適的并行度,而eSault的彈性機制確定的并行度恰為8,這說明eSault的彈性伸縮機制準確地找到了最合適的并行度.不過因為彈性伸縮機制本身有一定開銷,所以最終彈性情況下的延遲略高于并行度為8的情況.
上述實驗結果基本滿足實驗預期,可以證明eSault的彈性支持可以準確地根據輸入流量找到最佳并行度,既能應對流量高峰,又能在流量低谷節約資源.
本文基于Actor模型設計與實現的彈性流處理框架eSault,除了實現其他通用流處理框架的基本功能外,還重點支持了應用的彈性伸縮.實驗證明了eSault可以準確根據輸入流量決定資源使用量,既能在流量高峰時保持延遲穩定,又能在流量低谷時節約資源,達到了預期效果.
未來的工作包括2個方面:
1) 更加智能的自適應算法,使得參數配置可以根據數據流歷史情況挖掘流量變化規律自動調整,對于流量波動較少但幅度較大的數據流,采用激進的參數配置;對于流量波動幅度較小但頻繁的數據流,采用穩健的參數配置.
2) 有狀態處理單元的彈性伸縮,進一步研究如何將狀態管理原語以盡可能透明地方式納入eSault的編程模型,并在框架內支持彈性伸縮過程中的狀態遷移,從而實現對有狀態處理單元彈性伸縮的原生支持.
[1]Cheng Xueqi, Jin Xiaolong, Wang Yuanzhuo, et al. Survey on big data system and analytic technology[J]. Journal of Software, 2014, 25(9): 1889-1908 (in Chinese)(程學旗, 靳小龍, 王元卓, 等. 大數據系統和分析技術綜述[J]. 軟件學報, 2014, 25(9): 1889-1908)
[2]Cui Xingcan, Yu Xiaohui, Liu Yang, et al. Distributed stream processing: A survey[J]. Journal of Computer Research and Development, 2015, 52(2): 318-332 (in Chinese)(崔星燦, 禹曉輝, 劉洋, 等. 分布式流處理技術綜述[J]. 計算機研究與發展, 2015, 52(2): 318-332)
[3]Gulisano V, Jimenez-Peris R, Patino-Martinez M, et al. Streamcloud: An elastic and scalable data streaming system[J]. IEEE Trans on Parallel and Distributed Systems, 2012, 23(12): 2351-2365
[4]Hummer W, Satzger B, Dustdar S. Elastic stream processing in the cloud[J]. Wiley Interdisciplinary Reviews: Data Mining and Knowledge Discovery, 2013, 3(5): 333-345
[5]Qi Kaiyuan, Han Yanbo, Zhao Zhuofeng, et al. MapReduce intermediate result cache for concurrent data stream processing[J]. Journal of Computer Research and Development, 2013, 50(1): 111-121 (in Chinese)(亓開元, 韓燕波, 趙卓峰, 等. 支持高并發數據流處理的MapReduce中間結果緩存[J]. 計算機研究與發展, 2013, 50(1): 111-121)
[6]Lu Xicheng, Wang Huaimin, Wang Ji. Internet-based virtual computing environment (iVCE): Concepts and architecture[J]. Scientia Sinica: Informationis, 2006, 49(6): 681-701
[7]Buyya R, Broberg J, Goscinski A. Cloud Computing: Principles and Paradigms[M]. New York: John Wiley & Sons, 2011: 457-490
[8]Herbst N R, Kounev S, Reussner R. Elasticity in cloud computing: What it is, and what it is not[C] //Proc of the 10th Int Conf on Autonomic Computing. Berkeley, CA: USENIX Association, 2013: 23-27
[9]Neumeyer L, Robbins B, Nair A, et al. S4: Distributed stream computing platform[C] //Proc of the 13th Int Conf on Data Mining Workshops. Piscataway, NJ: IEEE, 2010: 170-177
[10]Toshniwal A, Taneja S, Shukla A, et al. Storm@ twitter[C] //Proc of the 2014 ACM SIGMOD Int Conf on Management of Data. New York: ACM, 2014: 147-156
[12]Zaharia M, Das T, Li H, et al. Discretized streams: An efficient and fault-tolerant model for stream processing on large clusters[C] //Proc of the 4th USENIX Conf on Hot Topics in Cloud Computing. Berkeley, CA: USENIX Association, 2012: 10
[13]Satzger B, Hummer W, Leitner P, et al. Esc: Towards an elastic stream computing platform for the cloud[C] //Proc of 2011 Int Conf on Cloud Computing. Piscataway, NJ: IEEE, 2011: 348-355
[14]Fernandez R, Migliavacca M, Kalyvianaki E, et al. Integrating scale out and fault tolerance in stream processing using operator state management[C] //Proc of the 2013 ACM SIGMOD Int Conf on Management of Data. New York: ACM, 2013: 725-736
[15]Hewitt C, Bishop P, Steiger R. A universal modular actor formalism for artificial intelligence[C] //Proc of the 3rd Int Joint Conf on Artificial Intelligence. San Francisco, CA: Morgan Kaufmann, 1973: 235-245
[16]Cesarini F, Thompson S. Erlang Programming[M]. Sebastopol, CA: O’Reilly Media, Inc, 2009
[17]Zhan Hanglong, Kang Lianghuan, Cao Donggang. DETS: A dynamic and elastic task scheduler supporting multiple parallel schemes[C] //Proc of the 8th Int Symp on Service Oriented System Engineering. Piscataway, NJ: IEEE, 2014: 278-283
An Elastic Scalable Stream Processing Framework Based on Actor Model
Zhan Hanglong, Liu Lantao, Kang Lianghuan, Cao Donggang, and Xie Bing
(Key Laboratory of High Confidence Software Technologies (Peking University), Ministry of Education, Beijing 100871)(Peking University Information Technology Institute(Tianjin Binhai), Tianjin 300450)
In the era of big data, stream processing has been widely applied in financial industry, advertising, Internet of things, social networks and many other fields. In streaming scenarios, the generation speed of stream data tends to be fluctuant and difficult to predict. If the streaming peak is larger than system capacity, the system may run slowly or even crash, which leads to job failure. If excessive resources are provided in case of streaming peak, there can be unnecessary waste under light load. In order to solve the matching problem between stream processing load and resources, stream processing system should be elastically scalable, which means that provided resources can be adjusted automatically according to the real-time change of stream flow. Although some researches have made great progress in stream processing, it is still an open problem that how to design an elastic scalable system. This paper introduces eSault, an elastically scalable stream processing framework based on Actor model. eSault firstly manages the processing units stratified hierarchically based on Actor model, and realizes scalability with two-layer routing mechanism. On this basis, eSault proposes an overload judgment algorithm based on data processing delay and light load judgment algorithm based on the data processing speed to efficiently allocate the resources, and achieve elastically scalable stream processing. Experiments show that eSault has good performance, and can achieve flexible scalability well.
stream processing; Actor model; cloud computing; elastic scalable; two-layer routing mechanism

Zhan Hanglong, born in 1989. Received his PhD degree from the School of Electronics Engineering and Computer Science, Peking University in 2016. His main research interests include big data, system software, parallel and distributed computing, etc.

Liu Lantao, born in 1990. Received his MSc degree from the School of Electronics Engineering and Computer Science, Peking University in 2015. His main research interests include big data, system software, parallel and distributed computing, etc.

Kang Lianghuan, born in 1986. Received his PhD degree from the School of Electronics Engineering and Computer Science, Peking University in 2015. His main research interests include distributed systems, concurrent programming structures and languages, etc.

Cao Donggang, born in 1975. Received his PhD degree from the School of Electronics Engineering and Computer Science, Peking University in 2004. Currently associate professor at Peking University. His main research interests include system software, parallel and distributed computing, etc.

Xie Bing, born in 1970. Received his PhD degree from the School of Computer, National University of Defense Technology in 1998. Currently professor and PhD supervisor at Peking University. His main research interests include software engineering, formal methods and software reuse, etc (xiebing@pku.edu.cn).
2015-12-09;
2016-08-08
國家“八六三”高技術研究發展計劃基金項目(2015AA01A202);國家“九七三”重點基礎研究計劃基金項目(2011CB302604);國家自然科學基金項目(61272154,61421091);百度云服務開放平臺示范項目(2015年) This work was supported by the National High Technology Research and Development Program of China (863 Program)(2015AA01A202), the National Basic Research Program of China (973 Program)(2011CB302604), the National Natural Science Foundation of China (G61272154, G61421091), and the Baidu Cloud Service Open Platform Demonstration Project (2015).
曹東剛(caodg@pku.edu.cn)
TP391