楊 浩
(榆林職業技術學院 質量管理辦公室, 榆林 719000)
MongoDB是介于關系數據庫與非關系數據庫之間的基于BSON(Binary Serialized Document Format,二進制串行化文檔格式)格式的一種分布式文檔存儲系統,是一種NoSQL數據庫,具有二級索引和較強的SOL性能。MongoDB以文檔為單位存儲數據,數據結構由鍵值對(key->value)組成,字段值可以是文檔、數組或文檔數組。
MapReduce是Hadoop的核心部件之一,是一個分布式批處理計算框架。MapReduce框架采用Master/Slave結構,master節點負責資源管理與調度, Slave節點執行分布式計算。MapReduce1.0中的master節點就是JobTracker節點,只有一個,既要進行資源管理又要進行作業調度,任務很重。JobTracker進行資源分配時根據任務數進行,而不是根據任務對資源的實際需要進行分配,容易產生內存溢出。另外,存在資源劃分不合理現象,MapReduce把資源打包為slot,包括Map slot和Reduce slot,前者供Map任務調用,后者只能供Reduce任務調用,即使一方空閑,也不能被另一方調度使用。為了克服上述缺點,MapReduce2.0引入了YARN,把原來由JobTracker執行的任務進行了分解,資源分配交給了ResourceManager,由YARN專門負責,任務調度與監控交給了ApplicationMaster, MapReduce就是運行在YARN上的計算框架。
關于MongoDB和hadoop MapReduce單方面性能改進的研究較多[1-2],也有學者對MongoDB內置的MapReduce性能進行過研究,但研究結果缺乏數字表述[3-4],曾強等人通過優化MongoDB-Connector for Hadoop的內部工作機制,對其性能進行了測試評估,認為,根據輸入、輸出密集程度不同,分別將MongoDB和HDFS靈活配置為Hadoop MapReduce的輸入源或輸出目標可以取得更好的數據交換性能[5-6]。
本文剖析了MongoDB和Hadoop MapReduce的數據交換機制,在原有項目實踐的基礎上,總結形成了改進MongoDB與Hadoop MapReduce數據交換性能的措施,并對應用效果進行了分析論證。
目前MongoDB與MapReduce之間數據傳輸主要采用如圖1所示。

