Thursday, September 17, 2015

Introduction to Complex Event Processing using Drools Fusion

Complex Event Processing is used to process a large stream of information and can be used for real-time event monitoring or correlation. Events can be processed in two ways, that is either in the 'stream' mode or in the 'cloud' mode. The following image illustrates the difference between the two modes:

Stream Mode v/s Cloud Mode

[Image Available From SlideShare]

The continuous flow of information or events can be classified into one of these brackets (or even both) for analysis or correlation. The cloud mode would be useful in the following circumstances: user behavior, market data and activity monitoring.  The stream mode could be most useful in applications such as: real-time event monitoring, event correlation, and sensor networks. 

The most useful end-applications are Threat Detection, Anomaly Detection, Airport Security, Market Prediction, Forecasting Profits, Automating Algorithmic Trading Decision among a host of other applications.

By the way, Sliding Window and Batch Window will need more clarity for any discussion on Compex Event Processing. For most Architects and Engineers - this will come across a very novel way of Analyzing Information - If this is the first time they are reading about this:

The Batch Window illustration as given below, demonstrates that the information window is processed in discrete or fixed slot or block of events. 

 [Image Available From Oracle]

The Sliding Window illustration as given below, demonstrates that the information window is processed in continuous or moving slot or block of events.

[Image Available From Oracle]

 [Publicly Available Image from Google Image Search]

Introducing Sherlock! (Mystery That is Data), which is an event correlation application that demonstrates the above concept of Complex Event Processing. It is built for the domain of banking for anomaly and threat detection. It will analyze the following use-cases that have been listed at the 'Top Threats, Especially in Banking Sector By SANS Instittute'. SANS Institute is a Co-operative Research and Training Institute for Information Security.

1. Detect if there are more than Ten Port or IP Scan Attempts from the Same IP Address (and Port)  in any of the last 10 seconds [Port Scan and IP Scan By SANS Institute]

2. Detect if there are more than Five Repeated Login Attempts from the Same IP Address in any of the last 30 seconds [Database Login or Intrusion Attempts by SANS Institute]

3. Detect if the traffic on a Port x has all of a Sudden Spiked than the History - Any of the last 30 seconds had more than five accesses [Repeated Port Access by SANS Institute]

I will demonstrate only the first use-case in this blog (including how to run the 'Intelligent Data Loader' and Possibly Hookup with a 'User Interface') to understand the anomaly and complex event processing. You may need to do the following before you can download and understand Sherlock!:

A. Download Drools 6.1.0 Distribution (Include in Classpath)
B. Download the Eclipse Plugin for Drools (Include in Classpath)
C. Use JDK 1.8.0 and JEE 1.7 Libraries (If Required) (Include in Classpath)
D. Brief Read on MVEL Dialect and Drools Fusion (Above/Official)

