于金良,朱志祥,梁小江
(1.西安郵電大學,陜西 西安 710061;2.陜西省信息化工程研究院,陜西 西安 710061)
基于Flume的MySQL數據自動收集系統
于金良1,朱志祥1,梁小江2
(1.西安郵電大學,陜西 西安 710061;2.陜西省信息化工程研究院,陜西 西安 710061)
針對分布式系統中、不同數據中心之間的數據收集,同時解決將數據由關系型數據庫交換到非關系型數據庫的問題,提出一種基于Flume的MySQL數據庫數據自動收集系統。為了符合現實中的生產環境,該系統采用的是一種星型拓撲結構。系統可以自動查詢給定的MySQL數據庫表,自動檢測表中的數據更新,實現自動增量傳輸,并對原始數據進行封裝、解析,最終將數據存儲到非關系型數據庫HBase中。在測試中,系統中每臺機器的平均傳輸速度可達到1 111 kb/s,系統總的平均傳輸速度可以達到3 333 kb/s,并且保證了數據的完整性,實現了可靠高效傳輸的目標。
Flume;MySQL數據庫;數據收集;HBase;JDBC
近年來,隨著信息技術的快速發展,帶來的是各種信息、數據的爆發式增長,大數據時代應運而生。2008年8月,首次提出大數據的概念。在大數據時代,TB、PB,甚至EB級的數據已經成為一種常態。為了應對大數據的存儲、處理以及大型計算機成本高的難題,集群化的分布式系統快速發展并取代了單機服務系統[1]。各大公司和開源社區紛紛提出了自己的大數據解決方案,其中開源的Hadoop生態系統最為熱門[2]。
Hadoop是一個并行處理大規模數據的分布式計算和存儲系統,可以將分布式系統部署在廉價機器上[3]。要使用Hadoop來存儲、處理數據,首先要解決的問題是如何將數據收集到Hadoop平臺上。而將原始關系型數據庫中的數據導入到Hadoop中的非關系型數據庫HBase中顯得尤為重要。文中系統使用Hadoop的子項目Flume收集分布式系統中各個計算機中數據庫的數據,并最終存儲在非關系型數據庫HBase中[4]。在Hadoop生態系統中,Sqoop可以實現Hadoop集群與關系型數據之間的數據交換,但是由于它底層使用的是MapReduce計算框架,故依賴于Hadoop的集群環境,這是一大缺陷。而文中系統則可以脫離Hadoop環境,在構建大數據平臺時更加靈活。
Flume最早是Cloudera開源的一個日志收集系統,設計它的初衷是在分布式系統中提供可靠而有效的大規模日志收集服務[5]。2011年10月,Cloudera完成了Flume-728,對Flume進行了里程碑式的改動,重構了核心組件、核心配置以及代碼結構,重構后的版本統稱為Flume-ng。同時Cloldera將Flume貢獻給了Apache基金會,成為了頂級項目Hadoop中的一個子項目。它的架構非常簡單靈活,尤其是當前的Flume-ng版本,它只是一個純粹的數據傳輸工具,將數據的讀入和寫出分為兩個獨立的部分,實現了二者的異步性。一個傳輸通道只是一個代理,各個代理之間又是相互獨立的。每個代理包括Source(數據源)、Channel(中間傳輸通道)、Sink(數據接收器)三個部分。
其中Source是將數據從數據源讀入,封裝為一個事件發送到Channel;Channel的作用是臨時緩存這些事件,為了保證數據的可靠性,當事件被Sink接收時才將其刪除,否則一直緩存;Sink負責接收事件后將數據存儲到指定的目的端,完成一次數據傳輸。
Flume作為日志收集系統,支持多種數據源,如Exec、Spooling、Kafka、NetCat和用戶自定義的源等;擁有多種接收器,如HDFS、File Roll、HBase、Kafka、數據倉庫Hive[6]和用戶自定義的接收器等;包括多種Channel,如Memory、File等,其中File Channel具有很高的故障恢復能力。
它的使用也非常簡單,用戶根據自己的需求編寫配置文件,啟動代理服務,即可完成數據收集。
因為Flume是一個開源項目,所以用戶可以在原有的架構上自己定制,實現自己的數據收集系統。
2.1 系統介紹
該系統是基于Flume-ng 1.6.0的MySQL數據庫數據收集系統,收集MySQL中的數據,自動監測數據庫中的數據更新,實現實時增量收集,最終將數據存儲到非關系型數據庫HBase中??蓪⒃痉荋adoop集群中的數據導入到集群中,實現單機系統與分布式系統的數據交換??稍诿撾xHadoop環境的前提下將數據導入到Hadoop集群,具有依賴小、量級輕的優點。
系統需要實現的目標:
(1)可以收集MySQL數據庫中的原始數據;
(2)自動檢測數據庫中數據的更新,只收集變化的數據,實現數據的實時增量收集;
(3)將收集到的數據存儲到非關系型數據庫HBase中。
2.2 系統設計
在Flume的運行過程中根據設定會運行一個或者多個代理,由每一個代理完成數據的收集服務。每個代理都是一個進程,它們是相互獨立的,因此可以實現同時對多個數據源進行并行處理,以達到從分布式系統中的不同計算機上收集數據的目的。代理在運行的過程中,并不依賴于Hadoop環境,這使得將非Hadoop集群上的數據交換到集群中變得可行[7]。系統架構如圖1所示。

