喬 昕,劉 峰,于碧輝
(中國(guó)科學(xué)院大學(xué),北京 100049)
(中國(guó)科學(xué)院 沈陽(yáng)計(jì)算技術(shù)研究所,沈陽(yáng) 110168)
隨著新一代信息技術(shù)的發(fā)展,制造業(yè)與信息技術(shù)的融合應(yīng)用已經(jīng)成為研究的熱點(diǎn).基于DSP與FPGA的數(shù)字信號(hào)處理技術(shù)在實(shí)時(shí)數(shù)字信號(hào)處理方面已經(jīng)較為成熟[1],可以滿足常規(guī)應(yīng)用場(chǎng)景下的數(shù)字信號(hào)處理需求.但是該技術(shù)存在諸多限制,首先其對(duì)于歷史數(shù)據(jù)的分析處理能力不足,其次受到存儲(chǔ)空間的制約,所以需要將工業(yè)設(shè)備采集到的數(shù)據(jù)通過(guò)采樣縮小樣本空間,再使用計(jì)算單元進(jìn)行計(jì)算.但是由于工業(yè)設(shè)備的精度越來(lái)越高、功能越來(lái)越豐富,使得其產(chǎn)生的數(shù)據(jù)呈現(xiàn)了大數(shù)據(jù)特征,如數(shù)據(jù)量大、數(shù)據(jù)類型多、數(shù)據(jù)價(jià)值密度低等.這就需要使用一種新的面向海量歷史信號(hào)數(shù)據(jù)分析的方案解決以上問(wèn)題,作為對(duì)離線數(shù)字信號(hào)處理方案的有效補(bǔ)充.
隨著HDFS等分布式文件系統(tǒng)的出現(xiàn),存儲(chǔ)海量數(shù)據(jù)已經(jīng)成為可能.然而數(shù)字信號(hào)處理算法是迭代式的計(jì)算,在計(jì)算過(guò)程中會(huì)產(chǎn)生大量的中間結(jié)果,并且這些中間結(jié)果還需要參與下一輪計(jì)算,所以中間結(jié)果的存儲(chǔ)就可能變成整個(gè)系統(tǒng)的性能瓶頸.Spark是面向大數(shù)據(jù)處理的并行計(jì)算引擎,在大數(shù)據(jù)處理方面有廣泛的應(yīng)用,并且它是基于內(nèi)存計(jì)算的[2],在計(jì)算過(guò)程中數(shù)據(jù)是存儲(chǔ)在內(nèi)存中的,不存在由于I/O而造成的時(shí)間和磁盤空間的消耗.由于Spark比較適合迭代計(jì)算,因此工業(yè)大數(shù)據(jù)分析平臺(tái)可以采用Spark作為實(shí)時(shí)信號(hào)處理和離線信號(hào)處理加速的計(jì)算引擎,但是目前該分析平臺(tái)仍然缺少適用于分布式并行計(jì)算引擎的數(shù)字信號(hào)處理等數(shù)學(xué)計(jì)算的解決方案.
針對(duì)這些問(wèn)題,本文提出了基于Spark并行計(jì)算引擎的分布式數(shù)字信號(hào)處理算法庫(kù),用于解決大規(guī)模數(shù)據(jù)下的離線式數(shù)字信號(hào)處理問(wèn)題,為面向分析的工業(yè)大數(shù)據(jù)應(yīng)用場(chǎng)景提供支撐.該算法庫(kù)不僅適用于大規(guī)模數(shù)據(jù)的并行處理,在速度和性能方面也有很大的提升.
本文主要分為四個(gè)部分,第一部分介紹了該分布式數(shù)字信號(hào)處理算法庫(kù)的功能組織形式和Spark層面的架構(gòu)設(shè)計(jì).第二部分完成了對(duì)多種常用數(shù)字信號(hào)處理算法的分布式實(shí)現(xiàn),并以FFT算法和DFT算法為例分別介紹了兩類不同算法的分布式實(shí)現(xiàn)思想.第三部分對(duì)本文設(shè)計(jì)的分布式數(shù)字信號(hào)處理算法庫(kù)進(jìn)行了正確性測(cè)試和性能分析.第四部分對(duì)本文進(jìn)行了總結(jié)以及對(duì)未來(lái)工作的展望.
本文所設(shè)計(jì)的分布式數(shù)字信號(hào)處理算法庫(kù)基于RDD技術(shù)實(shí)現(xiàn),部署在Spark上,利用了Spark基于內(nèi)存計(jì)算的特性,更加適用于大規(guī)模數(shù)據(jù)的快速并行計(jì)算[3–5].
該算法庫(kù)的結(jié)構(gòu)主要包含三部分:
第一部分為計(jì)算平臺(tái)接口及數(shù)學(xué)算法封裝,主要包括Spark并行計(jì)算API的封裝以及矩陣、向量計(jì)算的數(shù)學(xué)算法庫(kù)的封裝.這部分主要完成包括矩陣、向量等數(shù)據(jù)結(jié)構(gòu)的定義和計(jì)算.
第二部分為算法工具包,包括優(yōu)化算法、特征提取、算法評(píng)估、外部數(shù)據(jù)讀入等.這部分主要用于數(shù)據(jù)預(yù)處理和應(yīng)用分布式算法計(jì)算過(guò)程中的調(diào)優(yōu)與結(jié)果評(píng)估.
第三部分為分布式算法集合,包含了常用的數(shù)字信號(hào)處理算法的分布式實(shí)現(xiàn).
算法庫(kù)的功能組織圖如圖1所示.圖1中的分布式算法集合是算法庫(kù)結(jié)構(gòu)中的第三部分,即在Spark下以分布式方式實(shí)現(xiàn)的常用的數(shù)字信號(hào)處理算法.工具包是算法庫(kù)結(jié)構(gòu)中的第一部分與第二部分,作為分布式算法集合的輔助工具,主要用于數(shù)據(jù)預(yù)處理、結(jié)果評(píng)估和數(shù)學(xué)計(jì)算.

