林德煜
(中移互聯(lián)網(wǎng)有限公司,廣東 廣州 510653)
消息隊(duì)列中間件是一種為不同系統(tǒng)或同一系統(tǒng)內(nèi)不同模塊提供可靠異步網(wǎng)絡(luò)通信的分布式框架,接收來(lái)自上游服務(wù)的消息,存儲(chǔ)后轉(zhuǎn)發(fā)至下游服務(wù),在系統(tǒng)架構(gòu)中起著承上啟下的作用[1]。Kafka 是一個(gè)處理海量數(shù)據(jù)的分布式消息系統(tǒng),具有高效的數(shù)據(jù)傳輸速率,相對(duì)于其他的消息隊(duì)列系統(tǒng)具有較高的性能,采用發(fā)布/訂閱模式,具有較強(qiáng)的可靠性、海量數(shù)據(jù)處理能力以及可拓展性,是不少業(yè)務(wù)平臺(tái)選型隊(duì)列和削峰填谷功能的很好選擇[2]。
通常為了避免單點(diǎn)問(wèn)題,高并發(fā)業(yè)務(wù)平臺(tái)通常需要滿(mǎn)足多節(jié)點(diǎn)部署。例如,業(yè)務(wù)平臺(tái)有2 個(gè)節(jié)點(diǎn)A和節(jié)點(diǎn)B 同時(shí)對(duì)外提供服務(wù),節(jié)點(diǎn)A 出現(xiàn)故障需要容災(zāi)切換時(shí),通常會(huì)將網(wǎng)關(guān)入口全部切換到B 節(jié)點(diǎn),如果A 節(jié)點(diǎn)中Kafka 隊(duì)列存在未消費(fèi)消息時(shí),為了不影響業(yè)務(wù),需要將A 節(jié)點(diǎn)的未消費(fèi)數(shù)據(jù)在用戶(hù)無(wú)感知的情況下同步到B 節(jié)點(diǎn),并在B 節(jié)點(diǎn)繼續(xù)消費(fèi)。
Kafka 支持集群部署,它的數(shù)據(jù)由broker 負(fù)責(zé)存儲(chǔ)和同步。Kafka broker 可以對(duì)隊(duì)列(Topic)進(jìn)一步分片,Producer 負(fù)責(zé)向broker 推送數(shù)據(jù),Consumer 負(fù)責(zé)從broker 消費(fèi)數(shù)據(jù)。
Kafka 支持消費(fèi)者以不同的消費(fèi)組消費(fèi)相同的Topic,如圖1 所示。圖1 中:每個(gè)Topic 可以分為多個(gè)分區(qū),如P0、P1、P2、P3;每個(gè)服務(wù)器負(fù)責(zé)部分分區(qū),一個(gè)消費(fèi)組中只有一個(gè)消費(fèi)者能消費(fèi)到特定分區(qū),不同的消費(fèi)組內(nèi)消費(fèi)者可以重復(fù)消費(fèi)同一個(gè)特定分區(qū),消費(fèi)者可以同時(shí)消費(fèi)多個(gè)分區(qū)。

