■廣州 張鵬 諶得志
計算資源是各類應用系統完成數據信息處理的物質基礎。通過有效調度處理任務,可以更加充分利用有限的計算資源,提高信息系統的工作效率。
原有的業務系統是由若干臺服務器組成的,每臺服務器又部署了若干業務處理軟件。這些處理軟件獨立完成從任務調度到數據處理的全過程。由于新增業務越來越多,系統逐漸呈現碎片化的趨勢。業務分布依靠人工調節,存在部分服務器資料占用率偏高,而部分服務器虛耗的現象。
為了改善原有業務系統存在的問題,更加有效的利用軟硬件資源,結合業務工作實際,提出了基于ActiveMQ 數據總線的分布式業務處理模型。
ActiveMQ 是Apache 軟件基金會所研發的開放源代碼消息中間件。它是一個純java 開發的軟件,具有良好的跨平臺特性。ActiveMQ 支持多種語言和協議,適用于異步調用、一對多通信、多應用解耦以及提高系統伸縮性等用途,在各類系統開發設計中廣泛使用。
將ActiveMQ 應用到分布式的業務處理平臺的設計過程中,可以通過其消息隊列功能實現多臺數據處理終端的協同工作,并且可以解除任務調度和任務處理的緊密耦合,提高系統的整體性。
采用ActiveMQ 作為數據總線,解決了任務調度和多機系統工作的問題。如圖1 所示,系統分為三部份:數據總線、生產者和消費者。
數據總線由ActiveMQ 實現,主要承擔接收任務調度信息,協調多機完成任務處理。生產者由若干觸發器程序組成。生產者緊貼業務需求開發,負責產生任務調度信息,向數據總線發出執行請求。消費者獲取執行任務和參數,完成最終數據處理任務。

圖1 系統結構圖
為了區分業務優先級,數據總線由多條隊列組成,業務調度信息被發送到不同的隊列中。再由數據總線將消息指派給消費者進行處理。消費者終端可以根據本地配置決定注冊到哪些消息隊列來獲取消息。
這樣設計的好處是可以為特別的消費者單獨指定消息隊列,這對于實時性要求特別高的或是運行環境有特別要求的業務,具有極其重要的意義。
不同業務處理的優先次序問題也是需要重點考慮的。對于實時性要求較高的處理任務,應該處于較高的處理優先級,并被盡可能快速的執行。而對于一些實時性要求不太高的任務,可以等到系統有空余資源時再處理。
此外,需要注意一些耗時較長的數據處理任務,有可能會大量占用處理資源,使得實時性要求較高的業務被堵塞在隊列中,進而導致數據處理時延增大,資料時效性降低。
分布式系統中多個消費者同時工作可以解決一部分的優先次序問題,因為多個消費者的存在從某種程度上減少了單個消費者被任務堵塞的現象。為了進一步解決數據處理優先級問題,消費者模塊設計采用兩級優先級控制機制,系統模型結構圖如圖2 所示。

圖2 消費模塊原理圖
第一級稱為源抑制機制。系統通過定義不同的ActiveMQ 通信隊列對業務進行區分,消費者模塊通過控制是否注冊到ActiveMQ 消息隊列中。
決定是否接受數據總線指派的任務。它的具體做法是消費模塊按照配置初始化監聽消息隊列,并將接收到的調度信息存儲到優先級隊列中。監聽器不斷的檢查優先級隊列,切斷低于最低優先級的消息隊列,直到優先級隊列被清空為止。這樣消費者模塊始終只可以接收到更高優先級任務,低優先級的任務會被指派給其他服務器或是保留在數據總線中,直到有空閑消費者注冊到該隊列時,任務才會被指派給消費者執行。
第二級就是優先級隊列機制。消費者接收到的消息被保存在一個優先級隊列中,而優先級是由本地監聽的隊列順序來決定的。監聽控制模塊按照優先級從高到底的順序把消息推送到通信隊列中。由于監聽控制模塊一次只推送一條消息到通信隊列中,以保證執行器獲取的始終是優先級隊列中的最高優先級任務。最后,執行器不斷的從通信隊列中獲取任務,并指派給工作進程來完成處理。
搭建該系統首先需要部署Apache ActiveMQ 軟件。該軟件配置比較簡單,這里就不詳細介紹了,具體內容可以參考官方網站說明。其次,就是消費模塊的實現。消費模塊采用python 語言開發。python 具備良好的跨平臺能力,而且是極佳的膠水語言,可以適應多種環境的任務調度需求。Python 語言支持通過stomp 協議訪問ActiveMQ,所以在開發前需要安裝python 的stomp 庫。
當 python 完成ActiveMQ 的注冊后,就可以從ActiveMQ 獲取消息了。
需要注意的是收到的消息被保存到queue 模塊的PriorityQueue 隊列中。監聽控制部分通過不斷檢查PriorityQueue 隊列來確定哪些隊列會被注冊(subscribe)或是注銷(unsubscribe)。
除了PriorityQueue 外還需要multiprocessing 模塊的消息隊列Queue,因為Queue 是進程安全的,適合用于監聽器和執行器之間傳遞消息。
監聽器只有在通信隊列Queue 為空時,才會向里推送消息。因為,Queue 時先進先出的,當多條消息同時位于通信隊列Queue 中,可能會造成任務優先級的混亂。特別是當個別任務處理耗時較長時,對信息處理優先級影響尤其明顯。最后由執行器從通信隊列中獲取消息后會將任務提交給進程池Pool完成處理。

圖3 ActiveMQ 隊列開放情況
為了驗證系統的優先級調度,定義了5 條消息隊列,分別為channel1 到channel5。先向channel2與channel3 發送消息,間隔1 秒時間再向channel1、與channel3 發送消息。
第一次發送向channel3的消息被最新消費,其次是第二次發送向channel3的消息被消耗,此時觀察ActiveMQ 監控頁面如圖3 所示。
第二次發送的channel1的信息被保留在了隊列中。接下來被消耗的才是第一次發送的channel2 的信息。最后是channel1 的消息。
經過多次測試,系統基本能夠保證多機系統工作和高優先級任務被優先執行。
基于ActiveMQ 的分布式業務處理平臺,依托數據總線實現多機的協同工作,是一種相對簡單的分布式數據處理的實現方法,但對于業務數據處理的效率提高是十分明顯的。采用多機協同處理,可以更加有效的利用閑置的處理資源,同時也可以避免單節點故障造成的業務中斷。對于新增業務只需遵守平臺規范開發觸發器(生產者)和業務處理邏輯,即可融入到原有的系統架構中,也避免新業務、新軟件造成的系統破碎化。
這一模型還有很大的優化空間,可以將更加復雜的控制邏輯引入監聽模塊的注冊控制機制中,讓系統更加智能。一套完善的日志系統更加有利于分析系統資源利用情況,提高系統故障排查效率。