圖1 系統架構圖
為安裝有MySQL數據源的每臺機器部署一個Flume的代理,將Source配置為每臺機器的MySQL數據源,URL指定為連接MySQL數據庫的連接符,形如jdbc:mysql://IP:port/databasename;Sink配置為要將數據交換到Hadoop集群中的HBase數據庫。啟動每個機器上的代理,向集群發送數據。
代理要完成的主要工作是:
(1)檢測指定計算機上MySQL數據庫表中的數據,實時檢測表中數據的更新,并收集、預處理更新數據,預處理完成后記錄當前數據行數,防止重復處理;
(2)將收集的數據通過HBase客戶端插入到HBase數據庫指定的數據表中。
在代理內部,按數據的流向又可分為Source、Channel、Sink三個組件,分別完成不同的任務。Source組件的任務是查詢MySQL數據庫表中的數據,檢查數據更新,預處理更新的數據,且記錄處理數據的位置信息(即數據在表中所在的行數),將其保存在一個指定的文件中,并將數據封裝為事件發送到Channel中;Channel只負責緩存Source經過處理后發送來的事件,等待Sink抽取事件,抽取完成后,該事件自動刪除;Sink負責抽取Channel中的緩存事件,并進行解析,最終將解析完成的數據存入到HBase數據庫中。該系統代理架構如圖2所示。

圖2 系統代理架構圖
2.3 主要組件的設計與實現
代理內部各組件數據處理流程如圖3所示。
Source使用JDBC技術查詢數據庫表中的數據,執行查詢時采用分頁查詢的方法,所謂分頁查詢就是在數據量過大時分多次對數據庫進行查詢[8]。而MySQL數據庫的分頁查詢是通過調用LIMIT函數實現的[9],在SQL語句的結尾處指定每次查詢的行數。使查詢一次數據庫加載到內存里的數據量減小,從而在總數據量變大時減輕內存的壓力。對數據庫的查詢是循環執行的,以達到實時監控數據庫更新的目的。當檢測到有數據時(第一次查詢原始數據或者更新的數據),將這些數據從數據庫讀入到代理中,通過預處理將其封裝為一個事件,然后更新已處理完成數據的位置信息(即數據庫中數據的行數),并和數據庫連接符拼接在一起保存到指定的記錄文件中。在每次對此數據庫表進行查詢時先讀取此文件,獲得這個位置信息,查詢時從此行開始,之前的數據不再查詢,從而避免了重復查詢處理,也解決了使用內存通道時不能自動故障恢復的缺陷[10]。當系統發生故障,導致代理宕機時,只需要重新啟動代理,從文件中讀取這個行數即可從上次處理的位置繼續處理,使代理具有很好的可恢復性。下面是Source組件的部分源代碼。

