黎文陽
(四川大學計算機學院,成都 610065)
大數據處理模型Apache Spark研究
黎文陽
(四川大學計算機學院,成都610065)
MapReduce計算模型在大規模數據分析領域已取得很大成績,并被很多公司廣泛采用。這些系統都是基于非循環的數據流模型,有很好的容錯性,同時為開發人員提供了高級接口以便于編寫并行程序。目前這些系統能很容易地訪問集群中的計算資源,但是不能充分地利用分布式內存,導致了對那些重用中間結果的應用不是很有效。這些應用的特點是在多個并行操作之間重用數據,例如機器學習中的PageRank算法、K-means聚類算法、邏輯回歸算法等迭代式算法。交互式的數據挖掘算法中也經常重用數據。Spark計算模型剛好解決了這些問題,并且能在Hadoop集群下部署,訪問HDFS文件系統。Spark將分布式內存抽象成彈性分布式數據集(Resilient Distributed Datasets,RDD)[1]。RDD支持基于工作集的應用,同時具有數據流模型的特點:自動容錯、位置感知調度和可伸縮性。RDD允許用戶在執行多個查詢時顯式地將工作集緩存在內存中,以便后續的查詢能夠重用,這極大地提升了查詢速度。
Spark是UC Berkeley AMPLab于2009年發起的,然后被Apache軟件基金會接管的類Hadoop MapReduce通用性并行計算框架,是當前大數據領域最活躍的開源項目之一。Spark是基于MapReduce計算框架實現的分布式計算,擁有Hadoop MapReduce所具有的優點;但不同于MapReduce的是中間輸出和結果可以保存在內存中,從而不再需要讀寫HDFS,因此Spark更適用于數據挖掘與機器學習等需要迭代的算法。如圖1所示,邏輯回歸算法在Hadoop和Spark上的運行時間對比圖,可以看出Spark的效率有很大的提升[3]。
Spark由Scala[4]語言實現的,Scala是一種基于JVM的函數式編程語言,提供了類似DryadLINQ[5]的編程接口。而且Spark還提供了一個修改的Scala語言解釋器,能方便地用于交互式編程,用戶可以定義變量、函數、類以及RDD。
Spark集成了豐富的編程工具,其中Spark SQL用于SQL語言和結構化數據處理,Spark Streaming用于流處理,MLlib用于機器學習算法,GraphX用于圖處理。Spark不但能夠訪問多種數據源,例如 HDFS、Cassandra、HBase、Amazon S3,還提供了Scala、Java和Python三種語言的API接口,以便于編寫并行程序。而且Spark還能部署在已有的Hadoop系統上,由YARN進行集群調度,極大地利用了Hadoop系統。

圖1 邏輯回歸算法在Hadoop和Spark上的運行時間
1.1適用場景
Spark適用于那些在多個并行操作之間重用數據的應用,而MapReduce在這方面效率并不高,因為MapReduce和DAG引擎是基于非循環數據流的,即一個應用被分成一些不同的作業(job),每個作業從磁盤中讀數據,然后再寫到磁盤[2]。Spark不太適合那些異步更新共享狀態的應用,例如并行Web爬行器。Spark的適用場景有:
●迭代式算法:許多機器學習算法都用一個函數對相同的數據進行重復的計算,從而得到最優解。MapReduce計算框架把每次迭代看成是一個MapReduce作業 (job),而每個作業都要從磁盤重新加載數據,這就導致了效率不高,而Spark可以把中間數據緩存到內存中加快計算效率。
●交互式數據分析:用戶經常會用SQL對大數據集合做臨時查詢(Ad-Hoc Query)。Hive把每次查詢都當作一個獨立的MapReduce作業,并且從磁盤加載數據,有很大的延遲,而Spark可以把數據加載到內存中,然后重復的查詢。
●流應用:即需要實時處理的應用,這類應用往往需要低延遲,高效率。
1.2組成部分
目前 Spark由四部分構成:Spark SQL、MLlib、GraphX、Spark Streaming,如圖2所示。

