--Event Processing Engiinnee"/>
999精品在线视频,手机成人午夜在线视频,久久不卡国产精品无码,中日无码在线观看,成人av手机在线观看,日韩精品亚洲一区中文字幕,亚洲av无码人妻,四虎国产在线观看 ?(Cloud Computing&IT Institute,ZTE Corporation,Nanjing 210012,China)
Design and Implementatiioonn of a Distributed Complexx
--Event Processing Engiinnee
Ping Lu,Yuming Qian,and Kezhi Zhu
(Cloud Computing&IT Institute,ZTE Corporation,Nanjing 210012,China)
A high-performance,distributed,complex-event processing engine with improved scalability is proposed.In this new en?gine,the stateless processing node is combined with distribut?ed storage so that scale and performance can be linearly ex?panded.This design prevents single node failure and makes the system highly reliable.
CEP;stream processing;M2M;sliding window;real-time pro?cessing
The development of mobile internet and the internet of things(IoT)has caused a data explosion that has been the impetus for continuous improvements in data processing.Ten years ago,diversified enter?prise systems were isolated information islands.Since then,service-oriented architectures(SOA)have been widely re?searched in terms of acquiring information and connecting in?formation islands.However,quickly acquiring valuable con?tent from mass data and preventing useless information from swamping valuable content is an issue.
In telecommunication applications,user signaling and be?havior is monitored in real time.Any internet access request,call,short message,or location change requires real-time data collection and processing.At the same time,relevant behavior models also need to be built.The amount of data processing be?comes so huge that in a city of two million people,the amount of signaling reaches a level of gigabytes per second.This can?not be handled with traditional offline processing.
There are two main solutions to processing mass real-time data:online data collection and offline data processing(using a method similar to MapReduce)and event streaming,in which data is calculated and processed in the memory.Micro-Ma? pReduce is an example of the former for an event system[1]. Micro-MapReduce can reduce the granularity and shorten the period of MapReduce.It also properly addresses scalability(al?though near-real-time extends from five minutes to an hour). Event streaming includes current open-source stream-process?ing frameworks,e.g.S4.Oracle CEP is an available S4 product [2].With event streaming,relevant data can be loaded and cal?culated in the memory.Therefore,this method provides highperformance standalone processing,but is weak in terms of scalability and fault and disaster tolerance.Data therefore needs to be distributed at the front end and aggregated at the back end.
Storm is a good distributed solution[3].A Storm cluster com?prises a master node and multiple worker nodes.The master and workers interact through a Zookeeper.Storm is essentially a reliable distributed event-processing engine that ensures ev?ery event is processed.However,single-point failure at the Storm nimbus(master)node is a problem,and HA is required. Moreover,without a time window,Storm cannot sufficiently support either event windows or multi-event collaboration.
With complex event processing(CEP),a user-friendly inter?face is essential.Event Processing Language(EPL),a Quasistructured Query Language(SQL),is usually used to define event processing logic.Cayuga[4]and Borealis[5]are effec?tive systems for EPL processing and guaranteeing event QoS.
To reliably process mass real-time data,we propose a new high-performance distributed CEP engine called ZX-CEP. This engine has the following features:
·stream computing for complex event data
·high concurrency(a single server supports more than 100,000 messages per second)
·high linear scalability
·simple orchestration of EPL message processing and graphic processing flow
·distributed computing
·linear scalability of system capacity and processing
·sliding event window.
A distributed stream-processing system can be regarded as a black box.Large numbers of continuous data streams enter the black box,where they are processed and converted into specific event streams for output or for further processing in other systems.If the system detects an alarm during stream pro?cessing,the system generates an alarm event and requests an automatic maintenance program to fix the fault or else it stores the event in a persistent storage engine for future analysis and processing.
In a stream-processing system,the data stream directions conform to a directed acyclic graph.If data multiprocessing is needed,the output of one flow can be used as the input of an?
other flow.This is serialized processing of multiple flows.
Fig.1shows the architecture of the distributed complexmessage processing engine,which comprises data-preprocess?ing module,complex-message processing module,output adapter,and task scheduling management module.
2.1 Data-Preprocessing Module
Continuous event streams are sequences of events that have not undergone structured processing.These original events are filtered,combined,and distributed by the event-preprocessing module(Fig.2).
The preprocessing module is responsible for input data adap?tation and preprocessing.
The input data adapter receives original events and converts them to structured events.It also follows the event occurrence sequence and puts the converted events into a local message queue,where they await preprocessing.An input data adapter is usually customized according to the input contents.After in?put data adaptation,original events are converted to format messages that include message source ID,generation time stamp,and content K/V object.Messages that cannot be quan? tized require metadata management,which involves quantiza?tion mapping of message content.
The following atoms for event preprocessing have been achieved:field filtering atom,field filling atom,event filtering atom,event combination atom,and event split atom.The task manager customizes rules for and dynamically loads basic at?om operations that can be instantiated into multiple operators. In data preprocessing,these operators are connected according to defined rules.Operators can be connected using a graphic editing tool or through EPL condition resolution.Connecting operators in a pipeline manner is beneficial because complex data-processing logic can be generated any time by connect?ing basic operators in series and parallel,and complicated code does not have to be written.
During event processing,an input signal may generate some noise above the normal amplitudes.Filtering removes the noise so that the signal is normal[6].
2.2 Complex Message-Processing Module