圖1 Kafka 多消費(fèi)組消費(fèi)
在實(shí)現(xiàn)數(shù)據(jù)同步的過(guò)程中,按照數(shù)據(jù)源節(jié)點(diǎn)的流量主要分為單向同步和雙向同步[3]。對(duì)于要求實(shí)現(xiàn)業(yè)務(wù)雙活節(jié)點(diǎn)的平臺(tái),一般需要實(shí)現(xiàn)雙向同步。
Kafka 數(shù)據(jù)包含隊(duì)列數(shù)據(jù)(包括隊(duì)列自身的Offset)和消費(fèi)組Offset 數(shù)據(jù)。這2 部分?jǐn)?shù)據(jù)的同步對(duì)一個(gè)高可用系統(tǒng)來(lái)說(shuō)至關(guān)重要。數(shù)據(jù)同步指當(dāng)某一節(jié)點(diǎn)服務(wù)器產(chǎn)生一條數(shù)據(jù)時(shí),需要把該數(shù)據(jù)實(shí)時(shí)同步到其他的節(jié)點(diǎn)中,以便其他節(jié)點(diǎn)完成必要的工作或提供相關(guān)服務(wù)[4]。基于性能考慮,通常采用定期同步的方式,將隊(duì)列數(shù)據(jù)和Offset 數(shù)據(jù)從A 節(jié)點(diǎn)同步到備用B 節(jié)點(diǎn)。
為實(shí)現(xiàn)Kafka 不同節(jié)點(diǎn)的數(shù)據(jù)同步,可以在2 個(gè)節(jié)點(diǎn)之間引入中間件,模擬Kafka A 節(jié)點(diǎn)的消費(fèi)者和B 節(jié)點(diǎn)的生產(chǎn)者,定期將Kafka 對(duì)應(yīng)的消息數(shù)據(jù)、Topic 最新Offset 和消費(fèi)組消費(fèi)Offset 同步到B 節(jié)點(diǎn)。
了解了Kafka 的消費(fèi)原理之后,可以選擇官方/開(kāi)源的第三方框架實(shí)現(xiàn)異地節(jié)點(diǎn),也可以自己開(kāi)發(fā)一套中間件實(shí)現(xiàn)類(lèi)似功能,二者的優(yōu)缺點(diǎn)都很明顯。
(1)選擇官方或開(kāi)源的第三方工具。其優(yōu)點(diǎn)是可靠性相對(duì)較高,具有一定的生態(tài)成熟度,資料文檔相對(duì)完善,接入業(yè)務(wù)的時(shí)間相對(duì)較短。其缺點(diǎn)是相比自研工具,工具版本的更新相對(duì)不可控。
(2)選擇自研工具。其優(yōu)點(diǎn)是版本更新迭代和代碼完全自主可控,缺點(diǎn)是開(kāi)發(fā)周期時(shí)間長(zhǎng),成熟度需要時(shí)間。
綜合時(shí)間、成熟度、業(yè)務(wù)需求考慮,本次研究采用了基于官方方案做優(yōu)化改進(jìn)的策略。
目前,最常用Kafka 跨節(jié)點(diǎn)同步工具是Kafka官方自帶的Mirror Maker。Mirror Maker 在異地?cái)?shù)據(jù)同步中廣泛使用,可靠性和成熟度較高。目前,Mirror Maker 最新的版本為Mirror Maker2(以下簡(jiǎn)稱(chēng)MM2)。MM2 基于Kafka Connect 實(shí)現(xiàn),支持跨節(jié)點(diǎn)復(fù)制Topics 數(shù)據(jù)以及配置信息,也支持復(fù)制消費(fèi)組及其消費(fèi)Topic 的Offset 信息;MM2 相比Mirror Maker有較大的優(yōu)化和改善,對(duì)于同一個(gè)Topic 在不同節(jié)點(diǎn)中配置不同的前綴,同步時(shí)識(shí)別消息歸屬,從而解決回環(huán)問(wèn)題。
通常Kafka 同步組件自身并不具備良好的進(jìn)度檢測(cè),僅監(jiān)控組件自身進(jìn)程無(wú)法確定Kafka 是否已完成數(shù)據(jù)的同步。在實(shí)際應(yīng)用中,引入一種基于滑動(dòng)時(shí)間窗口的同步延遲檢測(cè)算法,基于該算法開(kāi)發(fā)腳本工具M(jìn)Q_Sync_Monitor,只需要在源節(jié)點(diǎn)部署一套,負(fù)責(zé)從源節(jié)點(diǎn)A 到目標(biāo)節(jié)點(diǎn)B 的同步延遲檢測(cè)。
滑動(dòng)窗口指以固定窗口為單位不斷進(jìn)行更新,如果滑動(dòng)窗口已滿(mǎn),那么最先進(jìn)入滑動(dòng)窗口的一個(gè)固定窗口被刪除,滑動(dòng)窗口隨之更新一次[5]。
MM2 消費(fèi)組Offset 同步時(shí)間配置字段為sync.group.offsets.interval.seconds,定義定期同步的時(shí)間為mq_sync_interval_seconds(以下簡(jiǎn)稱(chēng)MQ 同步時(shí)間MST)。該參數(shù)通常配置等于sync.group.offsets.interval.seconds,MQ 同步工具負(fù)責(zé)啟動(dòng)MQ 數(shù)據(jù)的同步,因?yàn)樯婕癆 節(jié)點(diǎn)和B 節(jié)點(diǎn)兩邊跨節(jié)點(diǎn)的輸入輸出(Input/Output,I/O)操作,該操作通常需要超過(guò)1 s 才能完成。
本算法處理周期保持跟MST 一致,稱(chēng)其為算法處理時(shí)間PT。
MQ 同步的Offset 數(shù)據(jù)不一致:MQ 對(duì)應(yīng)的數(shù)據(jù)從A 節(jié)點(diǎn)同步到B 節(jié)點(diǎn)時(shí),假設(shè)A 的某個(gè)Topic 最大Offset 為A_maxOffsetLast=10 000,消費(fèi)者Z 對(duì)應(yīng)的消費(fèi)組Offset 為A_consumeOffset=8 000。
完全同步到B 節(jié)點(diǎn)之后,B 節(jié)點(diǎn)查詢(xún)得到的該Topic 最大Offset 和消費(fèi)者Z 對(duì)應(yīng)的消費(fèi)組Offset 可能為B_maxOffsetLast=5 000、B_consumeOffset=3 000。因此,A 和B 對(duì)應(yīng)的Offset 數(shù)據(jù)通常不對(duì)等。
算法原理:MQ 同步非實(shí)時(shí),所設(shè)計(jì)B 的跨度時(shí)間要包含A 的跨度時(shí)間,假設(shè)A 的Offset 增加值為A_Sub,如果B 在跨度時(shí)間內(nèi)的Offset 增加值小于A_Sub,則說(shuō)明存在同步延遲問(wèn)題。滑動(dòng)時(shí)間活動(dòng)窗口如圖2 所示。考慮間隔MPT 的同步操作可能剛好在算法處理時(shí)間PT 之前1 s 內(nèi)執(zhí)行,而通常同步操作可能需要超過(guò)1 s 才完成,所以即使是B 的2 次算法處理時(shí)間PT,Offset 同步上限仍然沒(méi)辦法確保包含A的1 次算法處理時(shí)間PT,因此需要計(jì)算B 的3 次算法處理時(shí)間PT。

