,,,,
(許繼電氣股份有限公司,河南 許昌 461000)
消息總線服務為進程間(多臺計算機之間和一臺計算機內部)的消息傳輸提供支持,具有消息主題的創建 / 刪除、消息的訂閱 / 發布等功能,以接口函數的形式提供給各類應用;具有組播、廣播和點到點傳輸形式,支持一對多、一對一的信息交換場合[1]。當發送者需要分發一組新的消息時,它向消息總線服務器發出一個請求,消息總線服務器會負責把請求遞送給所有的訂閱者。經過對當前各種開源軟件和其它行業軟件的分析對比, 配電系統決定采用 Kafka 作為消息總線的主體服務。
消息總線服務為配電主站系統的各個模塊提供基于 topic 的消息訂閱和發布,消息總線服務應考慮系統的集群部署、系統冗余功能等[2];消息總線服務應具備足夠的吞吐量,處理大量消息傳遞能力, 所以較高的性能是消息總線的基本要求。
消息總線采用 Qt/C++ 對 Kafka 的客戶端進行封裝,屏蔽各種網絡處理細節,給平臺其它應用一個簡單易用的接口[3]。整個消息處理組件主要包括兩部分內容:消息的發布和消息的處理。為了簡化應用,消息體現為函數參數,可以任意定義消息體結 構而不改變程序結構。消息的處理采用回調函數的方式,向系統注冊處理某種 topic 的回調函數即可。 下面具體介紹一下消息發布和消息處理的實現機制。
消息發布機制是對kafka客戶端提供的信息發送函數進行封裝,以達到向服務器上傳數據的目的[4]。由于事先不確定用戶上送的數據類型,封裝采用模板模式,支持任意數據類型。上送方式為單條上送,即一次只能上送一條數據,數據大小不做限制。并且接收服務器返回的上送數據狀態,成功返回0,失敗返回-1等。
消息處理采用的是向服務器注冊機制,對kafka客戶端提供的注冊函數進行封裝,以達到向服務器訂閱數據的目的。封裝技術與發布機制類似,采用模板形式。注冊可以分組注冊也可以同一組批量注冊。同一用戶可以注冊到服務器的不同組,但不能在同一組內多次注冊。
消息總線的性能指標主要包括:大數據容量、數據處理速率、消費者個數上限、消費者個數上限、穩定性等性能指標。以上幾項性能指標中大數據容量、數據處理速率、消費者個數上限和穩定性是最重要的,接下來我們具體分析這兩項性能指標。
大數據容量分為單機數據容量和集群數據容量,是指消息總線的數據吞吐量[5]。主要測試方法是采用控制變量的方法將測試用例中的消息數量和單個消息大小分別增大,將數據總量逐級增加,測試容量上限,記錄上限值與單機最大容量是否成正比例。在上限范圍內,單機或集群各項功能正常工作。
數據處理速率又包括:生產者數據處理速率、消費者數據處理速率、網絡風暴中單條生產者生產速率和網絡風暴中單條消費者接收速率。
生產者數據處理速率是指生產者生產數據的效率。主要測試方法是啟用測試用例,以50%容量上限值發送數據,記錄Producer的生產時間,計算生產者的數據處理速率;更改測試用例,逐步提升到80%容量上限值發送量,記錄Producer生產時間,計算生產者數據處理速率,直至發送量提升至容量上限值,借此測試生產者數據處理速率上限。在上限范圍內,集群各項功能正常工作。
消費者數據處理速率指消費者的接收效率。主要測試方法是啟用測試用例,以50%容量上限值發送數據,記錄Consumer的消費時間,計算消費者的數據處理速率;更改測試用例,逐步提升到80%容量上限值發送量,記錄Consumer消費時間,計算消費者數據處理速率,直至發送量提升至容量上限值,借此測試消費者數據處理速率上限。在上限范圍內,集群各項功能正常工作。
網絡風暴中單條生產者生產速率指網絡風暴情況下生產者生產一條數據的效率[6]。主要測試方法是更改測試用例,逐步提升到80%容量上限值發送量,記錄Producer生產時間,計算生產者單條數據處理速率,直至發送量提升至容量上限值,借此測試生產者單條數據處理速率上限。在上限范圍內,集群各項功能正常工作。
網絡風暴中單條消費者接收速率指網絡風暴情況下消費者接收一條數據的效率。主要測試方法是更改測試用例,逐步提升到80%容量上限值發送量,記錄Consumer消費時間,計算消費者單條數據處理速率,直至發送量提升至容量上限值,借此測試消費者單條數據處理速率上限。在上限范圍內,集群各項功能正常工作。
消費者個數上限又分為單機消費者個數上限和集群消費者個數上限。其中單機消費者上限是指單機消費者的最大上限,主要測試方法是啟動測試用例,使Producer以50%容量上限值持續生產數據,在一臺服務器上逐步增加Consumer個數同時消費,測試消費者個數上限;將測試用例調整至容量上限值,重復上述工作,測試消費者上限個數。在上限范圍內,各項功能正常工作。集群消費者個數上限是指整個集群的消費者上限者上限,主要測試方法是啟動測試用例,使Producer以50%容量上限值持續生產數據,在多臺服務器上逐步增加Consumer個數同時消費,測試消費者總個數上限;將測試用例調整至容量上限值,重復上述工作,測試消費者上限總個數,在上限范圍內,集群各項功能正常工作。
穩定性主要是指消息總線服務長時間穩定的運行和在高負荷下穩定的運行。具體長時間穩定運行是指最低保證50%容量上限值72小時穩定運行,具體測試方法是更改測試用例,50%容量上限值發送數據量,進行72小時拷機測試,拷機期間各項功能正常,程序不應產生異常或死機等現象,拷機測試期間觀測記錄CPU平均負荷率,拷機測試期間,觀測內存占有率,不發生內存泄露,拷機測試期間,各節點網絡平均負荷率。并且最低保證80%容量上限值30分鐘穩定運行,具體測試方法是更改測試用例,80%容量上限值發送數據量,進行30分鐘拷機測試,拷機期間各項功能正常,程序不應產生異常或死機等現象,拷機測試期間觀測記錄CPU平均負荷率,拷機測試期間,觀測記錄內存占有率,不發生內存泄露,拷機測試期間,記錄各節點網絡平均負荷率。高負荷穩定運行是指最低保證100%容量上限值5分鐘穩定運行,具體測試方法是更改測試用例,100%容量上限值發送數據量,進行5分鐘壓力測試,壓力測試期間各項功能正常,程序不應產生異常或死機等現象,拷機測試期間觀測記錄CPU平均負荷率,拷機測試期間,觀測記錄內存占有率,不發生內存泄露,拷機測試期間,記錄各節點網絡平均負荷率。
消息總線測試環境采用三臺服務器搭載Kafka服務、組成集群,系統采用單網千兆網絡進行連接,滿足2k+1配置,至少k+1臺服務器正常使用時,保證消息總線能正常工作。見圖1。
如圖2所示,消息總線軟件測試框架主要包括大數據生成、數據發送、數據接收和時間統計4個功能模塊。