?Figure 1. Architecture of the distributed-complex message processing engine.

?Figure 2. Architecture of the data-preprocessing module.
Multiple event-processing modules detect one or more win?dow-change event queues.Idle event-processing modules au?
tomatically acquire events from the queue or queues.Because the event-processing modules are stateless,they can be added or removed any time according to service conditions and with?out affecting system operation.
In distributed message processing,a distributed message queue ensures that events are sequential.This can be achieved in the DCache K/V system[7]or by using other high-perfor?mance distributed message queues.The sequencing and dis?tributed processing of messages is ensured by maintaining a consistent message queue in the distributed storage(Fig.3).In a distributed K/V system,there is also a unified time window that is maintained by the elected master node.This avoids pro?cessing errors caused by inconsistency between the clocks of different nodes.
2.3 Output Adapter
The output adapter converts system processing results into specific output actions or data streams.The output includes message output and periodical sampling output.If the output is a data stream,it can act as an input stream that is later pro?cessed according to another group of rules.In this case,analy?sis should be performed in different dimensions.The output adapter observes data stream conditions within the fine granu?larity of five minutes,and the output of integrated results forms a data stream with a coarse granularity.Then,analysis is done over a longer period,such as one day,and the outputs are files,database tables,and message queues.The output adapter is generally customized according to service requirements,and it also supports time travel.After a rule condition has been trig?gered,the output adapter can record relevant system informa?tion before and after the event has occurred.The records are put into mirror persistent storage for later replay and analysis.
2.4 Task Scheduling Management Module
The task scheduling management module includes rule gen?eration and rule scheduling and execution(Fig.4).
Rule generation involves dynamic generation of customized task graphs by EPL event processing statements.It also in?volves the generation of event processing logic in GUI mode by a rule editor.
Rule scheduling and execution involves dynamic rule load?ing in the service process.That is,the rule loading procedure cannot affect the normal processing procedure.EPL is suitable for simple rule scenarios,and rule graph editing is suitable for complex rule scenarios.A graphic rule editor has been devel?oped to increase efficiency.It converts rule graphs into corre?sponding codes and dynamically loads the codes.
When a designated sliding window has adapted to a new rule,old data(that was generated before the new rule took ef?fect)has to be matched.We propose two strategies for address?ing this problem.First,after the new rule has been deployed,the corresponding window data is cleared.However,this may cause data interruption.Second,the time can be recorded after the new rule takes effect,and the old and new rules can be si?multaneously computed for the recorded period.Computing re?sults obtained using the new rule are not used until the data generated after the new rule takes effect has been removed from the stack.Otherwise,computing results obtained by using the old rule are always in use.A possible effect of this is that only after a time T=Tw+1,where W is the time window length,can the new rule take effect.
The sliding event window can be achieved through a highperformance distributed message queue.This window is de?fined as a unique event sequence with a fixed time length or a fixed number of messages kept in the system.As time passes,all messages kept in the event sequence stay within the specif?ic time or length range.Therefore,there are sliding time win?dows and sliding space windows.In a sliding time window,all events are maintained within a specific time interval.In a slid?ing space window,event capacity is defined in advance,and events exceeding this capacity are automatically removed from the stack.
The distributed K/V engine maintains the sliding event win?dow so that storage of events is guaranteed and consistent.
Each element of the event sequence,stack push pointer,and stack pop pointer are kept in the distributed K/V engine as key-value pairs.In this way,the distributed sliding event win?dow is stored(Fig.5).The stack push pointer and stack pop pointer are accessed in a specific synchronous operation mode,
which ensures data consistency in a distributed environment.

▲Figure 3.Multiple event processing modules’distributed processing of the same change queue.

