


摘 ?要:Apache Kafka作為一種分布式的消息隊列中間件,由于其具有高可靠性、高吞吐量、可持久化、可擴展性好等特點。在大數據項目中,如日志聚合、流數據處理等應用場景中被廣泛使用。由于Kafka的消息需要持久化到磁盤中,磁盤故障會影響Kafka的使用,嚴重時會造成數據丟失。所以基于Kafka的存儲特性,通過復盤和分析由于磁盤問題導致的Kafka集群故障,提出了一系列的磁盤故障處理方法,從而縮短Kafka集群故障的恢復時間。
關鍵詞:Kafka;分布式;消息隊列;磁盤故障;處理方法
中圖分類號:TQ587.22;TP309.3 ? ? 文獻標識碼:A 文章編號:2096-4706(2020)13-0148-03
Abstract:Apache Kafka is a distributed middleware used for message queue. It has merits of high reliability,high throughput,data persistence,good scalability,and therefore has be widely used in big data project such as log aggregation,streaming data processing and so on. The messages of Kafka are persisted to disk,so Kafka is not work when its disk malfunction. Some severe cases may result in subsequent loss of data. Therefore,based on the storage characteristics of Kafka,this paper proposes a series of methods to deal with the failure of Kafka cluster through the re-disk and analysis of Kafka cluster failure caused by disk problems,so as to shorten the recovery time of Kafka cluster failure.
Keywords:Kafka;distributed;message queue;disk malfunction;solution
0 ?引 ?言
Apache Kafka[1],最初由LinkedIn公司開發,并于2011年開源[2]。2012年被孵化成為Apache軟件基金會頂級項目。如今,Kafka應用于眾多大數據項目中,很多互聯網公司也在自己的生產環境中將Kafka作為消息中間件使用。
1 ?Kafka組件及架構
Kafka作為一種分布式的消息隊列中間件,部署多采用若干節點構成集群的方式。在這個Kafka集群中,每個節點被稱作Broker,可以理解為Kafka提供服務的一個實例。在消息(message)隊列系統中,通常都會有生產者(Producer)發送消息,消費者(Consumer)消費消息,這樣就構成了一個消息“流水線”的上下游,如圖1所示。每條被Producer發布到Kafka集群的消息都屬于一個Topic。
2 ?Kafka中的文件存儲介紹
Topic經過Producer發布到Kafka集群中,這條Topic會根據配置被劃分為多個分區(Partition),這些分區又會被均勻地分布到Kafka集群所有的Broker節點上。這樣做可以通過增加分區的數量來橫向增加Topic的存儲數據量,并且均勻分布也可以起到負載均衡的作用。
在存儲層面,任何發布到此分區的消息都會被追加(append)到數據文件的尾部,文件以“.log”為后綴。消息被追加到分區中因為是順序寫入(write)磁盤的,因此效率非常高。如圖2所示,圖中不同顏色的數據文件對應的是不同的分區數據,append操作正在寫入對應虛線數據文件。
除了log文件,分區中還有一個以“.index”為后綴的索引文件,它們共同組成段(Segment)文件。在分區中會存在多個段文件,它們大小相等,但其中包含的消息數不一定相等。這種特性方便舊的段文件可以被快速刪除,這樣可以清理空間供新的消息進行存儲,提高磁盤利用率。
作為分布式系統,Kafka在設計上也充分考慮了高可用,從Broker的多節點到Topic的多副本。Topic的副本機制則是通過分區的副本實現的,被稱為Replica,即在另一個或多個Broker節點上存在這個分區的副本。
3 ?故障復盤與分析
在公司某生產環境里的Kafka集群中,一個Broker節點的磁盤發生故障,導致這個Broker節點的進程退出[3],進而影響了Kafka中的某一個Topic的正常使用。
如果啟用副本,Kafka至少不會因為單個節點不能對外服務而發生Topic不能正常使用的情況,這就是Topic的高可用性。本次故障影響使用的主要原因就是Topic沒有設置副本,采用系統默認值1。在Broker節點發生磁盤故障停止服務時,由于這個Topic在故障Broker的分區沒有可以使用的副本,導致了此Topic不能正常寫入和消費數據的問題。
當發生磁盤故障,通??焖倩謴蚄afka服務的方法就是修改Kafka的server.properties配置中log.dirs參數,將故障磁盤從配置中刪除,Broker就可以啟動了。Broker啟動之后,節點上故障磁盤的分區會在此Broker的其他磁盤中創建。但對于這次的Kafka故障還遇到了下文提到的兩種意外情況。
3.1 ?啟動時觸發了特定版本Kafka的bug
啟動Broker時,日志出現異常報錯,顯示讀取index文件損壞,不能啟動,如圖3所示。遇到這種問題時一般是刪除拋出異常的index文件。
index文件存放的元數據指向對應的log文件中消息的物理偏移地址,如圖4所示。
那為什么index會發生損壞呢?這是因為index文件是一個索引文件映射,它不會對每條消息都建立索引,而是間隔indexIntervalBytes大小之后才寫入一條索引條目,所以是一個稀疏文件。Kafka運行時會創建一個log.index.size.max.bytes大小的index文件,向其中寫入稀疏索引,內容達到閾值后會進行滾動覆蓋。根據社區jira的內容[4],在Kafka非正常退出后會出現index損壞的情況,而在0.8及以前版本,Kafka在讀取這個損壞的index文件后會出現報錯退出無法啟動的問題,在0.9版本中對此問題進行了修復[5],處理的邏輯是自動清理這個文件后重建,不拋出異常。
3.2 ?轉移的分區將磁盤空間寫滿
將故障磁盤從配置文件中刪除后重新啟動Broker,故障磁盤中所有Topic的分區副本會在剩余磁盤中重新創建,并同步消息數據,此時出現了多個大數據量的分區副本被放入同一個磁盤中,導致磁盤空間被迅速寫滿。這種情況下,就不能再使用剔除磁盤的方法了。緊急處理時可以采取縮短Topic的保存時間,從物理上減小Topic數據大小,然后分階段刪除磁盤中過期數據,最后重啟Broker節點恢復。
4 ?實驗驗證
對于磁盤故障這類服務器常見問題,如何能將故障對Kafka集群的影響減少至最低是研究的重點。對此總結了如下的恢復步驟,并通過實驗進行了驗證,可供參考使用。
4.1 ?緊急恢復故障時可以剔除故障磁盤后重啟
Step1:刪除Kafka配置文件config/server.properties中損壞的磁盤(例如data2為故障磁盤,原配置為:“log.dirs=/data0/Kafka-logs,/data1/Kafka-logs,/data2/Kafka-logs,/data3/Kafka-logs...”,更改后配置為:“log.dirs=/data0/Kafka-logs,/ data1/Kafka-logs,/data3/Kafka-logs...”)。
Step2:重啟Kafka進程。
結果:原“/data2/Kafka-logs,”目錄下的分區會被重新分配到當前Broker的其他磁盤上。
影響:會產生數據傾斜的情況,大數據量的分區疊加到同一個磁盤,可能造成個別磁盤被寫滿。
4.2 ?最小化影響恢復故障
集群可允許一個Broker下線時,可暫不重啟Kafka進程,待磁盤更換完成后直接重啟Kafka進程。
如果當前Broker有多余的一塊磁盤作備盤。當Kafka進程下線時,修改配置文件config/server.properties(例如data2為故障磁盤,data9為備盤,原配置為:“log.dirs=/data0/Kafka-logs,/data1/Kafka-logs,/data2/Kafka-logs,/data3/Kafka-logs...”,替換后配置為:“log.dirs=/data0/Kafka-logs,/data1/Kafka-logs,/data9/Kafka-logs,/data3/Kafka-logs...”,用data9替換data2),直接啟動Kafka進程。之后在最近的系統維護周期時間點更換壞盤。
4.3 ?具體實驗驗證過程
環境:測試集群共3個Broker(Broker1至Broker3),每個主機上掛載5塊磁盤(data0至data4)。Kafka配置文件config/server.properties配置了4塊磁盤,即data0、data1、data2、data3,data4作為備盤。
Step1:新建測試Topic為testKafka,配置分區為12,副本數為2。此時分區均勻分布,每個Broker中8個分區。
Step2:測試Topic testKafka,進行正常的信息生產和消費,此時查看Broker3中data3目錄下面文件,存在2個分區,如圖5所示。
Step3:直接刪除data3目錄,模擬磁盤故障,Kafka進程退出。此時,修改Kafka配置文件config/server.properties,將data3換成data4,即四塊磁盤變成了data0、data1、data2、data4。
Step4:上述步驟完成后,重啟Broker3服務,此時會發現消費Topic數據時會有短暫告警打印,后續恢復正常。
結果:磁盤data3中的2個分區轉移到備用磁盤data4中,如圖6所示。
5 ?結 ?論
本文描述了在磁盤損壞后導致Kafka集群出現的幾種異常情況,提出了在這些情況下的幾種故障處理方法,并通過實驗進行模擬驗證。這些方法可以應用于日常運維Kafka集群的工作中,有效提高了Kafka集群可用性,為避免數據丟失提供了參考方案。
參考文獻:
[1] KREPS J,NARKHEDE N,RAOJ.Kafka:Adistributied messaging system for log processing [C]//Proceedings of the NetDB11.[S.l.:s.n.],2012:129-140.
[2] GOODHOPE K,KOSHY J,KREPS J,et a1. Building LinkedIns Real—time Activity Data Pipeline [J].Data Engineering,2012,35:33-45.
[3] ASF JIRA. Shutdown Kafka when there is any disk IO error [EB/OL].(2011-07-19).https://issues.apache.org/jira/browse/KAFKA-55.
[4] ACHANTA V S. Corrupt index after safe shutdown and restart [EB/OL].(2014-11-20).https://issues.apache.org/jira/browse/KAFKA-1791.
[5] PALINO T. Broker should automatically handle corrupt index files [EB/OL].(2015-03-09).https://issues.apache.org/jira/browse/KAFKA-2012.
作者簡介:汪濤(1990—),男,漢族,江西九江人,工程師,碩士,研究方向:系統運維。