圖1 算法庫(kù)功能組織圖
從能否進(jìn)行并行化計(jì)算優(yōu)化的角度,算法庫(kù)的數(shù)字信號(hào)處理算法大致可以分為兩類,第一類是以離散傅里葉變換(DFT)算法為代表的無(wú)法通過(guò)基于時(shí)間的拆分進(jìn)行優(yōu)化的算法,這一類算法將基于Spark并行計(jì)算架構(gòu)進(jìn)行計(jì)算,在保證計(jì)算正確性的同時(shí),實(shí)現(xiàn)大規(guī)模數(shù)據(jù)支撐與加速計(jì)算.第二類是可以通過(guò)基于時(shí)間的拆分進(jìn)行優(yōu)化的算法,這類算法在基于Spark并行計(jì)算框架實(shí)現(xiàn)大規(guī)模數(shù)據(jù)支撐與加速計(jì)算的基礎(chǔ)上,通過(guò)基于時(shí)間的拆分實(shí)現(xiàn)算法優(yōu)化,達(dá)到二次加速目的,快速傅里葉變換(FFT)算法就是這類算法的典型代表[6–8].
圖2所示為Spark層面的算法庫(kù)的架構(gòu)圖.其中包括兩部分,通過(guò)拆分優(yōu)化加速部分即為對(duì)第二類算法的處理部分,當(dāng)?shù)诙愃惴ㄟM(jìn)入該部分時(shí),首先會(huì)對(duì)數(shù)據(jù)進(jìn)行預(yù)處理,然后根據(jù)具體算法的拆分方式進(jìn)行數(shù)學(xué)拆分.而基于Spark計(jì)算框架優(yōu)化部分即為對(duì)第一類算法的處理部分,由于第一類算法無(wú)法進(jìn)行公式拆分,所以這部分直接對(duì)其數(shù)學(xué)公式進(jìn)行累加計(jì)算,并使用Spark計(jì)算引擎的內(nèi)部特性,如內(nèi)存計(jì)算特性等對(duì)算法的計(jì)算過(guò)程進(jìn)行優(yōu)化加速.

