◎王正迅
基于傳統關系型數據庫的穩定性,目前還有很多企業將數據存儲在關系型數據庫中,但是關系型數據庫的數據模型較簡單,不適合表達復雜的數據關系,在處理大量數據、半結構化和非結構化數據,以及系統容錯和系統擴展性方面受到了一定的限制,Hadoop 下的系列工具則有較大優勢,早期由于工具的缺乏,Hadoop 集群與傳統數據庫之間的數據傳輸非常困難。基于這些方面的考慮,需要一個能在傳統關系型數據庫和Hadoop 之間進行數據遷移的工具,Sqoop 應運而生,Apache 提供的Sqoop 工具,能實現自動化數據遷移,依托于數據庫相關的schema 描述信息,遷移的過程則使用MapReduce(后面都簡寫為MR)來進行。Sqoop 作為一個跨平臺抽取和輸出數據的工具,在關系型數據庫(MySQL、O-ralce 等)和大數據平臺(HDFS、Hive、HBase)之間常用。作為ETL 過程中重要的一環,加載作業的性能也是需要關注和優化的。本文將主要闡明如何在異構環境中使用Sqoop 方法進行數據遷移。
Sqoop 是一款用于在Hadoop 和關系型數據庫之間高效遷移大批量數據的開源工具,類似于其他ETL 工具,Sqoop 使用元數據模型來判斷數據類型,并在數據從數據源轉移到Hadoop 時確保傳輸安全的數據處理,專為大數據批量傳輸設計,能夠分割數據集并創建Maptask 任務來處理每個區塊。以RDBMS 和HDFS 之間數據傳輸為例,Sqoop 借助于MR 導入和導出數據,用戶可以輕松地以命令行模式從RDBMS 如MySQL 或 Oracle 中導入數據到 HDFS 中,通過 Hadoop 的MR 模型計算完之后,將結果導回RDBMS,Sqoop 能夠自動完成整個過程中的大部分,并提供容錯和并行化操作。
Sqoop 本質就是遷移數據,用戶在使用Sqoop 在異構環境間遷移數據時,Sqoop Client 提供了CLI 和瀏覽器兩種方式提交請求,然后Sqoop Server 收到請求后,授權MR 執行。這個過程它高度依賴Hadoop 并行導入數據,充分利用了MR 的并行特點,以批處理的方式加快數據的傳輸,同時也借助MR 實現了容錯。
Sqoop 把關系型數據庫(以mysql 為例)的數據導人到HDFS 中,主要分為兩步:一是得到元數據(mysql 數據庫中的數據),二是提交Map。在這個過程中,sqoop 會通過jdbc來獲取需要的數據庫的元數據信息,例如:導入的表的列名,數據類型。這些數據庫的數據類型會被映射成為java 的數據類型,根據這些信息,sqoop 會生成一個與表名相同的類,用來完成序列化工作,最后使用Java 類進行反序列化,MR并行寫數據到Hadoop 中,從而保存表中的每一行記錄。在導入數據時,如果不想取出全部數據,可以通過類似于where 的語句進行限制。

圖1 Sqoop 數據導入機制
Sqoop 的導出通常是將 HDFS、HBase、Hive 中的數據導出到關系型數據庫中,關系型數據庫中的表必須提前創建好。底層方面,同樣是通過jdbc 讀取HDFS/HBase/Hive 數據,生成Java 類(這個類主要作用是解析文本中的數據),用于序列化,最后export 程序啟動,通過Java 類反序列化,同時啟動多個Map 將相應值插入表中。

圖2 Sqoop 數據導出機制
數據導入分全量導入和增量導入。
(1)全量導入數據。全量數據導入就是一次性將所有需要導入的數據,從關系型數據庫一次性地導入到HDFS 中(也可以是HBase、Hive 等)。全量導入形式使用場景為一次性離線分析場景。用sqoop import 命令,具體如下:

