Esper primer

by Dhruba Bandopadhyay on Wed, 02 Sep 2009 22:38:58 +0000

Recently, feeling the need for more complex (no pun intended) event processing than what a simple enterprise integration patterns implementation such as Spring Integration provides, I started looking at Esper for more insight into CEP and ESP but from an open source perspective.

Here I reproduce an example that I followed from the Esper documentation. Note that this is the exact example from the Esper tutorial with javadoc also taken from the Esper documentation which I take no credit for. However I reproduce it here as a complete and coherent example for my own reference and anyone else’s and also with one or two fixes to mistakes in the Esper documentation.

Create an event using a java pojo class.

package esper.example;

import java.text.MessageFormat;

/**
 * Java classes are a good choice for representing events, however Map-based or
 * XML event representations can also be good choices depending on your
 * architectural requirements.
 * 
 * A sample Java class that represents an order event is shown below. A simple
 * plain-old Java class that provides getter-methods for access to event
 * properties works best:
 */
public class OrderEvent {

    private String itemName;
    private double price;

    public OrderEvent(String itemName, double price) {
        this.itemName = itemName;
        this.price = price;
    }

    public String getItemName() {
        return itemName;
    }

    public double getPrice() {
        return price;
    }

    public String toString() {
        return MessageFormat.format("OrderEvent [itemName={0}, price={1}]", new Object[] {
                itemName, new Double(price) });
    }

}

Create a main method that will invoke the application by creating a statement, assigning a listener to that statement and publishing some events.

package esper.example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.espertech.esper.client.Configuration;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;

public class Main {

    final static Logger logger = LoggerFactory.getLogger(Main.class);

    public static void main(String[] args) {

        /*
         * Esper runs out of the box and no configuration is required. However
         * configuration can help make statements more readable and provides the
         * opportunity to plug-in extensions and to configure relational
         * database access.
         * 
         * One useful configuration item specifies Java package names from which
         * to take event classes.
         * 
         * This snippet of using the configuration API makes the Java package of
         * the OrderEvent class known to an engine instance:
         */
        Configuration config = new Configuration();
        config.addEventTypeAutoName(OrderEvent.class.getPackage().getName());

        /*
         * A statement is a continuous query registered with an Esper engine
         * instance that provides results to listeners as new data arrives, in
         * real-time, or by demand via the iterator (pull) API.
         * 
         * The next code snippet obtains an engine instance and registers a
         * continuous query. The query returns the average price over all
         * OrderEvent events that arrived in the last 30 seconds:
         */
        EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config);
        String expression = "select itemName, avg(price) from OrderEvent.win:time(30 sec)";
        EPStatement statement = epService.getEPAdministrator().createEPL(expression);

        /*
         * By attaching the listener to the statement the engine provides the
         * statement's results to the listener:
         */
        MyListener listener = new MyListener();
        statement.addListener(listener);

        /*
         * The runtime API accepts events for processing. As a statement's
         * results change, the engine indicates the new results to listeners
         * right when the events are processed by the engine.
         * 
         * Sending events is straightforward as well:
         */
        OrderEvent event1 = new OrderEvent("a", 1);
        OrderEvent event2 = new OrderEvent("b", 2);
        OrderEvent event3 = new OrderEvent("c", 3);
        OrderEvent event4 = new OrderEvent("d", 4);

        EPRuntime epRuntime = epService.getEPRuntime();
        epRuntime.sendEvent(event1);
        epRuntime.sendEvent(event2);
        epRuntime.sendEvent(event3);
        epRuntime.sendEvent(event4);

    }

}

Create a listener that will receive events in real time.

package esper.example;

import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.espertech.esper.client.EventBean;
import com.espertech.esper.client.UpdateListener;

/**
 * Listeners are invoked by the engine in response to one or more events that
 * change a statement's result set. Listeners implement the UpdateListener
 * interface and act on EventBean instances as the next code snippet outlines:
 */
public class MyListener implements UpdateListener {

    final static Logger logger = LoggerFactory.getLogger(UpdateListener.class);

    public void update(EventBean[] newEvents, EventBean[] oldEvents) {
        EventBean event = newEvents[0];
        Object average = event.get("avg(price)");
        Object itemName = ((Map) event.getUnderlying()).get("itemName");
        logger.info("upon arrival of {} the average stood at {}", itemName, average);
    }

}

This produces the output below as expected.

22:34:21.296 [main] INFO  c.e.esper.client.UpdateListener - upon arrival of a the average stood at 1.0
22:34:21.300 [main] INFO  c.e.esper.client.UpdateListener - upon arrival of b the average stood at 1.5
22:34:21.301 [main] INFO  c.e.esper.client.UpdateListener - upon arrival of c the average stood at 2.0
22:34:21.301 [main] INFO  c.e.esper.client.UpdateListener - upon arrival of d the average stood at 2.5

