999精品在线视频,手机成人午夜在线视频,久久不卡国产精品无码,中日无码在线观看,成人av手机在线观看,日韩精品亚洲一区中文字幕,亚洲av无码人妻,四虎国产在线观看 ?

基于Flink的電子疾病檔案數據處理模型設計與實現

2023-10-25 10:46:50季英凱
無線互聯科技 2023年16期
關鍵詞:數據處理模型

劉 瀟,季英凱

(江蘇省疾病預防控制中心 公共衛生信息所,江蘇 南京 210009)

0 引言

隨著疾控信息化工作的不斷深入,疾控的傳染病、公共衛生突發事件、慢性病、免疫規劃、精神衛生等業務條線的信息系統在不斷地建立與完善,由于業務系統在建設之初缺乏總體規劃,各自為政,各類數據難以支撐有效的業務協同服務,造成信息孤島[1]。在當前各級疾控業務協作日漸緊密、內部一體化集成日漸成熟的大背景下[2],為了有效地對業務數據進行匯聚與利用,中國疾病預防控制中心制定的《疾病預防控制信息系統建設指導方案(2018年版)》要求,以國家和省統籌區域兩級建設為重點,依托全員人口信息庫等基礎設施,構建實時共享的動態電子疾病檔案(electronic diseases records,EDR)[3-4],以個人健康為核心,貫穿整個生命周期,以出生和死亡2個重要的生命節點為開始和結束,全程記錄疾病發生、發展及轉歸的監測信息,形成以個人基礎信息(人口學信息、出生登記、死亡登記)為基礎,包含體檢篩查史、疾病診斷史、檢驗檢測史、治療隨訪史、流行病學史和預防接種史等內容的主題數據目錄[5],以支撐疾控各類業務的交互協同,為政府決策分析提供有效的支持。如何利用現有的業務系統實現各類業務數據的匯集,以形成實時共享的電子疾病檔案成了疾控信息化建設面臨的一個新的問題。

當前,疾控的各類業務系統于不同的時間由不同的開發公司建設,所采用的技術架構、業務流程以及業務數據的格式各不相同,各業務數據與電子疾病檔案的數據標準均存在一定程度的差異。基于這些問題,為了實現電子疾病檔案所需數據的抽取并進行有效的匯集,本文基于流式計算框架Flink建立一個電子疾病檔案數據的實時處理模型,使用消息中間件Kafka實現各類業務主題數據的發布與訂閱,使用Flink實現各類業務數據按電子疾病檔案主題目錄數據的實時處理,使用HBase作為數據庫實現電子疾病檔案數據的匯集與有效關聯,通過實驗證明該模型的可行性與高效性。

1 技術簡介

1.1 Flink介紹

Apache Flink是一個低延遲、高吞吐的分布式計算框架[6],可對無界數據流與有界數據流進行計算[7-8],相對于以MapReduce為代表的批處理計算框架、以Spark Streaming為代表的微批處理方案、以Storm為代表的流處理計算框架,Flink提供了DataStream API與DataSet API分別對流處理與批處理予以支持,其將批數據看成是有界的流數據,本質上還是進行數據流的處理[9]。這種批流一體的特性使得Flink在執行計算時具有極高的靈活性與極低的延遲性。

Flink可在單獨集群中運行,也可以在Yarn、Mesos等資源調度與管理框架上運行[10]。一個Flink集群總是包含一個JobManager以及一個或多個TaskManager[11]。JobManager負責處理Job提交、Job監控以及資源管理;TaskManager運行worker進程,負責實際任務Tasks的執行。這些任務共同組成了一個Job。Flink應用程序中的任務由用戶定義的算子轉換而來的流式Dataflows所組成,以構成有向無環圖(DAG)以對數據流行處理,如圖1所示。Flink的Dataflows由Source、Transformation、Sink3大部分組成。Source可從數據源不斷獲取數據,Transformation通過Flink提供或用戶自定義的各類算子靈活組合將獲取的數據流進行各種業務邏輯的處理,最終由Sink輸出到外部。Source與Sink階段都提供了包括Kafka、 ElasticSearch、MySQL、InfluxDB等多種數據庫引擎的專用算子用以獲取或輸出數據,使用十分方便。

圖1 Flink Dataflow