(2)導入數據庫中的部分數據。導入部分數據可以在行與列的選取上添加參數完成,列選取上添加一個--columns參數,指定數據庫中需要導入的列,如添加--columns id,name,age,sex;行選取上添加 --where 參數,增加 where 條件篩選滿足條件的行,如--where "age >= 20" ;還可以使--query 參數查詢篩選需要導入的數據,同時實現行、列的選取,如 --query"select id,name,age,sex from t_user_info where age>=20 and$CONDITIONS"。
(3)增量導入數據。在實際生產環境中,系統可能會定期從與業務相關的關系型數據庫向Hadoop 導入數據,導入數據倉庫后進行后續離線分析。數據量比較大,有的前期數據已經被用于項目分析了,我們此時不可能再將所有數據重新導一遍,此時我們就需要增量數據導入這一模式了。增量數據導入分兩種,一是基于遞增列的增量數據導入(Append方式)。二是基于時間列的增量數據導入(LastModified 方式)。在--incremental 參數后通過指定Append 方式或LastModified 方式。
export 是HDFS 里的文件導出到關系型數據庫的工具,不能直接從hive、hbase 導出數據。如果要把hive 表數據導出到關系型數據庫,需先把hive 表通過查詢寫入到一個暫存表,臨時用文本格式,然后再從該暫存表目錄里導出數據。
執行數據導出前,數據庫中必須已經存在要導入的目標表,默認操作是從將文件中的數據使用INSERT 語句插入到表中,也可選擇更新模式(Sqoop 將生成UPDATE 替換數據庫中現有記錄的語句)或調用模式(Sqoop 將為每條記錄創建一個存儲過程調用)。
默認情況下,sqoop export 將每行輸入記錄轉換成一條INSERT 語句,添加到目標數據庫表中。如果數據庫中的表具有約束條件(例如,其值必須唯一的主鍵列)并且已有數據存在,則必須注意避免插入違反這些約束條件的記錄。如果INSERT 語句失敗,導出過程將失敗。此模式主要用于將記錄導出到可以接收這些結果的空表中。通常用于全表數據導出。使用如下命令可完成:

更新模式導出,僅僅更新已存在的數據記錄,不會插入新記錄,該模式用于更新源表與目標表中數據的不一致,即在不一致時,將源表中的數據遷移至目標表(如MySQL、Oracle 等的目標表中),這種不一致是指一條記錄中存在的不一致,比如HDFS 表和MySQL 中都有一個id=1 的記錄,但是其中一個字段的取值不同,則該模式會將這種忽視差異。對于“你有我無”的記錄則不做處理,通過指定update-key 并在—update-mode 參數后指定是updateonly 模式。調用模式導出,會更新已存在的數據記錄,同時插入一個新記錄,實質上是插入一個update+insert 的操作,同樣是通過指定update-key 并在—update-mode 參數后指定是allowinsert模式。
Sqoop 作為一種重要的數據遷移工具,在使用過程中需要遵守數據庫約束、數據庫連接機制,考慮空值、并行度、分隔符等原因導致的傳輸列數和表的列數不一致等問題。
空值問題常見于Hive 與MySQL 數據遷移過程中發生。Hive 中的 Null 在底層是以“N”來存儲,而 MySQL 中的 Null在底層就是Null,這就導致了兩邊進行數據遷移時存儲不一致問題,Sqoop 要求在數據遷移的時候嚴格保證兩端的數據格式、數據類型一致,否則會帶來異常。
為了保證數據兩端的一致性,數據遷移的過程中遇到null-string,null-non-string 數據都轉化成指定的類型,通常指定成"N"。依賴自身參數在導入數據時采用--null-string“\N”和--null-non-string“\N”,在導出數據時采用--input-null-string“\N”和 --input-null-non-string“\N”兩個參數,在使用這些參數過程中,需要正確地將值N 轉義到\N。


