黃必棟
(南京鐵道職業技術學院 智能工程學院 江蘇省南京市 210000)
Python 和R 語言是數據分析中的主流編程語言,由于Python 的Pandas 庫的流行,Python 成為數據分析的熱門編程語言。使用Pandas 進行數據分析適用于單機小規模數據分析場景,無法適應大規模數據處理和計算要求[1][2]。Spark則是大數據處理和迭代計算的主流計算平臺[3],支持Python語言,PySpark 即是Spark API 的Python 語言接口。由于Python 在數據分析中的優勢,基于PySpark 的大數據分析方法逐漸成為大數據分析的熱門方法,在PyPI 上每月有5百萬的下載量,PySpark 是Spark 最新發展中的重點領域[1]。在使用PySpark 進行大數據分析時,由于使用方法靈活、功能多變以及Python 語言和Spark 本身的特性,使用者往往較難評估使用PySpark 的不同方法對大數據處理的性能影響[4],以及高效地利用現有的Python 數據科學生態進行大數據分析。本文將詳細介紹和分析基于PySpark 的大數據分析方法,第1 章介紹了使用PySpark 進行數據分析方法的特點;第2 章分析了PySpark 的基本架構,Python UDF 的演化和性能影響,以及與Pandas 庫的集成方法;第3 章介紹了Spark 的運行方法和最佳實踐;第4 章提出了PySpark 結合Pandas 庫進行時序分析的方法,通過此方法闡述了如何通過PySpark 和Pandas 進行大數據時序分析。
數據處理和分析中的開發與軟件開發的方法和側重點有所不同,數據處理和分析是面向數據的編程,強調開發的敏捷性和高效性,注重數據處理的性能。使用PySpark 進行數據處理和分析編程,具有較高的開發效率,同時PySpark的架構和設計也兼顧數據處理的性能。在Spark 平臺中使用PySpark 接口與使用Java 或Scala 接口進行數據分析的方法區別在于:
使用Java 或Scala 接口進行數據處理,需要將代碼編譯打包為jar 包,再提交至Spark 運行。而使用PySpark,由于Python 語言的特性,無需編譯,直接提交源碼即可運行,開發效率較高。
使用PySpark 可以方便、靈活地進行交互式編程,既可使用Spark 自帶的PySpark shell,也可使用交互性更強的iPython、Jupyter Notebook 等交互式編程工具。而使用Java或Scala 接口,可供使用的交互式編程工具較少,Spark 自帶的spark-shell 工具僅提供了Scala 語言的交互式編程。
UDF 可以用Python 編寫,方便了數據分析師的使用。同時UDF 還能調用現有代碼和Python 庫,使得Python 代碼可以復用。
Spark 使用Scala 語言開發,運行于JVM 之上,比較容易提供Scala 或Java 接口,而Python 程序與JVM 不兼容,較難提供Python 接口。Spark 為了提供Python 接口,同時又能兼顧數據處理性能,設計上進行了多次迭代和改進[5]。PySpark 由Spark 0.7 版本引入,其架構如圖1 所示。Driver端由Python 進程和JVM 進程構成,他們之間通過進程間通訊方法進行接口調用和數據交換。PySpark 基于Spark Java接口實現,通過使用Py4j,使得Python 代碼能夠調用Java代碼。Executor 端主要還是基于JVM,若使用了Python UDF,則通過啟動Python 進程調用UDF。

圖1:PySpark 架構
若未使用Python UDF,由于數據處理主要運行在Executor 端,Executor 不需要啟動Python 進程以進行UDF調用,PySpark 的數據處理的性能并沒有因為Python 的引入而下降。若使用Python UDF,Executor 端通過啟動Python進程調用UDF,JVM 進程和Python 進程的數據交換需經過序列化操作和管道傳輸,加上Python 代碼的解釋執行,PySpark 的性能會有一定的下降。針對使用Python UDF 帶來的性能問題,Spark 在2.3 版本中引入了Pandas UDF[6],通過Apache Arrow 提供的基于內存的列式存儲數據層,減少JVM 進程和Python 進程數據交換過程中的序列化操作,從而提高了性能,其設計如圖2 所示。Pandas UDF 也稱之為Vectorized UDF,即支持向量化處理的UDF,相較于早期的一次UDF 執行處理單條記錄,改為一次UDF 執行處理多條數據,UDF 的執行性能得到進一步提高。Spark 3.0 對Pandas UDF 繼續進行了改進,解決了Pandas UDF 易用性差的問題,利用Python 3.6 提供的Python type hints 功能,使Pandas UDF 定義清晰,易于使用[7]。

