Tung Nguyen,Jingwen Zhang,and Weisong Shi
(Department of Computer Science,Wayne State University,Detroit,MI 48202,USA)
Abstract Many applications,such as those in genomics,are designed for one machine.This is not problematic if the input data set is small and can fit into the memory of a single powerful machine.However,the application and its algorithms are limited by the capacity and per?formance of the machine(the application cannot run in parallel).A single machine cannot handle very large data sets.In recent re?search,cloud computing and MapReduce have been used together to store and process big data.There are three main steps in han?dling data in the cloud:1)the user uploads the data,2)the data is processed,and 3)results are returned.When the size of the data reaches a certain scale,transmission time becomes the dominant factor;however,most research to date has only been focused on re?ducingtheprocessing time.Also,it is generally assumed that thedataisalready stored in thecloud.Thisassumption doesnot hold be?cause many organizations now store their data locally.In this paper,we propose SPBD(pronounced“speed”)to minimize overall user wait time.We abstract overall processing time as an optimization problem and derive the optimal solution.When evaluated on our pri?vate cloud platform,SPBDis shown to reduce user wait time by up to 34%for a traditional WordCount application and up to 31%for a metagenomic application.
Keyw ords bigdata;genomics;NGS;MapReduce;cloud
R ecent improvements in genomic technology have al?lowed researchers to cheaply sequence significant?ly more data than before.However,storing and pro?cessing mass data derived from next-generation se?quencing(NGS)is challenging.In our research facility at Wayne State University,the Illumina HiSeq sequencing sys?tem can generate about 250 million reads(about 60 GB)per run[1].It usually has eight lanes,each of which can handle 12 samples at most.In total,we can have 96 samples(i.e.96 read files),each of which is 60 GB(pair-end read)for one run.It takes several days to generate the data, and with state-of-the-art Novalign alignment software,it takes two daystoprocessthedata[2].
If security is not a strict requirement,a cloud can be used to store and process a large data set.A cloud provides highly available,almost unlimited data storage and elastic computing power.There are also no up-front costs.When dealing with a large data set,a parallel programming model is usually the most effective computing model.Google MapReduce[3]and Microsoft Dryad[4]are among the most widely used frame?works for parallelizing genomic applications[5]-[8].Using a cloud and parallel computing is not always easy for biologists,who are often not experts in computer science.However,re?cent attemptshave been made to improve the usability of cloud and parallel computing for these scientists[9],[10].
In practice,the place where data is usually generated is dif?ferent to that where it is processed and analyzed[11].There?fore,results of the data analysis are usually obtained by mov?ing thedatatothe data center;runningtheanalysissoftwarein?side the data center,cloud or local cluster;and downloading the resultsto the scientist'slocal machine.
Methods such as scp shell command,ftp client,and web(http)are often used by domain scientists to transfer data to re?mote sites.Even when the data is large,such methods are still used.Most domain scientists are not aware of other methods,such as GridFTP.One of the most significant issueswith trans?ferring a large file over the network is failure.Traditional data transfer methodsdo not handle failure well.
In this paper,we focus on cloud MapReduce,for example,Amazon Elastic MapReduce(EMR).Weusethe Hadoop imple?mentation of MapReduce as our framework;therefore,the ap?plications are of the MapReduce-compatible variety only.In this context,the data is usually moved to Amazon S3 by the AWSmanagement console,s3cmd,BucketExplorer,or Cloud?Berry Explorer.Amazon announced itssupport for objectslarg?er than 5 GB in November 2010;however,at that time,many S3 clients did not support the new multipart upload feature.This feature was only recently supported in the newest(beta)s3cmd client,which wasreleased in April 2012.
Havingnarrowed down the context,we now describe how da?ta is moved to HDFS.Traditionally,the most popular method of moving or copying data from a local machine to HDFSis to use Hadoop dfs-put<filename>or distcp shell command.With Amazon EMR,data is first moved to Amazon S3.In this process,command-line interface(cli)tools,such as s3cmd and S3-Util,are used.Graphical user interface(GUI)tools,such as CloudBerry Explorer and Bucket Explorer,are also used.Then,either GUIbased AWS Management Console or Amazon EMR Ruby Client(based on cli)is used to create a job flow for running a MapReduce application with the upload?ed data.
With mass data,Amazon suggests using their physical im?port/export service.Userscan storeup to4 TBof datain an eS?ATA or USB 2.0 portable hard drive and have it couriered to the nearest Amazon site.In general,there are many other tools,such as Pentaho[1]and Apache Sqoop,that can be used to transfer datatoand from HDFS.
Most current work on MapReduce for cloud is focused on ei?ther improving the processing time tprocof the software[5],[6],[8]or improving the data transmission time ttransonly[13].In this paper,we aim to minimize user wait time,also called re?sponse time tres.This is the time from when the user starts up?loading their data to when the results are ready to be returned or downloaded.We do not take into account the time taken to transfer the results from the cloud to the user because in our context,the size of the results is often much smaller than that of the input data.
Many people argue that we should only focus on tprocbecause the data only needs to be transferred once(at most).Neverthe?less,in research on large NGS data,alignment or mapping tools—which deal directly with large data in the pipeline men?tioned in[14]—are usually run only once or twice because of lengthy runs and high cost.Consequently,it is crucial to mini?mize tres.
Movement of data from the user to the data center cannot be avoided;therefore,we propose streamlined processing of the data.Our approach involves splitting the user data into smaller parts that are streamlined to the data center.As soon as the first part arrives,the system starts processing it and keeps re?ceiving subsequent parts.After all the parts have been pro?cessed,the system merges the results and sends them back to theuser.
We implemented a prototype of SPBD and evaluated it with two MapReduce applications:WordCount and metagenomic.The results showed that SPBD improves tresof WordCount by 34%and improves tresof the metagenomic application by 31%for 32 GBof input data.
In this paper,we try to minimize treswhen using MapRe?duce-style software to analyze a large NGSdata set in a cloud
with a Hadoop framework.Our proposed system is shown in Fig.1.

