JavaComplex Event Processing and Correlation with Drools Fusion 6.x

Complex Event Processing and Correlation with Drools Fusion 6.x

Complex event processing is used to process a large stream of information and can be used for real-time event monitoring or correlation. Events can be processed in two ways, either in ‘stream’ mode or in ‘cloud’ mode. The following image illustrates the differences between the two modes:

The continuous flow of information or events can be classified into one of these brackets (or even both) for analysis or correlation. The cloud mode would be useful in the following circumstances: user behavior, market data and activity monitoring. The stream mode could be most useful in applications such as: real-time event monitoring, event correlation, and sensor networks.

The most useful end-applications are threat detection, anomaly detection, airport security, market prediction, forecasting profits, and automating algorithmic trading decisions, among a host of other applications.

Sliding Window and Batch Window will need more clarity for any discussion on complex event processing. for most architects and engineers – this will come across a very novel way of analyzing information – if this is the first time they are reading about this. The Batch Window illustration below demonstrates that the information window is processed in a discrete or fixed slot or block of events.

The Sliding Window illustration below demonstrates that the information window is processed in a continuous or moving slot or block of events.

Introducing Sherlock! event correlation app

Introducing Sherlock!, an event correlation application that demonstrates the above concept of complex event processing. I built it along with three other engineers for the domain of banking for anomaly and threat detection (download the entire Sherlock! code, data loader and basic user interface as an Eclipse project, and here’s a GitHub link for Sherlock). I will analyze the following use cases that have been listed among the top threats in the banking sector by the SANS Institute.

  • Detect if there are more than ten port or IP scan attempts from the same IP address (and port)  in any of the last 10 seconds
  • Detect if there are more than five repeated login attempts from the same IP address in any of the last 30 seconds
  • Detect if the traffic on a port x has all of a sudden spiked and if any of the last 30 seconds had more than five accesses

I will demonstrate only the first use-case in this article, including how to run the “Intelligent Data Loader” and possibly hookup with a user interface, to understand the anomaly and complex event processing. You may need to do the following before you can download and understand Sherlock:

  • Download Drools 6.1.0 distribution (include in Classpath)
  • Download the Eclipse plugin for Drools (include in Classpath)
  • Use JDK 1.8.0 and JEE 1.7 libraries if required (include in Classpath)
  • Read up on MVEL Dialect and Drools Fusion