As a bonus if you want to load events from a csv file that can be done as follows.

Create a csv file with column headings and call it, let’s say, simulation.csv.

itemName,price
a,1
b,2
c,3
d,4

Load and publish events directly from the csv file. Awesome.

package esper.example;

import java.util.HashMap;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.espertech.esper.client.Configuration;
import com.espertech.esper.client.EPServiceProvider;
import com.espertech.esper.client.EPServiceProviderManager;
import com.espertech.esper.client.EPStatement;
import com.espertech.esperio.AdapterInputSource;
import com.espertech.esperio.InputAdapter;
import com.espertech.esperio.csv.CSVInputAdapter;
import com.espertech.esperio.csv.CSVInputAdapterSpec;

public class Main {

    final static Logger logger = LoggerFactory.getLogger(Main.class);

    public static void main(String[] args) {

        Configuration config = new Configuration();
        config.addEventTypeAutoName("esper.example");

        /*
         * declare event type map
         */
        Map typeMap = new HashMap();
        typeMap.put("itemName", String.class);
        typeMap.put("price", double.class);
        config.addEventType("OrderEvent", typeMap);

        EPServiceProvider epService = EPServiceProviderManager.getDefaultProvider(config);
        String expression = "select itemName, avg(price) from OrderEvent.win:time(30 sec)";
        EPStatement statement = epService.getEPAdministrator().createEPL(expression);

        MyListener listener = new MyListener();
        statement.addListener(listener);

        /*
         * publish events from csv file
         */
        AdapterInputSource adapterInputSource = new AdapterInputSource("simulation.csv");
        (new CSVInputAdapter(epService, adapterInputSource, "OrderEvent")).start();

    }

}

This gets a lot more sophisticated with looping and throttling of the adapter source.

Esper, on first impressions, seems like a very powerful tool for event processing with its event processing language (EPL) modelled on SQL and also very lightweight in that it is simply a jar that can be deployed within your application without any further infrastructure. Being new to CEP and Esper I’m very excited indeed by the prospects it opens up for me and I definitely intend to continue looking further into CEP and ESP academically using Esper to see what else is possible.

What will also be interesting going forward is the experience of gradually porting the more complex use cases I had for Spring Integration to using Esper – although I do understand that they are not an exact match – one being a CEP/ESP solution and another being an EIP/inversion of concurrency solution.

{ 19 comments… read them below or add one }

Md. Abdus Salam November 22, 2009 at 06:53

hi dhruba ! Thank you for your examples on Esper. I am tried your example; The first one worked fine. But when I use “simulation.csv” file I am getting a run – time exception like this:

Exception in thread "main" com.espertech.esper.client.EPException: C:\Users\lincoln\workspace\csvInput\bin\esper\simulation.csv not found
	at com.espertech.esperio.AdapterInputSource.resolvePathAsStream(AdapterInputSource.java:198)
	at com.espertech.esperio.AdapterInputSource.getAsStream(AdapterInputSource.java:157)
	at com.espertech.esperio.csv.CSVSource.(CSVSource.java:36)
	at com.espertech.esperio.csv.CSVReader.(CSVReader.java:53)
	at com.espertech.esperio.csv.CSVInputAdapter.finishInitialization(CSVInputAdapter.java:213)
	at com.espertech.esperio.csv.CSVInputAdapter.(CSVInputAdapter.java:64)
	at com.espertech.esperio.csv.CSVInputAdapter.(CSVInputAdapter.java:76)
	at esper.Lincoln.main(Lincoln.java:120)

I tried changing the patch name and used absolute path name for the file, but still don’t work. It would be very nice if you can give me any suggestion regarding this.

Reply

adnan RASHID December 9, 2009 at 05:38

Dear dhruba and Md. Abdus Salam

I am having issues with running first example. I tried to run from eclipse but it just get terminated.

Can u pls assist me here.
Many thanks &
Regards
Adnan

Reply

Md. Abdus Salam December 14, 2009 at 15:59

hi adnan ! the first example worked well for me. I used eclipse. just copied and paste the code in respective files named on the class name. what sort of problem are u facing?

Reply

Adnan December 17, 2009 at 14:23

Dear Abdus Salam

Thanks for your kind reply. Yes my first one is working fine now. Did your program work when u tried to load from the csv file ?

Regards
Adnan

Reply

adnan December 18, 2009 at 12:51

Hi
I am getting the same error as file not found. Tried few options by giving obsolute path but still having the same error. Any suggion would be greatly appreciated.\

