Hello All,
today i want to write sth new from my ongoing understanding of my work with Apache Camel. This time it’s not that of a big story as last time, but quite useful: Up to this point (actually a little bit in the past now) i thought when building Routes with Apache Camel in a org.apache.camel.RouteBuilder you have to write everything you want to configure in that respective configure()-method and that’s it.
Well that’s not true…
You may as well as call any method you like, i.e. from a class your RouteBuilder inherits from and Camel interprets everything as a whole configuration.
Before some might ask: and why should that be useful - Well you could for example define some errorHandlers in that super class and make them accessible for every Camel-Route in which the errorhandling should be active, so you have to define only once but could use those errorhandling definitions in more than one RouteBuilder.
public class InboundRouter extends ErrorHandlingRouteBuilder {
@Override
public void configure() throws Exception {
super.inheritConfigure();
from(fromEndpoint)
.noAutoStartup()
.transacted()
.policy(required)
.routeId(BasicRouteBuilder.stdfromRouteId(this.componentBaseName))
.process(neededHeadersProcessor)
.to(nextstepfromRoute);
}
}
And in ErrorhandlingRouteBuilder:
inheritConfigure() {
errHdlRoute = "direct:errorfwd." + cpBase;
errorHandler(deadLetterChannel(getContext().getEndpoint(errHdlRoute))
.maximumRedeliveries(errHandlingParameters.getMaxRedeliver())
.useOriginalMessage()
.logStackTrace(false)
.maximumRedeliveryDelay(errHandlingParameters.getMaxRedeliverDelay())
.redeliveryDelay(errHandlingParameters.getRedeliverDelay())
.backOffMultiplier(errHandlingParameters.getBackoffMultiplier())
.useExponentialBackOff());
onException(PermanentError.class)
.maximumRedeliveries(0)
.useOriginalMessage()
.handled(true)
.to(errHdlRoute);
from(errHdlRoute)
.routeId("errorforwarder")
.process(new ErrorhandlingHeadersProcessor())
.process(new ConvertErrorToJPAProcesor())
.to("jpa:com.innoq.errhandling.SaveMessageStore?persistenceUnit);
}
Ahem, don’t read too much into that errorhandling stuff itself, the configurations shown are parts, taken from my most recent project.
I promise i will come back with another post to explain my solution for errorhandling.
I only wanted to show that every camel-configuration aspect in the above example is also considered from camel in it’s configuration process.
As already mentioned you could build a big callstack in inherited classes or in others - it is only important to start your callstack inside the configure()-method of a RouteBuilder that gets loaded with the start of the CamelContext.
Now, if you might say: “Hello Mr. Obvious” - sorry that i wasted your precious time.
For me, from reading the available online docs and the Book “Camel in Action” (which is, by the way, a very good one and a great way to start/work with Apache Camel) this wasn’t so obvious to me.
Cheers…
Martin
I've been working with Apache Camel for a year now and i am still scraping at the top of all
the possibilities you have with Camel.
In this blog i wanna show something that might be helpful when Camel is already used and someone wants to keep the statistics that your routes generate, in a database for some sort of long time evaluations.
I think it's quite easy to do so and actually not limited to handling statistics of camel-routes.
If you have absolutely no idea what i could possibly mean with routes and statistics, i recommend some reading:
Camel WebSite and for the jmx part itself: Camel JMX
The Entity
Because this example here is build around camel-routes you'll see the statistics kept by
a org.apache.camel.management.mbean.ManagedPerformanceCounter
.
I want to persist those stats in a database using JPA. So i need a @Entity for this:
@Entity(name = "CAMELSTATENTRY")
@Table(name = "MyStats")
public class CamelStatEntry {
private Long id;
private String nameIdentifier;
private Date statisticTimeStamp;
private Long totalExchanges;
private Long exchangesCompleted;
private Long exchangesFailed;
private Long minProcessingTime;
private Long maxProcessingTime;
private Long totalProcessingTime;
private Long lastProcessingTime;
private Long meanProcessingTime;
private Date firstExchangeCompletedTimestamp;
private Date firstExchangeFailureTimestamp;
private Date lastExchangeCompletedTimestamp;
private Date lastExchangeFailureTimestamp;
@Id
@Column(name = "ID")
public Long getId() {
return id;
}
...
}
I think it's not too complicated to see, what those values in the CamelStatEntry-Entity stand for, and looking at this entity you can see what statistics Camel provides out of the box.
The Collecting Route
Those values get collected by Camel, while exchanges "run through" your Camel-Routes.
The trick is now to gather those statistics in a time-interval. The idea behind the time-interval is to be able to see, how your routes "behave" over the course of a day, week, month and so on.
For this time-interval we're going to use another route, a route defined in StatCollectionRouter-RouteBuilder.
This route is one that get's triggered at defined points in time while using the
quartz component (in my example every full quarter of an hour).
So there's the route:
public class StatCollectionRouter extends RouteBuilder implements InitializingBean {
/**
* Spring injected bean that does all the collection work, and keeps the
* information which routes have to be checked.
*/
private StatCollectorBean statCollBean;
@Override
public void configure() throws Exception {
from(quartz://camelstats/statctr?cron=0+15+*+*+*+?+*)
.routeId("StatisticalRoute")
.bean(statCollBean, "collect")
.split(body())
.to("jpa:?persistenceUnit=statsdb");
}
...
}
After the route got triggered it calls a bean that does all the collection-work.
This bean returns a list of all CamelStatEntry-Objects. The split() divides the one trigger into so many message-exchanges
as you have CamelStatEntry-Objects in that List, returned by the collect-method of the statCollBean. Then every CamelStatEntry
gets stored, when sent to the jpa-component. Of course, the "statsdb"-persistenceUnit is defined in a persistence.xml-file.
The last thing you need to know how to collect the stats from the Camel-Routes-MBeans.
Well the following assumes that you have a jmx-connector started with your CamelContext and know a little bit about
connecting to them and requesting MBeans. Explaining all that would be to much at this point. Look @ the recommended reading above
to see how this is done.
The Bean I: How to select the Objectnames to check.
I need to connect to the MBeanServer of my "DefaultDomain". My Jmx-Agent is configured in the spring-config like this:
<camel:camelContext id="myCamelContext" >
<camel:jmxAgent id="com.innoq.jmxagent" createConnector="true"
mbeanObjectDomainName="com.innoq" mbeanServerDefaultDomain="WebSphere"
xmlns:camel="http://camel.apache.org/schema/spring" connectorPort="39099"
onlyRegisterProcessorWithCustomId="true" />
...
</camel:camelContext>
The "DefaultDomain" mentioned above is "com.innoq".
You need to know the default-domain, because when you search for your MBeans with the Objectname, you have to set the right Domain.
The following is a part of the initialization of the StatCollectorBean. I search for Route-Types in my MBeanServer. Note that i have an extra-class "MyJMXHelper" that finds the right MBeanServer for me, which i left out for space reasons.
And also allow me
to say that this is just an example for creating the list: "allObjectNames2Check" - the ObjectNames, for which i want to check my Statistics.
Feel free to do this more elegantly ;-).
The things you need to know gather around the build-up of the searchstring s
and that you maybe make a List of all MBeans that should be checked.
public class StatCollectorBean implements JMXConstants {
/**
* Result-List of all Objectnames that have to be statistically checked.
*/
private Set
...
@SuppressWarnings("unchecked")
private void addRoutes2CheckSet(List
MBeanServer mbs = MyJMXHelper.findDefaultDomainMBS();
Set
Iterator
while (it.hasNext()) {
String routeIdName = it.next();
try {
String s = "com.innoq:type=routes, name=\"" + routeIdName ""\", *";
ojnsearch = new ObjectName(s);
} catch (MalformedObjectNameException e) {
e.printStackTrace();
}
}
}
The Bean II (Collecting)
The last piece of code i wanna show is the collect() method of the
StatCollectorBean. It works with the Set "allObjectNames2Check"
initialized in the addRoutes2CheckSet-method:
public List < CamelStatEntry > collect() throws RuntimeException {
List<CamelStatEntry> list = new ArrayList< CamelStatEntry >(allOjn2Check.size());
try {for (ObjectName ojn : allOjn2Check) {
list.add(createCSEfromMBean(ojn, new Date());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return list;
}
private CamelStatEntry createCSEfromMBean(ObjectName ojn, Date statisticTimeStamp)
throws Exception {
final String[] emptySig = new String[] {};
final Object[] emptyParms = new Object[] {};
CamelStatEntry cse = new CamelStatEntry();
MBeanServer mbs = MyJMXHelper.findDefaultDomainMBS();
String nameId = (String) mbs.invoke(ojn, "getRouteId", emptyParms, emptySig);
cse.setNameIdentifier(nameId);
cse.setExchangesCompleted((Long) mbs.invoke(ojn,
"getExchangesCompleted",
emptyParms, emptySig));
cse.setExchangesFailed((Long) mbs.invoke(ojn,
"getExchangesFailed",
emptyParms, emptySig));
cse.setFirstExchangeCompletedTimestamp((Date) mbs.invoke(ojn,
"getFirstExchangeCompletedTimestamp",
emptyParms, emptySig));
cse.setFirstExchangeFailureTimestamp((Date) mbs.invoke(ojn,
"getFirstExchangeFailureTimestamp",
emptyParms, emptySig));
cse.setLastExchangeCompletedTimestamp((Date) mbs.invoke(ojn,
"getLastExchangeCompletedTimestamp",
emptyParms, emptySig));
cse.setLastExchangeFailureTimestamp((Date) mbs.invoke(ojn,
"getLastExchangeFailureTimestamp",
emptyParms, emptySig));
cse.setLastProcessingTime((Long) mbs.invoke(ojn,
"getLastProcessingTime",
emptyParms, emptySig));
cse.setMaxProcessingTime((Long) mbs.invoke(ojn,
"getMaxProcessingTime",
emptyParms, emptySig));
cse.setMinProcessingTime((Long) mbs.invoke(ojn,
"getMinProcessingTime",
emptyParms, emptySig));
cse.setMeanProcessingTime((Long) mbs.invoke(ojn,
"getMeanProcessingTime",
emptyParms, emptySig));
cse.setStatisticTimeStamp(statisticTimeStamp);
cse.setTotalExchanges((Long) mbs.invoke(ojn,
"getExchangesTotal",
emptyParms, emptySig));
cse.setId(statisticTimeStamp.getTime() + nameId.hashCode() + servername.hashCode());
cse.setTotalProcessingTime((Long) mbs.invoke(ojn,
"getTotalProcessingTime",
emptyParms, emptySig));
mbs.invoke(ojn, "reset", emptyParms, emptySig);
return cse;
}
}
Note that it is important to reset the statistics, so i have for every time interval a fresh set of the statistics.
So that's it.
Hope you'll be able to use some of this.
Cheers and keep on riding,
Martin
Welcome to my random thoughts, that i find worthy of a writedown. As i am about to start this here, blogs actually twittered out of any fashion, but i like them. You can learn a lot by reading the blogs of others. I won’t presume you could necessarily do the same while reading my blog, but i’ll give it my best shot.
Mostly i am into riding the Apache Camel. Up to now i rode mostly the WebSphere Application Server in my professional Java-Life. In order to keep up a steady progress ;-), i am trying to combine both in my most recent project i am glad to be part of.
Next up: Learning some new language. I am targeting the likes of Erlang, Scala or Ruby
Hopefully you’ll enjoy some of my posts