999精品在线视频,手机成人午夜在线视频,久久不卡国产精品无码,中日无码在线观看,成人av手机在线观看,日韩精品亚洲一区中文字幕,亚洲av无码人妻,四虎国产在线观看 ?

基于滑動窗口的流數據并行處理方法

2021-02-23 10:04:10張鴻宇李軍懷馮連強王懷軍
重型機械 2021年1期
關鍵詞:數據處理模型

徐 江,張鴻宇,李軍懷,馮連強,王懷軍

(1.中國重型機械研究院股份公司,陜西 西安710032;2.西安理工大學 計算機科學與工程學院,陜西 西安710048)

0 前言

當前,工業生產體系伴隨著數字化技術與移動互聯網的蓬勃發展,掀起了一場萬物互聯智慧化的新興革命,信息技術正在飛快的與工業生產中的基礎設施和管理系統相融合,將傳統的工業體系提升到更高的水平。德國“工業4.0”[1]與我國的“中國制造2025”[2],標志著工業生產制造將從自動化時代全面轉向信息化與智慧化的時代。在智慧化轉型的洪流中,傳統工業利用新一代的信息與工業互聯網技術,采集的巨大結構化的工業時序數據[3],深入挖掘與運用制造企業信息資源,以業務與數據作為雙驅動,逐步轉型為智能制造[4],提供更為全面、及時、科學、智慧的工業服務。物聯網技術的發展速度遠遠超過了對工業企業管理與生產模式的影響速度;物聯網(The Internet of Things, IOT)是工業信息化發展的基礎技術,并依托于無線網絡、移動設備、SOC、傳感器等多種技術的進步,在集成度、靈敏性以及成本控制等方面愈發成熟[5]。利用RFID、工業現場總線、藍牙、紅外、探測器以及多種信號傳感器等采集設備與短距離傳輸技術,采集并傳輸整個制造設備的生產數據,并通過物聯網中的感知層網絡將這些數據傳遞到采集控制柜中,之后通過實時數據庫,上傳到監測云平臺,經由流數據處理平臺進行運算分析。因此,物聯網技術使得許多行業領域產生了以“海量”和“高速”為特征的數據。

在工業生產中,設備的生產數據以及各種傳感器采集到的信息數據會持續生成,且以數據記錄的形式發送。數據序列的量級會隨著時間增長而無限增大,這樣的動態數據集合被稱為流數據[6]。和傳統的靜態數據不同,流數據需要按照記錄或者根據滑動時間窗按順序進行遞增式處理,借助處理的結果,管理人員可以及時的了解業務活動的情況,從而迅速應對新情況做出響應。在流數據處理方面,數據流管理系統(DataStreamManagementSystem,DSMS)是近年來國內外學者的一個研究熱點,流處理引擎(StreamProcessingEngine,SPE)[7,8]是進行實時流數據處理的一種主流方法。傳統的數據處理結構對于流式數據的處理能力較差,對批式數據的分析[9]效果也不佳。尤其對于狀態監測數據類型復雜、業務需求多變以及上層應用集成等方面的需求[10],傳統SPE的解決能力有限。

1 相關工作

1.1 流數據并行處理方法基本流程

設備在制造過程中時刻產生大量的數據,其中一些數據需要進行及時處理分析,實時的將處理過后的數據上傳到監控界面供生產管理人員進行監控。監測流數據以元(指定數量的數據包)為單位,以時間為序列,按照順序抵達處理平臺,可以被定義為

Dflow=[d1,d2,d3,…,dn]T

(1)

式中,Dflow為數據集合矩陣,di表示矩陣中單個維度下的數據元集合。一般情況下,Dflow遵循順序訪問或指定次數內的訪問。滑動窗口模型對環境中的流數據進行處理。對持續性的流動數據施加滑動窗口,有利于更好地適應流數據延續性,實時性的特點。

1.2 滑動窗口

對持續性的流動數據施加滑動窗口,有利于更好地適應流數據延續性,實時性的特點。通常情況下,滑動窗口可分為三種類型,分別是基于數據元組、基于分區以及基于時間[11]。

(2)基于分區的滑動窗口包含整數N與流數據S的一組屬性參數(a1,a2,a3,…,ak)。根據屬性參數的需求,在邏輯上對S劃分為不同的子流,在每個子流上獨立計算大小為N的基于數據元組的滑動窗口,之后將這些窗口的結果進行整合產生輸出。