圖2:基于Apache Arrow 的Pandas UDF
Pandas 庫是Python 生態中數據分析的主要工具,已成為數據分析工具的標準。Pandas 庫和Python 數據科學生態中的numpy、matplotlib、scikit-learn 等庫緊密集成,能適應多種數據處理和分析場景,包括:基本的統計分析、數據補全、時序分析等。數據分析師通常在單機上使用Pandas庫,受制于內存和CPU 的限制,處理的數據量和性能都有限。Spark 可以運行于易擴展的分布式系統中,能充分利用系統的計算和內存資源,適合處理海量數據并具有較好的性能。但是PySpark 和Pandas 庫還存在一些差異,功能和生態都不及Pandas 庫,數據分析師較難遷移到PySpark。融合Pandas 庫是PySpark 目前發展的主要方向,使PySpark 既有類似Pandas 的API,同時又能高性能地處理海量數據。目前在PySpark 中使用Pandas API 有兩種方法:
2.3.1 PySpark 和Pandas 庫混合使用
PySpark 支持Spark DataFrame 和Pandas DataFrame 之間的相互轉換,如下代碼所示:

實際使用中混合方法一般先使用Spark 進行大數據量的處理,把大規模數據轉成小規模數據,再轉換為Pandas DataFrame, 從而進一步使用Pandas API 進行分析。toPandas()方法實際上是一種Spark collect action,數據處理結果先收集到Driver 端,再進行轉換。處理結果由Driver端的JVM 進程傳至Python 進程,若處理結果比較大,使用本地網絡通訊方式傳遞數據的性能不佳。與Pandas UDF的性能改進方法類似,Spark 使用Apache Arrow 解決Spark DataFrame 和Pandas DataFrame 的轉換中跨進程數據交換性能問題,其設計如圖3 所示。此功能在Spark 3.0 中默認是關閉的,需要設置spark.sql.execution.arrow.pyspark.enabled為true 打開。

圖3:使用Apache Arrow 進行Spark DataFrame 和Pandas DataFrame 的轉換
2.3.2 使用Koalas API
Koalas API 的目標是在Spark 上提供類似Pandas API,方便數據分析師學習和使用PySpark。Spark 3.0 中提供了Koalas 1.0 版本,已實現了60%~80% 的Pandas 功能[8]。Koalas 包的下載非常活躍,從2020 年1 月至2020 年6 月,每天有2~3 萬的下載量[8]。未來Koalas 將繼續擴大Pandas功能的覆蓋,增強數據可視化庫、機器學習庫以及文檔的完善。
混合使用方法和Koalas 方法目前都還不完善,不能完全解決Pandas 的融合問題。混合使用方法對使用者要求較高,使用者既要熟悉PySpark,又要熟悉Pandas。Spark DataFrame 與Pandas DataFrame 的轉換有一定的局限性,Spark DataFrame 若規模過大,則可能由于Driver 端的內存不足而導致轉換失敗。Koalas API 由于設計上的差異,不存在內存限制問題,但還不能完全兼容Pandas API。Koalas 的發展方向較好,潛力較大,未來可能是一種較優的方法。
PySpark 的安裝既可以通過下載Spark 安裝包,也可以使用Python 的包管理器pip 或conda 安裝。使用包管理器較為方便,包管理器會下載好完整的Spark 程序,安裝和升級非常方便。PySpark 的運行有交互式運行和提交代碼執行兩種方式,前者適用于數據分析場景,后者適用于數據批處理場景。Spark 自帶的pyspark shell 為交互式編程工具,sparksubmit 腳本提供了提交代碼執行的方法。由于Notebook 編程方式基于Web,可以更方便、靈活地進行代碼編輯和數據可視化,實際使用中交互式分析大多選擇Notebook。在Notebook 中啟動Spark 的方式與spark-submit 腳本不同,Spark 的配置參數并不是通過設置命令行參數,而是通過代碼指定。以運行PySpark 于Yarn 集群為例,以下代碼提供了一個Spark 啟動函數,該函數返回SparkSession 對象。