圖3 代理內部各組件數據處理流程圖
public class MySQLSource extends AbstractSource implements Configurable, PollableSource[11]{
private JdbcHelper jdbcHelper; //通過JDBC查詢數據庫的輔助類
@Override
public Status process() throwsEventDeliveryException {
try{
sqlSourceCounter.startProcess();
//取得查詢數據得到的結果
List> result=jdbcHelper.executeQuery();
//判斷是否取得數據
if(!result.isEmpty()) {
//將數據發到ChannelcsvWriter.writeAll(sqlSourceHelper.getAllRows(result));
//處理完后,更新查詢行數sqlSourceCounter.incrementEventCount(result.size());
//更新記錄文件的數據即當前查詢數據表中的行數
sqlSourceHelper.updateStatusFile();
}
sqlSourceCounter.endProcess(result.size());
if(result.size() { Thread.sleep(sqlSourceHelper.getRunQueryDelay()); } //返回READY,執行下一次查詢 return Status.READY; } 使用JDBC輔助查詢數據庫的部分代碼: public classJdbcHelper{ public List List //對數據庫實行分頁查詢,減小內存的壓力,提高查詢效率 resultSet=connection.createStatement().executeQuery( (sqlSourceHelper.getQuery())+"LIMIT"+sqlSourceHelper.getCurrentIndex()+","+sqlSourceHelper.getMaxRows()));); //將查詢的數據緩存到rowsList中 while(resultSet.next()){ List for(int i=0;i } rowsList.add(list); } //更新新的查詢起始位置 sqlSourceHelper.setCurrentIndex(sqlSourceHelper.getCurrentIndex()+rowsList.size()); //返回查詢結果 returnrowsList; } } Channel的主要作用是緩存Source預處理的數據,即事件。只有當事件被Sink抽取后才會從當前Channel中刪除。這種機制保證了數據傳遞的可靠性。系統采用內存通道,將數據直接緩存在內存中,優點是數據傳輸速度快,減少數據由讀入端到寫出端的時延。 Sink主要負責從Channel抽取事件,由于事件包括一個事件頭和事件體,事件體中存儲的才是真正從MySQL中讀取的數據,故先將事件體讀出,再跟據在配置文件中設定好的規則解析這些數據,將數據分解為與原始字段和值一一對應的形式,最后把這些數據通過調用HBase的Thrift客戶端接口插入到指定的HBase表中[12]。下面是插入到HBase中部分代碼: public class HBaseSink extends AbstractSink implements Configurable{ //開啟HBaseSink public void start(){ try{ //先建立通過Thrift連接到Hbase服務器[13] client=privilegedExecutor.execute(new PrivilegedExceptionAction @Override public Hbase.Client run() throws Exception { socket=new TSocket(thriftQum,Integer.parseInt(port)); TProtocol protocol=new TBinaryProtocol(socket, true, true); client=new Hbase.Client(protocol); //打開Thrift連接 socket.open(); return client; } }); }catch(Exception e){ //TODO Auto-generated catch block e.printStackTrace(); sinkCounter.incrementConnectionFailedCount(); logger.error("Could not connect hbase"+thriftQum,e); } //實現接口的方法 @Override public Status process() throws EventDeliveryException { //初始化解析數據對象 serializer.initialize(event,columnFamily); //拿到解析后的數據,并存放在actions中 actions.add(serializer.getActions()); //調用數據傳遞給插入Hbase的方法 putEventsAndCommit(actions,txn); } //真正將數據插入到HBase中的方法 private void putEventsAndCommit(final List privilegedExecutor.execute(new PrivilegedExceptionAction @Override public Void run() throws Exception { Map //將數據插入到Hbase中名為tableName的表中 client.mutateRows(wrap(tableName),actions,attributes); return null; } } 下面是Sink解析數據的部分代碼: public class MyHbaseEventSerializer implements HbaseEventSerializer { //配置文件中的一些屬性: //分隔數據的字符,默認為空格 public static final String SPLIT_CONFIG="splitChar"; public static final String SPLIT_DEFAULT=""; //Hbase表中字段的名字 public static final String COL_NAME_CONFIG="colNames"; //數據的編碼方式,默認為UTF-8 public static final String CHARSET_CONFIG ="charset"; public static final String CHARSET_DEFAULT="UTF-8"; //插入Hbase表中的列族 protected byte[]cf; public void configure(Context context) { //獲得配置文件中的屬性值 String splitChar=context.getString(SPLIT_CONFIG,SPLIT_DEFAULT); inputPattern=Pattern.compile(splitChar); charset=Charset.forName(context.getString(CHARSET_CONFIG,CHARSET_DEFAULT)); String colNameStr=context.getString(COL_NAME_CONFIG,COLUMN_NAME_DEFAULT); String[]columnNames=colNameStr.split(","); for(String s:columnNames) { colNames.add(s.getBytes(charset)); } } //從Channel中緩存的事件對象中獲得MySQL中的數據 @Override public void initialize(Event event,byte[] columnFamily) { //從事件(包括header和數據)中得到其中的數據 this.payload=event.getBody(); //獲取列族 this.cf=columnFamily; } public BatchMutation getActions() throws FlumeException { BatchMutation bm=null; List //按約定的規則分隔數據 String[] data=inputPattern.split(new String(payload,charset)); //以系統當前時間的納秒數為行鍵 String rowKey=String.valueOf(System.nanoTime()); Map try{ fieldName=new HashMap for(int i=0;i String column=cf+":" +colNames.get(i); //將數據插入到HBase中 mutations.add(new Mutation(false,wrap(column),wrap(data[i]),false)); } bm=new BatchMutation(wrap(rowKey), mutations); } catch (Exception e) {throw newFlumeException("Could not get row key!",e); } //返回要插入的數據 return bm; } Flume配置文件中信息: #配置Sources agent.sources=s agent.sources.s.type=org.xy.flume.source.MySQLSource agent.sources.s.connection.url=jdbc:mysql://IP:port/database agent.sources.s.user=username agent.sources.s.password=password agent.sources.s.table=tablename agent.sources.s.jdbc.connection.driver_class=com.mysql.jdbc.Driver agent.sources.s.custom.query=SELECT * FROM tablename agent.sources.s.channels=c #配置Channel agent.channels=c agent.channels.c.type=memory agent.channels.c.capacity=10 000 agent.channels.c.transactionCapacity=1 000 #配置sinks agent.sinks.k.type=hbase agent.sinks.k.channel=c agent.sinks.k.table=hbaseTablename agent.sinks.k.columnFamily=columnFamilyName agent.sinks.k.serializer=org.apache.flume.sink.hbase.MyHbaseEventSerializer agent.sinks.k.serializer.colNames=columns 2.4 測試與結果分析 測試系統使用的環境由四臺機器組成的集群,其中安裝MySQL數據庫作為數據源的四臺相同配置,HBase集群中的HBase Master是一臺服務器。測試中使用的HBase版本為0.98,Flume版本為1.6.0。 為了最大限度地接近實際生產環境,使用的測試數據為1 700 000余條,總大小為320 M,分別在每個數據庫中存一份。三個Flume代理運行在服務器上,從三臺機器的數據庫中收集數據,測試結果見表1。 表1 測試數據 由表1得到測試每個代理收集數據的平均時間為318 s,平均速度為1 007 kb/s,系統總速度為3 021 kb/s,且不會丟失數據,在網絡暢通的情況下數據可靠性強。后期研究發現在通過JDBC查詢數據庫時,對于大量數據,使用分頁查詢可以提高查詢效率,并且解決了隨著數據庫數據量增大而引起的Java虛擬機內存溢出的問題。改變分頁大小,測試系統得到測試結果如圖4所示。 由圖4可以看出,當分頁為10 000(這個值與具體的數據總量有關)時,傳輸的速度最快。此系統可在5 min左右將510 000 000多萬條(大約9 600 MB)的數據收集完畢,并且沒有數據丟失,兼顧了數據傳輸的效率和可靠性,達到了預期的目標。 圖4 傳輸速度隨分頁大小變化的曲線 系統通過使用開源的工具Flume-ng 1.6.0實現了MySQL數據庫數據收集系統,主要實現了其中的Source和Sink組件,完成了基于JDBC的自動查詢、檢測數據庫更新和解析數據寫入HBase的功能。豐富了Flume的功能,使得Flume可以在脫離Hadoop環境下進行數據傳輸。測試結果表明,系統可以高效可靠地收集數據庫中的數據。 [1] 孫韓林.一種基于云計算的網絡流量分析系統結構[J].西安郵電大學學報,2013,18(4):75-79. [2] 李 芬,朱志祥,劉盛輝.大數據發展現狀及面臨的問題[J].西安郵電大學學報,2013,18(5):100-103. [3] Apache Hadoop[EB/OL]. 2015. http://hadoop.apache.org/. [4] 詹 玲,馬 駿,陳伯江,等.分布式I/O日志收集系統的設計與實現[J].計算機工程與應用,2010,46(36):88-90. [5] Apache Flume[EB/OL].2015.http://flume.apache.org/. [6] 王春梅.基于數據倉庫的數據挖掘技術[J].西安郵電學院學報,2006,11(5):99-102. [7] 王 博,陳莉君.Hadoop遠程過程調用機制的分析和應用[J].西安郵電學院學報,2012,17(6):74-77. [8] 孫 輝.MySQL查詢優化的研究和改進[D].武漢:華中科技大學,2007. [9] 李現艷,趙書俊,初元萍.基于MySQL的數據庫服務器性能測試[J].核電子學與探測技術,2011,31(1):48-52. [10] 謝曉燕,張靜雯.一種基于Linux集群技術的負載均衡方法[J].西安郵電大學學報,2014,19(3):64-68. [11] Apache HBase[EB/OL].2015.http://hbase.apache.org/. [12] 楊寒冰,趙 龍,賈金原.HBase數據庫遷移工具的設計與實現[J].計算機科學與探索,2013,7(3):236-246. [13] Carstoiu D,Lepadatu E,Gaspar M.Hbase-non SQL database,performances evaluation[J].International Journal of Advancements in Computing Technology,2010,2(5):42-52. Automatic Collection System for MySQL Data Based on Flume YU Jin-liang1,ZHU Zhi-xiang1,LIANG Xiao-jiang2 (1.Xi’an University of Posts and Telecommunications,Xi’an 710061,China;2.Shaanxi Information Engineering Research Institute,Xi’an 710061,China) For data collecting in distributed systems or between different data centers,while addressing the issue of exchange data from a relational database to non-relational databases,an automatic collecting system for MySQL data based on Flume is put forword.In order to meet the real-world production environment,this system uses a star topology.It can automatically query a given MySQL database table,automatic delection of the data updating of the table,automatic incremental transfer,packaging and parsing to the original data,finally storing data into a database of HBase.In test,the average speed of transmission of every machine in the system can reach 1 111 kb/s,and the total speed of transmission can reach 3 333 kb/s,which ensure data integrity and achieve the goal of reliable and efficient transport. Flume;MySQL database;data collecting;HBase;JDBC 2016-01-11 2016-05-05 時間:2016-11-21 2015年工信部通信軟科學研究項目(2015-R-19);2015陜西省信息化技術研究項目(2015-002) 于金良(1991-),男,碩士研究生,研究方向為大數據分析處理;朱志祥,教授,研究方向為計算機網絡、信息化應用和網絡安全。 http://www.cnki.net/kcms/detail/61.1450.TP.20161121.1641.022.html TP274.2 A 1673-629X(2016)12-0137-05 10.3969/j.issn.1673-629X.2016.12.030>executeQuery() throws SQLException{
> rowsList=new ArrayList
>();


3 結束語