(3)基于時間的滑動窗口在流數據S中加入了時間周期T以及T與指定時間的關系函數R。在內,R()包含了S中所有屬于[t-T,]范圍內的數據元組。借助滑動T實現對數據流的處理。

針對復雜重型裝備監測數據規模大、類型多、實時性要求較高的需求,本文提出了基于滑動窗口的流數據并行處理方法。結合基于分區與基于時間兩種思想,構建滑動窗口模型。在此模型中,數據被劃分為數個子流,按照數據類型對應的處理邏輯進行并行處理操作,最后通過融合子流結果的方式獲得處理結果,為系統實時監控等功能提供高質量的數據管理。

2 基于滑動窗口流數據并行處理方法基本原理

本文結合基于分區與基于時間兩種滑動窗口思想,構建單位時間周期下,融合子流處理結果的滑動窗口模型。在模型中,首先定義基本窗口與滑動窗口。基本窗口可以被定義為在[t-T,]內抵達的流數據所組成的時間窗口。其中,時間跨度為

Wspan=T

(2)

而連續時間內的基本窗口序列可以組建成為一個滑動窗口,可以表示為

Wslide=[Wk-n+i,Wk-n+i,Wk-n+i+1,…,Wk]

(3)

式中,Wk為第k個基本窗口抵達后的時間窗口,n為滑動窗口可容納的基本窗口數量。時間窗口的跨度可表示為

Wslide-span=nT

(4)

滑動窗口處理模型如圖2所示。

圖2 滑動窗口處理

并行實時運算模式是實現實時監控與報警的有效途徑。基于時間角度與不同的業務需求,并行處理滑動窗口模型結構如圖3所示。

圖3 并行處理滑動窗口模型結構

圖3中小方格是模型中的基本窗口,對應一個監測點數據計算單元,默認設置為1 s, 則整個窗口大小為覆蓋基本窗口的長度n。復雜重型裝備監測中的狀態參數監測數據會以數據流的形式陸續通過窗口,n個計算單元并行進行分析計算,每1 s會輸出n個參數的監測分析結果。最后根據需求,按照不同的時間周期將處理后的基本窗口進行合并,統計每秒輸出的處理結果,并將其輸出。

3 基于滑動窗口的并行處理算法

3.1 算法過程

監測數據在處理的過程中需要引入滑動窗口,在基本窗口不變的情況下,根據數據實際需求,改變滑動窗口大小。滑動窗口跨度變化規則為:設置初始窗口大小為Winit,其中包含n個基本窗口Wbase。之后窗口隨數據流方向開始滑動,滑動增量為ε×Wbase,其中ε為整數且0<ε≤n,其具體值可以按照流數據元集合di的實際需求而定。數據進入窗口后,將按照本文步驟進行處理。

(1)每個基本窗口的流數據元會隨機分配到不同的PQueue(并行隊列)數據集合中。該數據集合會對流入的數據按照數據類型切割為數據片PSlice。PSlice通過配置服務獲取配置信息,其需要配置的參數如表1所示。

表1 PSlice配置參數

如圖4所示,流數據元中的數據包括多種數據類型,在單位時間窗口下數據量會彼此不同,因此切割后的PSlice的數據長度不等。每個PSlice均通過配置服務配置信息:數據類別作為當前PSlice的特征值;數據閾值用于甄別該PSlice的數據范圍;該數據類型對應的處理流程接口。

圖4 基本窗口下PQueue中的PSlice

(2)采用輪詢機制,當有PQueue需要被分配時,將工作節點可用的線程數量按照從多到少排序。之后按照自定義的并行度依次從每個工作節點的線程池中調用可用線程,并將PQueue中當前窗口的PSlices分配給它們。如果一輪后當前窗口仍有PQueue存在未分配的PSlice,則從工作節點中再次調用下一個可用的線程,循環反復直到所有的PSlice被分配。這種調度機制不需要進程間通信,所以開銷較少,適合復雜重型裝備監測的實際需求。

