田翠姣,蘇義武
(武漢大學中南醫院,湖北 武漢 430071)
2020年2月15日國務院醫療救治組發文《關于做好武漢市新冠肺炎患者醫療數據報送工作的通知》,通知要求武漢市全部新冠肺炎定點收治醫院自2月15日起每天中午12點前上報新冠肺炎在院病例和出院病例的全量醫療數據。數據包括但不限于電子病歷數據、病程記錄數據、護理記錄數據、檢驗信息數據、醫囑信息數據、重癥系統數據、影像報告系統數據、手術麻醉系統數據、病案首頁數據。
數據上報采用VPN+FTP方式,數據文件要求每張表數據存放到獨立的CSV格式文件,并且文件首行為數據表對應字段名,每行的各個字段之間使用Tab符( )分隔,數據字符集要求UTF-8,最后文件用gz壓縮為單個壓縮文件[1]。
按照要求對任務進行了分析,上傳數據文件要求為CSV格式文件。CSV格式文件是一種通用的、相對簡單的文件格式,在商業和科學中應用廣泛,其特點是分隔的數據格式,字段之間的分割格式有逗號分隔和空格分隔兩種[2-3]。數據內容方面主要涉及到9個信息系統,4種不同的數據庫,進一步分解任務后得出需要上報的表格數量為22個(見表1)。

表1 上報數據信息數據庫格式
按照以往提取數據的方式,決定先用最熟悉的Oracle數據庫做測試,使用Oracle自帶的PL/SQL工具查詢出數據后,直接導出CSV文件。經過測試發現這些CSV文件默認使用逗號分隔符,字符集為GB2312,需要使用特定工具進行轉換。此種方法不僅繁瑣,費時費力,而且會出現轉換失敗的情況。經過不斷摸索,最后選擇使用DataX工具進行數據批量提取的方法。
DataX是阿里巴巴集團內被廣泛使用的離線數據同步工具,能夠實現包括MySQL、Oracle、SQL Server、MongoDB、HDFS、Hive、OceanBase、HBase、OTS、ODPS等各種異構數據源之間高效的數據同步功能[4-6]。DataX將復雜的網狀的同步鏈路變成了星型數據鏈路,DataX作為中間傳輸載體負責連接各種數據源。當需要接入一個新的數據源的時候,只需要將此數據源對接到DataX,便能跟已有的數據源做到無縫數據同步。更重要的是下載即可用,配置簡單,支持Linux和Windows,只需要短短幾步就可以完成數據的傳輸[7]。
另外需要引入的工具是Python,Python是一種跨平臺的計算機程序設計語言,是一種面向對象的動態類型語言,最初被設計用于編寫自動化腳本(shell),隨著版本的不斷更新和語言新功能的添加,越來越多地被用于獨立的、大型項目的開發[8]。再加上Java運行環境,組成了DataX+Python+Java環境,該文以Windows10+Datax3.0+Python2.7+jdk1.8組合環境進行說明。
先安裝jdk環境并配置系統環境變量,然后缺省安裝Python軟件并配置系統環境變量,最后解壓Datax壓縮包到指定目錄(此處路徑為D:datax),至此環境配置完成,整個安裝過程為純傻瓜式安裝,沒有特殊設置,簡單易行。
在DataX框架中,數據交換通過DataX進行中轉,任何數據源只要和DataX連接上即可以和已實現的任意數據源同步。
DataX作為離線數據同步框架,采用Framework+plugins架構構建。將數據同步過程中的讀取和寫入過程抽象為內部Reader/Writer插件,如圖1所示。

圖1 DataX框架結構
Reader:數據采集模塊的抽象,負責采集數據源數據并發送給Framework。Writer:數據寫入模塊,不斷從Framework取數據并寫入目標端。Framework:作為Reader和Writer的數據傳輸通道,處理緩沖流控、并發、數據轉換等核心技術問題。
框架提供了簡單的接口與插件接入機制,只需要任意加上一種插件,就可以無縫對接其他數據源。開放式的框架也讓開發者可以在短時間內開發一個新的插件,以快速支持新的數據源同步,以下結合此次數據上報應用具體說明。
此外,DataX內部運行采用Job模式運行機制(如圖2所示),DataX完成單個數據同步的作業,稱之為Job,DataX接受到一個Job之后,將啟動一個進程來完成整個作業同步過程。DataX Job模塊是單個作業的中樞管理節點,承擔了數據清理、子任務切分(將單一作業計算轉化為多個子Task)、TaskGroup管理等功能。