圖2 Spark組成部分
(1)Spark SQL:是Spark處理SQL和結構化數據工具,Spark引入了SchemaRDD的數據抽象,使其能以統一地、高效地訪問和查詢各種不同的數據源,例如Apache Hive表、parquet文件、JSON文件。Spark SQL兼容Apache Hive,能重用Hive的前端和元存儲。Spark SQL API能像查詢RDD一樣查詢結構化的數據,并且Spark SQL還提供了JDBC/ODBC的服務端模式,以便建立JDBC/ODBC數據連接。
(2)MLlib(Machine Learning):是Spark提供的機器學習庫,包含了常見的機器學習算法。Spark擅長于迭代式計算,所以與MapReduce相比,MLlib中的算法效率更高,性能更好。常見的算法有:
①SVM、邏輯回歸(logistic regression)、線性回歸(linear regression)、樸素貝葉斯(naive Bayes)
②K均值算法(K-means)
③奇異值分解(singular value decomposition)
④特征提取與轉換(feature extraction and transformation)
(3)GraphX(Graph Processing):是Spark處理圖(graph)的框架,利用Pregel API可以用RDD有效地轉換(transform)和連接(join)圖,實現圖算法。GraphX在速度上可與最快的專用圖處理系統相媲美。
(4)Spark Streaming:是Spark處理流應用的庫。其結合了批處理查詢與交互式查詢,方便重用批處理的代碼和歷史數據?;驹硎荢park將流數據分成小的時間片斷(幾秒),以類似批處理的方式來處理這小部分數據。Spark Streaming API能像編寫批處理作業一樣構建可擴展的流應用。Spark Streaming也能訪問各種不同的數據源,例如能夠從HDFS、Flume、Kafka、Twitter和ZeroMQ中讀取數據。
1.3部署模式
Spark集群如圖3劃分,主要有驅動程序、集群管理程序,以及各worker節點上的執行程序[3]。
Spark應用的主程序稱為驅動程序 (driver program),其中包含SparkContext對象,用于連接集群管理程序(cluster manager)。集群管理程序用于分配集群中的資源,目前有三種:Spark獨自的集群管理程序、Mesos[6]和 YARN[7]。Spark應用的運行過程是,Spark-Context對象首先連接集群管理程序,然后Spark獲取集群中各個節點上的執行程序(executor),執行程序是用于計算和存儲數據的進程,然后Spark把代碼發送到執行程序,最后運行執行程序上的任務。所以Spark目前支持3種集群部署模式:Standalone模式、Apache Mesos模式、Hadoop YARN模式。

