








摘要:隨著互聯(lián)網(wǎng)技術(shù)在工業(yè)生產(chǎn)中廣泛應(yīng)用,工業(yè)互聯(lián)網(wǎng)的發(fā)展突飛猛進(jìn)。在工業(yè)生產(chǎn)中,企業(yè)為了對(duì)工業(yè)大數(shù)據(jù)進(jìn)行更好的采集、分析和預(yù)處理,利用大數(shù)據(jù)技術(shù)搭建大數(shù)據(jù)集群來(lái)完成各個(gè)生產(chǎn)環(huán)節(jié)。基于Hadoop的高可用分布式框架已經(jīng)成為很多企業(yè)在集群搭建中的首選。文章在基于高可用Hadoop組件基礎(chǔ)上,搭建了Hive、HBase、Spark、Flink、Kafka等大數(shù)據(jù)生態(tài)系統(tǒng)中一些重要組件,用于對(duì)數(shù)據(jù)的存儲(chǔ)、采集、抽取、清洗、預(yù)處理和分析等操作,幫助企業(yè)在生產(chǎn)過(guò)程中完善生產(chǎn)環(huán)節(jié),提高生產(chǎn)效率。
關(guān)鍵詞:工業(yè)大數(shù)據(jù);Hadoop集群搭建;數(shù)據(jù)處理
中圖分類號(hào):TP311" 文獻(xiàn)標(biāo)志碼:A
基金項(xiàng)目:2023年保定市科技計(jì)劃項(xiàng)目;項(xiàng)目名稱:基于高可用集群和隨機(jī)森林算法的工業(yè)大數(shù)據(jù)分析平臺(tái);項(xiàng)目編號(hào):2311ZG018。
作者簡(jiǎn)介:張艷敏(1985— ),女,講師,碩士;研究方向:大數(shù)據(jù)技術(shù),軟件技術(shù)。
0" 引言
在工業(yè)生產(chǎn)過(guò)程中,各個(gè)生產(chǎn)環(huán)節(jié)產(chǎn)生的數(shù)據(jù)越來(lái)越多,這些數(shù)據(jù)大多是非結(jié)構(gòu)化數(shù)據(jù),傳統(tǒng)的關(guān)系型數(shù)據(jù)庫(kù)已無(wú)法滿足對(duì)這些數(shù)據(jù)的存儲(chǔ)與處理,因此,文章利用大數(shù)據(jù)技術(shù)原理搭建大數(shù)據(jù)高可用集群來(lái)實(shí)現(xiàn)工業(yè)大數(shù)據(jù)的采集與存儲(chǔ)等操作,集群中包含大數(shù)據(jù)生態(tài)系統(tǒng)中一些常用的組件。在已搭建的集群中通過(guò)Spark技術(shù)實(shí)現(xiàn)對(duì)離線數(shù)據(jù)的抽取、清洗和預(yù)處理,利用Flink技術(shù)對(duì)實(shí)時(shí)數(shù)據(jù)進(jìn)行分析與存儲(chǔ)。整個(gè)生產(chǎn)過(guò)程在大數(shù)據(jù)集群環(huán)境中運(yùn)轉(zhuǎn)流暢,最終達(dá)到了為企業(yè)節(jié)約成本、創(chuàng)造更多有益價(jià)值的目的。
1" 系統(tǒng)整體設(shè)計(jì)
根據(jù)企業(yè)實(shí)際生產(chǎn)場(chǎng)景,本文搭建了基于Hadoop HA高可用的集群[1],集群中包含Hive、HBase、Spark、Flink、Redis和MySQL等組件,實(shí)現(xiàn)對(duì)工業(yè)生產(chǎn)中設(shè)備信息(machine.csv)、設(shè)備狀態(tài)信息(showFactChangeRecordList.csv)、環(huán)境檢測(cè)信息(showFactEnvironmentData.csv)和產(chǎn)品加工信息(showFactProduceRecord.csv)的采集和處理。其中HDFS、Hive和HBase等組件用來(lái)存儲(chǔ)數(shù)據(jù),Spark用來(lái)對(duì)離線數(shù)據(jù)進(jìn)行抽取、清洗和預(yù)處理,F(xiàn)link主要對(duì)實(shí)時(shí)生產(chǎn)數(shù)據(jù)進(jìn)行計(jì)算和分析后存儲(chǔ)到Redis或MySQL數(shù)據(jù)庫(kù)中[2]。集群整體結(jié)構(gòu)如圖1所示。
2" 離線數(shù)據(jù)處理
離線數(shù)據(jù)處理是利用Spark技術(shù)對(duì)已經(jīng)存儲(chǔ)在數(shù)據(jù)庫(kù)中的數(shù)據(jù)進(jìn)行預(yù)處理,一般用Scala語(yǔ)言編寫,通常包括數(shù)據(jù)抽取、清洗和指標(biāo)計(jì)算等操作[3]。
2.1" 數(shù)據(jù)抽取
數(shù)據(jù)抽取包含全量抽取和增量抽取[3]。全量抽取是將源數(shù)據(jù)庫(kù)中的所有數(shù)據(jù)抽取到目標(biāo)數(shù)據(jù)庫(kù)中,增量抽取是將自上次抽取后發(fā)生改變的數(shù)據(jù)從源數(shù)據(jù)庫(kù)抽取到目標(biāo)數(shù)據(jù)庫(kù)中。
2.1.1" 全量抽取
在生產(chǎn)過(guò)程中,研究人員通常會(huì)將數(shù)據(jù)從MySQL中抽取到Hive中,方便數(shù)據(jù)更高效的處理。MySQL中包括數(shù)據(jù)庫(kù)shtd_industry,抽取shtd_industry庫(kù)中ChangeRecord表的全量數(shù)據(jù)進(jìn)入Hive的ods庫(kù),構(gòu)成表changerecord,字段排序、類型不變;同時(shí)添加靜態(tài)分區(qū),分區(qū)字段為etldate,類型為String,值為當(dāng)前日期(如20230702)。ChangeRecord的表結(jié)構(gòu)如表1所示。
抽取操作執(zhí)行完畢后,系統(tǒng)可以通過(guò)hive cli執(zhí)行show partitions ods.changerecord命令查看分區(qū)結(jié)果,如圖2所示。
2.1.2" 增量抽取
在企業(yè)生產(chǎn)過(guò)程中,有些數(shù)據(jù)只保留最新數(shù)據(jù),例如環(huán)境監(jiān)測(cè)表EnvironmentData(表結(jié)構(gòu)如表2所示)中只保留每臺(tái)設(shè)備的最新監(jiān)測(cè)數(shù)據(jù),在數(shù)據(jù)抽取時(shí)抽取MySQL中shtd_industry庫(kù)的EnvironmentData表的增量數(shù)據(jù);將其輸入Hive的ods庫(kù)中構(gòu)成表environmentdata,將ods.environmentdata表中inputtime作為增量字段,僅將新增的數(shù)據(jù)抽入,字段排序、類型不變;同時(shí)添加靜態(tài)分區(qū),分區(qū)字段為etldate,類型為String,值為當(dāng)前日期(如20230702)。
2.2" 數(shù)據(jù)清洗
在生產(chǎn)過(guò)程中,系統(tǒng)會(huì)產(chǎn)生大量的“臟”數(shù)據(jù),數(shù)據(jù)清洗就是去除這些“臟”數(shù)據(jù),通過(guò)篩選、過(guò)濾等操作使數(shù)據(jù)變得更加干凈和準(zhǔn)確[4]。數(shù)據(jù)清洗是數(shù)據(jù)處理過(guò)程中非常重要的一步,可以提高數(shù)據(jù)的質(zhì)量和可信度,為后續(xù)的數(shù)據(jù)處理工作提供更有效安全的數(shù)據(jù),例如對(duì)數(shù)據(jù)進(jìn)行去重整合等操作。
在數(shù)據(jù)抽取中,系統(tǒng)將MySQL中數(shù)據(jù)抽取到Hive的ods庫(kù)中,在數(shù)據(jù)清洗中將ods庫(kù)中的changerecord全量數(shù)據(jù)抽取到dwd庫(kù)表fact_change_record中,在抽取之前須要對(duì)數(shù)據(jù)根據(jù)changeid和changemachineid進(jìn)行聯(lián)合去重處理,并且添加dwd_insert_user、dwd_insert_time、dwd_modify_user、dwd_modify_time這4列。其中dwd_insert_user、dwd_modify_user均填寫“user1”;dwd_insert_time、dwd_modify_time均填寫當(dāng)前操作時(shí)間。執(zhí)行完畢后,系統(tǒng)使用hive cli按照change_machine_id、change_id降序排序,查詢前1條數(shù)據(jù),如圖3所示。
2.3" 指標(biāo)計(jì)算
系統(tǒng)將清洗后的數(shù)據(jù)存入dwd庫(kù),進(jìn)入指標(biāo)計(jì)算。指標(biāo)計(jì)算是企業(yè)根據(jù)業(yè)務(wù)需求對(duì)數(shù)據(jù)進(jìn)行的針對(duì)性查詢[5],通常是將多個(gè)數(shù)據(jù)源的數(shù)據(jù)集成到一個(gè)統(tǒng)一的數(shù)據(jù)存儲(chǔ)庫(kù)中。例如:系統(tǒng)使用Spark根據(jù)dwd層的fact_change_record表和dim_machine表統(tǒng)計(jì),計(jì)算每個(gè)車間設(shè)備的月平均運(yùn)行時(shí)長(zhǎng)與所有設(shè)備的月平均運(yùn)行時(shí)長(zhǎng)對(duì)比結(jié)果(即設(shè)備狀態(tài)為“運(yùn)行”,結(jié)果值為:高/低/相同),計(jì)算結(jié)果存入MySQL數(shù)據(jù)庫(kù)shtd_industry的machine_running_compare表中。dim_machine表、machine_running_compare表結(jié)構(gòu)分別如表3和4所示。
3" 實(shí)時(shí)數(shù)據(jù)采集與處理
在工業(yè)生產(chǎn)過(guò)程中,系統(tǒng)除了對(duì)離線數(shù)據(jù)進(jìn)行處理,還須要對(duì)實(shí)時(shí)數(shù)據(jù)進(jìn)行處理。實(shí)時(shí)數(shù)據(jù)通過(guò)Flume采集后存儲(chǔ)在Kafka消息隊(duì)列中,再通過(guò)Flink讀取Kafka中的流數(shù)據(jù),對(duì)數(shù)據(jù)進(jìn)行實(shí)時(shí)處理與分析,將結(jié)果存儲(chǔ)到數(shù)據(jù)庫(kù)中。
3.1" 實(shí)時(shí)數(shù)據(jù)采集
在主節(jié)點(diǎn)中,系統(tǒng)使用Flume采集/data_log目錄下實(shí)時(shí)日志文件中的數(shù)據(jù),將數(shù)據(jù)存入Kafka的Topic中(Topic名稱為ChangeRecord,分區(qū)數(shù)為4)[6],F(xiàn)lume采集ChangeRecord主題的配置如圖4所示。
3.2" 實(shí)時(shí)數(shù)據(jù)處理
當(dāng)實(shí)時(shí)數(shù)據(jù)采集完畢后,系統(tǒng)使用Flink消費(fèi)Kafka中ChangeRecord主題的數(shù)據(jù)[7],例如每隔1 min輸出最近3 min的預(yù)警次數(shù)最多的設(shè)備,將結(jié)果存入Redis,key值為“warning_last3min_everymin_out”,value值為“窗口結(jié)束時(shí)間,設(shè)備id”。本文使用redis cli以HGETALL key方式獲取warning_last3min_everymin_out值,如圖5所示。
4" 結(jié)語(yǔ)
文章介紹了工業(yè)大數(shù)據(jù)高可用集群搭建的整體架構(gòu),在此基礎(chǔ)上實(shí)現(xiàn)了離線數(shù)據(jù)處理以及實(shí)時(shí)數(shù)據(jù)的采集和處理。系統(tǒng)在離線數(shù)據(jù)處理中采用數(shù)據(jù)抽取、清洗和指標(biāo)計(jì)算;在實(shí)時(shí)數(shù)據(jù)中使用Flume采集數(shù)據(jù)到Kafka中,再通過(guò)Flink技術(shù)進(jìn)行計(jì)算后將結(jié)果存入Redis。整個(gè)流程來(lái)自真實(shí)的企業(yè)生產(chǎn)過(guò)程。本文將大數(shù)據(jù)技術(shù)應(yīng)用到企業(yè)生產(chǎn)中,為企業(yè)生產(chǎn)效率提高、轉(zhuǎn)換提供了有效價(jià)值。
參考文獻(xiàn)
[1]劉曉莉,李滿,熊超,等.基于Hadoop搭建高可用數(shù)據(jù)倉(cāng)庫(kù)的研究和實(shí)現(xiàn)[J].現(xiàn)代信息科技,2023(1):99-101.
[2]黎心怡,夏梓彤,莊嘉濠,等.基于大數(shù)據(jù)技術(shù)的實(shí)時(shí)軌道交通分析預(yù)測(cè)可視化系統(tǒng)的設(shè)計(jì)與實(shí)現(xiàn)[J].電腦知識(shí)與技術(shù),2023(29):71-74.
[3]鄭倩倩.基于Kettle的工業(yè)數(shù)據(jù)集成與應(yīng)用[D].重慶:西南大學(xué),2023.
[4]謝文閣,佟玉軍,賈丹,等.數(shù)據(jù)清洗中重復(fù)記錄清洗算法的研究[J].軟件工程師,2015(9):61-62.
[5]何文韜.基于Spark的工業(yè)大數(shù)據(jù)能效分析平臺(tái)的設(shè)計(jì)與實(shí)現(xiàn)[D].大連:大連理工大學(xué),2018.
[6]林子雨.數(shù)據(jù)采集與預(yù)處理[M].北京:人民郵電出版社,2022.
[7]林子雨,陶繼平.Flink編程基礎(chǔ)[M].北京:清華大學(xué)出版社,2022.
(編輯" 王雪芬)
Design and implementation of industrial big data high availability cluster construction based
on big data technology
ZHANG" Yanmin, MA" Xiaotao, YANG" Bingqian, WU" Weihong, ZHAO" Bin
(Hebei Software Institute, Baoding 071000, China)
Abstract: With the wide application of the Internet in industrial production, the development of industrial Internet is advancing rapidly. In industrial production, in order to assist enterprises in better collecting, analyzing, and preprocessing the industrial big data, it is necessary to build a big data cluster to complete various production processes using big data technology. Hadoop based highly available distributed frameworks have become the preferred choice for many enterprises in cluster construction. In the article, based on highly available Hadoop components,some important components in the big data ecosystem such as Hive, HBase,Spark, Flink,Kafka, etc." are built to store, collect, extract, clean, preprocess, and analyze data, helping enterprises improve production processes and increase production efficiency.
Key words: industrial big data; Hadoop cluster construction; data processing