圖2 檢測(cè)滑動(dòng)窗口
實(shí)現(xiàn)中以4 個(gè)MQ 同步時(shí)間MST 為滑動(dòng)時(shí)間窗口,假如MQ 的A 間隔一次算法處理時(shí)間PT 對(duì)應(yīng)的Offset 差有變動(dòng)(假設(shè)差值為A_Sub),而B(niǎo) 對(duì)應(yīng)的差值小于A_Sub,即B[times]-B[times-3]<A_SUB,則產(chǎn)生警告。
算法邏輯如圖3 所示。

圖3 MQ 同步延遲檢測(cè)
首先,進(jìn)行初始化。其代碼為
針對(duì)每個(gè)topic,包含4 個(gè)long 類(lèi)型數(shù)組和一個(gè)long 類(lèi)型參數(shù)
A_maxOffset//代表A 節(jié)點(diǎn)某topic 對(duì)應(yīng)的Offset
A_consumeOffset//代表A 消費(fèi)Offset
B_maxOffset//代表B 某個(gè)topic 對(duì)應(yīng)的Offset
B_consumeOffset//代表B 消費(fèi)Offset
times=0 算法的實(shí)現(xiàn)需考慮times 超過(guò)最大值的處理。同時(shí),建議數(shù)組采用循環(huán)數(shù)組方式,只保留最新4 個(gè)元素。
其次,定時(shí)執(zhí)行進(jìn)度監(jiān)控,間隔時(shí)間為PT,該值和MPT 時(shí)間保持一致。
通過(guò)調(diào)用MQ 提供的腳本查詢(xún)A 和B 節(jié)點(diǎn)MQ對(duì)應(yīng)Topic 的Offset 和對(duì)應(yīng)客戶(hù)端消費(fèi)組Offset。
對(duì)于單位時(shí)間內(nèi)A 節(jié)點(diǎn)自身的Offset 變化較小(如變化為0),為節(jié)省計(jì)算資源,可以選擇不做判斷處理,分別引入Min_Delay_Latest_Offset 和Min_Delay_Consume_Offset 作為topic 同一個(gè)分片最新Offset 和消費(fèi)組消費(fèi)Offset 同步延遲檢測(cè)判斷閾值。
如果times ≥3(B 的PT 最少要從3 開(kāi)始計(jì)算),則啟動(dòng)判斷:判斷A 節(jié)點(diǎn)的A_maxOffset[times-2]和A_maxOffset[times-3]的差值A(chǔ)_maxOffsetSub 是否大于Max_Delay_Max_Offset,如是,則判斷B_maxOffset[times]和B_maxOffset[times-3]的差值是否小于A_maxOffsetSub,如是,提示當(dāng)前topic 最大Offset 同步延遲告警;判斷A 節(jié)點(diǎn)的A_consumeOffset[times-2]和A_consumeOffset[times-3]的差值A(chǔ)_consumeOffsetSub是否大于Max_Delay_Max_Offset,如是,則判斷B_consumeOffset[times] 和B_consumeOffset[times-3] 的差值是否小于A_consumeOffsetSub,如是,提示當(dāng)前topic 消費(fèi)組Offset 同步延遲告警。無(wú)論如何,執(zhí)行times++。
在實(shí)際生產(chǎn)中,為了實(shí)現(xiàn)消息隊(duì)列跨節(jié)點(diǎn)容災(zāi),采用Kafka MM2 實(shí)現(xiàn)消息數(shù)據(jù)從源節(jié)點(diǎn)到目標(biāo)節(jié)點(diǎn)的同步。同時(shí),采用腳本語(yǔ)言,基于上述MQ 同步延遲檢測(cè)算法實(shí)現(xiàn)了MQ 同步延遲檢測(cè)工具M(jìn)Q _Sync_Monitor。
在每個(gè)源節(jié)點(diǎn)部署一套MQ _Sync_Monitor 腳本工具,負(fù)責(zé)同步延遲的檢測(cè)。需要注意的是,要實(shí)時(shí)監(jiān)控MQ 同步工具和MQ _Sync_Monitor 腳本工具自身進(jìn)程。生產(chǎn)中,可基于配置zabbix 或Prometheus 等工具實(shí)現(xiàn)進(jìn)程的監(jiān)控。通過(guò)實(shí)現(xiàn)該算法,當(dāng)MQ 出現(xiàn)容災(zāi)異地切換時(shí),可以較好地保障消息數(shù)據(jù)的一致性,從而保障業(yè)務(wù)的高可用性。
為了實(shí)現(xiàn)中移互聯(lián)網(wǎng)有限公司業(yè)務(wù)消息隊(duì)列異地容災(zāi),采用Kafka 官方MM2 同步工具實(shí)現(xiàn)源節(jié)點(diǎn)到目標(biāo)節(jié)點(diǎn)的數(shù)據(jù)同步,并引入了一種基于滑動(dòng)時(shí)間窗口的同步延遲檢測(cè)算法,基于該算法實(shí)現(xiàn)Kafka 跨節(jié)點(diǎn)同步檢測(cè)工具。該工具在生產(chǎn)實(shí)踐中很好地解決了高可用分布式系統(tǒng)中Kafka 集群跨節(jié)點(diǎn)同步延遲檢測(cè)的盲區(qū),并為內(nèi)部其他項(xiàng)目的容災(zāi)提供了新的思路和借鑒。