log4j:WARN Please initialize the log4j system properly.
Exception in thread "main" com.espertech.esper.client.EPException: test.csv not found
	at com.espertech.esperio.AdapterInputSource.resolvePathAsStream(AdapterInputSource.java:198)
	at com.espertech.esperio.AdapterInputSource.getAsStream(AdapterInputSource.java:157)
	at com.espertech.esperio.csv.CSVSource.(CSVSource.java:36)
	at com.espertech.esperio.csv.CSVReader.(CSVReader.java:53)
	at com.espertech.esperio.csv.CSVInputAdapter.finishInitialization(CSVInputAdapter.java:213)
	at com.espertech.esperio.csv.CSVInputAdapter.(CSVInputAdapter.java:64)
	at com.espertech.esperio.csv.CSVInputAdapter.(CSVInputAdapter.java:76)

Reply

Md. Abdus Salam December 19, 2009 at 04:19

hi!

I did the following to overcome this problem

File inFile = new File("test.csv");
AdapterInputSource adapterInputSource = new AdapterInputSource(inFile);

try it and I’m sure u will get output of the second example

Reply

adnan rashid December 19, 2009 at 23:16

Thanks Abdus Salam. It worked .I tried using absolute path from root and also using your comment and it did work in both cases now.

Once again appreciate your kind reply.

Reply

adnan January 13, 2010 at 03:52

Hi Abdus salam

I hope u r fine. I am just wondering if u have tried to save the events in csv file. Any suggision how we can do that?

Regards
Adnan

Reply

ajmal February 25, 2011 at 23:06

dear adnan
i need some help for configuring esper.please send your email address at majmalazad@hotmail.com.
thanking in advance.

Reply

Sudhakar Reddy March 4, 2010 at 04:44

Hi Dhruba Bandopadhyay,

I am using EPSER for CEP. I had done RD and able to upload CVS file in Adapter. I had written mulitple queries and they are firing against the data from CVS thru polling.

The problem is Esper didnt gave examples to write some block of EPL queries using case statements or EPL blocks similar to PL/SQL block. I need to update multiple streams based on data in one particular stream. I am using custom Utility classes in queries to simply business logic. Is there any way that I can pass stream to the Custome classes so that i
can modify the stream in Utility classes itself.

Pl reply….

Regards
Sudhakar Reddy.

Reply

Anshul May 27, 2010 at 08:54

Hello
I am new to esper and ecllipse.
Can you tell me how to run these programs whether on ecllipse or esper.
When I simply compile your programs using java in command prompt it says an error because it cant find espertech.esper.client*
So please throw some light on how to start running these programs from scratch.
I will be highly obliged if you help me out.

Reply

bhavana February 4, 2013 at 11:10

hi Anshul..
even me too have same problem of how to run esper programs .even i tried in java ,but getting same error as it cant find espertech.esper.client.*
it has been two years you have posted this,if you got solution for that problem,kindly help me in running those programs.waitng for your valuable reply.

Reply

Pradeep June 11, 2010 at 16:57

thanks good one
can you provide a similar event processing example to send JMS events to esper engine

Reply

roozbeh July 22, 2010 at 12:05

where can i find the org.slf4j.Logger ?

Reply

Turco August 24, 2010 at 14:05

@Anshul , to overcome this problem (“espertech.esper.client*”) add esper’s libs. and .jar files …

Regards…

Reply

Anitha April 28, 2011 at 07:22

Hello
I am new to esper and ecllipse.
Can you tell me how to run these programs whether on ecllipse.
So please throw some light on how to start running these programs from scratch.
I will be highly obliged if you help me out.

Reply

Alex October 12, 2012 at 16:06

Hi! I’m trying to run the first part but I’m getting this type of error. Could you please help me?

Exception in thread “main” java.lang.NoClassDefFoundError: org/apache/commons/logging/LogFactory
at com.espertech.esper.client.Configuration.(Configuration.java:39)
at Main.main(Main.java:16)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.logging.LogFactory
at java.net.URLClassLoader$1.run(Unknown Source)
at java.net.URLClassLoader$1.run(Unknown Source)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
… 2 more

Reply

sushma February 4, 2013 at 11:28

hello alex,
Iam the beginner of esper.i dont know how to run esper programs in java.i simply runned them in command prompt using java.but iam getting the same errors which you have got.kindly help me in running esper programs pls.Mail me those who knows solution to this problem at sushma.030894@gmail.com .i will be highly obliged if any one helps me for this

Reply

Alex October 12, 2012 at 16:18

I had to import commons-logging.jar and now I have another error :(
Exception in thread “main” java.lang.NullPointerException
at Main.main(Main.java:29)

Reply

Leave a Comment

{ 1 trackback }

Previous post:

Next post: