Abstract
Sensor network–based application has gained increasing attention where data streams gathered from distributed sensors need to be processed and analyzed with timely responses. Distributed complex event processing is an effective technology to handle these data streams by matching of incoming events to persistent pattern queries. Therefore, a well-managed parallel processing scheme is required to improve both system performance and the quality-of-service guarantees of the system. However, the specific properties of pattern operators increase the difficulties of implementing parallel processing. To address this issue, a new parallelization model and three parallel processing strategies are proposed for distributed complex event processing systems. The effects of temporal constraints, for example, sliding windows, are included in the new parallelization model to enable the processing load for the overlap between windows of a batch induced by each input event to be shared by the downstream machines to avoid events that may result in wrong decisions. The proposed parallel strategies can keep the complex event processing system working stably and continuously during the elapsed time. Finally, the application of our work is demonstrated using experiments on the StreamBase system regardless of the increased input rate of the stream or the increased time window size of the operator.
Introduction
Wireless sensor networks (WSNs) composed of geographically distributed autonomous sensor nodes are cooperatively monitoring the physical environment. WSNs have been gaining importance in a variety of applications including health-care monitoring, fire detection, environmental monitoring, habitat monitoring, financial services, military surveillance, vehicle tracking systems, and earthquake observation. Distributed sensors in these applications often generate massive data in the form of streams, where processing, analyzing, managing, and detecting patterns from the data streams are unavoidably complex. 1 Therefore, how to extract valuable information from the data streams with timely responses plays a very important role. There are two main technologies to process and analyze the data streams. One technology is about data stream processing (DSP).2–7 Another technology is about complex event processing (CEP).8–15
CEP is a method of processing and analyzing the data streams of information by making use of patterns over sequential primitive events for detecting and reporting composite events. 16 Because of advantages such as an expressive rule language and an efficient event detection model, CEP systems have been applied extensively in both academic circles and industry in recent times.17–23 In CEP systems, event streams are processed in or near real time for a variety of applications ranging from wireless sensor networks to financial tickers.24–27 In those fields of application, continuous and highly available event stream processing with high throughput is critical to deal with real-world events.
Subsequently, in DSP systems, many types of parallel strategies have been developed to handle massive amounts of distributed data streams.28–38 Because of the differences between DSP and CEP systems,16,26 most of these strategies, which focus exclusively on aggregate queries or binary equi-joins in DSP systems, cannot be simply and directly used in CEP systems, which focus on multi-relational non-equi-joins in the time dimension, possibly with temporal ordering constraints, such as sequences (SEQ) and conjunctions (AND). For example, Figure 1 illustrates a concept hierarchy of medical diagnosis and treatment work and eight pattern queries for monitoring patients’ conditions within 24 h. In this example, let us assume that the patients and equipment are radio-frequency identification (RFID)-tagged, and the information is gathered and collected using the tags. When the information from the RFID-tags are received by the RFID-readers, the system will make decision in terms of matching the data streams according to the pre-determined patterns. The structure of the query language and the definitions of the operations in the queries are explained in section “Preliminaries.” Some of the detailed descriptions, including the time constraints of the queries, are omitted from Figure 1 for simplicity. Such complex pattern queries involve nests of sequences (SEQ) and conjunctions (AND), which can have negative event type(s), and combinations of them.11,13,15,39 The volume and input rates of the data, however, would become large, similar to event stream processing, especially for big data applications.40,41 Increasing the size of the time window of an operator or the input rate of a stream may cause bottlenecks, which gives rise to query results of poor quality, losing the quality-of-service (QoS) guarantees of the system.

Example of concept hierarchy and pattern queries for medical diagnosis and treatment work.
To address these issues, we propose a new parallelization model and three parallel processing strategies that can be used to keep the CEP system running stably and continuously. Specifically, the CEP system based on the proposed parallelization model splits the input stream into parallel sub-streams to execute a continuous pattern query over event streams using a scalable model. The three parallel processing strategies keep the CEP system working stably and continuously during the elapsed time under the increased time window size of the operator and input rate of the stream. Our work is validated through experiments on the StreamBase 21 system.
The contributions of this study are as follows:
We propose a novel parallelization model that includes imposed temporal constraints, for example, sliding windows, to enable sharing of the processing load for the overlap between windows of a batch induced by each input event by the downstream machines.
Applying the proposed parallelization model, we design three parallel processing strategies for the CEP system, that is,
Using the StreamBase system, 21 we perform empirical studies to validate our work, with increased input rates and larger time windows.
The rest of this article is organized as follows: The related work is discussed in section “Related work.” The background for this study is briefly introduced in section “Preliminaries.” A new parallelization model and three parallel processing strategies are proposed for distributed CEP systems in section “The proposed methods.” The model is validated against the experiments on the StreamBase system and the results are presented in section “Experimental results.” Finally, the conclusions and future work are presented in section “Conclusion and future work.”
Related work
CEP plays a very important role in detecting and integrating events by pattern matching in situation-aware applications from financial trading to health care.8–15 Most of the conventional CEP systems are centralized, running on a single machine. With the need for high performance, distributed CEP systems are developed by distributing the detection logic over a network of operators, where individual operators can be a bottleneck, requiring operator parallelization. 42 As far as we know, only a few recent works focus on parallelization of pattern-matching processing in which pattern matching is processed as a stateful operator in a general-purpose streaming system. Hirzel 30 exploits the partitioning constructs provided by the queries, such as PARTITION BY. However, this approach is sufficient only when queries contain such constructs. Wu et al. 31 proposed a parallelization framework for stateful stream processing operators that split events using a round-robin methodology to replicate different versions of an operator with a shared state. Despite being an effective parallelization framework, their framework is not feasible for pattern matching. Schneider et al. 36 implemented intra-operator parallelism through data-partitioning; they introduced a compiler and a run time system that can automatically extract data parallelism from streaming applications. Brenna et al. 38 proposed a novel approach to non-deterministic finite automata (NFA)-based distributed event processing where the NFA is decomposed into separate states running on different machines. In contrast to other related works, they also focus on running multiple queries in parallel. Balkesen et al. 43 proposed a run-based intra-query parallelism (RIP) technique for scalable pattern matching in event streams. RIP distributes input events that belong to individual run instances of a pattern’s finite state machine (FSM) to different processing units.
In this study, the focus is the parallelization of a pattern operator previously described in the literature.10–12,15 This approach is both orthogonal and complementary to the previous approaches. Our approach can be used even to parallelize a pattern operator with CEP queries that contain no “PARTITION BY” clauses. In addition, our parallelization approach includes imposed temporal constraints, for example, sliding windows, where the overlap between the windows of each batch influences the processing load induced by the following events. By implementing our approach, events generated by matching the coming input events from different streams are produced on different machines, where one input stream is replicated and sent to the downstream machine and the other input stream is split and sent to the downstream machine. Therefore, the processing load is shared by the downstream machines, while it can avoid omitting the detection of some events that may result in incorrect decisions.
Preliminaries
In this section, we briefly introduce the basic event model, nested pattern query language, and pattern operators based on related studies.10–12,15,39
Event model
An event, which
Definition 1
A primitive event
Definition 2
Based on Definition 1, a composite event is denoted as
Nested pattern query language
The following nested complex event query language is used to specify nested pattern queries:
PATTERN (Event Expression: composite event expressed by the nesting of SEQ and AND, which can have negative event type(s), and their combination operators);
WHERE (Qualification: value constraint);
WITHIN (Window: time constraint).
The composite event expression in the PATTERN clause specifies the nested pattern queries, which support nests of SEQ and AND that can have negative event type(s), and their combination operators, as explained above. Sub-expressions denote inner parts of a pattern query expression. The value constraint in the WHERE clause defines the context for the composite events by imposing predicates on event attributes. The time constraint in the WITHIN clause describes the time window during the time difference between the first and the last event instances, which is matched by a pattern query that falls within the window.
Pattern operators
We define the operators that appear in the PATTERN clause of a query. More details can be found in the literature.
39
In the following,
Definition 3
An SEQ operator10,15 specifies a particular order, according to the start-time stamp, in which the event must occur to match the pattern and thus form a composite event
Definition 4
An AND operator 15 takes the event types as input, and events occur within a specified time window without a specified time order
The proposed methods
In this section, a new parallelization model and three parallel processing strategies are proposed for distributed CEP systems. The proposed strategies comprise a scalable execution of a continuous pattern query of event streams by splitting the input stream into parallel sub-streams.
Parallelization model
We propose a new parallelization model for pattern operators. An example scenario of an application with a pattern operator using our proposed framework for parallelizing the pattern operator is shown in Figure 2. In this example, we assume that two input streams are sent to the CEP system. Because of the specific property of a pattern operator, as described in section “Preliminaries,” we cannot split both of the inputs

An example scenario of (a) a pattern operator and (b) its parallelized implementation.
Split-(process*)-merge
The
Intra-operator parallelization
The analysis in section “Split-(process*)-merge” showed that this method efficiently works with two input streams. If the

A pattern operator with multiple input streams.

The use of the proposed framework for parallelizing the pattern operator with multiple input streams.
Parallel processing strategies
Round-Robin
Algorithm 1 is the implementation of the
For simplicity and without loss of generality, we use an example of the flow chart of

The flow chart of the
The “
Join-the-Shortest-Queue
Algorithm 2 implements the
For simplicity and without loss of generality, the flow chart for

Flow chart of the
Least-Loaded-Server-First
The
The
For simplicity and without loss of generality, the flow chart of

Flow chart of the
Experimental results
Experimental setup
Based on the parallelization model in Figure 2, we implemented experiments on the StreamBase 21 system for the following query:
To validate our proposal, we compared the three proposed strategies,
Comparison of the methods under an increased input rate environment
When the time window size is 1 s and the input rate is 400 tuples/s, the data in Figure 8(a) show that the

Comparison of the throughput of each method under an increased input rate environment.

Comparison of
Comparison of the methods under an increased time window size environment
The data in Figure 10(a) and (b) show that the

Comparison of the throughput of each method under an increased time window size environment.

Comparison of
However, no matter when the input rate of the streams increased to 400 tuples/s or the time window size of the operators increased to 16 s, the throughput of the
Conclusion and future work
In the beginning of this article, we identified the general problems of parallel processing with respect to pattern operators. We proposed a new parallelization model for a pattern operator and three parallel processing strategies,
For future work, we propose the design of an adapting mechanism by fully leveraging the proposed parallel processing strategies to realize distributed CEP, deciding the parallelization degree and adopting real streaming data in the experiments. Furthermore, as discussed by Flouris et al., 44 distributed monitoring over probabilistic event streams is also an open and interesting issue that should be explored further. Conventional CEP systems inevitably involve various types of intrinsic uncertainties, such as imprecision, fuzziness, and incompleteness, caused by the sensor environment. Math tools such as D-numbers and evidence theory are widely used to handle the uncertainties and the data fusion in these systems.45–48 In these cases, a fuzzy approach is recommended over distributed CEP systems in future research. In the future, we also intend to include the hardware implementation and energy requirements for distributed CEP systems.
Footnotes
Academic Editor: James Brusey
Declaration of conflicting interests
The author(s) declared no potential conflicts of interest with respect to the research, authorship, and/or publication of this article.
Funding
The author(s) disclosed receipt of the following financial support for the research, authorship, and/or publication of this article: This research is supported by the Fundamental Research Funds for the Central Universities (Grant Nos XDJK2015C107, SWU115008, XDJK2016C043, and SWU115091), the Education Teaching Reform Program of Higher Education (No. 2015JY030), the 1000-Plan of Chongqing by Southwest University (No. SWU116007), the National Natural Science Foundation of China (Nos. 61672435, 61702427, and 61702426), the Project Funded by the Priority Academic Program Development of Jiangsu Higher Education Institutions (PAPD), and the Jiangsu Collaborative Innovation Center on Atmospheric Environment and Equipment Technology (CICAEET).