(3)每個線程均對應一個處理邏輯單元PExcutor,該單元專注于處理同一數據類別的數據,因此,PSlices被分配入PExcutor后,會被緩存在一個HashMap中,其Key值為PSlice的數據類別Tdi。PExcutor會根據Tdi獲取到該時間窗口下的所有數據,之后調取處理接口對其進行批量處理。

(4)將當前窗口范圍內的處理結果與上一窗口的數據結果進行統計處理。統計范圍為該數據類型的監測周期Tm=ω(ε×Winit),其中ω可根據實際需求人為定義在配置服務中。

(5)按照滑動量持續滑動窗口,從而實現流數據持續穩定的并行處理流程。

3.2 算法描述

一個流數據元集合d在滑動窗口機制的支持下,主要有三種算法:數據接收、數據處理和滑動窗口。

3.2.1 算法1:數據接收DataAccept(D)

監測數據會以流的形式持續不斷的輸入,選擇多進程機制保證服務器可以穩定及時地接收數據,同時可以提高接收效率。每次接收數據會進行存儲過程,為降低存儲壓力,每個進程均會對數據進行聚合,每隔一段時間后將聚合數據發送至存儲單元。存儲單元需要一邊將數據進行持久化處理,一邊將數據發送至處理單元。

算法1DataAccept(D)源代碼為

輸入:流數據D

aggregatedData = []

for t in timeRange(interval):

# 按照時間周期進行數據聚合

slice = D.get() # 獲取數據流數據

aggregatedData.extend(slice) # 聚合數據

kafka.receive(aggregatedData) # 存儲單元接收

kafka.sendToPQueue(aggregatedData)

# 發送至處理單元

3.2.2 算法2:數據處理DataProcessing(aggregatedData)

數據依照數據類型進入對應的并行處理隊列后,進入等待狀態。狀態激活后,被處理線程調取,進行自定義處理。統計線程負責將處理結果進行合并統計,并輸出結果。

算法2DataProcessing(aggregatedData)源代碼為

輸入:聚合數據對象aggregatedData

輸出:處理結果PResult

for PQueue in PQueues:

PQueue.add(aggregatedData)

# 聚合數據輸入PQueue

PQueue.split(Td, configuration)

# 按照數據類別分割為PSlice并對其初始化

Worknodes.sort() # 對工作節點排序

for node in Worknodes:

thread = node.GetThreadPoolExecutor(degree) # 按照并行度啟動線程池

thread.run(PQueue)

combineResult(Td, interval, threads.result)

# 按照數據類型的指定周期合并處理結果

# thread 處理邏輯過程:

hashmap.add( thread.getSlice(PQueue))

# 將PSlice緩存入map中

for task in PExcutor:

# 對相同數據類型集中處理

task.execute(hashmap.getValue(PSlice.type) .union(),PSlice.processInterface)

3.2.3 算法3:滑動窗口SlideWindow

在實際算法中,若要統計單位時間t內最近n秒的數據,滑動窗口可劃分為n/t個slot,對窗口創建長度為n/t的數組。處理單元會在時間t內通過算法2不停的將流數據進行處理。每隔時間段t會觸發滑動窗口的移動行為,即

Array[Slot3]?Array[Slot2],Array[Slot2]?Array[Slot1]

(5)

算法3SlideWindow源代碼為

Slot currentSlot # 當前激活的slot

LinkedList historySlots # 未被激活的slot

long time = System.currentTimeMillis()

currentSlot = new Slot(time)

elif((time-currentSlot.timstamp )> t):

# 每單位時間t創建一個slot

historySlots.add(currentSlot)

currentSlot = new Slot(time)

DataProcessing() # 執行處理過程

void maintain():

# 維護窗口,刪除已處理slot

if(historySlots.isEmpty()) return

slot = historySlots[0]

long ts = System.currentTimeMillis()

if(ts-slot.timestamp > n):

# 超過滑動距離后刪除

historySlots.remove(0)

3.3 基于Storm的模型拓撲結構

自2010年雅虎公司公開其通用分布式流處理平臺S4[12]起,許多用途相近又各具各色的平臺相繼被提出,如Storm[13],SparkStreaming[14],Samza[15]和MillWheel[16]等。Storm最大的提升在于提供消息處理反饋機制和巧妙的利用異或計算保障記錄被完全處理,平臺采用弱中心化的結構。借助其特性搭建本文提出的基于滑動窗口并行處理方法的實現平臺,PQueue、PSlice、PExcutor均可由Storm中的Blot去實現,其模型拓撲結構圖如圖5所示。