圖2 Spark層面算法庫(kù)架構(gòu)圖
對(duì)于某一具體算法,無(wú)論是適用于拆分優(yōu)化還是計(jì)算框架優(yōu)化,當(dāng)其通過(guò)算法庫(kù)進(jìn)行處理后,便可交由Spark計(jì)算引擎進(jìn)行并行化計(jì)算,從而得到分布式計(jì)算結(jié)果.
算法庫(kù)在設(shè)計(jì)和實(shí)現(xiàn)上充分考慮了對(duì)于數(shù)字信號(hào)處理算法的擴(kuò)展性,所以對(duì)于本文中沒(méi)有涉及的數(shù)字信號(hào)處理算法可以在完成相應(yīng)的分布式實(shí)現(xiàn)后按照功能加載入相應(yīng)的模塊中.
由于傅里葉變換(FFT)算法可以作為大多數(shù)數(shù)字信號(hào)處理算法的實(shí)現(xiàn)基礎(chǔ),所以本文以FFT算法為例,作為第二類算法的實(shí)現(xiàn)代表,詳細(xì)介紹其在Spark并行計(jì)算引擎下基于并行計(jì)算優(yōu)化的數(shù)字信號(hào)處理算法的實(shí)現(xiàn)思想.分布式FFT算法的思想可以作為通用實(shí)現(xiàn)思想,對(duì)算法庫(kù)中的其他同類算法在此基礎(chǔ)上做適應(yīng)性調(diào)整,從而均可完成其他算法的分布式實(shí)現(xiàn).
Spark中的數(shù)據(jù)是通過(guò)RDD進(jìn)行存儲(chǔ)和表示的,使用map和reduce函數(shù)進(jìn)行RDD之間的轉(zhuǎn)換[3–5].通過(guò)文獻(xiàn)[6–8]可知,FFT算法的關(guān)鍵步驟為蝶形運(yùn)算和變址運(yùn)算,所以本部分主要介紹對(duì)蝶形運(yùn)算和變址運(yùn)算中的map和reduce的設(shè)計(jì)思想.除此之外,由于FFT算法可以使用按照時(shí)間拆分的基2方法進(jìn)行優(yōu)化計(jì)算,所以在本部分中還詳細(xì)介紹了數(shù)據(jù)填充部分的步驟,作為對(duì)原始數(shù)據(jù)的預(yù)處理環(huán)節(jié).
2.1.1 數(shù)據(jù)填充
由于FFT算法可以使用按照時(shí)間抽取的基2方法實(shí)現(xiàn),其核心思想是將N點(diǎn)序列按照奇偶序進(jìn)行逐級(jí)迭代拆分,直到最后只剩下兩項(xiàng)做累加計(jì)算時(shí)停止,所以要求原始數(shù)據(jù)量必須是2的冪次方,否則無(wú)法進(jìn)行拆分.這就需要先對(duì)原始數(shù)據(jù)集進(jìn)行數(shù)據(jù)量大小的判斷,若數(shù)據(jù)量為2的冪次方,則無(wú)需進(jìn)行填充,可直接用于計(jì)算;若數(shù)據(jù)量不是2的冪次方,則需要在數(shù)據(jù)末尾進(jìn)行填充至大于原始數(shù)據(jù)量的最小的2的冪次方.這里使用的填充方式為零填充,即在數(shù)據(jù)集末尾補(bǔ)零.具體過(guò)程如下所示:
(1)創(chuàng)建RDD用于存儲(chǔ)原始數(shù)據(jù).
(2)將RDD傳遞給map函數(shù),map函數(shù)首先會(huì)為每條數(shù)據(jù)賦予一個(gè)遞增的key值,然后判斷該數(shù)據(jù)塊是否需要填充,若需要?jiǎng)t進(jìn)行填充,否則不做任何操作.map過(guò)程將會(huì)產(chǎn)生新的RDD.
(3)將第二步的RDD傳遞給reduce函數(shù)進(jìn)行簡(jiǎn)單格式整合,生成這一階段的結(jié)果RDD.
經(jīng)過(guò)這一階段后產(chǎn)生的結(jié)果RDD即為符合長(zhǎng)度要求的數(shù)據(jù)集,存儲(chǔ)在內(nèi)存中,以便進(jìn)行下一階段操作.
2.1.2 變址運(yùn)算
由于按照時(shí)間抽取的基2-FFT算法的結(jié)果序列的存儲(chǔ)下標(biāo)與輸入序列的存儲(chǔ)下標(biāo)不同,而原始序列的存儲(chǔ)下標(biāo)又是按序排列的,與結(jié)果序列相同,所以需要將原始序列變換為輸入序列.本階段的具體步驟如下:
(1)將經(jīng)過(guò)數(shù)據(jù)填充階段的結(jié)果RDD作為該階段的原始序列.
(2)將原始序列傳遞給map函數(shù),map函數(shù)會(huì)將每條記錄的key值轉(zhuǎn)換為二進(jìn)制表示,然后將二進(jìn)制key值進(jìn)行逆序轉(zhuǎn)換,再將逆序key值排序,最后把排序好的結(jié)果存儲(chǔ)在新的RDD中.
(3)將第二步生成的RDD傳遞給reduce函數(shù)進(jìn)行簡(jiǎn)單格式整合,生成新的RDD.該RDD中存儲(chǔ)的數(shù)據(jù)即為符合要求的輸入序列.
2.1.3 蝶形運(yùn)算
本階段是FFT算法的核心.在這一階段中會(huì)涉及復(fù)數(shù)運(yùn)算,由于RDD可以被抽象地理解為一個(gè)大的數(shù)組,它存儲(chǔ)的不是真實(shí)數(shù)據(jù),而只是真實(shí)數(shù)據(jù)的分區(qū)信息和針對(duì)每個(gè)分區(qū)的讀取方法,所以可以直接使用RDD進(jìn)行復(fù)數(shù)的操作[3,4].具體步驟如下所示:
(1)將經(jīng)過(guò)變址運(yùn)算階段的RDD作為輸入序列.
(2)將輸入序列傳遞給map函數(shù),map函數(shù)將每條記錄的key值更換成統(tǒng)一值.
(3)將第二步的結(jié)果傳遞給reduce函數(shù),在reduce函數(shù)中按照傳統(tǒng)的FFT算法中的蝶形運(yùn)算進(jìn)行計(jì)算,并將結(jié)果進(jìn)行格式整合,輸出到文件中.
這一過(guò)程結(jié)束后,生成的結(jié)果文件中存儲(chǔ)的數(shù)據(jù)即為原始信號(hào)數(shù)據(jù)經(jīng)過(guò)FFT之后的計(jì)算結(jié)果,接下來(lái)可以使用其他工具進(jìn)行畫圖分析.
對(duì)于第一類算法,則可以使用Spark集群的并行計(jì)算優(yōu)勢(shì)進(jìn)行加速計(jì)算.由于Spark集群可以利用多臺(tái)處理器的存儲(chǔ)能力和計(jì)算能力,即將任務(wù)分配給多臺(tái)機(jī)器協(xié)同工作,從而使該類算法的累加計(jì)算成為可能,并且還能利用Spark計(jì)算引擎的特性在性能方面實(shí)現(xiàn)優(yōu)化.以離散傅里葉變換(DFT)算法為例,具體實(shí)現(xiàn)步驟如下:
(1)創(chuàng)建RDD用于存儲(chǔ)原始數(shù)據(jù).
(2)將RDD傳遞給map函數(shù),map函數(shù)按照該算法的數(shù)學(xué)公式進(jìn)行逐級(jí)累加,并產(chǎn)生新的RDD用于存儲(chǔ)中間結(jié)果.
(3)將第二步的RDD傳遞給reduce函數(shù)進(jìn)行簡(jiǎn)單格式整合,并將結(jié)果輸出至文件存儲(chǔ).
本部分對(duì)本文所設(shè)計(jì)的算法庫(kù)進(jìn)行正確性測(cè)試,并分析在不同數(shù)據(jù)量的情況下該算法庫(kù)的運(yùn)行時(shí)間的變化趨勢(shì).本次實(shí)驗(yàn)使用由3個(gè)節(jié)點(diǎn)構(gòu)成的Spark集群,3個(gè)節(jié)點(diǎn)的硬件配置相同,均為16 G內(nèi)存,4 GHz主頻,4核CPU.
本實(shí)驗(yàn)用于驗(yàn)證算法庫(kù)中實(shí)現(xiàn)的分布式算法的正確性.所使用的測(cè)試數(shù)據(jù)集為對(duì)六組信號(hào)采集器采集的振動(dòng)信號(hào)使用采樣頻率為256、采樣點(diǎn)數(shù)為256進(jìn)行采樣得到的采樣數(shù)據(jù)集.該數(shù)據(jù)集以文件形式存儲(chǔ),數(shù)據(jù)之間以逗號(hào)分隔.將該數(shù)據(jù)集分別通過(guò)本文設(shè)計(jì)的算法庫(kù)和Matlab中的對(duì)應(yīng)算法進(jìn)行計(jì)算,將計(jì)算結(jié)果存儲(chǔ)在結(jié)果文件中,并對(duì)結(jié)果文件中的數(shù)據(jù)進(jìn)行對(duì)比.
通過(guò)比較計(jì)算結(jié)果可知,該數(shù)據(jù)集通過(guò)本文設(shè)計(jì)的算法庫(kù)進(jìn)行計(jì)算與使用Matlab對(duì)應(yīng)算法進(jìn)行計(jì)算得到的結(jié)果相同,即可以說(shuō)明本算法庫(kù)中實(shí)現(xiàn)的分布式算法均能正確完成數(shù)字信號(hào)處理的功能.
本實(shí)驗(yàn)用于測(cè)試該算法庫(kù)的性能.實(shí)驗(yàn)使用六組輸入信號(hào),均為信號(hào)采集器采集的振動(dòng)信號(hào),以文件形式存儲(chǔ),數(shù)據(jù)之間以逗號(hào)分隔,具體參數(shù)如表1所示.