圖2 消息總線性能測試軟件框架功能圖
數據生成模塊的主要功能是生成測試數據,為了便于統計數據大小,這里數據類型統一使用字符型。可以單個添加數據,也可以批量生成大量數據。數據發送模塊主要功能是與消息總線服務器建立鏈接并發送數據到服務器。數據接收模塊主要功能是消費者注冊和接收數據。時間統計模塊主要功能是統計并記錄消息總線處理數據的時間。由于要處理海量的數據,這里采用多線程技術,來保證測試框架界面的操作流暢。
Linux系統上采用 Qt/C++實現軟件框架。主窗口類設計如下:
class PUITest: public QMainWindow
{
Q_OBJECT
public:
explicit PUITest(QWidget *parent = 0);
~PUITest();
int m_DowNum,m_DowNum_reg,outcnt,totalcnt,cb_cnt;
outputDockWidget *m_InfoOutput;//操作信息輸出窗口
sendThread *sendmsgthread; //數據發送線程
runthread *runmsgthread; //海量數據發送線程
bool runOverflag;
int tId;//線程ID
void removeDockTitle(QDockWidget *docwidget);//去除頭標志
void timerEvent(QTimerEvent *event);//定時器自動調用函數
void InitTabWidget();
int test2(const QString str);
void Output(QString s);
void msgsendThread();
void magrunThread();
public slots:
void sendmsgover();
private slots:
void on_action_about_triggered();
void on_action_add_triggered();
void on_action_delete_triggered();
void n_action_reg_triggered();
void on_action_send_triggered();
void on_checkBox_clicked();
void on_action_unreg_triggered();
void on_action_addNumbers_triggered();
void on_action_save_triggered();
void on_action_open_triggered();
void on_action_run_triggered();
private:
Ui::PUITest *ui;
};
線程類設計如下:
class sendThread: public QThread
{
Q_OBJECT
signals:
void sendover();//發送數據結束信號
protected:
void run();//線程自動調用
};
框架實現主界面如圖3所示,主界面中有兩個分頁面,一個是發送信息配置,主要包括數據生成、發送次數設置、發送延時設置等。一個是接收信息配置,主要是注冊信息的設置等。數據可以單個添加,更可以批量生成。另外還有操作提示窗口,時間統計窗口等。