圖5 Storm下的并行滑動窗口模型

拓撲結構由負責數據來源的Spout與多個不同功能的Bolt組成。數據流由KafkaSpout開始,以直接分組的形式發送到預處理Bolt中,在此執行格式化或類型轉換等操作;預處理Bolt按照字段ID將數據發送到滑動窗口Bolt中;滑動窗口Bolt會依照滑動窗口模型的結構,先將數據流發送到模型Bolt中,利用按照業務需求自定義的模型Bolt對數據進行處理,之后會將處理結果隨機地發送到至統計Bolt中;統計Bolt進行統計處理后將結果轉發到存儲Bolt,由其進行數據視圖的存儲。該拓撲實現可分為5個步驟。

(1)創建拓撲構造器TopologyBuilder;

(2)配置SpoutConfig;

(3)利用拓撲構造其中的setSpout()與setBolt()方法創建KafkaSpout對象,預處理Bolt對象與滑動窗口Bolt對象,其執行器的并發度設為1。可調用withWindow()方法設置基本窗口的時間大小,默認設置為1 s;

(4)按照業務需求構建模型Bolt對象,執行器的并發度設為n;

(5)創建統計Bolt對象與存儲Bolt對象,配置最終存儲方案。

流數據并行處理窗口借助于Storm的拓撲結構構建后,實現包含n個基本窗口的滑動窗口在單位周期內對接收數據的處理。在時間窗口獨立并行下,最終完成指定時間內統計最近ns的監測數據。

4 流數據處理平臺設計

基于Lambda架構[17],設計了流數據處理平臺,用于實時監控的流數據需要經過預處理、閾值判斷或聚合等多種處理流程。將數據處理分成批量處理平臺與增量處理平臺兩部分,以應對數據的多樣化需求,流數據處理平臺結構如圖6所示。

圖6 流數據處理平臺結構

批量處理平臺可用于不同數據集的任意查詢,并實現對數據集的深入分析,例如設備狀態評估模型、能耗分析等業務功能。增量處理平臺則需要對某一數據序列增量式地更新指標、報告和匯總統計結果,以響應每個到達的數據記錄,更適合為設備實時監控和報警響應等功能提供數據支持。

構建混合模式的數據處理平臺,同時維護批量處理與增量處理兩個部分。數據流從實時數據庫開始,通過Flume組件對其進行采集整理,整理后的數據流按照需求標準分為批量數據流與實時數據流兩個部分,分別發送到相應的flume-sink上。批量數據流直接存儲到批量處理平臺中的HDFS上,可利用如MapReduce等多種大數據處理方式對其進行數據分析,生成批計算視圖,數據量較小的視圖存儲到MySQL中,數據量較大的存儲在HBase上。實時數據流對應的sink是Kafka,Kafka作為消息通道,將實時數據流的增量數據發送到Storm中的Spout,之后通過實時計算得到實時視圖,將其存儲在Redis等緩存數據庫中,方便快速訪問。

增量處理平臺用來處理增量的實時數據。增量處理平臺對數據處理后會生成實時處理視圖(Real-Time View)。為了提升處理效率,平臺會在接收到新數據后不斷刷新實時視圖,采用增量計算對數據進行分段處理,因此延遲較小,更適合實時處理。增量處理平臺的數據訪問可以定義為

RealTimeView=function(realtimeview,newdata)

(6)

面臨高度動態的實時數據信息,增量處理平臺選擇使用Strom創建拓撲結構來轉換數據流,持續處理到達的數據。為避免在做實時運算的過程中因為數據擁塞而導致服務器掛掉,本文選擇Kafka[18]作為消息隊列,將不均勻的數據轉化成均勻的數據流,從而與Storm完善結合,實現穩定的流運算。增量處理平臺的框架結構如圖7所示。

圖7 增量處理平臺結構

數據經過Flume分流后,增量數據被發送至Kafka中。Kafka作為專門面向海量數據傳輸而開發的分布式消息中間件,以提供生產消費訂閱,流數據處理等功能被廣泛應用[19]。在增量處理平臺中,通過Kafka創建用于處理增量數據的Topic,之后即可通過配置Flume的Sink數據流向,將RealTimeEvent與Event發送到該topic上。接收到增量數據后,Kafka將其發送到由Storm構建的拓撲模型中。Storm對其進行增量處理,之后生成相應的視圖或結果,發送到指定界面中或者緩存到Redis中供業務服務消費。