▲Figure1.Overview of proposed system.
In section 2,we describe our approach in detail and formu?late problems.In section 3,we describe the design and imple?mentation of SPBD.In section 4,we give the results of our ex?periments on SPBD.In section 5,we discuss future research di?rections.
We propose adapting the pipeline processing model for the CPU to our scenario.In the pipeline model,a CPU instruction is equivalent to completely processing a partition of data in our context.This processing is denoted Pi,where i is the number of the partition.In this paper,the terms partition and chunk can be used interchangeably.Each Pionly has transmission and processing stages,not many stages,as in a CPU instruc?tion.All stages of a CPU instruction have the same clock cy?cle,that is,the time taken to complete a stage.In our proposal,this time isarbitrary.
Instead of waiting until after all the data has been moved to the cloud before analyzing it,we start analyzing the chunks as soon as they arrive.While the first chunk is being analyzed,the second chunk is being received.When the first chunk has been processed,the second chunk is ready to be processed.While processing the second chunk of data,the next chunk is received.This continues until the last chunk has been pro?cessed.Depending on the application,an additional merge pro?cess can be run after the last chunk has been processed or as soon as the first chunk has been processed.The output results can be merged after all chunks have been processed.Although defined by application developers,such a merge process is usu?ally simple in practice(Fig.2).The time taken to execute the mergeprocessisdenoted tmerg.
How large,then,should each chunk be to achieve an opti?mal tres?Or how many partitionsshould theinput databe divid?ed into?To answer these questions,we need to compute tres.In Fig.2,all the stages are the same size;however,in practice,this is not necessarily the case.In the overlapping periods,where receiving and processing occurs at the same time,any stage that requires more time to execute would be used to com?pute tres.Consider two possible cases:ttrans>tprocand ttrans≤tproc.For simplicity,we assume that all partitions are the same size,and ttransand tprocof a partition are the same as those of other partitions.The merge period contributes to the calcula?tion of tres only after the last chunk has been processed.All ex?ecutions(if any)in the merge period that take place before the last chunk has been processed are masked by either the data transmission stageor processingstage.
If ttrans>tproc(Fig.3),trescan begiven by

▲Figure2.Streamlined processing.

where N isthenumber of partitions.
In this case,ttransdominates,so the system always finishes processingall thereceived dataand waitsfor thenext chunk.
If ttrans≤tproc(Fig.4),trescan be given by

In this case,the data is ready to be processed,but the sys?tem has to wait until the previous chunks have been processed because tprocislonger.
In the cases shown in Fig.3 and Fig.4,the next chunk of da?ta is transmitted as soon as the previous one has finished.In general,tresisgiven by