圖1 MongoDB與MapReduce整合模式
圖1主要由三部分組成,依次是MongoDB Cluster(存儲節點)、MongoDB-Connector for Hadoop(連接器)以及MapReduce Cluster(計算節點)。其中,MongoDB Cluster 負責對海量數據進行分片存儲,MapReduce Cluster承擔并行計算,MongoDB-Connector for Hadoop是數據存取的核心組件,由MongoDB公司開發,該組件為MongoDB替代HDFS作為Hadoop計算框架MapReduce的數據存儲節點提供了接口,為計算節點快速讀寫MongoDB集群的數據提供了保證。
MongoDB-Connector for Hadoop連接器還包括對Pig和Hive的支持,這使得非常復雜的MapReduce工作流可以通過編寫更簡單的腳本來執行。通過Hive訪問MongoDB的方法有兩種,一種是直接建立hive表和mongodb表的映射關系,通過對hive表執行HSQL查詢來實現對mongodb表的查詢,然后將結果返回,這種操作會導致兩種問題,需要引起注意:一是如果權限設置不合理,則在Hive中刪除表時,同時刪除MongoDB中的對應表;二是如果MongoDB的表數據量很大,則執行HSQL操作是會變得很慢。另一種訪問MongoDB的方法是采用BSON文件 。先從mongodb中導出BSON文件(mongodump),再將BSON文件寫入Hive,然后用HSQL對Hive進行操作,在這種方法中序列號和反序列化操作會消耗較大的系統開銷,特別是當MongoDB文檔過大時,會造成網絡堵塞。另外,在與Hadoop的MapReduce組件進行數據交換時,在數據分片等方面與HBASE相比需要額外開銷,影響數據交換效率。
chunk的拆分與遷移發生在數據更改和插入操作時,頻繁的splitting和balancing會消耗shard server的很多IO資源。chunk size的設置對數據遷移時的資源開銷具有較大的影響。chunk size過小,shard server之間數據分布會得到很好的平衡,數據遷移速度就快,但chunk分裂頻繁,會消耗掉較多的router process資源,chunk size過大,shard server之間數據分布就有可能失去平衡,chunk分裂就少,一旦有數據遷移,則量較大,會集中消耗掉更多IO資源[7,.8,9]。
chunk size默認值64M,在實際應用中,要根據具體的業務類型,設置合適的chunk size值 ,以減少系統資源開銷、獲得更大性能。
sharded cluster中,分片的方式有兩種,一種是基于范圍的分片,將集合中的數據按片值范圍進行分類,另一種是基于哈希索引的分片。前者適合基于片鍵的范圍查詢,后者適合數據的隨機讀取與寫入,所以要結合業務特征靈活選擇分片方式。
MongoDB 的Sharded Cluster通過將數據分散存儲到多個Shard上來實現性能擴展。元數據服務器Config Server 存儲著Sharded Cluster的元數據,mongos進程為應用訪問接口,mongos進程檢測到讀寫請求時,從 Config Server 讀取所訪問數據的路由信息,并將讀寫請求轉發到相應的 Shard 上。
集群中的mongos可任意擴展,所有的 mongos 是對等關系,用戶可通過任意一個或多個mongos訪問分片集群。如果將用戶讀取請求均勻的分散到多個 mongos 上,同時將數據分散存儲到Sharded Cluster的所有shard上,可以有效實現負載均衡[10-11]。
下面是連接分片集群的代碼:
MongoClientURI myconnectionString = new MongoClientURI("mongodb://:123456 @ 192.168. 250.2:3717,192.168.250.3:3717,192.168.250.4:3717/admin");
MongoClient myclient = new MongoClient (myconectionString);
MongoDatabase mydatabase = client.getData base("test");
MongoCollection
這樣,系統就會自動將用戶請求分散到集群中的所有 mongos 上,以實現負載均衡;另外,當某個mongos 故障時,系統自動進行故障轉移,將應用請求轉發到正常的 mongos 上。
config.shards組件存儲著集群中所有Shard的信息,通過操作config.shards可動態的向Sharded 集群中增加或移除shard。
所以,將config.shards中分片shard的地址替換為myconnectionString中IP地址,就可以使mongos集群系統中的機器直接訪問shard上的數據,從而實現數據的并發處理。
CAP是NoSQL的三大理論基石之一,C(即Consistency)指的是一致性,A(即Arailability)指的是可用性,P(即Partition Tolerance)指的是分區容忍性,在同一個NoSOL系統中往往很難同時獲得三個方面的最大性能,必須以犧牲一方或兩方的性能來獲得另一方面的最大性能。分區容忍性是基本要求,必須要有,在一般系統中很少要求強一致性,而對可用性要求較高[12,13]。鑒于此,在對MongoDB進行讀寫操作時,要針對具體業務,進行靈活的性能優化設置,如在讀取數據時設為為了獲得最大的可用性(響應時間)可以犧牲一致性等。為了提高性能,在分布式系統中往往設置為一個數據節點負責數據寫入,然后把寫入的數據更新至其他備份節點,而讀操作往往在其它備份節點完成,但在主節點寫數據成功后,在同步到其它備份節點的時候,經常由于網絡故障原因出現延遲寫入,這時如果要保證相關節點的數據一致性,就必須等到網絡故障恢復,才能完成節點之間的數據一致性,其代價是犧牲了系統的可用性。
MapReduce在處理數據時,先將需要的數據一次從MongonDB中讀取到內存進行處理,同時把中間結果暫存在內存,待數據處理完后,將結果一次性存入內存,以節約IO開銷。
但是會存在以下問題:服務器內存有限,遇到大文檔數據時,內存難以一次存儲所有數據;內存為半導體介質的存儲器,一旦掉電或重啟,內存數據全部丟失;如果有新的節點加入,無法立即對新加入的計算機(服務器)空間進行尋址。
為了避免主機掉電或系統重啟帶來的數據丟失問題,可以設置讀數據操作在內存數據庫(Memory DB)中進行,當內存數據庫中找不到時,再去訪問磁盤數據庫(Disk DB),寫數據時則直接寫入磁盤數據庫中,不會影響內存數據庫訪問速度,內存數據庫定期與磁盤數據庫進行數據同步,從磁盤數據庫中把新寫入的數據讀入內存,或把內存計算結果寫入磁盤數據庫,這樣既保證了讀寫速度,又不會丟失數據。
混合分區的每個分區由一個內存數據庫和一個My SQL關系數據庫構成,形成水平方向多個分區,垂直方向二級數據庫分區(內存數據庫和MySQL數據庫),解決水平擴展性差的問題[14]。
另外,可以借鑒Spark計算框架的持久化技術,將經常要訪問的數據緩存到內存(rdd.cache()或rdd.persist(MEMORY_ONLY)),這樣當遇到動作類型的操作時,就免去了從硬盤調用數據的開銷,當數據使用完畢后將內存清空(rdd.dispersist())即可。在map shuffle過程中,系統啟動combin(合并)操作的閾值是3(即3個溢寫文件),即當溢寫文件大于等于3時啟動combin操作,否則不啟動combin操作。在實際應用過程中,應根據實際需要調整閾值,以提高性能。
在機器學習、圖計算等迭代算法中,前一階段的計算結果往往會作為下一階段的計算輸入,但在MapReduce框架中,都是把中間結果寫入到存儲系統,然后再根據需要調入內容,這樣會造成大量的數據復制、磁盤IO和序列化開銷[15]。可以參照RDD(Resilient Distributed Datasets,彈性分布式數據集)設計理念,構建GDA(Directed Acyclic Graph,有向無環圖),以管道化的方式把前一個操作的處理結果轉發給下一個操作作為其輸入,從而避免中間結果的存儲,降低數據復制、磁盤IO和序列化開銷。
移動計算要比移動數據的開銷要小的得多,所以將計算代碼移動到存儲數據的節點進行計算。在同一個集群中同時部署hadoop和MongoDB,使數據節點和計算機節點部署在同一個節點上,實現“計算向數據靠攏”,在數據節點上進行計算,減小數據傳輸與序列化開銷。同時,設置MapReduce的分片(split)大小最好與shard中的chunk size值相同,以避免Map函數跨節點處理分片。
當得到數據的計算節點的資源,正被其它任務占用時,是把當前節點的數據移動到其他空閑節點上進行計算,還是等待其它任務釋放資源呢?需要進行計算比較,如果當前節點結束任務的時間比移動數據所花費的時間還小,則不必移動數據,等待釋放資源;如果當前節點所需計算資源不能夠及時釋放,且數據量不大,移動數據不會帶來網絡堵塞,則可以移動到其它空閑節點上進行計算。
上述MongoDB與Hadoop MapReduce數據交換及性能調優的改進措施是在某校學生網絡輿情分析與某超市顧客消費行為分析兩個項目的研究中,經過反復實踐與探索形成的。學校輿情分析項目中數據集以大約8000條/天的速度增加,超市顧客消費記錄以大約1萬條/天的速度增加,在分析當天或近幾天的數據時,由于數據量小,參數調整對系統整體性能影響不大,但是在對歷史數據進行比較分析時,不同的Chunk size值、不同的MongoDB Sharded Cluster部署、不同的CAP設計等對系統性能產生了明顯的影響。
在對學生年度輿情分析及超市季節性消費行為分析時,數據集均達到了300萬條以上。這時將chunk size設為32MB、同時將MapReduce的split大小也設為32MB時,系統性能明顯優于其它值。另外MongoDB集群要比單MongoDB性能優越,本地計算比移動數據也更優越。
系統研發過程中,對程序編寫思路也進行了改進,讓程序按有向無環圖方式執行,以避免中間結果的存儲,降低數據復制、磁盤IO和序列化開銷,其效果也是非常明顯的。
在進行混合分區與持久性技術實驗時,系統性能并沒有明顯的變化,其原因是數據集不夠大,因為從理論上分析,這兩項改進措施對處理GB級數據時效果較明顯。
以超市全年顧客季節性消費行為大數據分析為例,對此MongoDB與Hadoop MapReduce集成數據分析系統進行性能優化的前后對比關系如表1所示。

表1 優化前后的性能對比
性能改進效果通過在相同環境下執行同一任務所消耗的時間來判斷,該時間為執行任務的結束時間與開始時間之差。每項優化措施在前一項優化措施的基礎上進行。
項目實踐時,首先將單MongoDB擴展為MongoDB集群,程序運行時間減少了2.891秒,在此基礎上同步優化chunk size值和split值,程序運行減少了1.022秒,繼續將移動數據改為移動計算,運行時間又減少了1.220秒,將程序按有向無環圖的思想編寫,運行時間減少了0.562秒,但是在利用混合分區與持久化技術進行性能優化時,未得到明顯效果。程序運行節省的總時間為5.695秒。所以所采取的性能優化措施是有效的。
對MongoDB與Hadoop MapReduce集成系統的性能改進研究具有很大的空間,本文根據項目研究的實踐與探索經驗,從Chunk size設置與分配方式、MongoDB分片集群部署、平衡CAP、混合分區與持久化技術、計算本地化、構建有向無環圖、設置預測機制等方面進行了論述,并對這些改進措施在實踐中的應用情況進行了分析。實踐證明:
1) 設置chunk size值和MapReduce的split大小對系統整體性能的改進具有明顯的作用。
2) 計算本地化與有向無環圖技術在,數據量較大且網絡性能不佳的情況下,能獲得較明顯效果。
3) 優化MongoDB分片集群部署、平衡CAP、混合分區在本研究中未取得較明顯效果,在理論上當數據量達到GB級別時,能取得明顯效果,需要后期深入研究。
但是對MongoDB與Hadoop MapReduce集成系統的性能改進,沒有固定的模式,需要根據數據量的大小和計算類型采取合適的措施才能達到性能改進的目的。