5 實驗分析

搭建1個主節點和2個從節點組成的Storm+Kafka虛擬集群,實驗數據用模擬的方式,人為輸入模擬數據,將其流入到Kafka中的Spout。考慮到復雜重型裝備監測中的數據實際監控周期,30 s較為接近各個數據類型的單位時間周期,故滑動窗口設置為30 s,基本時間窗口設置為1 s,模型Bolt定義為閾值判斷對模擬數據進行篩選,以測試整個平臺的性能。

表2 實驗平臺

其中一臺虛擬機既作為主節點,又充當從節點,剩余三臺虛擬機為從節點,受主節點調配。

5.1 吞吐量分析

5.1.1 集群與單機對比

設置集群參與工作的節點為4,設置Spout與模型Bolt的并發度分別為1與15,分別觀察在單機與集群下進行數據閾值判斷的吞吐量,結果如圖8所示。

圖8 單機與集群吞吐量對比

從圖8可以看出,單機環境可在單位時間內處理將近6萬條左右的數據,而集群環境的處理能力則大大增加。在數據量低于5萬條左右時,單機處理要比集群性能好,隨著數據流的不斷輸入,集群的高性能便凸顯出來,不過由于涉及到機器之間的數據分發,集群相比于單機在數據處理能力上存在較大的不穩定性。本次對比實驗證明在流數據量級逐漸增大的情況下,通過集群部署的方式可以有效提高數據并行計算的能力。

5.1.2 集群節點數對比

設置全局參與進程數為4,Spout與模型Bolt的并發度仍然為1與15,分別設置集群參與工作的節點個數為2、3、4。在其他變量不變,只修改集群節點個數的前提下逐漸增加數據規模,觀察其進行數據判斷的吞吐量,結果如圖9所示。

圖9 集群節點數吞吐量對比

在數據量級分別為10萬條,30萬條與60萬條的前提下,節點數不變,增加數據量級基本不會改變系統的吞吐量,而吞吐量的細微變化可能與設備配置、任務分發與數據傳輸環境有關。在相同數據規模下,增加節點個數可以明顯看到系統吞吐量出現上升的趨勢。本次對比實驗證明通過增加參與運算的集群節點可以有效提升整個系統的并行計算能力。

5.1.3 參與進程數對比

設置集群參與工作的節點為4,設置Spout與模型Bolt的并發度仍為1與15,改變全局參與工作的進程數量為2、3、4、8,觀察增加數據規模后的系統吞吐情況,結果如圖10所示。

圖10 進程數吞吐量對比

由圖10可知,數據量級不變的情況下,隨著進程個數的增加,系統的吞吐量出現了先上升后下降的情形。例如當數據量為10萬條時,在2進程下單位時間內數據處理量為6 500條,進程個數增加到4為止,吞吐量一直處于上升趨勢,達到單位時間內8 600條,而進程個數進一步增加到8時,吞吐量下降。本次對比實驗說明在一定程度上,增加參與工作進程的個數可以提高集群的并行計算能力,但是這個數量是有限制的,這個閾值與集群的工作節點數量有關。

通過3個對比實驗結果可以得出結論,在集群部署下,通過合理設置集群節點數與工作進程數,可以有效的提升增量處理平臺針對流數據并行處理的計算能力。

5.2 延遲分析

在集群節點與進程數一定的前提下,通過修改模型Bolt的并發度,來驗證模型Bolt對于系統處理數據的延遲影響。本實驗對于延遲定義為數據從Spout出發到其完全處理完成所消耗的時間。對模型Bolt的并發度分別設置為5、15、30后,得到的延遲情況如圖11所示。

圖11 模型Bolt延遲性測試

由圖11可知,當并發度設置較小時,處理模塊無法有效的對當前數據及時處理,從而造成數據堆積現象,最終導致系統延遲急速上漲;當并發度設置為滑動窗口長度的一半時,基本上可以滿足流數據的處理需求;當并發度等于滑動窗口長度時,延遲進一步降低,但是效果提升有限,分析認為可能和設備性能瓶頸與虛擬機環境有關。結合吞吐量的測試實驗結果,說明基于滑動窗口的并行流數據處理方法可以滿足復雜重型裝備實時監測對于數據實時性的需求。