圖3 Spark集群組成部分
(1)Standalone模式:即獨立模式,使用Spark自帶的集群管理程序,好處是不需要額外的軟件就能運行,配置簡單。
(2)Apache Mesos模式:需要安裝Mesos資源管理程序,Spark的集群管理就交給Mesos處理了。使用Mesos的好處是,可以在Spark與其他框架之間或多個Spark實例之間動態地劃分。
(3)Hadoop YARN模式:利用已有的Hadoop集群,讓Spark在Hadoop集群中運行,訪問HDFS文件系統,使用YARN資源調度程序。Spark on YARN是Spark 0.6.0版本加入的,該模式需要額外的配置參數。
Spark既可以在單機上運行,也可以在Amazon EC2上運行。Spark提供了相應的配置文件、啟動腳本、結束腳本用于配置、啟動、結束Spark集群中的master 和slave。也提供了相應的Web監控系統與日志系統,方便地監控與調試程序。而且還可以配置Zookeeper以保證高可用性。
Spark最主要的抽象就是彈性分布式數據集(Resilient Distributed Datasets,RDD)以及對RDD的并行操作(例如map、filter、groupBy、join)。而且,Spark還支持兩種受限的共享變量 (shared variables):廣播變量(broadcast variables)和累加變量(accumulators)。
2.1RDD
RDD是只讀的對象集合,RDD分區分布在集群的節點中。如果某個節點失效,或者某部分數據丟失,RDD都能重新構建。Spark將創建RDD的一系列轉換記錄下來,以便恢復丟失的分區,這稱為血系(lineage)。每次對RDD數據集的操作之后的結果,都可以緩存到內存中,下一個操作可以直接從內存中輸入,省去了MapReduce大量的磁盤操作。RDD只支持粗粒度轉換,即在大量記錄上執行的單個操作。雖然只支持粗粒度轉換限制了編程模型,但RDD仍然可以很好地適用于很多應用,特別是支持數據并行的批量分析應用,包括數據挖掘、機器學習、圖算法等,因為這些程序通常都會在很多記錄上執行相同的操作。
使用RDD的好處有:
●RDD只能從持久存儲或通過轉換(transformation)操作產生,相比于分布式共享內存(DSM)可以更高效地實現容錯,對于丟失部分數據分區只需根據它的血系就可重新計算出來,而不需要做特定的檢查點(checkpoint)。
●RDD的不變性,可以實現類MapReduce的預測式執行。
●RDD的數據分區特性,可以通過數據的本地性來提高性能,這與MapReduce是一樣的。
●RDD是可序列化的,當內存不足時可自動改為磁盤存儲,把RDD存儲于磁盤上,此時性能會有大的下降但不會差于現有的MapReduce。
在Spark中,RDD是一個Scala對象,對RDD的并行操作即是調用對象上的方法。有四種方法創建一個RDD:
(1)通過一個文件系統中的文件創建,例如常見的HDFS文件。
(2)通過并行化Scala集合,即把一個集合切分成很多片,然后發送到各種節點。
(3)通過對已有的RDD執行轉換操作,可以得到一個新的RDD。例如通過flatMap可以把類型1的RDD轉換成類型2的RDD。
(4)通過把RDD持久化。RDD默認是惰性的,即只有當RDD在執行并行操作時,RDD才被物化,執行完后即被釋放。用戶可以通過顯式的cache或save操作使RDD持久化。
2.2并行操作
作用在RDD上的并行操作有兩種:轉換(transformation)和動作(action),轉換返回一個新的RDD,動作返回一個值或把RDD寫到文件系統中。轉換是惰性的,即從一個RDD轉換成另一個RDD不是馬上執行的,Spark只是記錄這樣的操作,并不執行,等到有動作操作時才會啟動計算過程。常見的轉換(transformation)如表1所示,常見的動作(action)如表2所示。

表1 常見的轉換(transformation)操作
注:有些操作只對key有效,例如join、groupByKey, reduceByKey。除了這些操作以外,用戶還可以請求將RDD緩存起來。而且,用戶還可以通過Partitioner類獲取RDD的分區順序,然后將另一個RDD按照同樣的方式分區。
2.3共享變量
通常情況下,Spark中的map、filter、reduce等函數的參數是一個函數(閉包),運行時這些函數參數被復制到各個worker節點上,互不干擾。Spark還提供了共享變量用于其他用途,常見的有兩種:
(1)廣播變量(broadcast variables):對于大量的只讀數據,當有多個并行操作時,最好只復制一次而不是每執行一次函數就復制一次到各個worker節點。廣播變量就是用于這種情況,它只是包裝了一下原有的數據,然后只復制一次到各worker節點。
(2)累加變量(accumulators):累加變量只能用于關聯操作,并且只有驅動程序才能讀取。只要某個類型有“add”操作和“0”值都可以是累加變量。累加變量經常用于實現MapReduce的計數器,而且由于是只加性的,所以很容易實現容錯性。

表2 常見的動作(action)操作
Spark提供了 Scala、Java和 Python三種語言的API。而且Spark程序既可以通過交互式Spark shell運行 (Scala或Python語言),又能以獨立的程序運行(Scala、Java或Python語言)。下面這些編程示例使用Scala語言,在Spark shell下執行[8]。Scala是一種基于JVM的函數式編程語言。
Spark的首要抽象便是彈性分布式數據集RDD,所以編程的主要任務就是編寫驅動程序,創建RDD,然后對RDD執行并行操作。RDD既能從外部文件中創建(例如HDFS文件),又能從對其他RDD執行轉換操作(transformation)得到。對RDD有兩種操作:一種是轉換操作,產生新的RDD;另一種是動作操作(action),開始一個作業并返回值。首先配置好Spark運行環境,然后啟動Spark集群,在此不再細述。
3.1文本搜索
本示例搜索某個HDFS日志文件的ERROR信息:


