甄凱成,黃 河,宋良圖
1.中國科學院 合肥智能機械研究所,合肥230031
2.中國科學技術大學,合肥230026
由于云計算和數(shù)據(jù)分析技術的高速發(fā)展,使得智慧城市[1]、精細農(nóng)業(yè)[2]、車聯(lián)網(wǎng)[3]、環(huán)境保護、工業(yè)監(jiān)測[4]等物聯(lián)網(wǎng)廣泛應用領域的數(shù)據(jù)獲取變得愈發(fā)重要,進而促使各種物聯(lián)網(wǎng)采集終端設備數(shù)量爆炸增長。設備數(shù)量及其感知的數(shù)據(jù)量的快速增長為用戶帶來了更精準的趨勢預測和更便利的生活服務,但同時,也為服務端數(shù)據(jù)收集帶來挑戰(zhàn)。如何滿足大量終端接入并快速處理終端上傳的數(shù)據(jù)成為亟待解決的問題。
物聯(lián)網(wǎng)數(shù)據(jù)上層網(wǎng)絡通信協(xié)議雖然在應用層上有多種不同的體現(xiàn),但其在傳輸層及下層網(wǎng)絡中基本上是遵從TCP/IP 網(wǎng)絡協(xié)議的[5]。因而用來接收物聯(lián)網(wǎng)數(shù)據(jù)的服務可以參考一般的網(wǎng)絡通信服務。
Netty 由于性能優(yōu)越、安全性高、使用簡便、社區(qū)資源豐富等特點使得其從諸多網(wǎng)絡通信庫中脫穎而出,成為高性能的通信領域的首選框架。當前,Netty 已成功應用在諸多流行項目中,如Cassandra、Spark[6]、Hadoop[7]等大數(shù)據(jù)領域;Dubbo、gRPC 等分布式通信框架均有Netty 的身影。在物聯(lián)網(wǎng)相關領域,Duan 等人[8]基于Netty 專用協(xié)議棧研究的基礎上,結(jié)合責任鏈的網(wǎng)絡協(xié)議體系結(jié)構,在遠程醫(yī)療監(jiān)護系統(tǒng)上實現(xiàn)了醫(yī)療數(shù)據(jù)業(yè)務的快速傳輸。還有基于Netty實現(xiàn)的車輛遠程監(jiān)控通信服務系統(tǒng),該系統(tǒng)從網(wǎng)絡設計模式、協(xié)議解析模式、共享數(shù)據(jù)同步和線程池四個方面進行優(yōu)化[9]。此外,在滑坡等地質(zhì)災害監(jiān)測系統(tǒng)中,Netty 也被用來處理全球?qū)Ш叫l(wèi)星系統(tǒng)終端上傳的大量、高頻數(shù)據(jù)[10]。
但是,在高并發(fā)環(huán)境下,為處理終端上傳的數(shù)據(jù),大量的同步耗時操作特別容易導致程序阻塞,從而帶來通信的響應時間過長、吞吐量低等情況。使用消息中間件的異步傳送方式將消息接收與寫入數(shù)據(jù)庫這種耗時操作解耦[11],可以避免上述情況的發(fā)生。Kafka 作為一款優(yōu)秀的消息系統(tǒng)也已被應用于物聯(lián)網(wǎng)相關領域中。如在CO2監(jiān)測系統(tǒng)的數(shù)據(jù)使用Kafka流平臺分發(fā)到數(shù)據(jù)庫中[12]。俄勒岡老齡科技中心在臨床監(jiān)測系統(tǒng)中使用Kafka使得其數(shù)據(jù)處理能力實時性更高[13]。在電網(wǎng)設備數(shù)據(jù)接入系統(tǒng)中,也大量使用到Kafka消息隊列以保證其數(shù)據(jù)處理的實時性[14]。
基于上述討論,本文使用Netty 網(wǎng)絡通信框架和Kafka消息隊列搭建了可供物聯(lián)網(wǎng)采集終端使用的數(shù)據(jù)接入系統(tǒng)。本文還自行設計一個可供參考的終端設備與消息收集端的通信協(xié)議,在設計協(xié)議過程中充分考慮了當前的實際應用易用性以及未來升級的可能性等因素。此外,多線程、線程池、堆外內(nèi)存、垃圾回收機制等方面的優(yōu)化,使得系統(tǒng)的性能得到提升。
目前高性能網(wǎng)絡通信服務大多是基于epoll機制和多線程模型組合的實現(xiàn)。而Netty可依據(jù)用戶自定義的程序啟動參數(shù)調(diào)整其運行期間的線程模型。Netty官方推薦使用主從Reactor多線程模型。其主要特點是擁有多個線程池,其中主線程池是處理新的客戶端連接,處理完新連接后將新建的Socket 綁定到從線程池中的某個線程中;從線程池將負責后續(xù)對這個Socket 的讀寫、編解碼、業(yè)務處理工作。設計主從Reactor 多線程模型的目的是將監(jiān)聽端口服務與處理數(shù)據(jù)功能剝離開來,從而提高處理數(shù)據(jù)的能力[15]。在實際應用中,Netty 支持添加多個從線程池,可按照業(yè)務特性將不同的業(yè)務分配到不同的從線程池處理,或若干個特性相似的業(yè)務分配到同一個從線程池。如圖1所示,主線程池負責響應新連接接入,從線程池1 負責編解碼業(yè)務,從線程池m 負責數(shù)據(jù)讀取業(yè)務,做到按特性分配,輔以合理的線程池參數(shù),可令Netty的性能更出色。
Kafka的commit log隊列是Kafka消息隊列概念的具體實現(xiàn)。生產(chǎn)者向commit log 隊列中發(fā)送流式消息,其他消費者可以在毫秒級延時處理這些日志的最新信息。每個數(shù)據(jù)消費者在commit log 中有一個自己的指針,并獨立移動,從而促使消費者們在分布式環(huán)境下能可靠、順序地處理隊列中的消息。commit log可以被多個生產(chǎn)者和消費者所共享,并覆蓋集群中的多臺機器,為集群中機器提供容錯保障。Kafka 作為一個現(xiàn)代的分布式系統(tǒng)還可以便捷地水平擴張和縮小。此外,Kafka 的消息代理(broker)能支持TB 級消息的持久化。上述特性使得Kafka 能夠?qū)⑵鋺梅秶粌H局限于消息系統(tǒng)。
系統(tǒng)的整體架構設計如圖2 所示。采集終端一般由眾多可連接互聯(lián)網(wǎng)的嵌入式設備組成。消息收集端暴露IP地址和端口,供采集終端連接,當有新的TCP連接或者新的消息發(fā)送時,都將觸發(fā)消息收集端的網(wǎng)絡通信處理程序。對于需要進一步處理的消息將由消息收集端通過異步方式推送到Kafka集群中。之后,再由不同的Kafka consumer 進程按照不同的業(yè)務需求來處理被推送到Kafka 集群中的消息。這些消息或被持久化到數(shù)據(jù)庫,或進行其他實時計算。此外,Zookeeper用來監(jiān)測Kafka集群的運行狀態(tài),協(xié)調(diào)管理Kafka集群;同時Zookeeper還可預留作為協(xié)調(diào)管理收集端服務水平擴展業(yè)務的服務軟件。
圖1 Netty多Reactor模型示意圖
引入Kafka 消息隊列來處理數(shù)據(jù)轉(zhuǎn)發(fā)業(yè)務有以下幾點原因:(1)若消息被收集端解析后,直接寫入數(shù)據(jù)庫或者進行數(shù)據(jù)分析等耗時操作,將會阻塞一條采集終端的連接,進而降低系統(tǒng)的并發(fā)能力。(2)當前數(shù)據(jù)的持久化操作一般會將熱點數(shù)據(jù)寫入如Redis 的內(nèi)存數(shù)據(jù)庫,常規(guī)數(shù)據(jù)寫入如MySQL的傳統(tǒng)關系型數(shù)據(jù)庫。對于這種數(shù)據(jù)消費多目的端的處理邏輯,利用多個Kafka 的consumer group 機制可以輕易地并發(fā)實現(xiàn)。(3)當采集終端的數(shù)據(jù)上傳速率較消息處理速率快時,Kafka 內(nèi)部自帶數(shù)據(jù)持久化功能,能保證消息不丟失,且Kafka 集群還支持動態(tài)擴容能擴展消息處理業(yè)務吞吐量。(4)當單一的收集端服務不足以支撐大量采集終端的數(shù)據(jù)上傳時,也可利用Kafka 的多Producer 便捷地實現(xiàn)收集端服務的水平擴展。(5)考慮到采集終端上傳的數(shù)據(jù)具有典型的流式數(shù)據(jù)特征,使用Kafka可以方便地拓展未來對數(shù)據(jù)進行實時分析處理業(yè)務。
通信協(xié)議作為采集終端與收集端消息傳送的“模板”,在業(yè)務中有著十分重要的地位,是采集終端與收集端消息傳送的重要組成部分。雖然MQTT[16]等協(xié)議已被大量使用,但是其在點對點通信、群通信以及負載均衡等方面仍有缺陷。而定義私有協(xié)議可更具靈活性,可按照業(yè)務需求有針對性地優(yōu)化。故本文設計了一個協(xié)議以供參考。通信協(xié)議在設計過程中除了考慮常規(guī)的業(yè)務需求,也需考慮工程實現(xiàn)的難易程度、編解碼性能等,還需兼顧未來對業(yè)務擴展與升級的影響。
本文通信協(xié)議的設計如圖3所示。協(xié)議由校驗碼、版本號、指令號、數(shù)據(jù)長度、數(shù)據(jù)五部分組成。
協(xié)議中各個組成部分的作用如下:
(1)校驗碼
設計校驗碼的主要意義是對采集終端進行鑒權,即剔除非本應用的采集終端連接,以防止惡意占用收集端資源。若校驗不通過,收集端將直接關閉該條TCP 連接。本文校驗碼設計成一個定值0x43218765。校驗碼不使用定期更換策略,原因是簡單的定值校驗碼驗證已經(jīng)能夠滿足大多情況下的校驗;且定期更換策略需要收集端及時推送校驗碼到各個采集終端,對收集端的性能是種損耗。
(2)版本號
協(xié)議版本號用來定義當前通信協(xié)議的版本,是為將來對通信協(xié)議的升級預留位。此處借鑒常用的網(wǎng)絡通信協(xié)議的設計,如IP 數(shù)據(jù)報中前4 個比特為協(xié)議版本號。
(3)指令號
采集終端與收集端通信過程中有許多不同的消息類型來應對不同的業(yè)務邏輯。例如,采集終端消息發(fā)送間隔會因需求不同而不同,對于發(fā)送間隔較短的一些應用使用長連接的方式會顯著降低TCP 連接構建過程中的網(wǎng)絡帶寬損耗;對于發(fā)送間隔較長的應用使用短連接的方式會降低收集端IO資源占用。因而需要針對不同的業(yè)務場景選擇合適的處理方式。指令號用來確定收集端的業(yè)務處理邏輯以及消息回傳的格式。
(4)數(shù)據(jù)長度
圖2 系統(tǒng)架構示意圖
圖3 通信協(xié)議結(jié)構示意圖
本文設計的通信協(xié)議是基于TCP 傳輸。網(wǎng)絡本身的不穩(wěn)定性,可能會導致消息重傳;以及某些情況下因單條消息過大而導致的TCP 底層拆包等意外情況的出現(xiàn)。為防止消息編解碼錯誤,本文使用在消息頭標識該條消息傳輸數(shù)據(jù)長度的方式來處理操作系統(tǒng)內(nèi)核對消息可能出現(xiàn)的拆包/粘包操作。相較于指定分隔符方式在二進制編解碼過程中的大概率重復導致拆包/粘包失敗以及固定長度方式的局限性,在消息頭標識消息總長度的方式是最佳選擇。此外Netty框架還對該種方式的工程實現(xiàn)提供了非常友好的支持。
(5)數(shù)據(jù)
數(shù)據(jù)即為消息實體實際所要傳輸?shù)膬?nèi)容。數(shù)據(jù)內(nèi)部的格式與指令號關聯(lián)性較強,不同的消息指令會有不同的數(shù)據(jù)格式。例如,心跳維護消息包中,數(shù)據(jù)這一部分為空,而在某些具體的業(yè)務消息中,數(shù)據(jù)就是采集終端實際所要傳輸?shù)母袷胶蛢?nèi)容。
本節(jié)主要介紹收集端大體的消息處理流程,對于采集終端的失敗重連機制,消息發(fā)送異常的多次重發(fā)機制等通信協(xié)議上處理流程不做過多介紹。
消息處理流程如圖4 所示。收集端服務在啟動時將綁定并監(jiān)聽一個網(wǎng)絡端口。當有連接發(fā)送新消息時,收集端將完成以下步驟:(1)若連接發(fā)送消息完成,將觸發(fā)收集端的讀數(shù)據(jù)操作。(2)對消息頭的校驗碼進行驗證,若驗證不通過則關閉連接,若驗證通過則繼續(xù)流程。(3)按照3.2節(jié)介紹的通信協(xié)議對讀取到二進制的數(shù)據(jù)進行解析,獲取到消息對象。(4)獲取消息指令,選擇與之對應的Handler。(5)在Handler 內(nèi)部進行業(yè)務處理。業(yè)務包含構建應答消息、日志記錄、按需將接收到的消息推送至Kafka等。
圖4 消息處理流程示意圖
為進一步提升收集端服務性能,本文針對以下幾個方面做出優(yōu)化:(1)Netty會為每個連接構建一個內(nèi)部節(jié)點為ChannelHandler類型的雙向鏈表pipeline,默認情況下數(shù)據(jù)會從鏈表頭節(jié)點依次傳輸?shù)芥湵砦补?jié)點,實際上多數(shù)業(yè)務不需要眾多中間節(jié)點處理,故使用ctx.write-AndFlush()方法改變事件傳播源能有效壓縮事件傳播路徑,加速數(shù)據(jù)流轉(zhuǎn)。(2)將業(yè)務邏輯與網(wǎng)絡、數(shù)據(jù)庫等耗費資源相關的ChannelHandler 分配到獨立的線程池工作,避免長時間程序阻塞。(3)調(diào)整部分Netty 與TCP網(wǎng)絡相關的啟動參數(shù)如設置TCP_NODELAY 來禁用nagle[17]算法降低延遲,設置SO_RCVBUF、SO_SNDBUF來調(diào)整接收緩沖區(qū)和發(fā)送緩沖區(qū)的大小避免擁塞。(4)收集端服務程序由java實現(xiàn),在程序運行期間JVM的垃圾回收機制會導致程序短暫的假死,設置JVM 參數(shù),合理分配堆、老年代、年輕代、堆外內(nèi)存大小避免頻繁Full GC。
使用一臺Dell Optiplex 9020 搭建消息收集端服務;在一臺Dell R720 上使用Xen 虛擬機構建集群環(huán)境并部署Kafka 和Zookeeper 應用。此外,還使用多臺壓測機器來模擬大量TCP 連接,以對系統(tǒng)進行性能測試。所有機器之間通過一個百兆交換機連接。消息收集端配置信息參數(shù)如表1所示。
表1 消息收集端機器配置信息
由Dell R720內(nèi)三個虛擬機組成的集群各節(jié)點環(huán)境參數(shù)如表2 所示。所有虛擬機節(jié)點的配置信息均一致。MySQL數(shù)據(jù)庫安裝在R720宿主機中。
表2 集群節(jié)點配置信息
消息收集端的并發(fā)處理能力與資源消耗情況將會極大限制整個系統(tǒng)的處理能力,因而需要測試在收集端性能以及當前的網(wǎng)絡環(huán)境。
使用多臺壓測機器與消息收集端建立若干個TCP連接,每個連接以1 s 為間隔向消息收集端發(fā)送數(shù)據(jù)實體為當前時間戳的數(shù)據(jù)包。收集端接收到數(shù)據(jù)包后將時間戳取出并返回至各自壓測機,壓測機接收到返回的數(shù)據(jù)包后取出原始的時間戳并與當前的系統(tǒng)時間比較,從而計算平均響應時間。以此來測試當前的網(wǎng)絡環(huán)境以及消息收集端的處理性能。在實驗時,需要修改操作系統(tǒng)的局部與全局文件句柄數(shù)限制,以防操作系統(tǒng)限制連接數(shù)量。
實驗結(jié)果如表3所示,隨著連接數(shù)增大QPS有所升高,但其占連接數(shù)的比例沒有太大變化。平均響應時間會隨著連接數(shù)的增加略有增大,需要注意的是響應時間會因網(wǎng)絡狀況而有所波動。內(nèi)存在不同的連接數(shù)下有所波動,是由于JVM 的垃圾回收機制所導致的。CPU資源的消耗隨著連接數(shù)的增大而升高。在實驗過程中發(fā)現(xiàn)當一臺壓測機連接數(shù)超過5 000 時,發(fā)現(xiàn)其響應時間大幅增加,其原因是壓測機的硬件資源消耗過多導致系統(tǒng)卡頓。在使用多臺壓測機并將連接分散到不同機器上后,響應時間恢復為正常狀態(tài),這也是6 000個TCP連接下響應時間略好于4 000個TCP連接的原因。
表3 消息收集端性能測試
實驗與4.2 節(jié)類似,使用多臺壓測機與消息收集端建立連接,并模擬發(fā)送真實的數(shù)據(jù)包,每個連接發(fā)送時間間隔為5s。收集端在接收到數(shù)據(jù)后將其推送到Kafka集群中,再由多個Kafka consumer將集群中數(shù)據(jù)消費到MySQL 數(shù)據(jù)庫中。平均響應時間獲取方式與4.2 節(jié)相同。以此驗證系統(tǒng)在局域網(wǎng)環(huán)境下不同連接數(shù)的性能。
圖5 響應時間對比圖
實驗結(jié)果如圖5 所示。平均響應時間保持在5 ms以下,說明在百兆局域網(wǎng)環(huán)境下系統(tǒng)具有良好的性能。與4.2 節(jié)對比發(fā)現(xiàn)其響應時間未明顯增加,其原因是使用異步方式將數(shù)據(jù)推送至Kafka 集群不會對線程運行造成過大影響。此外,又對不使用Kafka消息隊列的情況進行實驗,即消息收集端接收到數(shù)據(jù)后直接寫入數(shù)據(jù)庫。發(fā)現(xiàn)在不使用Kafka且連接數(shù)低于8 000的情況下系統(tǒng)性能正常,但當連接數(shù)為10 000 時,響應時間明顯上升,連接數(shù)在12 000 時響應時間更是超過1 000 ms,遠超出了實際應用的要求。
通過監(jiān)測收集端服務進程,發(fā)現(xiàn)連接數(shù)為12 000時,在不使用Kafka 的情況下,盡管使用了數(shù)據(jù)庫連接池技術,仍有大量線程阻塞,如圖6所示,每個線程阻塞時間占總運行時間35%左右。在線程阻塞的同時,還有大量數(shù)據(jù)不斷涌入,又進一步加劇了線程阻塞,進而導致響應時間急劇增加。
圖6 連接數(shù)為12 000不使用Kafka情況下線程阻塞情況
本文基于Netty 和Kafka 實現(xiàn)了一個面向物聯(lián)網(wǎng)應用的數(shù)據(jù)接入系統(tǒng)。簡潔的通信協(xié)議與諸多性能優(yōu)化措施使得系統(tǒng)在百兆局域網(wǎng)環(huán)境的測試下,可穩(wěn)定保持萬級別長連接和毫秒級別的響應速度,在物聯(lián)網(wǎng)數(shù)據(jù)接入服務與應用領域有一定的現(xiàn)實意義。但目前的工作僅是使用一個收集端服務,系統(tǒng)的整體性能會受限于此。下一步可搭建收集端服務集群,并利用負載均衡策略將采集終端的連接分散到集群的不同機器上去,以減少單臺機器硬件資源的限制對系統(tǒng)性能造成影響,從而滿足未來更大規(guī)模的應用。