6 結束語

針對復雜重型裝備制造過程中設備的監測,本文提出了一種基于滑動窗口的流數據并行處理方法。融合了基于分區與基于時間兩種滑動窗口思想,有利于更好地適應流數據延續性和實時性;并基于時間角度與不同的業務需求,提出了并行處理滑動窗口模型,設計了并行處理算法;借助于Storm的拓撲結構,實現滑動窗口在單位周期內對接收數據的處理,基于Storm拓撲結構和Lambda架構設計流數據處理平臺,實現流數據的預處理、閾值判斷或聚合等多種處理。最后對其進行了吞吐量與延遲的測試實驗,驗證了方法與平臺的可用性。

猜你喜歡
數據處理模型
一半模型
認知診斷缺失數據處理方法的比較:零替換、多重插補與極大似然估計法*
心理學報(2022年4期)2022-04-12 07:38:02
ILWT-EEMD數據處理的ELM滾動軸承故障診斷
水泵技術(2021年3期)2021-08-14 02:09:20
重要模型『一線三等角』
重尾非線性自回歸模型自加權M-估計的漸近分布
3D打印中的模型分割與打包
MATLAB在化學工程與工藝實驗數據處理中的應用
FLUKA幾何模型到CAD幾何模型轉換方法初步研究
Matlab在密立根油滴實驗數據處理中的應用
基于POS AV610與PPP的車輛導航數據處理
主站蜘蛛池模板: 国产人人乐人人爱| 亚洲欧美成人综合| 老司机久久99久久精品播放| 国产欧美专区在线观看| 网友自拍视频精品区| 日本精品一在线观看视频| 91在线精品麻豆欧美在线| 在线观看免费人成视频色快速| 国产在线第二页| 亚洲精品无码抽插日韩| 精品视频在线观看你懂的一区| 亚洲精品国产精品乱码不卞| 国产成人综合欧美精品久久| 亚洲一本大道在线| 色综合狠狠操| 国产亚洲欧美在线中文bt天堂| 国产主播在线一区| 看看一级毛片| 国产永久在线视频| 免费又黄又爽又猛大片午夜| 国产97区一区二区三区无码| 狠狠色综合网| 中文字幕亚洲综久久2021| 国产一区三区二区中文在线| 亚洲第一成年网| 亚洲视屏在线观看| A级全黄试看30分钟小视频| 欧美精品1区2区| 在线观看免费黄色网址| 另类综合视频| 国产一级小视频| 国产成人精品第一区二区| 欧美h在线观看| 91丨九色丨首页在线播放| 亚洲国产系列| AV网站中文| 久久国产精品夜色| 国产女人在线视频| 中文字幕丝袜一区二区| 亚洲第一国产综合| 日韩天堂在线观看| 色综合久久久久8天国| 国产永久在线观看| 99热国产在线精品99| 国产真实乱子伦视频播放| 亚洲视频一区| 国产一级裸网站| 亚洲精选无码久久久| 欧美午夜小视频| 日本91在线| 亚洲综合经典在线一区二区| 狠狠亚洲五月天| 伊人激情综合网| 国产电话自拍伊人| 99热这里只有免费国产精品| 97青草最新免费精品视频| 精品一区二区三区自慰喷水| 亚洲天堂久久| 91啪在线| 亚洲人妖在线| 日韩a在线观看免费观看| 亚洲av无码牛牛影视在线二区| 国产特级毛片| 欧美精品高清| 久久精品国产一区二区小说| 亚洲综合网在线观看| 久久精品无码国产一区二区三区| 日本成人福利视频| 欧美成人A视频| 亚洲综合片| 久久精品免费看一| 好吊日免费视频| 国产精品19p| 夜色爽爽影院18禁妓女影院| 精品天海翼一区二区| 永久免费精品视频| 久爱午夜精品免费视频| 国产经典三级在线| 99热国产在线精品99| 亚洲中文字幕97久久精品少妇| 亚洲免费黄色网| 国产欧美日韩视频一区二区三区|