Spark還能顯示的緩存RDD,只需執行cache操作:

3.2單詞統計
本示例統計某個HDFS文件的各個單詞出現的次數,并將結果保存到HDFS文件:

3.3估算PI值
Spark也用于計算密集型任務,本示例使用“扔飛鏢法”估算PI值,在(0,0)-(1,1)的正方形中,隨機生成坐標(x,y),統計落在圓內的點數,那么落在圓內的點數/總點數等于PI/4:

本文大致介紹了Spark系統的基本概念與核心思想,并給出了編程示例。Spark最重要的抽象就是RDD,一種有效的、通用的分布式內存抽象,它解決了集群環境下并行處理大數據的效率問題,比Hadoop MapReduce的效率高,特別適用于機器學習中的迭代式算法和交互式數據分析等特殊的應用場景。目前,Spark是非常流行的內存計算框架,一直在發布新版本,還處于比較活躍的開發階段。當前遇到的技術挑戰有:①資源調度程序如何為Spark作業確定合適的資源需求;②Spark如何更好地與YARN集群管理程序配合,使系統最優;③提供更多的編程API供用戶使用。
[1]Zaharia M,Chowdhury M,Das T,et al.Resilient Distributed Datasets:A Fault-Tolerant Abstraction for in-Memory Cluster Computing [C].Proceedings of the 9th USENIX Conference on Networked Systems Design and Implementation.USENIX Association,2012:2-2
[2]Zaharia M,Chowdhury M,Franklin M J,et al.Spark:Cluster Computing with Working Sets[C].Proceedings of the 2nd USENIX Conference on Hot Topics in Cloud Computing,2010:10~10
[3]Spark[EB/OL].http://spark.apache.org
[4]Scala[EB/OL].https://www.scala-lang.org
[5]Yu Y,Isard M,Fetterly D,et al.DryadLINQ:A System for General-Purpose Distributed Data-Parallel Computing Using a High-Level Language[C].OSDI.2008,8:1·14
[6]Hadoop MapReduce Tutorial[EB/OL].http://hadoop.apache.org/docs/r1.2.1/mapred_tutorial.html
[7]Apache Mesos.http://mesos.apache.org
[8]Spark Programming Guides[EB/OL].http://spark.apache.org/docs/1.1.0/quick-start.html
Spark;Hadoop;MapReduce;Big Data;Data Analysis
Research on Apache Spark for Big Data Processing
LI Wen-yang
(College of Computer Science,Sichuan University,Chengdu 610065)
1007-1423(2015)08-0055-06
10.3969/j.issn.1007-1423.2015.08.013
黎文陽(1990-),男,河南信陽人,碩士研究生,研究方向為分布式與數據庫
2015-02-10
2015-02-28
Apache Spark是當前流行的大數據處理模型,具有快速、通用、簡單等特點。Spark是針對MapReduce在迭代式機器學習算法和交互式數據挖掘等應用方面的低效率,而提出的新的內存計算框架,既保留了MapReduce的可擴展性、容錯性、兼容性,又彌補了MapReduce在這些應用上的不足。由于采用基于內存的集群計算,所以Spark在這些應用上比MapReduce快100倍。介紹Spark的基本概念、組成部分、部署模式,分析Spark的核心內容與編程模型,給出相關的編程示例。
Spark;Hadoop;MapReduce;大數據;數據分析
Apache Spark is a popular model for large scale data processing at present,which is fast,general and easy.Compared with the MapReduce computing framework,Spark is efficient in iterative machine learning algorithms and interactive data mining applications while retaining the compatibility,scalability and fault-tolerance of MapReduce.With its in-memory computing,Spark is up to 100x faster than Hadoop MapReduce in memory.Presents the basic conception,component and the deploying mode of Spark,introduces the internal abstraction and the programming model,gives the programming examples.