戴振邦 江恩杰 劉力嘉 甘江偉 戴振邦 江恩杰 劉力嘉 甘江偉



摘要:集中化,合并任務的處理是分布式設計中的一個難點。由于不同任務中需要頻繁的數據交換,且需對不同任務的結果進行合并,使其難以使用目前主流的分布式設計模型。分布式管道設計模式是一種高度解耦合的分布式設計模型,適合處理業務粒度較大,數據交換頻發,IO瓶頸較高的任務。文章在分布式管道設計模式的基礎上,參考服務發現機制實現了一個高性能的管道服務框架。
關鍵詞:分布式;服務發現;動態負載均衡;管道任務;高解耦
中圖分類號:TP311? ? ? 文獻標識碼:A 文章編號:2096-4706(2021)07-0044-07
Design and Implementation of Pipeline Service Framework Based on
Distributed Pipeline Mode
DAI Zhenbang1,JIANG Enjie2,LIU Lijia2,GAN Jiangwei3
(1.School of Chemical Engineering,Northwest Minzu University,Lanzhou? 730124,China;
2.Information Engineering Department,Chengyi University College,Jimei University,Xiamen? 361021,China;
3.College of Physics and Electronic Information Engineering,Minjiang University,Fuzhou? 350108,China)
Abstract:Centralization and combined task processing is a difficulty in distributed design. Due to there are frequent data exchanges in different tasks and the results of different tasks need to be merged,it is difficult to use the current mainstream distributed design model. Distributed pipeline design pattern is a high decoupling distributed design model,which is suitable for the tasks with large business granularity,frequent data exchange and high IO bottleneck. Based on the distributed pipeline design pattern,this paper implements a high performance pipeline service framework with the reference of service discovery mechanism.
Keywords:distributed;service discovery;dynamic load balancing;pipeline task;high decoupling
收稿日期:2021-03-09
基金項目:大學生創新創業訓練計劃項目 (省級)(202013471013)
0? 引? 言
目前比較流行的分布式解耦合的解決方案中,如微服務,這是一種將應用程序構造為一組松散耦合服務的軟件開發技術,是面向服務的體系結構(Service-Oriented Architecture,SOA)架構樣式的一種變體[1]。在微服務體系結構中,其服務是細粒度的,并且協議是輕量級的[2],是一種高度解耦合的分布式設計思想,但微服務更適合用在解決模塊化可分離的任務中,可以在不同服務中通過相互調用的方式來完成不同服務中的協同。對于目前互聯網大多數分層業務來說,微服務是一個很好的解決辦法,但對于集中化的,需要合并處理任務來說,微服務無法有效地對任務進行分層。本文參考分布式管道設計模式的解決方案,設計一個用于分層合并任務的分布式解耦合任務處理框架。
本文設計了一個分布式解耦合任務處理框架(下文簡稱管道服務框架),主要解決分布式任務中的服務發現、動態負載均衡和管道粒度的拆分,最后通過實際業務中的應用效果驗證了該框架的可行性和有效性。
1? 分布式管道設計模式
分布式管道設計模式是一種去中心化的分布式設計模式,其特點是無須任務分配,每個獨立的管道(總任務的子集)都可以主動或被動地推送或接收數據,并且按照一定的規則對數據進行處理,然后執行流出或者暫時儲存的操作,管道還可以截留數據,在特定條件下對數據進行合并流出。在管道設計模式中,管道可以根據數據的標記,使用一種類似服務發現的方式主動地推送數據到處理管道中,即對于任何一個產生的數據,無須使用消息隊列等工具對任務進行推送,只需要交付到任何一個管道的入口由管道進行推送即可。
分布式管道設計模式中道管道類型有進程管道、協程管道、線程管道,不同的管道維護不同的上下文,同時有不同的處理方案,相對來說,進程管道做到了并行處理,適用于計算密集型任務;協程管道適用于高IO任務,可以幾乎不需要額外的消耗,進行操作系統回調。
2? 管道服務框架架構
如圖1所示,管道服務架構中,服務表示轉流層負責記錄每個處理管道的地址,以及管道數據交換以及日志的緩存。數據交換層主要負責相互隔離的2個管道的數據交換。管道工作層主要負責用戶邏輯的處理部分,數據的合并處理,數據的流出處理。日志層主要用于管道崩潰后,根據日志進行數據的恢復,以及管道massage的記錄。回調層,主要用于任務處理完畢或者管道合并后的結果輸出。
2.1? 服務表示轉流層
服務表示轉流層主要分為服務注冊中心、管道全局中心。服務注冊中心[3]在管道服務框架中使用,全局管道空間實現一個簡單的Key-value數據庫,當然可以使用ETCD[4]或者Redis[5]來作為服務注冊中心的載體。
服務注冊中心的作用主要是為了維護每個管道或者管道組的正常運行,維護每個管道或者管道組的信息,實際上在使用中會把多個管道合并為一個集合,包括管道集合所在的IP端口,以及它擁有的管道ID。并且每個管道集合必須定時的重新上報自己的情況,否則過期后,服務注冊中心會刪除這個管道集合的信息。
管道全局中心的主要作用是用于在不同分區中的管道交換數據時,會先在管道全局中心中緩存數據,發送管道先緩存數據,然后告知目標管道,目標管道激活后直接通過管道交換通道獲取到對應的數據,流入管道。值得注意的是,為了保證在發生網絡分區時,框架的數據恢復能力,管道對全局空間并不會直接操作,而是需要通過管道交換通道進行。
2.2? 數據交換層
數據交換層主要由多個管道交換通道組成,一般來說管道交換通道的數目與管道集合的數目相等,即一個集合一個管道交換通道,管道交換通道主要用于解決數據一致性問題;數據通道解決事務一致性問題,保證管道推送的數據一定被日志記錄以及被上傳到管道全局中心中。然后去通知接收管道,同時管道交換通道還負責負載均衡的實現,如果管道在流出數據時沒有指定目標管道的唯一標識ID,而是指定了管道服務ID,那么數據交換層會盡可能均衡的推送數據到管道集合中管道服務ID相同的分組。同時管道交換通道還支持動態均衡[4]策略,管道交換通道會跟蹤每一個管道分配的拉取時間,對于分配拉取管道拉取時間慢的管道集合地址更新更小的權重,根據權重來進行負載均衡。
交換層代碼為:
func (cr*ConsulResolver) resolveService(ctxcontext.Context) (string, connect.CertURI, error) {
health :=cr.Client.Health()
svcs, _, err :=health.Connect(cr.Name, "", true, cr.query Options(ctx))
iferr!=nil {
return"", nil, err
}
iflen(svcs) <1 {
return"", nil, fmt.Errorf("no healthy instances found")
}
// Services are not shuffled by HTTP API, pick one at (pseudo) random.
idx :=0
iflen(svcs) >1 {
idx=rand.Intn(len(svcs))
}
returncr.resolveServiceEntry(svcs[idx])
}
func (cr*ConsulResolver) resolveQuery(ctxcontext.Context) (string, connect.CertURI, error) {
resp, _, err :=cr.Client.PreparedQuery().Execute(cr.Name, cr.queryOptions(ctx))
iferr!=nil {
return"", nil, err
}
svcs :=resp.Nodes
iflen(svcs) <1 {
return"", nil, fmt.Errorf("no healthy instances found")
}
// Services are not shuffled by HTTP API, pick one at (pseudo) random.
idx :=0
iflen(svcs) >1 {
idx=rand.Intn(len(svcs))
}
returncr.resolveServiceEntry(&svcs[idx])
}
2.3? 管道工作層
管道工作層作為管道服務框架中實際的工作角色,用戶將給每個不同的管道定義不同的工作任務,同時定義管道的服務ID或者叫作任務ID,定義管道的流動條件,定義管道的推送條件,定義管道的合并條件。如在計算階乘和的任務中,我們可以分解出計算階乘和求和的子任務,這2個任務會被定義到2個不同的管道中并且有自己的任務ID,在這個任務中我們可以開啟多個計算階乘的管道,一個組合的管道,并且指示計算階乘的管道的流出是組合管道,在組合管道中會對階乘管道的求出結果做合并,最后輸出到流出器中等待回調得到最終的結果。管道的基本工作流出如圖2所示。
管道工作層代碼為:
typePolicystruct {
ID? ? ? ? ? ? ? ? ? ? string? ? ? ? ? ? ? ? ?`hcl:"id"`
Revision? ? ? ? ? ? ? uint64? ? ? ? ? ? ? ? ?`hcl:"revision"`
ACL? ? ? ? ? ? ? ? ? ?string? ? ? ? ? ? ? ? ?`hcl:"acl,expand"`
Agents? ? ? ? ? ? ? ? []*AgentPolicy? ? ? ? ?`hcl:"agent,expand"`
AgentPrefixes? ? ? ? ?[]*AgentPolicy? ? ? ? ?`hcl:"agent_prefix,expand"`
Keys? ? ? ? ? ? ? ? ? []*KeyPolicy? ? ? ? ? ?`hcl:"key,expand"`
KeyPrefixes? ? ? ? ? ?[]*KeyPolicy? ? ? ? ? ?`hcl:"key_prefix,expand"`
Nodes? ? ? ? ? ? ? ? ?[]*NodePolicy? ? ? ? ? `hcl:"node,expand"`
NodePrefixes? ? ? ? ? []*NodePolicy? ? ? ? ? `hcl:"node_prefix,expand"`
Services? ? ? ? ? ? ? []*ServicePolicy? ? ? ?`hcl:"service,expand"`
ServicePrefixes? ? ? ?[]*ServicePolicy? ? ? ?`hcl:"service_prefix,expand"`
Sessions? ? ? ? ? ? ? []*SessionPolicy? ? ? ?`hcl:"session,expand"`
SessionPrefixes? ? ? ?[]*SessionPolicy? ? ? ?`hcl:"session_prefix,expand"`
Events? ? ? ? ? ? ? ? []*EventPolicy? ? ? ? ?`hcl:"event,expand"`
EventPrefixes? ? ? ? ?[]*EventPolicy? ? ? ? ?`hcl:"event_prefix,expand"`
PreparedQueries? ? ? ?[]*PreparedQueryPolicy`hcl:"query,expand"`
PreparedQueryPrefixes[]*PreparedQueryPolicy`hcl:"query_prefix,expand"`
Keyring? ? ? ? ? ? ? ?string? ? ? ? ? ? ? ? ?`hcl:"keyring"`
Operator? ? ? ? ? ? ? string? ? ? ? ? ? ? ? ?`hcl:"operator"`
}
2.4? 管道工作層調度模型
在管道服務框架的實現中,管道的工作使用GMP模型,模型分為M、P、G三個角色,分別代表POSIX Threads,Processor和Go Pipeline。P可以理解為執行上下文,也就是context,P負責完成對G和M的調度,我們可以把M理解為操作系統資源的抽象,是真正的執行體;把G理解為要執行任務的抽象,是執行代碼和數據的集合。P用執行體M來執行G,并且維護了一個隊列來存放可執行的G,當前G執行結束,M就空閑了下來,P就可以從隊列的頂部取出下一個G在M上繼續執行。
當M去執行該系統調用時線程會阻塞并被操作系統掛起,這個時候P會把當前的G留在原來的M中處理,然后從隊列里取出下一個G并創建一個新的M對象來執行它。被的G-M對完成系統調用變成可執行狀態時,又會在合適的時機被重新調度執行。如圖3所示。
圖中,長矩形為全局可用資源,包括全局隊列、操作系統;圓角長矩形為局部可用資源,圖中為局部隊列;圓形為虛擬工作角色,即工作管道;正方形為具有調度功能的角色,圖中為P(虛擬調度器)以及CPU(調度器);三角形為實際工作內核線程;虛線標記部分標識其為虛擬整體,圖中相對于整個系統CPU為一個整體工作,實際上CPU獨立互不干涉。
使用以上模型,可以使管道用更小的開銷異步處理計算密集型以及IO密集型的任務,相對來說,更適合處理IO任務,并且解決了分布式管道設計模式中用戶的管道選擇問題,降低了管道設計模式的任務分層時的選擇復雜度,使得用戶只需要關注如何拆分任務。
2.5? 回調層
回調層主要為結果的輸出,用戶最終使用回調層提供的接口,如圖1中的流出器獲取到管道處理完畢的數據。值得注意的是,為了解決管道資源的占用問題,管道使用回調層流出數據后會立刻釋放當前的資源,認為該任務集合的本次任務已經完成。
2.6? 日志層
日志層主要分為異常日志和恢復日志:
(1)異常日志。異常日志主要用于記錄POSIX Threads這里可以理解為操作系統回調的錯誤日志,以及框架用戶在GOPipeline中手動定義,并且被拋出觸發的錯誤,異常任務可以對框架的執行情況進行簡要的分析,使得框架具有更好的錯誤檢測,中斷分析能力[6]。
(2)恢復日志。在管道設計模式中全局管道空間儲存每一次管道執行任務后得到的中間態數據,并且管道全局中心中的數據保存在內存中,一旦發生宕機或者進程意外終止等問題,管道全局中心中的數據會全部損失,對應的管道任務需要重新執行。為解決上述問題管道設計模式參考Redis的持久化策略[5]引入恢復日志來解決框架的容災問題。恢復日志的工作主要分為以下3個過程。
1)管道全局中心會在計算壓力較小的時候,對管道全局中心的數據進行磁盤持久化,被稱為備份,并且在恢復日志中打下一個版本標記。
2)管道交換通道每產生一次中間態數據,優先記錄操作到恢復日志中,保證管道全局中心以及恢復日志的原子性,一致性[7]。
3)一旦發生導致進程崩潰的問題,在管道服務框架重新啟動時優先從最新的備份中恢復數據到管道全局中心中,根據備份的版本號,到恢復日志中逐步對該版本號以后的操作進行恢復。
以下為日志層代碼:
funcSetup(config*Config, uicli.Ui) (*logutils.LevelFilter, *GatedWriter, *LogWriter, io.Writer, bool) {
// The gated writer buffers logs at startup and holds until it's flushed.
logGate :=&GatedWriter{
Writer: &cli.UiWriter{Ui: ui},
}
// Set up syslog if it's enabled.
varsyslogio.Writer
ifconfig.EnableSyslog {
retries :=12
delay :=5*time.Second
fori :=0; i<=retries; i++ {
...
// Create a log writer, and wrap a logOutput around it
logWriter :=NewLogWriter(512)
writers := []io.Writer{logFilter, logWriter}
varlogOutputio.Writer
ifsyslog!=nil {
writers=append(writers, syslog)
}
}
3? 管道服務框架設計與驗證
3.1? 任務說明
我們對巨潮網上市公司2020年期間的報告數據進行爬取,同時使用樸素貝葉斯算法進行主題抽取,獲取年報中財務分析部分的報表內容,并且對每個提取到數據進行儲存分析。
3.2? 任務瓶頸分析
通過對巨潮網爬蟲的年報進行估計,全國上市公司共3 869家其中深圳股市有2 241家,上海股市有1 628家,平均每個年度的報告數據為900條目,平均年報大小為1 350 kB,粗略估計總IO任務讀寫大小為4.05 TB,可以確定該任務為高IO復雜度任務。在本次實驗任務中,假設樸素貝葉斯模型的訓練過程已經完成,來分析樸素貝葉斯模型的計算壓力。利用式(1)對離散數據進行簡單化計算:
P(X|Ci)=P(xk|Cj)? ? ? ? ? ? ? ? ?(1)
式(1)中,P(X|Ci)為Ci文本屬于X的概率,P(xk|Cj)為Cj中包含詞條xk的概率。
樸素貝葉斯分類器[8],分類的設計復雜度大概為O(C× D),其中C為類別,D為特征數。在本次二分類問題中時間復雜度為O(2D)屬于線性時間復雜度,由于在本次實驗任務中,需要對每個上市公司的報告進行分段輸入樸素貝葉斯分類器中進行分類,根據對巨潮網的財報分析平均需要對3 000 byte(每個段落的大小)進行分割計算,所以本次任務也屬于高計算密集型任務。
綜上所述,本次設計目標為高IO高計算密集型任務,任務主要瓶頸為IO瓶頸,其次為計算瓶頸,最后為任務調度開銷。
3.3? 任務分層情況與管道設計
對于以上任務,在管道服務框架中,按照復雜度分離,可以概括為以下幾個任務:數據請求返回、原始數據保存、數據分割、樸素貝葉斯分類、報表數據統計。其中IO任務主要集中在數據請求,保存部分即數據采集部分。計算密集型任務主要存在于數據分割、樸素貝葉斯分類部分。報表數據產生的IO相對于數據采集部分IO的開銷極小,在本文中不做統計。經過以上分析可以建立以下管道任務模型,如圖4所示。
在圖4中,我們共定義了3個管道集合,管道集合的定義,可以把一類有較高瓶頸任務的組合在一起,如數據請求管道,數據采集保存集合代表了高IO任務,數據分段解析集合,數據分類集合都是一個高CPU密集型集合任務。在分布式管道設計模式理論中,將每個管道分離出來會有更好的處理能力,更小的粒度,但是在我們進行分機部署工作時,就需要指定大量的管道部署,這是一個非常復雜的過程,同時也會出現數據交換管道過多出現管道全局中心搶占問題。在本次設計的服務框架中,使用管道集合的方式集中的管理一些管道,這樣的做法會導致管道的高解耦合性有一定影響,但是在一定程度上我們可以在用戶對管道集合的設計部分分離出可能出現高耦合的任務。如圖4中分離出了文章分類管道,使得數據的處理和我們的分類任務分離使用不同的數據交換管道,來獲取更高的性能和錯誤恢復能力。
3.4? 對比實驗
本次實驗只采用完整任務的1/1 000的工作量,并且使用上述的管道服務框架,以及GoCollaborate框架,一個提供分布式服務管理搭建的輕量級通用框架,可以輕松地用它進行編程,構建擴展,以及創建自己的高性能分布式服務。使用上述的2個框架同時實現對平安銀行、東航物流、中科軟3個企業在2020的年報分析,并且對2個不同框架做,丟包率、內存使用情況、CPU負載情況、單位請求QPS做了比較研究。
3.5? 實驗結果
在實驗過程中使用pprof[9],一種Go自帶的工具,可以做CPU和內存的profiling。對2個不同框架對任務實現的內存以及CPU負載情況進行查看,結果如圖5所示。
圖5中可以觀察到管道服務框架的內存和CPU的占用率都要大于GoCollaborate框架,換言之,管道服務框架的開銷較大,這很有可能是管道服務框架的管道全局中心中,產生大量的數據交換,以及恢復日志的處理導致的。同時在6、7、8時間點,管道服務框架的CPU占用明顯下降經過分析,在6、7、8時間點,全局管道交換中出現高IO占用,管道服務框架,優先處理數據保存流出管道的拉取推送任務,而GoCollaborate框架是平衡的,在高IO任務到來時,它必須分配更多的CPU資源去管理IO任務。
在本次實驗中,為檢查每秒請求數(querypersecond,QPS)情況,使用自搭建Web服務器,只實現讀取接口,并對在單位時間內,本次實驗采用1分鐘作為運行時間,使用Gin框架[10](一個go語言實現的高性能web框架)對日志分析獲取單位時間內的QPS量。
QPS=req((請求數)/sec(秒)),在本次實驗中單位時間一定,QPS=req/60。得到QPS數據如表1所示。
在單純QPS的檢測中,框架性能相差不大,對于單純的高IO任務,管道服務框架和GoCollaborate框架在宏觀上來看都是平衡的,那么QPS的瓶頸在gin服務器中。
在本次實驗中丟包率以及總工作時間,使用系統運行日志進行分析,即在框架層面,每次發生數據傳輸丟失重新傳輸時,或者發生日志恢復時,認為發生丟包。在本次使用中的丟包率指的是在應用層發生重發的概率,忽略傳輸層的情況。使用W=重發數據包/發送數據包,來計算丟包率,結果如表2所示。
管道服務框架的丟包率要遠大于GoCollaborate框架,經過分析,這是由于管道服務框架在交互數據需要經過,管道、管道交換通道、管道全局中心、管道交換通道、管道,5個過程,GoCollaborate框架只需要從一個服務到另外一個服務,2個過程,并且GoCollaborate框架保證數據安全在傳輸層中解決,管道服務框架的數據安全保障在應用層中處理。在表現上管道服務框架的數據傳輸安全性較差,但在應用層中進行安全保證,使得在發生災禍時,管道服務框架有更好的容災能力。
4? 結? 論
特定的計算環境產生的不同特征的問題,本文的實驗過程在不同特點,不同計算性能要求情況下的處理能力往往不同。在真實應用環境下的性能可能達不到預期要求,隨著更多實際計算任務場景的出現,分布式管道如何更高效地解決計算機資源的處理、調度仍需更深層次的研究。通過結合本文研究的不足之處,須在以下方面進行相應改進:
(1)在連續性任務中,若中間某一類數據管道全部斷開連接,則數據將被迫留在管道全局中心中,可能導致全局管道空間數據溢出。對于此問題可以在全局管道空間中對每個集合管道的推送添加緩存最大值,同時在管道交換通道中添加緩存和通知機制,嘗試重新喚醒斷開連接的管道,最終解決管道全局中心數據溢出問題。
(2)在數據推送過程中,數據在原管道—原管道交換通道—全局管道空間—目標管道交換通道—目標管道轉移。在此過程中,由于網絡交換的次數較多數據更易丟失。對于此問題,可以在數據交換頻繁,單次計算量較小的部分定義一個快速管道(實現管道于管道之間直接通信,犧牲容災性),提高數據處理效率,降低框架丟包率。
(3)全局管道空間備份的時機。全局管道空間使用定時器對數據進行定期備份,且對數據加鎖防止其在保存過程中被篡改。在系統在處理高負載任務時若全局管道空間進行備份加鎖,使得管道交換通道無法進行數據傳輸,則會影響其高計算密集任務的工作,導致任務長期阻塞等待。在此提出一種偵察機制,使得系統在執行高IO任務時,有更高的備份優先級,一定程度避免在進行高CPU任務時進行全局管道備份,將其和定時器相結合,有效提高系統計算能力。
參考文獻:
[1] HU H R,FANG L L,YANG C H,et al. Research on Cloud Architecture of Enterprise Distributed Business Information System Based on SOA [J].Journal of Physics:Conference Series,2020,1684(1):1-8.
[2] 徐旻洋,高承勇,周向東,等.基于微服務架構的大型建筑設計企業生產業務平臺構建 [J].土木建筑工程信息技術,2019,11(3):89-95.
[3] 袁曉晨,張衛山,高紹姝,等.基于微服務架構的眾包圖像數據集標注系統 [J].計算機系統應用,2021,30(5):83-91.
[4] 鄧兆森.一種云平臺服務狀態顯示方法及相關裝置:CN1 12737882A [P].2021-04-30.
[5] DONG Y,ZHU P F,JIANG Z Y,et al. Real Time Data Distribution Technology of SCADA based on Redis [C]//Proceedings of 2016 International Conference on Computer,Mechatronics and Electronic Engineering (CMEE 2016).Beijing:DEStech Publications,2016:190-194.
[6] 賈統,李影,吳中海.基于日志數據的分布式軟件系統故障診斷綜述 [J].軟件學報,2020,31(7):1997-2018.
[7] 孫志龍,沙行勉,諸葛晴鳳,等.面向內存文件系統的數據一致性更新機制研究 [J].計算機科學,2017,44(2):222-227.
[8] 張晨躍,劉黎志,鄧開巍,等.基于MapReduce的樸素貝葉斯算法文本分類方法 [J].武漢工程大學學報,2021,43(1):102-105.
[9] 向勇,湯衛東,杜香燕,等.基于內核跟蹤的動態函數調用圖生成方法 [J].計算機應用研究,2015,32(4):1095-1099.
[10] 張晶,黃小鋒.一種基于微服務的應用框架 [J].計算機系統應用,2016,25(9):265-270.
作者簡介:戴振邦(2000—),男,漢族,福建莆田人,本科在讀,研究方向:分布式、數據挖掘、集群計算;江恩杰(2000—),男,漢族,福建漳州人,本科在讀,研究方向:人工智能、模式識別、云服務;劉力嘉:(2000—),男,漢族,福建漳州人,本科在讀,研究方向:虛擬化;甘江偉(2000—),男,漢族,福建永安人,本科在讀,研究方向:微電子。