user_loser user_loser - 7 months ago 17
Java Question

Aggregation Program

I am trying to learn more about Apache Camel. I found the documentation somewhat helpful but leaves alot of guessing for beginner Camel riders that do not know how small code segments should easily fit into fully functioning programs. Hopefully most people know what I am trying to describe. I have been lost lots of other times in some programming books where segments of code are shown outside of the context of a fully running program.

Anyway here is my program that does not aggregate messages for some reason. I was hoping it would aggregate all my messages but this program does not do that. When running the program I receive an empty file as the output which is not my goal.

package laser.helmet.camel.friend;

import org.apache.camel.builder.RouteBuilder;

public class AggregatingMessagesRoute extends RouteBuilder {

@Override
public void configure() throws Exception {

from("direct:start")
.aggregate().constant(true)
.completionTimeout(100L)
.groupExchanges()
.to("file:target/this_folder/result?allowNullBody=true");
}

}


Then I call this program from the below class which has the
main
method of course.

package laser.helmet.camel.friend;

import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.ProducerTemplate;

public class Main {

public static void main(String[] args) throws Exception {
CamelContext c = new DefaultCamelContext();
c.addRoutes(new AggregatingMessagesRoute());
ProducerTemplate pt = c.createProducerTemplate();

c.start();

pt.sendBody("direct:start", "1");
pt.sendBody("direct:start", "2");
Thread.sleep(5000);
c.stop();
}

}


I was expecting the body of the two messages I create with the
ProducerTemplate
to be both in the file after the route finishes but it is just a blank file. I had to add the part
allowNullBody=true
to the route because for some reason the body is null when running this program.

Also if you are a beginner and wondering. I am bringing in the dependencies with Maven instead of putting the camel.jars on my Java classpath.

Thank-you for reading this everyone. :D

So how can I start aggregating messages Stackoverflow? 0_o

Peace,

user_loser

Answer

This is an easy one since you provided the code snippet, nice one! Remove .groupExchanges() since it is becoming deprecated and it is hardly adequate. You always need a AggregationStrategy to have fine grained control over how to aggregate your exchanges. So, add the following class:

class StringAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        if (oldExchange == null) {
            return newExchange;
        }

        String oldBody = oldExchange.getIn().getBody(String.class);
        String newBody = newExchange.getIn().getBody(String.class);
        oldExchange.getIn().setBody(oldBody + "+" + newBody);
        return oldExchange;
    }
}

And then reference it in your route; so it becomes something like this:

public class AggregatingMessagesRoute extends RouteBuilder {

    @Override
    public void configure() throws Exception {

        from("direct:start")
                .aggregate().constant(true)
                .completionTimeout(100L)
                .aggregationStrategy(new StringAggregationStrategy())
                .to("file:target/this_folder/result?allowNullBody=true");
    }

}

Hope this helps!

R.