In(3),ttrans,tprocand tmergare a function of N.As N increases,both ttransand tprocdecrease,but tmergincreases.In sum,tresis al?so a function of N.The problem is now one of simple optimiza?tion:Given fixed input data,find N so that tres is at its mini?mum.
In reality,the problem is how to identify all the components in the equation given that the system only receives data of known size and a data-analysisapplication.
Here,we discuss the proposed SPBD system for realizing the previously mentioned idea.The inputs for our system are input data,a data-analysis application,and a path to a local folder for storing the output.The data is at the user's site out?side the cloud.Like Amazon EMR,there isa pool of data-anal?ysis applications in the cloud for users to choose from.This makessensebecausein genomics,researchersoften haveacol?lection of data-analysis applications to choose from.These ap?plications include Galaxy and BLAST.Moreover,even if the user wishes to use a special application,it should not be a problem because the time to upload the new application is neg?ligible,and the application can be used many times.
Originally,we assumed that the data-analysis software was a partitionable MapReduce application,that is,a MapReduce job that can process input data in parallel,and chunks of input are independent.In fact,all MapReduce-style jobs are like this because all map tasks are independent.Consequently,our method can be used for any MapReduce job.
The output of our system is also the output of the selected applications.The system takes the input data from the user,transfers it to the cloud,runs the selected applications,and re?turns the results.The user only needs to specify the path to the local input and output folders.The user also needs to provide the names and other parameters of the applications.The prog?ress of jobs/applications in the cloud can be monitored via a built-in Hadoop website.Once the jobs have finished,the out?put data is typically written to HDFSor Amazon S3.However,in our system,the output data can be returned to the user if they wish.

▲Figure3.t res calculation for t trans>t proc.

▲Figure4.t res calculation for t trans≤t proc.
There are two approaches to identifying all the terms in(3):static and dynamic.Using the static approach,the functions of the terms in(3)are fixed.If ttrans=1000/N+20,it remains the same all the time.With the dynamic approach,such an as?sumption is abandoned,and ttrans is more realistic.The status of the network is not identical at different time points because of congestion or diverse network traffic.Also,in a virtualized cloud environment,the run time for the same applications with the same-sized input may not be similar at different time points.This dissimilarity occurs because of differences in the statuses of virtual machines,load,and network and also be?causeof differencesin thedatacontent itself.
Revisiting(3),tproc,ttransand tmergare not direct functions of N but are,in fact,functionsof thesize p of the partition.There?fore,we need to change the variables of these terms from p to N(or vice versa).If we assume that all partitions are the same size,and S is the size of all the input data,the relationship be?tween p and N isgiven by

If thetermsof(3)aresubstituted into(4),tresbecomesafunc?tion of S or p.
In practice,tmergis not easily estimated,so we remove it from(3).In our implementation,the input data is partitioned and processed by a specific data-analysis application.After that,the outputs of all partitions go to the merging process to pro?ducethefinal result.It isthereforedifficult todeterminethere?lationship between tmergand S because this relationship de?pends heavily on the output of the data-analysis applications.In section 5,we show that the tmergforms of different applica?tionsaredifferent.
We assume that tprocand ttransare linear with respect to S.Therefore,without loss of generality,we give the following de?notations:

In our context,a and c are alwaysgreater than 0 because tprocand ttransare monotonically increasing functions of S.The parti?tion size p is also greater than 0.To obtain the minimum tres,we need to know when tprocis greater than,less than,or equal to ttrans.This allows us to remove the maximum and minimum functionsin(3).
We have the following two cases:a>c and a<c.In the for?mer case,(3)becomes minimum.
In the latter case,(3)becomes

Similar to the former case,we can derive p in order to mini?mize tres.
After obtaining p for each subcase in equation(7),we can calculate tresfor each subcase and compare these tresvalues to obtain the overall minimum tres.
At this stage,we can fine-tune p by monitoring the network status and analysis progress.For example,the network keeps changing over time,so ttransfor the incoming partition is most likely different from ttransfor thepreviouspartitions.
SPBD is implemented in Java using a client-server model.The server is the portal of the cloud,and the client keeps send?ing data.The server decides when its buffer has enough data(the size of the partition)and begins processing so that an opti?mal overall tresis achieved.The problem is deciding how many partitions/chunks the input data needs to be divided into.In other words,when should the systemstart processing a chunk?
The client provides an interface so that a user can specify their job(application names and parameters).The path to the local input data is included in the parameters.The job specifi?cation is similar to that of Amazon EMRRuby Client.Other in?formation,such as the IPaddress of the server,credentials and access keys,is specified in a configuration file.Before sending the large input file,the client sends the job specification and data size to the server.After receiving an acknowledgement from the server,the client starts transferring the input data us?ing TCP.
We implement the server using a static or dynamic ap?proach.With the static approach,we run all available applica?tions in the pool offline.The applications are run with different sets of data of differing size,and using linear regression,all the functionsin(3)areidentified.In section 4,weshow that the as?sumption of linearity holds in practice because the R-squared valuesarevery close to 1.Thisindicates a good-fitting model.

