時曉旭 賴俊業 謝江 余燁 黃逸飛



摘要:隨著大數據時代的到來,數據處理的需求不斷增長,尤其是對傳感器產生的海量數據的處理需求,以及數據生產和消費之間的速度不同步問題日益突出。同時數據安全問題日益受到關注,國產操作系統的應用也逐漸增多。因此在國產操作系統環境下,需要一款國產大數據中間件解決數據生產和消費之間速度差的問題。基于Java技術,采用發布-訂閱模型作為設計模式,實現了消息的發送者和接收者可以獨立地演化和擴展,解耦數據的發送和接收過程,有效平衡數據生產和消費之間速度差。
關鍵詞:大數據;數據交換中間件;國產操作系統;傳感器數據
中圖分類號:TP311.1? ? ? 文獻標識碼:A
文章編號:1009-3044(2024)08-0052-04
開放科學(資源服務)標識碼(OSID)
0 引言
隨著物聯網的迅速發展和傳感器數據的激增,數據生產和消費之間的速度不同步問題日益突出。傳統的數據緩沖方式之一是將數據暫時存放在數據庫。然而,由于不同的應用系統需要調用數據庫中數據,可能導致存在數據泄露的風險,存在安全隱患[1]。同時,頻繁的I/O操作也會導致數據庫讀寫性能下降[2]。Kafka是目前業界常用的開源數據交換中間件,它由Apache軟件基金開發,能夠進行分布式流處理[3]。但其是由國外廠商開發,對于中國特有的應用場景可能存在一定的適配問題,并且可能會面臨技術授權的問題,進而影響后續的軟件升級和服務,同時也存在數據隱私和安全方面的風險。因此,本文基于Java技術,采用發布-訂閱模式,開發適用于國產操作系統的數據交換中間件。
1 中間件設計采用的相關技術和數據格式
1.1 Java
Java是一個廣泛應用于軟件開發的編程語言。選擇Java作為開發語言最主要的原因是其具有跨平臺特性。Java的跨平臺特性源于其獨特的編譯和執行方式。Java源代碼首先被編譯成字節碼而不是機器碼,然后在目標平臺上由Java虛擬機(JVM) 解釋執行字節碼。這種機制使得Java程序能夠在安裝了Java虛擬機的任何平臺上運行,而無須重新編譯[4]。
1.2 發布-訂閱模式
發布-訂閱模式是一種常用的軟件設計模式,用于實現組件之間的松耦合通信。在該模式中,通常存在兩個主要角色:發布者和訂閱者。發布者負責產生事件或消息,并將其發送到一個或多個訂閱者。訂閱者則注冊自己對特定類型的事件或消息感興趣,并在發布者發送相應事件或消息時進行相應的處理。發布-訂閱模式的核心思想為解耦發布者和訂閱者之間的關系,使它們能夠獨立地演化。發布者不需要知道訂閱者的存在,也不需要關心具體的訂閱者是誰。同理,訂閱者也不需要知道發布者的存在,只需要注冊自己感興趣的事件或消息即可。這種解耦使得系統更加靈活和可擴展[5]。在發布-訂閱模式中,發布者和訂閱者之間通過一個稱為消息隊列或事件總線的中介來進行通信。發布者將事件或消息發送到消息隊列或事件總線中,而訂閱者則從中獲取感興趣的事件或消息進行處理。這種中介的存在使得發布者和訂閱者之間的通信變得簡單和高效。發布-訂閱模式在許多領域都有廣泛的應用,例如消息中間件、事件驅動系統、GUI開發等。它能夠有效地解耦組件之間的依賴關系,提高系統的可維護性和可擴展性。同時,它還能夠實現異步通信,提高系統的響應性和性能。
1.3 數據格式
該中間件主要面對的數據為傳感器產生的檢測數據。在傳感器端的服務器中,會將數據進行預處理操作,將數據格式轉化為JSON格式,每條數據將包含以下內容:時間戳(記錄數據采集的時間點)、傳感器類型、采樣頻率、數據值和數據單位。
2 系統架構設計
中間件總體結構如圖1所示,主要由三大模塊構成:生產者、消費者和數據交換池。生產者是向數據交換池發送數據的客戶端應用程序,同時對外提供開發和調用接口,開發者可以調用其中包含的接口,向數據交換池發送數據或者設置數據交換池參數;消費者是一個從數據交換池中獲取數據的客戶端應用程序,同時對外提供開發和調用接口,開發者可以調用其中包含的接口,從數據交換池拉取數據;數據交換池部署于服務器中,主要作用為緩存數據和多線程處理數據讀寫任務,是中間件的核心模塊。
2.1 生產者模塊
生產者模塊結構如圖2所示。生產者是為開發者提供的接口集合,開發者通過調用生產者提供的接口,將數據發布到數據交換池相應的Class中,通過配置消息分區方式和選擇合適的消息序列化器實現可靠性設置,包括確認機制和重試策略。首先是開發者通過生產者創建Class,用戶向數據交換池發送帶有參數的請求,參數包括Class的名稱以及分區數量等,以便在數據交換池開辟空間和設定數據管理的模式,同時向Class目錄表添加新的Class記錄;數據分區Class創建完成后,開發者通過調用生產者程序向指定的Class里寫入數據。當數據開始流入時,首先要經過攔截器處理,攔截器的作用為:第一對數據的大小進行限制,若超過規定大小就要進行數據切分操作,第二對流入的每一條數據消息數據附加上事件時間戳(event time) 和UUID,形成每條數據的標識id,后期將以標識id作為數據的唯一標識對數據進行相關處理。
生產者與服務器的通信是基于Java的Socket通信和消息隊列來實現的。當數據流出攔截器,會由sender線程發送到服務器,當服務器通過應答機制確認收到數據后會反饋給sender然后繼續數據傳輸,若服務器沒有收到數據服務器也會反饋給sender,sender就會重新發送數據,這里sender中設置最大重試次數和最大重試延遲時間。
應答機制具體內容:該應答機制支持三種模式,1) 當參數為0:生產者將不會等待任何確認信號,直接將消息發送到數據交換池并認為發送成功。這種方式是最快的,但是也最不可靠,因為如果消息未處理成功,則無法得知。2) 當參數為1:生產者會等待來自數據交換池中分區的確認信號。當分區成功寫入消息后,生產者會收到確認信號。這種方式比參數0更可靠,但仍可能出現數據丟失的情況。3) 當參數為all或者ac當參數為-1:生產者會等待所有的副本都完成了消息的寫入才會收到確認信號。這種方式是最可靠的,但同時也會影響性能,因為需要等待所有副本的確認。
2.2 消費者模塊
消費者結構如圖3所示。消費者是為開發者提供的接口集合,開發者通過調用消費者提供的接口,訂閱服務器中的Class,并從指定的Class中讀取消息,可以配置消息消費的起始偏移量、消費組等參數。開發者調用消費者接口向數據交換池發送消費數據請求,請求的參數包括要消費的Class名稱等。消費請求經過Socket通信進入請求隊列中等待服務器響應。
數據交換池響應后,首先到Class目錄表中查詢是否存在對應的Class,若不存在則報錯,若存在,繼續查詢消費表中要消費的Class的分區是否正在被同一消費者組中的consumer消費。若該分區正在被消費,則需要在Class下選擇其他未被占用的分區進行消費,同時在消費表中生成一條記錄,記錄包括本次消費的Class和分區的名稱,以及讀取的最后一條數據的內容及時間等。數據讀出后經過攔截器(這里的攔截器的主要作用為去除數據上的標識id) ,然后傳遞到開發者手中。
2.3 數據交換池模塊
數據交換池結構圖如圖4所示。數據交換池部署于中央服務器,數據交換池內部的數據分類和分區設置是由開發者調用生產者相關接口進行設置的。開發者通過生產者上傳的數據按照一定的規則存放在指定空間中,并按照開發者的設置定期清理數據。當開發者通過消費者提出消費數據的請求時,數據交換池會按照請求準備好相關數據。
數據交換池采用的存儲策略為時間窗口策略。當任意一條數據進入服務器時,其自身攜帶的標識id已經帶有自身的時間戳(event time),是每條數據的儲存標識。用戶可以通過指定時間段,也可以使用中間件默認的時間段,將同一時間段內接收到的數據寫入同一個文件中。
在數據交換池內部采用以下存儲模式:當開發者通過調用生產者相應接口,向數據交換池發送創建Class命令時,數據交換池會創建Class文件夾(Class名稱由用戶指定),在Class文件夾下又會建立幾個分區文件夾(數量由用戶決定),分區文件夾中存放著寫有數據的文本文件。每個數據文本文件的名稱以文件中所有數據里最小event time命名,分區文件夾名稱以分區文件夾中最小數據文本文件名稱命名。
數據交換池采取多種措施保障其安全、穩定和可靠:1) 備份機制。為保障數據可靠性,為每個Class創建相同結構的備份文件。2) 日志文件機制。數據交換池內部存在兩張表,分別是Class目錄表和消費表,由數據交換池內部的listener負責管理。Class目錄表記錄生產者所有的操作,以及服務器上所有Class的名稱、其創建的時間、最新一條數據的標識id和哪些分區文件夾正在寫入,目的是快速地查詢所需的Class和內部數據的更新情況,同時保證分區文件夾的負載均衡。消費表記錄消費者所有的操作,以及正在被消費的分區文件夾、正在消費的消費者信息、每個消費者每次消費的Class名稱和最后一條消費數據等消費信息,此表的目的是保證在任何時間點只有一個消費者可以處理特定的分區。3) 內存緩沖讀寫機制。當生產者對數據文本文件進行寫操作時,為每個生產者在內存中開辟一個長度可變的緩存空間。先將輸入的數據寫入緩存中,等待寫入操作完畢后再從內存空間寫入數據文本文件,并回收內存空間。這樣既能夠保證多并發操作,又不會降低寫入速度。4) 消費者組機制。若多個消費者消費同一個Class的數據,則將這些消費者劃為同一個消費者組并分配group id,確保每個分區只被消費者組中的一個消費者消費。在任何時間點只有一個消費者可以讀取特定的分區文件夾,并且如果該消費者失敗或離線,則另一個消費者可以立即接管。5) 定期刪除機制。數據交換池會保存一定時間內的數據,并對過期數據進行定期刪除,保存數據的時間可由開發者調用生產者相應接口設置。6) 監聽者機制。在交換池內部設有一個監聽程序(listener) ,主要負責監控并記錄生產者和消費者的所有操作,以及對Class目錄和消費表的管理。
2.4 系統測試
服務器測試環境為麒麟銀河V10操作系統,系統界面如圖5所示。測試環境通過VMware Workstation Pro軟件搭建,配置了2個處理器,每個處理器擁有4個內核,分配了8GB內存和50GB的硬盤。在此環境中部署中間件的交換池模塊,并創建用于測試的Class分區。
使用Java編寫一個隨機數據生成程序,通過調用生產者提供的相關接口向數據交換池中寫入數據。數據生成程序的可設定參數包括每秒生成數據條數,數據格式為JSON,每條數據包含時間戳(記錄數據采集時間)、傳感器類型、采樣率、數據值和數據單位。測試結果如表1所示。
使用Java編寫程序,模擬實際情況中通過調用消費者提供的相關接口,從數據交換池中讀取數據。測試結果表2所示。
3 結束語
本文主要圍繞數據中間件的設計和功能實現展開論述,包括所采用的相關技術、理論概念,以及中間件的系統架構設計和最終測試。該中間件具備適配國產操作系統的能力,能夠滿足本土化需求,為國內相關企業在選擇同類型中間件時提供更多選擇。未來,該中間件的消費者模塊可擴展鏈接不同數據庫的接口,使開發人員能夠更便捷地將數據直接存儲到數據庫中。
參考文獻:
[1] 郭瓊.計算機數據庫的信息安全管理策略分析[J].電子技術,2023,52(10):326-327.
[2] 張志強,王偉鈞,周利軍,等.數據庫讀寫策略在文本挖掘中的優化研究[J].成都大學學報(自然科學版),2015,34(3):262-265,274.
[3] 余建忠,譚任深.基于Kafka的海上風電場數據傳輸系統設計與實現[J].科技創新與應用,2023,13(31):26-31.
[4] 董正言.Java跨平臺特性的實現原理[J].科技資訊,2014,12(18):20-21.
[5] 吳雯君.基于模式挖掘的發布/訂閱分布式系統異常檢測技術研究[D].南京:東南大學,2021.
【通聯編輯:謝媛媛】