1. Create the SherlockEvent (and SherlockEventCorrelation) Java Object


 package com.bw2015.sherlock.biz.vo;  
 /**  
  * @author spuri  
  *  
  */  
 public class SherlockEvent {  
      private int eventId;  
      private String eventType;  
      private String eventDescription;  
      private String eventSourceIp;  
      private String eventDestinationIp;  
      private String eventSourcePort;  
      private String eventDestinationPort;  
      private String eventSourceCountry;  
      private String eventDestinationCountry;  
      private String eventSourceUsername;  
      private String eventDestinationUsername;  
      private String eventRemarks;  
      private long eventSourceTime;  
      private long eventDestinationTimestamp;  ... // Refer Bundled Code

2. Code the ‘Rule/Condition’ using Drools ‘MVEL’ Dialect (Use-Case 01)


 package com.bw2015.sherlock.biz.cep  
 // list any import classes here.  
 import com.bw2015.sherlock.biz.vo.SherlockEvent;  
 import com.bw2015.sherlock.biz.vo.SherlockEventCorrelation;  
 
 declare SherlockEvent  
      @role(event)  
      @expires(20s)  
      @timestamp (eventDestinationTimestamp)  
 end  
 
 declare SherlockEventCorrelation  
      @role(event)  
      @expires(20s)  
      @timestamp (eventDestinationTimestamp)  
 end  
 
 global Long startTime;  
 global Long startMemory;  
 global Long totalFactCount;  
 global java.util.HashMap threatMap;   
 
 // use case 01  
 // detect if there are more than ten port or ip scan attempts from the same ip address (and port)   
 // to the destination ip address (and multiple ports) in the given window  
 
 rule "Port and IP Scan Event Processing Initial"  
 dialect "mvel"  
 no-loop  
  when  
   e1: SherlockEvent(eventType == "port and ip scan") over window:time(10s)   
   not SherlockEventCorrelation(eventSourceIp == e1.eventSourceIp, eventDestinationIp == e1.eventDestinationIp, eventSourcePort == e1.eventSourcePort)   
  then  
       SherlockEventCorrelation plec = new SherlockEventCorrelation();  
       plec.setEventSourceIp(e1.eventSourceIp);  
       plec.setEventDestinationIp(e1.eventDestinationIp);  
       plec.setEventSourcePort(e1.eventSourcePort);  
       plec.setEventDestinationPort(e1.eventDestinationPort);  
       plec.setEventCorrelation(0);  
       insert(plec);  
 end  
 
 rule "Port and IP Scan Event Processing Correlation"  
 dialect "mvel"  
 no-loop  
  when  
   e1: SherlockEvent(eventType == "port and ip scan") over window:time(10s)   
   ce: SherlockEventCorrelation(eventSourceIp == e1.eventSourceIp, eventDestinationIp == e1.eventDestinationIp, eventSourcePort == e1.eventSourcePort, $eventCorr : eventCorrelation >= 0)  
  then  
       $eventCorr++;  
   ce.eventCorrelation=$eventCorr;  
   if(ce.eventCorrelation >= 10) {  
        System.out.println("");  
           System.out.println("+++++++++++++++ USE CASE 01 +++++++++++++++");  
           System.out.println("SOURCE INET: " + ce.eventSourceIp);  
           System.out.println("SOURCE PORT: " + ce.eventSourcePort);  
           System.out.println("DESTIN INET: " + ce.eventDestinationIp);  
           System.out.println("EVENT ACTN: " + "port and ip scan");  
           System.out.println("TIMESTAMP : " + new java.util.Date(ce.eventDestinationTimestamp));  
           System.out.println("OCCURENCES : " + $eventCorr);  
           System.out.println("+++++++++++++++++++++++++++++++++++++++++++");  
           System.out.println("");  
   }        
   update( ce );  
   threatMap.put(new java.util.Date(), ce);  
 end  

3. Configure Drools Fusion

drools fusion

The above file named ‘kmodule.xml’ is included in the META-INF of your project. Make sure you make it available in the classpath of your main class.

4. Code the Drools Java Runtime to Send or Process Events

The SherlockComplexEventProcessing includes the Java Code for Drools Fusion Runtime. The following are the most important activities performed by this runtime.


X. Declare Drools Java Runtime Variables
 package com.bw2015.sherlock.biz.cep;  

 import java.util.Date;  
 import java.util.HashMap;  
 import java.util.LinkedList;  
 import org.kie.api.KieBaseConfiguration;  
 import org.kie.api.KieServices;  
 import org.kie.api.conf.EventProcessingOption;  
 import org.kie.api.runtime.KieContainer;  
 import org.kie.api.runtime.KieSession;  
 import org.kie.api.runtime.KieSessionConfiguration;  
 import org.kie.api.runtime.conf.ClockTypeOption;  
 import org.kie.internal.KnowledgeBase;  
 import org.kie.internal.KnowledgeBaseFactory;  
 import org.kie.internal.builder.KnowledgeBuilder;  
 import org.kie.internal.builder.KnowledgeBuilderFactory;  
 import com.bw2015.sherlock.biz.vo.SherlockEvent;  
 import com.bw2015.sherlock.biz.vo.SherlockEventCorrelation;  

 /**  
  * @author spuri  
  *   
  * SherlockComplexEventProcessing is a Sherlock service that provides the most  
  * essential part of feeding data to the Knowledge Is Everything API of Drools.  
  * It will provide data that is in order or even out-of-order. In essence, it  
  * provides the core of the Sherlock Intellect.  
  *  
  */  
 public class SherlockComplexEventProcessing {  

      private static SherlockComplexEventProcessing cepService = null;  
      // Drools Fusion Runtime Configuration  
      private KieBaseConfiguration kieConfiguration;  
      private KnowledgeBase kieBase;  
      private KieServices ks;  
      private KieContainer kContainer;  
      private KieSession kSession;  
      private KnowledgeBuilder kbuilder;  ... // Refer Bundled Code


 


Y. Initialize and Instantiate Drools Variables
 public void init() {  
      try {  
           System.out.println("initializing kie runtime for drools fusion...");  
           kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();  
           // kbuilder.add(ResourceFactory.newClassPathResource("event.drl"),  
           // ResourceType.DRL);  
           if (kbuilder.hasErrors()) {  
                System.out.println(kbuilder.getErrors().toString());  
           }  
           kieConfiguration = KieServices.Factory.get().newKieBaseConfiguration();  
           kieConfiguration.setProperty("drools.dialect.mvel.strict", "false");  
           kieConfiguration.setProperty("org.kie.demo", "false");  
           kieConfiguration.setOption(EventProcessingOption.STREAM);  
           ks = KieServices.Factory.get();  
           kContainer = ks.getKieClasspathContainer();  
           kieBase = KnowledgeBaseFactory.newKnowledgeBase(kieConfiguration);  
           kieBase.addKnowledgePackages(kbuilder.getKnowledgePackages());  
           // clock type for the session  
           KieSessionConfiguration sessionConfiguration = KnowledgeBaseFactory.newKnowledgeSessionConfiguration();  
           sessionConfiguration.setOption(ClockTypeOption.get("realtime"));  
           kSession = kContainer.newKieSession("sherlock-event", sessionConfiguration);  
           kSession.setGlobal("threatMap", new HashMap<Long,SherlockEventCorrelation>());  
           kSession.setGlobal("startTime", new Date().getTime());  
           kSession.setGlobal("startMemory", Runtime.getRuntime().freeMemory());  
           kSession.setGlobal("totalFactCount", totalFactCount);  
           System.out.println("initialized the kie runtime for drools fusion...");  
      } catch (Exception e) {  
           e.printStackTrace();  
      }  
 }  


 


Z. Send Event by Event for Complex Event Processing to Drools Runtime
 public void execute(SherlockEvent event) {  
      // try {  
           // anything to with event object  
           kSession.setGlobal("totalFactCount", totalFactCount++);  
           kSession.insert(event);  
           kSession.fireAllRules();  

           HashMap threatM=(HashMap) kSession.getGlobal("threatMap");  

           LinkedList list=new LinkedList();  
           list.addAll(threatM.values());  
           threats.pushAll(list);  

           if(prevTime==0) prevTime=Long.parseLong(kSession.getGlobal("startTime").toString());  
           currTime=new Date().getTime();  
      }  
}

5. Setup the Data Loader (Asynchronous Preferred – Think of JMS Extension)

Now run the SherlockDataLoaderDriver, which in turn starts the SherlockDataLoaderThread to intelligently load random data and ‘Inject Positive Cases’ into the large stream of information. We have controlled the above data load to create only 100 random records and then wait for 10 seconds. You can change this for your demo or POC purposes to suit a larger data stream and lesser or greater wait time.

  

You may include Sherlock code for demo, hack or for proof-of-concept of Drools Expert or Drools Fusion or simply for complex event processing.

Sherlock! was my hackathon creation for the Societe Generale Brainwaves 2015 (along with three other team members, team name: The_Big_Billionaire $). It also was based on my previous experience at an information security company as a Java Senior Technical Architect. You may additionally refer to the Sherlock! presentation slide, which I had created for my hackathon to understand Sherlock! better. We also had created a Basic User Interface, which can be used on PC and be adapted to mobile as well. I have not explained how to integrate – but if you can go through the code, there is Java Servlet code to get you started.

Get the Free Newsletter!

Subscribe to Developer Insider for top news, trends & analysis

Latest Posts

Related Stories