Flink的開發同時支持Java和Scala語言,提供了4個不同抽象層級的API,提供給用戶以開發應用程序。第一層的ProcessFunction API是其他三層的調用與封裝的基礎。Flink在這個API上實現最基礎的流式處理能力。在該層用戶可以進行有狀態編程,實現復雜的時間語義處理。第二層是Core APIs。該層包含應用于無界或有界數據流的DataStream API和應用于有界數據集的DataSet API,提供各種形式的用戶自定義轉換(transformations)、聯接(joins)、聚合(aggregations)、窗口(windows)和狀態(state)等操作。第三層是Table API。該層通過定義數據的schema提供類似于關系模型中的操作,比如 select、join、group-by 和 aggregate 等。第四層是SQL API。這層在語義和程序表達式上都類似于Table API,但其程序都是通過SQL表達式的形式來實現。

1.2 Kafka介紹

Kafka是一個分布式的消息發布-訂閱模式[12]的中間件系統。Kafka在主題中保存消息的信息,生產者向主題寫入數據,消費者從主題讀取數據,從而實現數據的傳遞。這種隱式調用的風格使生產者與消費者相互解耦,使其可以相互獨立的變化。

高性能、高吞吐、低延時是Kafka的顯著的特性,雖然Kafka的消息保存在磁盤上,但由于采用了順序寫入、MMFiles(Memory Mapped Files)、Zero Copy、批量壓縮等技術優化了讀寫性能[13],使其可以突破傳統的數據庫、消息隊列等數據引擎所受限的磁盤IO瓶頸。因此即使是部署在普通的單機服務器上,Kafka也能輕松支持每秒百萬級的寫入請求[14],讀寫速度超過大部分的消息中間件,這種特性使得Kafka在海量數據場景中應用廣泛。

2 設計與實現

2.1 架構設計

本文設計的電子疾病檔案數據處理模型采用Kafka+Flink+HBase建立實時數據流的處理框架。在Kafka中建立主題Topic,疾控的各個業務系統作為生產者按規定的格式向主題中生產數據,通過這種模式將數據生產的任務交給各個數據系統的開發維護單位,避免直接操作多個業務系統帶來不可預知的風險。Flink程序作為消費者去Kafka的主題中獲取數據并發送至下游進行數據類目的識別、分流以及格式轉換等業務邏輯的處理操作,最終寫入Hbase進行數據的持久化,并通過HBase表結構的設計實現以個人信息為基礎,各類電子疾病檔案主題目錄數據的關聯。模型架構設計如圖2所示。

圖2 模型架構

2.2 模型設計與實現

2.2.1 消息中間件設計

由于疾控業務系統多,電子疾病檔案所包含的數據類目繁雜且數據格式各異,而兒童預防接種等業務系統在某些時間段內產生的實時數據量很大,使用消息隊列Kafka作為中間件來接收各類業務系統產生的數據是一個很適合的選擇。一方面,利用Kafka出色的性能提高業務數據寫入的響應時間,保證穩定性;另一方面,使得各個業務系統與Flink數據處理程序解耦。數據生產層的業務系統既可以將業務系統完成審批后的實時數據流接入消息隊列,也可以批量對歷史數據進行處理,業務數據生產者與消費程序各自獨立運行。

數據生產層的各個業務系統按約定的格式將電子疾病檔案系統需要的數據寫入Kafka的主題中,為了使下游的消費者Flink程序能正確解析與處理,數據結構可以分為3個部分:第一部分代表該數據所屬的電子疾病檔案主題目錄;第二部分代表該條數據的操作類型(新增、修改或刪除);第三部分為業務數據所包含的具體內容,各個數據項統一使用制表符分隔(見表1)。

表1 生產者數據結構

2.2.2 Flink程序設計

Flink程序使用Scala語言設計并實現,程序的Source階段使用Flink框架自帶的FlinkKafkaCon-sumer不斷地從Kafka的主題內消費數據。在Transformation階段,對獲取的數據按電子疾病檔案的主題目錄進行分流,在各個主題目錄對應的數據流中對其所屬的數據進行各自不同的操作類型識別、數據格式檢查、數據格式轉換等操作。由于從Kafak獲取的數據包含了電子疾病檔案的各類數據目錄,程序根據數據的前6位字符判斷該條數據所屬的主題目錄,利用Flink提供的旁路輸出Side Output功能對數據流進行切分,每類主題目錄的數據被發送至其對應的子數據流中調用自定義的各類主題目錄的數據處理函數進行處理,最終在各個子數據流的Sink階段,將處理好的電子疾病檔案數據寫入HBase進行持久化。由于Flink沒有提供已封裝好的HBase的Sink算子,自定義算子實現Flink提供的RichSinkFunction接口來完成相關的對HBase的操作,Flink程序的拓撲結構,如圖3所示。

圖3 程序拓撲結構

以死亡報告主題目錄數據為例,Flink程序關鍵代碼如下:

val environment = StreamExecutionEnvironment.getExecutionEnvironment//創建可執行環境

val outsideStream = environment.addSource(new FlinkKafkaConsumer[String](topic,new SimpleStringSchema(),props))//使用FlinkKafkaConsumer 從Kafka消費數據

.process(new ProcessFunction[String,String] {//在基本處理函數 ProcessFunction中使用Side Output函數對數據進行分流

override def processElement(source:String,context:ProcessFunction[String,String]#Context,collector:Collector[String]):Unit = {

Source.substring(0,6)match {//截取數據前六位獲取所屬主題目錄

case "320601" => context.output(death01,source.substring(7)) //判斷所屬主題目錄為死亡報告數據后獲取剩余數據部分并標記其對應的數據流標識death01進行分流

...//其他主題目錄判斷與獲取

case _ => context.output(or,source.substring(7))

}

}

}).setParallelism(1)//分流算子并行度設置

val deathStream = outsideStream.getSideOutput(death01) //分流后主題目錄數據流的獲取

....

deathStream.map(deathTransformation(_)).setParallelism(2) //在自定義的deathTransformation函數中完成其對應主題目錄數據的操作類型識別、格式檢查、轉換操作

.addSink(new deathHbaseSink()) //自定義deathHbaseSink函數對其對應主題目錄數據進行持久化操作

2.2.3 HBase設計

在HBase電子疾病檔案所屬的命名空間Name-space中為各類主題目錄建立各自所屬的表,考慮到之后的業務需要通過身份證號以個人信息為單位對疾病檔案進行綜合展示與分析,對唯一標識表中的一行數據的RowKey[15]使用身份證號加業務主鍵號的方式來生成,這樣既保證了表中RowKey的唯一性,又可以使未來的業務端調用Hbase的API時,通過其提供的前置過濾器將身份證提取出來進行匹配與查詢,而無需再建立二級索引或借助其他的工具。在Flink程序進行數據持久化時,使用Protobuf工具提供的功能對電子疾病檔案數據進行序列化壓縮后,再關聯其對應的RowKey寫入Hbase,使每個RowKey在表中只存儲一個,提升空間的利用率。

3 仿真實驗

在疾控內部局域網部署應用模型進行測試,使用的各類軟件信息,如表2所示。集群硬件環境包含10臺Cpu8核,內存32 G,操作系統為64位Linux Centos 7.7的虛擬機。其中,7臺為Hadoop與Hbase集群的部署提供支持。Hadoop集群中包含了2個互為主從的管理節點NameNode,5個計算節點DataNode,2個資源調度管理節點ResourceManager;HBase集群中包含了兩個互為主從的HMaster節點,3個RegionServer節點;Flink集群部署于DataNode所在的5個計算節點,使用Yarn Session的模式對Flink作業進行調度與管理。Kafka與ZooKeeper等中間件部署在另外三臺虛擬機上。

表2 軟件版本信息

實驗從疾控內網數據庫批量取出死亡報告信息與兒童預防接種信息各10 000條寫入Kafka的電子疾病檔案主題內,對Flink程序中的分流算子以及死亡報告信息與兒童預防接種信息的數據處理算子分別設置不同的并行度的情況下,記錄數據全部寫入HBase的運行時間情況,運行時間皆為5次實驗后的平均值,結果如表3所示。

表3 實驗結果

從實驗結果可以看出,在數據量較大的情況下,該模型的數據處理的實時性良好,且合理的提高分流算子或數據處理算子的并發度能有效地提高模型的數據處理能力,實驗證明該模型具有良好的數據實時處理能力及彈性擴展能力。

4 結語