圖2 DataX各功能模塊關系
DataX Job啟動后,會根據不同的源端切分策略,將Job切分成多個小的Task(子任務),以便于并發執行。Task便是DataX作業的最小單元,每一個Task都會負責一部分數據的同步工作。
切分多個Task之后,DataX Job會調用Scheduler模塊,根據配置的并發數據量,將拆分成的Task重新組合,組裝成TaskGroup(任務組)。每一個TaskGroup負責以一定的并發運行完畢分配好的所有Task,默認單個任務組的并發數量為5。
每一個Task都由TaskGroup負責啟動,Task啟動后,會固定啟動Reader→Channel→Writer的線程來完成任務同步工作。
DataX作業運行起來之后,Job監控并等待多個TaskGroup模塊任務完成,等待所有TaskGroup任務完成后Job成功退出。否則,異常退出,進程退出值非0。
DataX使用一個JSON文件來描述一個Job[9-10],通過配置JSON格式的文件,使用Python命令就可以啟動DataX執行任務。這個Job本身配置并不復雜,Reader插件和Writer插件基本就是配置連接數據庫使用的連接串、用戶名、密碼、查詢表、字段名。接著,按照JSON編碼規則[11-12],編寫如下腳本,文件名為demo.json,存儲目錄為D:datax{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
//數據庫用戶名
"username": "db_user",
//數據庫密碼
"password": "db_pass",
//以下為查詢字段,若查詢全部字段,可以使用*代表字段列表"
"column": ["patientno","deptcode","name","rptext"],
"connection": [
{
//數據源數據表名稱
"table": ["pacs_report"],
//數據庫連接串",更多串配置參考官網示例
//MySQL數據庫連接串
//"jdbcUrl": ["jdbc:mysql://IP地址:3306/database"]
//Oracle數據庫連接串
//"jdbc:oracle:thin:@IP地址:1521/實例名",
//SQLServer數據庫連接串
"jdbc:sqlserver://IP地址:3433;DatabaseName=dbname"
}
]
}
}
"writer": {
"name": "txtfilewriter",
"parameter": {
//導出數據文件輸出路徑
"path": "D:\datax\myoutput ",
//導出數據文件名稱
"fileName": "pacs_report",
//數據內容字符集
"encoding": "UTF-8",
//覆蓋之前同名文件模式
"writeMode": "truncate",
//日期格式
"dateFormat": "yyyy-MM-dd HH:mm:ss",
//導出數據文件格式
"fileFormat": "csv",
// csv分隔符為 ,即tab方式分割
"fieldDelimiter": " " ,
//壓縮格式gzip,能減少生成文件的大小
"compress": "gzip",
//表頭設置
"header":["patientno","deptcode","name","rptext"],
}
}
}
]
}
}
腳本運行也非常簡單,在CMD窗口模式下,輸入如下語句執行:python D:dataxindatax.py D:dataxpacs_report.json,其中datax.py為python安裝后自帶的解析腳本。整個命令大意為使用python命令利用datax.py腳本執行pacs_report.json配置的任務[13-14]。此時在JSON腳本中設置的目錄D:\datax\myoutput下會出現pacs_report.csv.gz壓縮文件,壓縮文件內為pacs_report.csv文件。其余的21個文件,按照相同的方式配置job,最后將所有運行腳本形成批處理文件,一鍵執行后生成所需22個數據文件。
經過自己檢測,數據內容、文件格式、分隔符、字符集、壓縮形式均符合上報要求,上報后經國家醫政醫管局工程師檢驗,亦沒有問題。效率方面,原先手工生成方式,每次都需要執行大量的SQL語句,并且需要轉換、壓縮、校驗,一個文件生成至少需要2分鐘,并且是在數據量較小的前提下,費時費力而且容易出錯。使用基于DataX的工具環境后,實際執行33萬條費用明細數據全表導出,只需要27秒即完成,全部22個數據文件生成平均花費3分鐘,極大提高了數據收集的效率。
隨著疫情發展,疫情數據報送內容、對象、形式不斷變化,高峰時期對外報送報表數目多達20余個。如每日8點郵件報送住院人數、發熱門診人數,每日10點網絡報送核酸檢測數量、中醫藥治療病例數,每日18點郵件報送住院病人明細等等(部分任務清單如表2所示)。報送對象涉及國家、省、市、區的衛計委相關部門或者各級防控指揮部。