圖3 消息總線測試軟件框架主界面
采用前幾節中提出的測試方法及設計的測試框架,對消息總線性能進行測試。測試中使用三臺服務器組成消息總線集群,兩臺客戶端安裝測試程序。具體軟硬件信息如表1和表2所示。

表1 軟硬件配置環境

表2 軟件消息
通過測試框架的測試軟件自動生成海量的測試數據,并自動統計數據處理時間和讀取服務器資源占用信息。以下所有記錄均在HP DL560 G8 服務器上進行測試,測試10次求平均值,性能測試結果如表3~6所示。

表3 生產者生產速率

表4 消費者消費速率

表5 網絡風暴中單條生產信息生產時間

表6 網絡風暴中單條消費消息收時間
表3是生產者生產效率的的測試結果,從該測試結果中可以發現,發送量一定的情況下,隨著單條數據字節數的增加,發送效率逐漸降低。表4是消費者消費效率的測試的測試結果,從該測試結果中可以發現,接收量一定的情況下,隨著單條數據字節數的增加,接收效率逐漸降低。通過對比發現,消費效率要明顯高于生產效率。表5是網絡風暴中單條信息的生產效率測試結果,從該測試結果中可以發現,不同程度網絡風暴情況下,生產單條數據的效率相對穩定。也就是說網絡風暴對生產者的生產效率影響不大。表6是網絡風暴中單條消費信息的消費效率的測試結果,從該測試結果中可以發現,不同程度網絡風暴情況下,消費單條數據的效率不穩定。也就是說網絡風暴對消費者的效率影響較大。
消費者上限測試,當消費者為10 000時,系統各項指標均正常運行。這以遠遠超出我們實際應用時的上限,新型配電主站系統實際應用時的消費者數量不超過1 000,所以這里不再具體測試。
穩定性測試時接入不少于 10 000臺配電終端,每臺終端定義100個“ 三遙” 數據信息接入被測系統,系統應不間斷運行維持 72 h, 被測系統 cpu及內存使用情況如下:
1)當50%發送速率拷機,單條消息字節長度為1 024 byte時,內存占用率和網絡負荷負荷率正常;
2)當80%發送速率拷機,單條消息字節長度為1 024 byte時,內存占用率和網絡負荷負荷率正常;
3)當100%發送速率拷機,單條消息字節長度為1 024 byte時,內存占用率和網絡負荷負荷率較高。
經性能測試,除了掌握以上性能規律外,還發現了兩個有價值的問題,問題1:當向消息總線服務器發送海量數據,導致服務器存儲空間不足時,消息總線服務直接崩潰;問題2:向消息總線服務器長時間發送1 024 byte大小的數據流時,服務器內存占用過大。以上性能數據及問題得到了設計人員的高度認可,充分體現了文中設計的測試框架的價值。
隨著配電信息數據規模的不斷擴大,對消息總線的性能要求越來越高,而數據吞吐量和數據處理效率是消息總線性能的重要指標。本文通過對kafka為服務主體的消息總線性能進行分析后,提出了效果較好的測試方法,設計了實用性很強的測試框架。經實際應用證明,該測試框架簡單易用,在很大程度上提高了性能測試的效率。該框架有較好的通用性,其他消息總線的測試也可以使用。