第4-6 行代碼設置了Hadoop 配置文件目錄環境變量,用于尋找Yarn 的配置文件;第7-11 行代碼設置了Executor數量、核數、內存等運行參數;第12-13 行代碼設置了所依賴的jar 包;第14 行代碼指定了以Yarn 模式運行。調用該函數返回SparkSession 對象后,即可調用PySpark 接口。
原生Spark 在時序分析方面提供了較少的時序分析功能和算法,雖然第三方庫spark-timeseries[9]提供了基于Spark時間序列分析算法庫,但是開源項目目前活躍度不高[10],從功能和生態上看也不及Pandas 庫的時間序列分析模塊。而Pandas 庫只適用于單機小規模數據量的處理,無法運行于分布式系統環境中以處理大規模數據。由于在PySpark 編程中能同時使用Spark API 和Pandas 庫,并且Spark DataFrame和Pandas DataFrame 可以進行雙向轉換,PySpark 和Pandas庫可以相互配合,完成時序分析的不同階段的不同功能,最終達到既能處理大規模數據又能應用豐富的時序分析算法的目的。PySpark 配合Pandas 庫進行時序分析的設計如圖4 所示,其基本思路是使用Spark 對大規模數據進行過濾、聚合等變換操作,通過重采樣(降采樣)方法把大規模時序數據轉化為小規模均勻等距時序數據,再使用PySpark 的toPandas 方法轉化為Pandas DataFrame,最后使用Pandas 庫提供的時序分析算法進行時序分析。

圖4:PySpark 和Pandas 庫配合進行時序分析
在Spark 中對大規模時序數據降采樣是整個過程的關鍵,下面通過實例闡述其具體步驟和方法。待處理時序數據為事件記錄信息,其Schema 如表1 所示。其中timestamp 字段為事件發生的時間戳,單位為秒;event 為事件名稱,為枚舉類型;value 字段記錄了事件中對應的值(為了簡化問題,只定義一個值為代表)。

表1:待處理時序數據的Schema
現需要對數據降采樣為:事件A 在某個時間范圍(t_start,t_end)內的時序數據[tm,value],tm 為整小時點,value 為單位小時內的事件值的均值。步驟如下:


上述方法中步驟1 和2 降低了數據量,步驟1 通過事件名和時間范圍過濾篩選數據,步驟2 按粗粒度的時間單位進行聚合再次降低了數據量。步驟3 和4 用于對缺失的時間點進行補全,最后在步驟5 中調用toPands()函數把Spark DataFrame 轉至Pandas DataFrame,進一步轉化為Pandas time series。
PySpark 從設計上充分考慮了用戶使用習慣、數據處理性能以及生態擴展,不僅僅只是Spark 的Python 語言接口,更是成為大數據分析與傳統數據分析的橋梁,使得數據分析師能快速的學習和掌握Spark 從而進行大數據分析,同時把Python 的數據科學生態引入至Spark,擴展和完善了基于Spark 的大數據分析生態,推動了Spark 的流行和發展。本文介紹的PySpark 結合Pandas 進行時序分析的方法,在實際應用中具有較好的運行效率和性能。但還存在一定的局限性,Pandas 庫還只能應用于小規模數據的處理。Koalas 是對PySpark 和Pandas 庫融合的另外一種方法,從設計目標看能夠把現有的數據科學算法庫高效地應用于大規模數據。但是Koalas 目前正在活躍發展中,還未成熟穩定,待需要進一步的深入研究。