1. Create the SherlockEvent (and SherlockEventCorrelation) Java Object
  * @author spuri  
 public class SherlockEvent {  
      private int eventId;  
      private String eventType;  
      private String eventDescription;  
      private String eventSourceIp;  
      private String eventDestinationIp;  
      private String eventSourcePort;  
      private String eventDestinationPort;  
      private String eventSourceCountry;  
      private String eventDestinationCountry;  
      private String eventSourceUsername;  
      private String eventDestinationUsername;  
      private String eventRemarks;  
      private long eventSourceTime;  
      private long eventDestinationTimestamp;  ... // Refer Bundled Code

2. Code the 'Rule/Condition' using Drools 'MVEL' Dialect (Use-Case 01)
 // list any import classes here.  
 declare SherlockEvent  
      @timestamp (eventDestinationTimestamp)  
 declare SherlockEventCorrelation  
      @timestamp (eventDestinationTimestamp)  
 global Long startTime;  
 global Long startMemory;  
 global Long totalFactCount;  
 global java.util.HashMap threatMap;   
 // use case 01  
 // detect if there are more than ten port or ip scan attempts from the same ip address (and port)   
 // to the destination ip address (and multiple ports) in the given window  
 rule "Port and IP Scan Event Processing Initial"  
 dialect "mvel"  
   e1: SherlockEvent(eventType == "port and ip scan") over window:time(10s)   
   not SherlockEventCorrelation(eventSourceIp == e1.eventSourceIp, eventDestinationIp == e1.eventDestinationIp, eventSourcePort == e1.eventSourcePort)   
       SherlockEventCorrelation plec = new SherlockEventCorrelation();  
 rule "Port and IP Scan Event Processing Correlation"  
 dialect "mvel"  
   e1: SherlockEvent(eventType == "port and ip scan") over window:time(10s)   
   ce: SherlockEventCorrelation(eventSourceIp == e1.eventSourceIp, eventDestinationIp == e1.eventDestinationIp, eventSourcePort == e1.eventSourcePort, $eventCorr : eventCorrelation >= 0)  
   if(ce.eventCorrelation >= 10) {  
           System.out.println("+++++++++++++++ USE CASE 01 +++++++++++++++");  
           System.out.println("SOURCE INET: " + ce.eventSourceIp);  
           System.out.println("SOURCE PORT: " + ce.eventSourcePort);  
           System.out.println("DESTIN INET: " + ce.eventDestinationIp);  
           System.out.println("EVENT ACTN: " + "port and ip scan");  
           System.out.println("TIMESTAMP : " + new java.util.Date(ce.eventDestinationTimestamp));  
           System.out.println("OCCURENCES : " + $eventCorr);  
   update( ce );  
   threatMap.put(new java.util.Date(), ce);  

3. Configure Drools Fusion 
 <?xml version="1.0" encoding="UTF-8"?>  
 <kmodule xmlns="">  
   <kbase name="event" packages="event" eventProcessingMode="stream">  
        <ksession name="sherlock-event"/>  

The above file named 'kmodule.xml' is include in the META-INF of your project. Make sure you make it available in the classpath of your main class.

4. Code the Drools Java Runtime to Send or Process Events
The SherlockComplexEventProcessing includes the Java Code for Drools Fusion Runtime. The following are the most important activites perfromed by this runtime.

X. Declare Drools Java Runtime Variables

 import java.util.Date;  
 import java.util.HashMap;  
 import java.util.LinkedList;  
 import org.kie.api.KieBaseConfiguration;  
 import org.kie.api.KieServices;  
 import org.kie.api.conf.EventProcessingOption;  
 import org.kie.api.runtime.KieContainer;  
 import org.kie.api.runtime.KieSession;  
 import org.kie.api.runtime.KieSessionConfiguration;  
 import org.kie.api.runtime.conf.ClockTypeOption;  
 import org.kie.internal.KnowledgeBase;  
 import org.kie.internal.KnowledgeBaseFactory;  
 import org.kie.internal.builder.KnowledgeBuilder;  
 import org.kie.internal.builder.KnowledgeBuilderFactory;  
  * @author spuri  
  * SherlockComplexEventProcessing is a Sherlock service that provides the most  
  * essential part of feeding data to the Knowledge Is Everything API of Drools.  
  * It will provide data that is in order or even out-of-order. In essence, it  
  * provides the core of the Sherlock Intellect.  
 public class SherlockComplexEventProcessing {  
      private static SherlockComplexEventProcessing cepService = null;  
      // Drools Fusion Runtime Configuration  
      private KieBaseConfiguration kieConfiguration;  
      private KnowledgeBase kieBase;  
      private KieServices ks;  
      private KieContainer kContainer;  
      private KieSession kSession;  
      private KnowledgeBuilder kbuilder;  ... // Refer Bundled Code

Y. Initialize and Instantiate Drools Variables
 public void init() {  
      try {  
           System.out.println("initializing kie runtime for drools fusion...");  
           kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();  
           // kbuilder.add(ResourceFactory.newClassPathResource("event.drl"),  
           // ResourceType.DRL);  
           if (kbuilder.hasErrors()) {  
           kieConfiguration = KieServices.Factory.get().newKieBaseConfiguration();  
           kieConfiguration.setProperty("drools.dialect.mvel.strict", "false");  
           kieConfiguration.setProperty("org.kie.demo", "false");  
           ks = KieServices.Factory.get();  
           kContainer = ks.getKieClasspathContainer();  
           kieBase = KnowledgeBaseFactory.newKnowledgeBase(kieConfiguration);  
           // clock type for the session  
           KieSessionConfiguration sessionConfiguration = KnowledgeBaseFactory.newKnowledgeSessionConfiguration();  
           kSession = kContainer.newKieSession("sherlock-event", sessionConfiguration);  
           kSession.setGlobal("threatMap", new HashMap<Long,SherlockEventCorrelation>());  
           kSession.setGlobal("startTime", new Date().getTime());  
           kSession.setGlobal("startMemory", Runtime.getRuntime().freeMemory());  
           kSession.setGlobal("totalFactCount", totalFactCount);  
           System.out.println("initialized the kie runtime for drools fusion...");  
      } catch (Exception e) {  

Z. Send Event by Event for Complex Event Processing to Drools Runtime
 public void execute(SherlockEvent event) {  
      // try {  
           // anything to with event object  
           kSession.setGlobal("totalFactCount", totalFactCount++);  

           HashMap threatM=(HashMap) kSession.getGlobal("threatMap");  

           LinkedList list=new LinkedList();  

           if(prevTime==0) prevTime=Long.parseLong(kSession.getGlobal("startTime").toString());  
           currTime=new Date().getTime();  

5. Setup the Data Loader (Asynchronous is Preferred Mode - Think of JMS Extension)
Now run the SherlockDataLoaderDriver which in turn starts the SherlockDataLoaderThread to intelligently load random data and 'Inject Positive Cases' into the large stream of information. We have controlled the above data load to create only 100 random records and then wait for 10 seconds. You can change this for your demo or poc purposes to suite a larger data stream and lesser or greater wait time.

6. Output or Outcome, of 'Anomaly/Threat Detection', from Sherlock (Use-Case 01)

You may include Sherlock! code for demo, hack or for proof-of-concept of Drools Expert or Drools Fusion or simply Complex Event Processing. 

Download entire Sherlock! Code, Data Loader and Basic User Interface as an Eclipse Project.

[Sherlock! was my Hackathon creation for the Societe Generale Brainwaves 2015 (along with three other team members, Team: The_Big_Billionaire $). It also was based on my previous experience at a Information Security Company as a Java Senior Technical Architect. You may additionally refer to the Sherlock! Presentation Slide, which I had created for my Hackathon to understand Sherlock! better. We also had created a Basic User Interface which can be used on PC and be adapted to Mobile as well. I have not explained how to integrate - but if you can go through the code, there is Java Servlet code to get you started on the same.]

1 comment:

Sumith Kumar Puri said...

i request you to leave your valuable comments - people from all over the world. if you want me to demonstrate some specific feature or have queries on cep, drools fusion or the attached code. you may also choose to ask questions on sherlock! (example code)

also, refer to drools expert, including a simple example to demonstrate the same:

wish all the best to the curious minds of every part of the world, who want to get deep into the world of data science, complex event processing or rules engines. wait for the comments.