(1)任務失敗導致數據不一致。由于Sqoop 將導出過程分解為多個事務,因此失敗的導出作業可能會導致將部分數據提交到數據庫。在某些情況下,這可能會導致后續作業由于插入沖突而失敗,或者在其他情況下導致重復數據。如這樣一個場景:export 到 Mysql 時,使用 6個 Map 任務,過程中有3個任務失敗,那此時MySQL 中存儲了另外三個Map任務導入的數據,此時會生成一個不完整的報表數據。繼續調試問題并最終將全部數據正確的導入MySQL,會再次生成一個報表數據,而這個報表數據與之前的報表數據是不一致,這在生產環境是不允許的。這種情況下,可以通過--staging-table 參數指定一個staging 表來克服這個問題,指定的這個staging 表在單個事務中,暫存數據,等到事務完全處理完畢再移動到目標表。為了使用暫存功能,必須在運行導出作業之前創建暫存表,該表必須在結構上與目標表相同,這個表應該在導出作業運行之前為空,所以需要--clear-staging-table 這個參數配合起來使用。

(2)分隔符問題導致數據不一致。Sqoop 默認字段與字段之間是用“,”分隔開,Hive 默認的列分隔符是 ^A(001),行與行之間的分隔符是“
”,當然,在創建這些表(包括MySQL表)的時候也可以自己指定分隔符。在數據遷移過程中,由于分隔符的不一致會導致數據遷移失敗,由于導入的數據中有'
',hive 會認為一行已經結束,后面的數據被分割成下一行,也會導致數據不一致。這時可以使用--lines-terminated-by和--fields-terminated-by 這兩個參數來自定義行分隔符和列分隔符進行解決。但是hive 只支持'
'作為行分隔符,所以在關系型數據庫與Hive 進行數據遷移時,還需要加上--hive-delims-replacement
--hive-delims-replacement
--hive-drop-import-delims:將分隔符中的/0x01 和/r/n去掉
在生產環境中,由于數據量巨大,數據結構復雜,Sqoop導入數據報內存溢出以及抽數時間過長,日志顯示有個別的reduce 執行時間過長,卡在99%那個位置,例如有25個Map 中有24個Map 是在20 秒內執行完成,只有1個Map用了6 分多鐘,這種Map 分布不均勻,就是數據傾斜現象。導致數據傾斜的原因有可能是數據本身就不均勻,或是分隔符問題,或是數據類型不一致等。這時需要設置--split-by、--num-Mappers 和--split-Mappers 這三個參數。
在import 時,指定--split-by 參數,Sqoop 根據不同的split-by 參數值來進行切分,然后將切分出來的區域分配到不同Map 中。每個Map 中再處理數據庫中獲取的一行一行的值,寫入到HDFS 中。split-by 根據不同的參數類型有不同的切分方法,最好使用較簡單的int 型。
通過設置Map 的個數來提高吞吐量,-num-Mappers后面設置的Maptask 數目大于1 的話,那么-split-by 后面必須跟字段,因為-num-Mappers 后面要是1 的話,那么-split-Mappers 后面跟不跟字段都沒有意義,因為,他只會啟動一個Maptask 進行數據處理。一般來說數據量與Map 的數量是相關的,一般建議在500w 以下使4個Map 即可,如果數據量在500w 以上可以使用8個Map,Map 數量太多會對數據庫增加運壓力,造成其他場景使?性能降低。在使用并行度的時候需要了解主鍵的分布情況是否是有必要的。
隨著大數據、云計算、物聯網的不斷發展,信息系統產生的數據規模與日俱增,以Hadoop 平臺為代表的海量數據處理平臺通過對海量數據進行并行處理成為一種有效的解決方案,基于Sqoop 實現的在關系型數據庫與Hadoop 平臺之間進行數據遷移,它可以高效、可靠地完成數據傳輸任務,是數據分析處理及挖掘前的重要一環。本文從Sqoop 工作機制、遷移方法介紹、Sqoop 常見問題及解決辦法等方面進行分析,解決了Sqoop 使用過程中的簡單問題,在實際使用過程中,還需要結合項目實際需求對Sqoop 做更進一步的優化。