▲Figure 4.Rule generation and loading.

▲Figure 5.Distributed sliding event window.
A message processing action is triggered by either the entry of an event into the event window or the exit of an event from the event window.Such entries or exits are detected by period?ic scanning,and an action triggers the complex message pro?cessing module.The message queue mechanism guarantees distributed processing and implements message processing in pull mode.When an event enters or exits the sliding event win?dow,a change message is generated.This message enters an?other message queue and waits for the complex message pro?cessing module to respond and handle it.
We build a data model similar to Aurora[9]and convert the event sequence of the time window into an incremental event sequence.There are three types of incremental events:
1)insertion event,given by(+,t),where t is an event object newly added to the event window
2)deletion event,(-,t),where t is an event object that exits the event window
3)replacement event,(^,t1,t2),where t1is a replaced event ob?ject,and t2is a new event object.
By handling incremental events,the system avoids frequent global scanning of the event window,and this greatly acceler?ates the processing.In addition,QoS marks are added to event information and sent to queues with different priorities.In this way,the system prioritizes the processing of events with a high priority.Distributed K/V storage maintains the event state ma?chine and global counter and simplifies the data-processing logic in the event-processing procedure.Here,we discuss com?puting the average value of all the events in the event window as an example.Typically,the averages of all events in the event window are recalculated using

After incremental events have been introduced,the follow?ing computation is performed:
If the sampling period is 1 s and the event window is 5 min,the amount of computation required using(2)is only 1/300th that required using(1).
However,distributed computing creates a problem:Because multiple event sequences may be involved,events generated by multiple event queues may not be processed by the same event processor.Therefore,we introduce a taint data proroga?tion model for distributed computing.This model ensures that the updates of any basic event could trigger all necessary sub?sequent nodes’further processing with in timeframe.
If a rule needs to use data in two or more sliding windows,the event streams generated by two sliding windows may not be analyzed and processed on the same node.Therefore,we pro?pose a distributed taint data propagation mechanism that en?sures all the processing nodes of a rule tree correctly update the final results,even if computation is not done on the same node[5].In Fig.6,gray parts represent dirty data,and dirty da?ta marks are transferred through the data-propagation mecha?nism.This guarantees all the data is timely updated.
The entire rule tree can be regarded as a directed acyclic graph.After going through an operator,changed data may or may not affect subsequent processing nodes.Dynamic analysis can be done to search out all affected nodes,but static analysis only confirms the trusted boundary.Data outside the trusted boundary may be confirmed as tainted or suspected tainted. Data inside the trusted boundary is guaranteed to be clean.
The taint propagation algorithm recognizes the affected nodes,and then subsequent data of the affected nodes is re?computed by operators.Data that is not affected does not need to be recomputed.In a distributed system,data and operators may be generated or kept in various physical nodes.
Taint propagation analysis can be static or dynamic.The pro?posed distributed stream computing system does basic static analysis.When source data changes,the system analyzes all af?fected nodes in subsequent parts of the directed graph and marks all affected data.If affected data needs to be acquired for computing,backward-pass computing can be done accord?ing to taint marks.In this way,computation is minimized for the entire system.
The most important part of a taint-check strategy is analyz?ing the trusted boundary.The taint-check strategy is represent?ed by a tetrad comprising entity type(type),vulnerability de?scription(vul),program operation(op),and operand location (loc):[type,vul,op,loc]|type∈ROLES,vul∈VUL_TYPES,op∈ACTS,loc∈{N∪any}.

▲Figure 6.Dynamic data operation of taint propagation.
Each computing node is taint-checked for each input vari?able,and this helps derive the taint propagation matrix for the entire system(Table 1).
In an environment with multiple input variables,confirmed and suspected tainted nodes are combinations of single inputvariables.The taint propagation algorithm allows the system to update nodes with dirty data only when data output is needed. It is unnecessary to update all data nodes all the time.In this way,system computation is greatly reduced.