為了建設電子疾病檔案系統,實現疾控各個業務條線數據的有效匯集,進而為業務協同以及輔助決策提供數據支持,本文根據疾控業務信息化建設的現狀設計了一個數據實時處理模型,并使用消息中間件Kafka,流式計算框架Flink及寬列數據庫HBase給出了模型的具體實現。實踐表明,模型具有良好的數據處理能力,滿足預計的設計目標,其分層設計的架構風格使得每一層都可以靈活根據需求獨立的變化而不影響其他層。在業務生產數據量比較大的場景,可以單獨為某類業務數據另設一個Kafka以提高數據吞吐量。在Flink程序的Source階段,先進行數據的合流后再進行接下來的處理;而對于某些實時處理的數據量較大、數據處理任務較復雜的業務,也可以提高該業務數據處理算子的并行度以提高數據處理速度。目前,該模型已成功應用在江蘇省疾控中心的死亡報告、預防接種等部分業務條線數據的處理與匯集中,下一步的工作是根據電子疾病檔案的數據規范要求進一步匯集數據,并結合統計分析工具與相關算法進行數據的利用與分析。

猜你喜歡
數據處理模型
一半模型
認知診斷缺失數據處理方法的比較:零替換、多重插補與極大似然估計法*
心理學報(2022年4期)2022-04-12 07:38:02
ILWT-EEMD數據處理的ELM滾動軸承故障診斷
水泵技術(2021年3期)2021-08-14 02:09:20
重要模型『一線三等角』
重尾非線性自回歸模型自加權M-估計的漸近分布
3D打印中的模型分割與打包
MATLAB在化學工程與工藝實驗數據處理中的應用
FLUKA幾何模型到CAD幾何模型轉換方法初步研究
Matlab在密立根油滴實驗數據處理中的應用
基于POS AV610與PPP的車輛導航數據處理
主站蜘蛛池模板: 久久久国产精品无码专区| 国产欧美精品一区二区| 丝袜国产一区| 亚洲男人的天堂在线| 日韩成人在线一区二区| 青青青国产免费线在| 欧美黑人欧美精品刺激| 色精品视频| 青青热久麻豆精品视频在线观看| 免费三A级毛片视频| 老司机aⅴ在线精品导航| 亚洲国产精品无码AV| 国内黄色精品| 激情午夜婷婷| 制服无码网站| 4虎影视国产在线观看精品| 毛片免费视频| 亚洲欧洲日本在线| 一区二区三区国产| 国产麻豆精品手机在线观看| 波多野结衣久久高清免费| 成人在线欧美| 免费在线一区| 精品久久久久成人码免费动漫| 国产爽歪歪免费视频在线观看 | 色吊丝av中文字幕| 免费国产高清精品一区在线| 国产成人精品在线1区| 国产成人精品在线1区| 午夜色综合| 91成人免费观看| 色综合日本| 免费看a级毛片| 色哟哟精品无码网站在线播放视频| 日本午夜影院| 四虎永久在线| 国产麻豆91网在线看| 波多野结衣中文字幕一区二区| AV无码一区二区三区四区| 激情五月婷婷综合网| 97se亚洲综合| 日韩欧美中文字幕在线韩免费| 国产人成乱码视频免费观看| 亚洲首页在线观看| 一区二区三区成人| 免费国产无遮挡又黄又爽| 国产精品va| 国产日韩精品欧美一区灰| 香蕉综合在线视频91| 国产精品亚洲一区二区在线观看| 国产色婷婷| 精品福利视频导航| 久久久波多野结衣av一区二区| 亚洲看片网| 在线视频一区二区三区不卡| 国产理论一区| 精品无码人妻一区二区| 国产一二三区视频| 亚洲第一视频免费在线| 亚洲香蕉久久| 欧美成人精品一级在线观看| 日韩精品一区二区三区大桥未久| 欧美中文字幕在线二区| 午夜激情福利视频| 亚洲中文字幕久久精品无码一区| 暴力调教一区二区三区| 久久综合伊人 六十路| av在线手机播放| 无码区日韩专区免费系列| 久久九九热视频| 国产成人免费| 91在线播放免费不卡无毒| 毛片免费在线视频| 无码有码中文字幕| 欧美一区国产| 高清亚洲欧美在线看| 日韩精品久久无码中文字幕色欲| AV网站中文| 国产成人免费| 亚洲AV无码不卡无码| 亚洲精品无码抽插日韩| 在线观看国产一区二区三区99|