表2 疫情數據上報任務清單
從表2任務清單可知,疫情數據上報大多采用網頁填報或郵箱報送方式,并且對實時性要求高。由于并未提供數據接口,初期采用人工方式從各信息系統將數據整理出來,十分繁瑣、耗時。而且涉及到的系統比較多,數據庫類型也各異,為此,利用DataX工具強大的離線同步功能將所需數據匯總到一個專用數據庫(此處使用Oracle)中,然后引入網頁數據填寫成熟技術或者Python工具,梳理出網頁填報數據與匯總數據庫中信息數據對應關系。進行數據統計上報時,自動提取匯總數據庫中相關數據,填充到衛生部門下發的各類疫情報表。最后系統啟動代碼或工具將數據填寫到網頁上的相應元素域或郵箱中,數據填寫完畢后,自動提交并退出網頁[15-16]。通過該方案實現了大部分數據的自動化填報,極大減少了數據上報的工作量。
精準及時的數據有利于疫情處理。此次疫情對醫療資源消耗巨大。為了提高床位利用效率、緩解醫療資源緊張的形勢,根據醫院差異化定位分級接診、促使患者快速分流勢在必行,這離不開全面數據分析的支持。
為了能夠讓院領導實時獲取所有院區各項關鍵指標,便于快速精準決策,工程師利用大數據分析手段開發了移動BI平臺,用于展示疫情專題數據(如圖3所示)。并在此基礎上開發出院隔離患者初篩報表,按照診療標準中的出院條件自動化標識擬離院患者,然后將報表實時推送給相關領導,使得管理者對全院心中有數、決策有據,為改善出院管理、床位調配、轉院交接等方面發揮了作用。重視效率的同時,質量和安全也毫不松懈,針對住院日、吸氧、ICU入住、轉歸等情況開展實時統計,如若發現負性指標及時通報同時追查。

圖3 方艙醫院疫情專題主界面
BI平臺需要實時抽取各類系統大量的數據,這涉及到多個無法回避的問題。第一,一些數據,比如患者重癥情況、吸氧情況、CT檢查結果、核酸檢查結果分布在不同的數據庫中,無法通過聯合SQL查詢的方法及時獲取數據;第二,由于數據的及時性要求非常高,如果采取直接從生產數據庫中抽取數據的方式,勢必會造成生產庫壓力驟增,進而影響全院信息系統的運行效率,從而對醫療業務產生影響。
DataX工具在此時發揮了巨大的作用,按照數據分型編寫好數據抽取腳本后,根據數據產生規律定制化Job計劃任務。數據實時同步到BI數據倉庫,BI前端展示只需要查詢數據倉庫即可快速查詢數據,而且由于是高效率地實施同步模式,數據延遲在可控范圍內。
疫情結束后,數據上報會停止,如果僅僅為了解決這一次上報問題,花費精力去配置工具有些得不償失,但是發現在醫院實際工作中經常有各種數據上報,上報數據格式大多數都是CSV格式。比如國家衛生統計4-1報表上報病案首頁數據,HQMS醫療數據上報、全國三級公立醫院績效考核醫療數據上報、流感數據上報等都涉及到CSV格式數據報送,特別是有一些數據還涉及到不同數據庫表聯合查詢,普通的數據庫查詢導出方式很難實現,且容易出現數據丟失、錯位的問題。實踐表明,DataX不僅能快速導出數據,而且能將不同數據庫的數據抽取到目標庫,形成數據整合,數據導出高效、準確,能很好地滿足醫療機構日常工作中各類數據上報需求。
DataX作為一個服務于大數據的ETL工具,除了提供數據快照搬遷功能之外,還提供了豐富數據轉換的功能,讓數據在傳輸過程中可以輕松完成數據脫敏、補全、過濾等數據轉換功能,另外還提供了函數,讓用戶自定義轉換函數。結合Python和JDK運行環境[17],不僅部署簡單,而且操作方便,能夠根據需要整合各類格式數據,甚至是txt文本、ftp文件,對于醫學統計專業常用的dbf格式文件,也能突破常規256列數據的限制,對于醫療大數據分析意義重大[18]。在此次新型冠狀病毒引起的肺炎疫情中,本院也將該方案推薦給其他醫療機構,及時、高效地完成了數據上報工作,為消滅疫情提供了堅實可靠的數據支撐。