表1 信號(hào)文件參數(shù)列表
經(jīng)過(guò)本文設(shè)計(jì)的算法庫(kù)中各算法計(jì)算后,得到每個(gè)文件中數(shù)據(jù)的計(jì)算結(jié)果,并存儲(chǔ)在結(jié)果文件中.
以上六個(gè)文件的數(shù)據(jù)量與運(yùn)行時(shí)間的關(guān)系圖如圖3所示.
由上圖可知,當(dāng)數(shù)據(jù)集較小時(shí),數(shù)據(jù)量與時(shí)間基本成線性關(guān)系,隨著數(shù)據(jù)集的增大,計(jì)算時(shí)間呈現(xiàn)非線性增長(zhǎng)趨勢(shì).出現(xiàn)此趨勢(shì)的原因是集群的存儲(chǔ)能力和計(jì)算能力受節(jié)點(diǎn)數(shù)量的影響.

圖3 數(shù)據(jù)量-運(yùn)行時(shí)間關(guān)系圖
本文基于大規(guī)模數(shù)據(jù)下的數(shù)字信號(hào)處理及離線分析的需求,設(shè)計(jì)了基于Spark并行計(jì)算引擎的分布式數(shù)字信號(hào)處理算法庫(kù).詳細(xì)介紹了該算法庫(kù)的架構(gòu)設(shè)計(jì),并以FFT算法和DFT算法為例介紹了兩類數(shù)字信號(hào)處理算法的并行化計(jì)算實(shí)現(xiàn)及基于Spark計(jì)算引擎的優(yōu)化方法,最后對(duì)該算法庫(kù)進(jìn)行了正確性測(cè)試和性能分析.根據(jù)測(cè)試結(jié)果可以看出,本文設(shè)計(jì)的分布式數(shù)字信號(hào)處理算法庫(kù)能夠正確完成數(shù)字信號(hào)處理的功能,并且能夠處理大規(guī)模數(shù)據(jù)集,可以滿足工業(yè)大數(shù)據(jù)分析平臺(tái)對(duì)于大規(guī)模數(shù)據(jù)集進(jìn)行數(shù)字信號(hào)處理的需求.
下一階段的工作內(nèi)容主要是完成對(duì)本文實(shí)現(xiàn)的分布式數(shù)字信號(hào)處理算法庫(kù)的性能評(píng)估與調(diào)優(yōu)和基于Spark計(jì)算引擎的其他分布式數(shù)字信號(hào)處理算法的優(yōu)化方法研究.