With the dynamic approach,the terms in(3)are identified online when executing the job.At the server side,the input da?ta is divided into two parts that are nearly the same size.The first part is used to learn tprocand ttrans.The second part is pro?cessed in astreamlined way usingthecomputed optimal size.
The administrator(at the server side)specifies a number of samples when starting SPBD.This number is used to identify different sample sizes in our linear regression process.The size of the first sample partition is denoted init_size and is giv?en by

The size of the ith sample partition is double that of the(i-1)th sample partition.Algorithm 1 is used in the receiving function at the server.With this approach,it is difficult to learn tmergbecausethesizesof partitionsaredifferent.
In addition,we fine-tune SPBD by monitoring tprocand ttrans?after computing the optimal partition size.If the optimal parti?tion size isobtained when tproc=ttransfor│tproc-ttrans│>thresh?old,then we update the optimal size accordingly.For example,if a>c and tprocisgreater than ttrans,we decrease the size of the partition.
In this implementation,SPBD does not send the result back to the user because it the result is simple.When using Amazon EMR,the results are stored in S3,and there are many easy-to-use graphical tools that can be used to interact with S3 asif it werealocal filesystem.
We evaluate SPBD on WordCount and a metagenomic appli?cation.Theformer isasimpleand typical MapReduceapplica?tion that is useful in gaining insight into SPBD.It also shows that our approach is not necessarily limited to genomics.The latter is chosen from many genomic applications because it deals with large data sets.In addition,its outputs are fixed-sized matrices,which shows that the merge process is very different for different applications.
Our testbed configurations are described in Table 1.The lo?cal network speed is 100 Mbps,and the computing cluster runs Hadoop 0.20.203 on the Eucalyptusplatform.
To use SPBD,the developer needs to write 76 lines of code for the merge part of WordCount and 152 lines of code for the merge part of the metagenomic application.It is not difficult to write these parts,and most likely,involves only a slight modifi?cation of the reduce parts in the original applications.

▼Table1.Testbed configuration

Algorithm1.SPBDAlgorithm Input:size of the input data S,init_sizeand SAMPLE_NUM Output:none partition_size=init_size;accumulated_size=0;repeat bytesRead=Read fromsocket tobuffer;writebuffer to HDFS;accumulated_size+=bytesRead;partition_num=0;while accumulated_size≥partition_size do record t trans for thereceived partition;create and submit a new job asynchronously to Hadoop to processthereceived partition;if partition_num<SAMPLE_NUM-1 then partition_size*=2;end elseif partition_num==SAMPLE_NUM-1 then Start a new thread to compute the optimized size for a partition;theoptimized sizewill beused toupdate thepartition_size end accumulated size=0 partition_num++;end until BUFFER_SIZE!=bytesRead;
We intentionally did more experiments with WordCount be?cause it is more popular than a metagenomic application.WordCount is a MapReduce application that is included in the example jar file distributed with Hadoop.The application is used to count the number of occurrences of all words in an in?put file.The input data used in our experiments is the data dump from Wikimedia[2].
Experiments are performed to show the relationship between p and tprocand to show the relationship between partition size and ttrans(Fig.5).The same input data is used.The linear re?gression functions and R-square values are shown for tprocand ttrans.The sizeof the input data is between 5 GBand 2 GB.Fig.5 shows that if the same input data(i.e.the same content)is partitioned into different sizes,both tprocand ttransof each parti?tion can be modeled with linear regression.At each data point in Fig.5,the input data is cut into partitions of the same size.The linear functions obtained with particular input data may differ fromthoseobtained with other input data.
Fig.6 shows tmergfor 1 GB and 2 GB input data and varying N.When S remains the same and N increases,tmerg generally increases also.Fig.7 shows the relationship between S,tproc,and ttrans.Thecontent of all input datavaries.

▲Figure5.t proc,t pr oc and their linear regression functionsfor S=2GB.