▼Table 1.System taint propagation matrix
Messages with different priorities are put into different incre?mental event message queues according to different QoS marks.In this way,high-priority events are processed first.
The event processing module prioritizes the acquisition of change messages from the high-priority queue.When there are no waiting messages in this queue,the module acquires messages from the low-priority queue.
Computing results of the event-processing module are tem?porarily stored in DCache or are transferred to the output queue.A sampling tool periodically samples the computing re?sults temporarily stored in DCache and transfers them to the output queue.For example,it may be necessary to acquire the average of a sensor group in the window over the past hour,give a report every five minutes,and send an immediate alarm if the average is too high at a certain time.In this case,the event-processing module modifies the average value object maintained in DCache for each new input or output event.The sampling program samples this average data from DCache at five minute intervals and transfers the data to the output queue.If the data gives ultra-high readings,the program im?mediately generates an alarm and puts it in the output queue.
We propose a distributed complex-message processing en?gine called ZX-CEP.This high-performance engine can pro?cess a complex data stream in real time.The architecture is based on the principle that data should be separated from log?ic.The data is stored in the cloud,and multiple copies are kept.Data processing nodes are stateless and can be expanded in a distributed,dynamic manner.The engine ensures that stor?age is scalable and data is secure in a mass-data environment. It also ensures that processing is scalable in the case of paral?lel processing.Failure of any node does not affect normal pro? cessing or stream processing in the system.
This architecture depends on distributed K/V storage and a distributed message queue built on it.A sliding time window is also implemented by using the distributed message queue.
This paper describes how to use EPL to customize and load data-processing logic in real time.EPL is used to complete complex logic orchestration graphs based on basic operators. Distributed data processing and storage involve complex dis?tributed logic computation.A dirty data propagation mecha?nism is introduced so that data itself drives processing logic.
In the future,the dynamic logic-processing mechanism for this system will be improved so that logical decisions are more flexible and more complex logic computation is supported. Moreover,data storage and calculation nodes will also be ad?justed to ensure automatic fault discovery and recovery.
[1]T.Condie,P.Alvaro,J.M.Hellerstein,et al.,“MapReduce online,”Univ.of Cal?ifornia,Berkeley,CA,Rep.UCB/EECS-2009-136,2009.
[2]Qiang Huang and Qingkai Zeng,“Taint propagation analysis and dynamic verifi?cation with information flow policy,”Journal of Software,vol.22,no.9,pp.2036—2048,2011.
[3]Xinyu Li and Zhongdong Huang,“Persistent event processing solution based on CEP,”Computer Applications and Software,vol.27,no.12,pp.151-153,2010. doi:CNKI:SUN:JYRJ.0.2010-12-049.
[4]M.Cherniack,H.Balakrishnan,M.Balazinska,et al.,“Scalable distributed stream processing,”in 1st Biennial Conf.Innovative Data Systems Research,Asi?lomar,CA,2003,p.12.doi:10.1.1.63.3117.
[5]D.J.Abadi,Y.Ahmad,M.Balazinska,et al.,“The design of the borealis stream processing engine,”in 2nd Biennial Conf.Innovative Data Systems Research,Asilomar,CA,2005,p.13.doi:10.1.1.118.7039.
[6]D.C.Luckham and B.Frasca,“Complex event processing in distributed sys?tems,”Stanford University,Stanford,CA,Rep.CSL-TR-98-754,1998.doi: 10.1.1.56.876.
[7]L.Brenna,A.Demers,J.Gehrke,et al.,“Cayuga:A high-performance event pro?cessing engine,”inACM SIGMOD Int.Conf.Management of Data,Beijing,Chi?na,2007,pp.1100-1102.doi:10.1145/1247480.1247620.
[8]Oracle.(Dec 17,2008).Oracle CEP White Paper[Online].Available:https:// blogs.oracle.com/CEP/resource/cep_coherence.doc
[9]Hong Gao,“Distributed message queue based on P2P network,”Programmer,no.6,pp.102-106,2012.
Manuscript received:August 10,2013
Biograpphhiieess
Ping Lu(lu.ping@zte.com.cn)graduated from Southeast University,China.He heads the ZTE Cloud Computing&IT Institute and is in charge of R&D of service software,multimedia terminals,signaling detection,and ICT products.He is also re?sponsible for researching new internet,cloud computing,and home network servic?es.He led several national key research projects and has published 10 papers.He is a member of the China Computer Federation and CCF TCSC.
Yuming Qian(qian.yuming@zte.com.cn)graduated from Southeast University,Chi?na.He is a chief engineer at ZTE Cloud Computing&IT Institute and is responsi?ble for researching system architecture and new technologies for mass-data process?ing,cloud computing,and mobile internet.He has led several national key research projects and has published 16 papers.He was awarded the prestigious title Innova?tion&Entrepreneurship Talent of Jiangsu Province.
Zhike Zhu(zhu.zhuke@zte.com.cn)graduated from Southeast University,China.He is a product manager at ZTE Cloud Computing&IT Institute and is responsible for R&D and management of mobile internet and mass-data processing products.He has a deep understanding of mass-data processing,search engines,parallel calcula?tion,and mobile terminal management.