




摘要:面對部分國產非公開數據庫日志格式的數據庫數據高實時共享同步問題,文章設計了一種基于ThreadLocal和雙切面的數據同步共享方案。通過在Dao層切面采集Web請求響應過程中對數據庫的所有變更記錄,暫存在ThreadLocal中;在Controller層切面攔截響應,從ThreadLocal提取出所有變更記錄,最后通過傳輸管理模塊將變更記錄共享發送給其他系統,實現數據庫數據的增量共享。實驗對比發現方案采集時間、發送時間相對于業務操作時間占比極低,這是因為數據采集、發送等操作均在本地內存中進行,而業務操作依賴于數據庫網絡IO,所以該方案能夠實現國產數據庫數據同步共享,具備較高的實時性和較低的侵入性。
關鍵詞:數據共享;切面;ThreadLocal;國產數據庫;高實時
中圖分類號:TP39 "文獻標志碼:A
0 引言
數據是維系企業生存的“血液”[1],要想提高企業生命力,必須重視數據的同步共享。通常的數據同步共享方式可分為同源數據同步和異構數據同步2種。同源數據庫解決方案主要包括基于數據庫觸發器或者數據庫日志系統進行數據同步共享,如劉鵬等[2]利用canal基于數據庫日志系統實現數據雙向同步;異構數據同步解決方案主要為基于ETL工具進行數據同步,如陶慧玲等[3]采用DataX進行異構數據同步,王天舉等[4]基于Kettle實現鐵路系統異構數據同步。
在國產化背景下使用國產關系型數據庫存儲業務數據,根據業務需要將國產關系型數據庫中數據實時同步共享出去。其中,基于數據庫觸發器數據同步方式耗費數據庫資源高[5],基于數據庫日志系統同步方式不適用于非公開數據庫日志格式的場景[6],基于ETL異構數據源共享的機制決定了其難以解決高實時性問題[7]。而筆者面臨場景就是常規的Web請求下,使用國產關系型數據庫持久化業務數據,但并未提供數據同步工具,也未開放數據庫日志格式,同時又對數據實時共享要求非常高。傳統的方法不管是ETL還是利用公開日志格式進行數據同步共享均不能很好地滿足需求,因此需要設計一種新的高實時非公開日志格式的國產關系型數據庫同步共享方案,該方案要具較高的實時性同時對業務代碼侵入較低。
1 原理
1.1 相關理論
在典型的Web請求-響應流程中,一個Web Request進入服務端到Response返回前端的生命周期中,依次經過Controller、Service、Dao等組件層,通過Dao層將數據變更寫入數據庫,再反過來經Dao、Service、Controller層,完成一次Web請求響應過程。通常請求-響應模式中,為提高響應效率,服務端請求響應線程由服務端從線程池中分配而來,響應結束后又歸還給服務端線程池,由單個線程完成整個處理過程。該方案的技術路徑是通過依次采集該Web請求響應過程中對數據庫的所有變更記錄(如增加、修改、刪除等操作),然后將變更記錄同步共享出去,完成數據的同步共享,整體過程如圖1所示。
如果直接在程序中通過編碼的方式記錄下對數據庫的操作,無疑會對原有業務代碼侵入較大,導致業務邏輯不清晰,后期維護成本增大,因此須要采用 Aspect Oriented Programming(AOP),即面向切面編程[8]。將與核心業務無關卻多次使用的功能模塊抽取分離成切面,和核心業務模塊進行解耦,降低代碼復雜度,提高程序的后期可維護性[9]。現代程序大量使用切面編程方式,在實現一些與核心業務弱相關功能的同時又降低了程序的復雜性。這里將數據庫變更記錄和采集過程使用AOP技術從業務中剝離出來,能夠提高程序可讀性以及可維護性。
為了提高程序的實時性,結合請求-響應的單線程過程,這里采用線程局部量(ThreadLocal)技術。通過在線程內部存儲變量,能夠減少同一個線程內多個函數或者組件之間一些公共變量傳遞的復雜度[10]。白曉濤等[11]用ThreadLocal存儲當前用戶信息,方便處理過程中進行存儲使用。線程局部量存儲的數據可以很方便地在本線程運行過程獲取,不會受到其他線程的影響。因此,使用ThreadLocal存儲數據庫變更記錄,能夠提高程序的響應速度,同時進一步降低對業務代碼的侵入。
1.2 實現原理
該解決方案由Controller層切面、Dao層切面、ThreadLocal管理、數據傳輸管理等模塊組成,如圖2所示。
Controller層切面架設在Controller和Service之間,攔截進入的請求,初始化ThreadLocal,清理該ThreadLocal中的臟數據(來自線程池中的復用線程,會攜帶上次請求處理中的臟數據)。Dao層切面架設在Service和Dao之間,采集Dao層對數據庫所做的所有變更操作,抽象出變更的數據內容(add操作、delete操作、modify操作)并將其存儲在線程局部量ThreadLocal中,以便于后續的數據打包發送。在請求返回Controller層切面時,提取Dao層切面采集的變更記錄數據,將其序列化打包,通過數據傳輸模塊將數據變更記錄發送出去,通過Kafka或者其他數據中心將數據變更記錄同步到共享出去,通過這4個模塊的配合實現高實時國產數據庫數據同步共享功能,具體實現原理如圖3所示。
2 技術實現與實驗
2.1 偽代碼
筆者使用Spring后端服務框架,實現Controller層切面、Dao層切面,ThreadLocal管理、數據傳輸管理等模塊。
2.1.1 ThreadLocal管理
線程局部量ThreadLocal中保存的是解決方案的核心數據結構,包含操作的數據對象(table)、操作方式(add、delete、modify)、操作內容(變更的內容)等信息。通過ThreadLocalDataHolder管理器可以方便地在同一個線程中任何時間地點存入和取出變更數據內容,進一步降低程序內部邏輯的耦合。
public class ThreadLocalData {//線程局部量
private Listlt;DataContentOperatergt; dataOperaterLst = new ArrayList();//數據操作集合
private String operaterDescription;// 數據操作說明
public class DataContentOperater {
private int operator = -1;// 操作標識,1:新增 2更新 3刪除
private Object object;// 操作實體
private String themeName;// 操作表對象
}
}
public class ThreadLocalDataHolder {//ThreadLocal管理器
private static final ThreadLocallt;ThreadLocalDatagt; THREAD_threadLocalData = new ThreadLocal() {
protected ThreadLocalData initialValue() {
return new ThreadLocalData(); }
};
public static ThreadLocalData getThreadLocalData() {
return (ThreadLocalData)THREAD_threadLocalData.get();
}
}
2.1.2 Controller層切面
Controller層切面攔截Web請求,初始化ThreadLocal變量,在請求返回時,提取ThreadLocal變量中保存的數據變更記錄,然后將變更記錄數據發送出去。在該模塊能夠計算出整體耗時。
@Aspect
public class ControllerAop {//Controller層切面
@Autowired
SendService mSendService;
@Pointcut(\"execution(* controller.*.*(..)) \")
public void Pointcut() {}//Cortoller層切面定義,攔截切面數據
@Around(\"Pointcut()\")
public ReturnBean Around(ProceedingJoinPoint point) {
ReturnBean ret = 1;
long startTime = System.currentTimeMillis();
Object returnObject = point.proceed(); //調用業務邏輯
ret = (ReturnBean)returnObject;
if (ret.getCode() == 200) {//處理失敗,無須處理數據
return ret;
} else {
this.mSendService.sendMessgae(); //同步共享數據
long AllTime = System.currentTimeMillis() -startTime;//計算整體耗時
}
return ret;
}
}
2.1.3 Dao層切面
Dao層切面攔截數據庫操作,通過反射技術采集增加、修改、刪除等操作的數據庫對象[12],保存在ThreadLocal中。記錄單次數據庫操作的時間以及單次采集時間。
@Aspect
public class DaoAop {//Dao層切面
@Pointcut(\"execution(* *.dao.*.*(..))\")
public void Pointcut() {} //攔截所有save、update、delete方法
@After(\"Pointcut()\")
public void Around(JoinPoint point) {
long startTime = System.currentTimeMillis();//單次業務操作數據庫耗時
Object returnObject = point.proceed();
long endTime = System.currentTimeMillis();
long businessTime = System.currentTimeMillis() -startTime;
MethodSignature signature = (MethodSignature)point.getSignature();
Method method = signature.getMethod();
Object[] args = point.getArgs();
if (!BlankUtil.isBlank(args)) {
DataContentOperater datacontentoperater = new DataContentOperater();
String methodName = method.getName();
Classlt;?gt; clazz = point.getTarget().getClass();
datacontentoperater.setThemeName(clazz.getSimpleName());
if (methodName.contains(\"save\")) {//提取add數據
datacontentoperater.setOperator(1);
datacontentoperater.setObject(args[0]);
} else if (methodName.contains(\"update\")) {//提取modify數據
datacontentoperater.setOperator(2);
datacontentoperater.setObject(args[0]);
} else if (methodName.contains(\"delete\")) {//提取delete數據
datacontentoperater.setOperator(3);
datacontentoperater.setObject(args[0]);
}
if (datacontentoperater.getOperator() != -1) {
ThreadLocalDataHolder.getThreadLocalData().getDataOperaterLst()
.add(datacontentoperater);
}
}
long extractTime = System.currentTimeMillis()-endTime; //采集耗時
}
}
2.1.4 數據傳輸管理
數據傳輸管理是對數據進行打包發送,該方案采用多線程技術進行優化,將打包和發送過程放在單獨的發送線程中,和請求響應線程隔離開,以進一步降低對程序的侵入影響,縮短程序返回時間,提高程序響應速度。
public class SendDataService {//數據傳輸管理
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS);
@Autowire
DataSenderProxy proxy;
public void sendMessgae() {
ThreadLocalData threadLocalData = Thread LocalDataHolder
.getThreadLocalData(); ""http://提取線程局部數據
long startTime = System.currentTimeMillis();
Runnable task = new SendDataService.sendDataTask(data, proxy);
this.executor.execute(task);
long sendTime = System.currentTimeMillis() -startTime;//發送時間
}
private class SendDataTask implements Runnable {
private ThreadLocalData data;//數據內容
private DataSenderProxy proxy; //數據中心代理
public SendDataTask(ThreadLocalData data, DataSenderProxy proxy) {
this.data = data; this.proxy = proxy;
}
public void run() {
Codec codec = ProtobufProxy.create(ThreadLocalData.class);
byte[] dateByte =codec.encode(data);
proxy.send(dateByte); //數據發送--
}
}
}
2.2 實驗過程
以Add場景為例,通過swagger頁面[13],模擬業務請求,并在Service層循環調用Add操作,調用次數分別為100次、200次、300次,400次、500次,采集整體運行時間、業務操作時間(對單次Add操作數據庫時間進行累加)、數據采集時間(對單次Add操作數據采集時間進行累加)、數據發送時間,計算數據采集時間、數據發送時間、業務處理時間相對于整體運行時間占比,詳細的實驗過程如圖4所示,所得結果如表1所示。
2.3 實驗結果分析
執行Add操作100次、200次、300次,400次、500次,業務操作時間/整體運行時間從98.98%上漲至99.90%,采集和發送時間相對整體運行時間的占比隨著操作次數增多,整體呈下降趨勢,從不足1%到不足0.1%,說明該方案對原有程序的影響較低,業務邏輯越復雜,影響越低。
業務處理過程依賴于數據庫網絡IO,該過程較為耗時;數據的采集過程完全在內存中進行,采集的數據是原始數據的引用,因此數據采集時間占比較低;數據發送過程,對傳輸發送過程進行多線程優化,發送過程中數據打包以及網絡傳輸等耗時操作均在多線程中進行,不占用原有請求響應線程時間,因此發送時間占比也較低。綜上,該方案利用雙切面和ThreadLocal實現了國產數據庫數據同步共享,具備較高的實時性和較低的侵入性。
3 結語
在國產化背景下,面對部分未開放數據庫日志格式國產數據庫系統,難以高實時共享同步數據庫數據的問題,本文設計了一種高實時國產數據庫同步共享方案,在Dao層切面采集Web請求響應過程中對數據庫的所有變更記錄并放入ThreadLocal;在Controller層切面攔截響應,從ThreadLocal提取出所有變更記錄,最后通過傳輸管理模塊將變更記錄同步共享給其他系統,實現數據的高實時性增量共享。通過實驗對比發現采集時間、發送時間相對于業務操作時間占比極低,這是因為數據采集、發送等操作均在本地內存中進行,而業務操作依賴于數據庫網絡IO,所以該方案能夠實現國產數據庫數據同步共享,同時具備較高的實時性和較低的侵入性。
參考文獻
[1]彭雅芳.FC上舞動的NVMe:存儲部署的創新之舉[J].計算機與網絡,2015(24):62.
[2]劉鵬,李凡平,王堃.一種基于canal的數據庫雙向同步方法,介質及設備:CN202211313441.8[P].2024-06-09.
[3]陶慧玲,馬依琳,王曄,等.基于微服務的研究生信息系統數據同步方案研究與設計[J].華東師范大學學報(自然科學版),2024(2):42-52.
[4]王天舉,許丹亞,尹文志,等.基于Kettle的鐵路數據接入的設計與實現[J].無線互聯科技,2023(8):79-82.
[5]劉勝.基于增量ETL的分布式數據交換平臺的研究與實現[D].長沙:國防科學技術大學,2024.
[6]滕歡.基于Spring3的數據讀寫分離技術研究[D].哈爾濱:哈爾濱工程大學,2016.
[7]龍濤,戴牡紅.Improving the Quality of Data in the Data Warehouse:2009國際信息技術與應用論壇論文集(下)[C].重慶:計算機科學,2009.
[8]PATEL S,KATIYAR S K,SHARMA N.Metric Analysis for AOP and OOP programming paradigm[J].Journal of The Institution of Engineers,2023(1):215-220.
[9]唐瑤.基于AOP攔截技術的精準信息推送服務研究[D].哈爾濱:哈爾濱工程大學,2016.
[10]魏可源.深入分析ThreadLocal內存泄漏問題[J].計算機與網絡,2018(21):56.
[11]白曉濤,陶智勇.高效Java后臺程序緩存用戶信息的研究[J].網絡新媒體技術,2020(5):35-38.
[12]周志明.深入理解Java虛擬機[M].北京:機械工業出版社,2011.
[13]王志軍,姚文達.Web API后端接口管理與應用[J].智能計算機與應用,2024(5):247-251.
(編輯 "沈 強)
High real-time data synchronization sharing scheme of domestic database
CUI" Yu, HE" Wei, LI" Gaoshang, YAO" Wanhua, YAO" Ke
(The 28th Research Institute of China Electronics Technology Group Corporation, Nanjing 210028, China)
Abstract: Faced with the problem of high real-time sharing and synchronization of database data in log format of some domestic non-public databases, a data synchronization sharing scheme based on ThreadLocal and dual aspects was designed. Collect all change records to the database in the process of Web request response in the Dao layer aspect, and temporarily store them in ThreadLocal; Intercept the response at the Controller layer, extract all change records from ThreadLocal, and finally share the change records to other systems through the transmission management module to achieve incremental sharing of database data. Through experimental comparison, it is found that the acquisition time and transmission time of the scheme account for a very low proportion compared with the business operation time. This is because the data acquisition and transmission operations are carried out in the local memory, and the business operation depends on the database network IO. Therefore, the scheme can realize the synchronous sharing of data in the domestic database, with high real-time and low intrusion.
Key words: data sharing; aspect; ThreadLocal; domestic database; high real-time