▲Figure6.Linear regression functionsof themergeprocess for S=1GBand S=2GB.
In Fig.7,S is measured is measure in gigabytes whereas in Fig.5,S is measured in megabytes.Therefore,the linear func?tion parameters are significantly different.Also,the input data content used for Fig.5 is different from that used for Fig.7.The values obtained the experiments shown by Fig.5 and Fig.7 aretheaveragesof several runs.
Our experiments confirmed that the relationship between S,tprocand ttransis linear regardless of the content of the data.The relationship between p,tprocand ttransis also linear regardless of the content of the data.Also,tprocand ttrans,which derive from linear regression,are different for different runs.Consequent?ly,we should not use the same computed optimized p between different runs.
After verifying the linear assumption,we experimented with our SPBD system and mimicked a real scenario.The input da?ta is stored in the client machine,which acts as the storage server at the client side(Table 1).This machine also contains SPBD client code.The master node runs the SPBD server code and accepts job submissions.Thismaster node is also the mas?ter node of Hadoop.The remaining nodes are computing nodes.In reality,both the master and slaves are in the cloud,and the master and slaves are connected to the client through the internet.However,in our experiments,the master,slaves,and client are all on the same local network.This does not af?fect the applicability of SPBD because SPBD learns the func?tionsthrough measurement.
Using the static approach,we evaluate SPBD with varying p.In this experiment,we ran WordCount with 32 GB of data.The results show that varying p for the same input data significant?ly changes tres(Fig.8).In addition,as p increases,tresdecreas?es at first and then increases.This suggests that there is an op?timal treswith respect toan appropriate p.
To study the bandwidth between the client and the Hadoop master node,we ran WordCount on our local Hadoop testbed,and 16 GB of data was transferred from different machines in the network.The results are shown in Table 2.Bandwidth does affect the performance of the system but not very significantly.SPBDoutperformsthetraditional approach in all cases.

▲Figure7.Linear regression functions for WordCount.when S varies.

▲Figure8.t res of WordCount when static approach isused and p isvaried.

▼Table2.SPBDfor different network bandwidths
To determine the benefit of SPBD,we used the dynamic ap?proach with SAMPLE_NUM=3.The NO-SPBD configuration is computed by making p the same as S,and the SPBD values already include tmerg.In other words,the outputs for the two configurations are the same(Fig.9).As the size of input data increases,SPBDprovides greater improvement(Fig.9).For ex?ample,when S=32 GB,treswith SPBD is only 66%that of the existing approach.
Finally,we experimented with Amazon EMR.The results show that the assumption of a linear relationship between ttransand S holds.However,wedonot discussit here.
Metagenomics is the study of genetic material sampled di?rectly from the habitats of microorganisms[16].In our experi?ment,we used a metagenomic application developed by a bio?informatics research group[4].The input data for this applica?tion was generated using MetaSim[18].Availability of input data was also an important reason to select this tool for our ex?periment.
As with the previous experiments on WordCount,we verify the linear relationship between input data and tprocand the lin?ear relationship between input data and ttrans.We study the mergingprocessand determinethebenefit of SPBD.
Fig.10 shows measured tprocand ttranswhen S varies.Linear regression is also applied to the data.As in the WordCount ex?periments,theassumption of linearity holds.Table3 shows tmergfor 1 GB of data and varying N.This time,tmergis almost con?stant,unlike in the WordCount experiments.This is because the output of the metagenomics application is only a small number of small matrices,and these matrices are independent of S.Therefore,combining the matrices in parallel takes only small and almost constant amount of time.This experiment al?so demonstrates that tmergis difficult to estimate and depends heavily on particular applications.
Last,we ran a modified metagenomics application with SPBD and without SPBD and on different data sets.For S=32 GB,tresfor SPBDimproved 31%compared to tresfor NO-SPBD.
When S is less than 8 GB,SPBD performance is a little worse than that of NO-SPBD(Fig.11).This indicates that SPBD is only beneficial with big data.If S of an application is less than a certain value,it is good to upload all the input data to the cloud and processing it afterward.We determine that thisvalueis1 GB.

▲Figure9.Benefit of SPBDin WordCount when thedynamic approach isused.
In this paper,we have discussed state-of-the-art tech?niques for analyzing large data sets in genomics.These tech?niques involve combining a cloud with a parallel processing framework,such as MapReduceor Dryad.We proposed,imple?mented,and evaluated SPBD,which is a system that automati?cally transfers and processes large data in the MapReduce cloud.Our experimental results show that SPBD significantly improves treswhen a large data set is analyzed.

▲Figure10.Measured t pr oc and linear functionsof themetagenomic application when S varies.

▼Table3.t mer g for S=1 GB

▲Figure11.Benefitsof SPBDin themodified metagegomic application when thedynamic approach isused.
We took an empirical approach to deriving the coefficients needed to compute tres,and we continued refining the parame?ters after determining the optimal partition size.Our solution is resilient to changes of system parameters,such as load or net?work traffic.Although we did not experiment with other jobs or interferences in the system,SPBD should also benefit a practi?cal multiuser system.In future work,we plan to extend SPBD to handle more than one input file,and we plan to improve the linear model by using,for example,piecewise regression.We also plan to test SPBD with more applications or,better yet,with a benchmark for large data.Instead of TCP,we might al?so apply GridFTP or other state-of-the-art data transmission protocolsto SPBD.