Taming the Apache Camel

Apache Camel is an open source framework under the umbrella of Apache foundation. It provides connectors for components like HTTP, Kafka, CXF (Soap), JMS, MINA. If a connector is not available, Camel is extensible enough to accept a custom built component or data framework. Using the connectors one can very easily build rule based routings and models to follow Enterprise Integration patterns. Camel assists in building solutions using message based enterprise patterns.

I had last used Camel very long back. During one of my recent project, we had a discussion about using Camel for our routings. I thought I will write a quick blog to show how to implement a simple workflow that can help the team to understand basics.

In this project we will use Java DSL to write the routes. However, we can always use XML files to configure it.

Create a Spring Project

The first step is to create a spring project. We include dependencies for Camel as well.

Parent POM
<parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>3.4.3</version>
  <relativePath/> <!-- lookup parent from repository -->
</parent>
Dependency POM
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.apache.camel.springboot</groupId>
  <artifactId>camel-spring-boot-starter</artifactId>
  <version>4.10.2</version>
</dependency>

We have included two dependencies here. The first one is spring boot so that we can create our REST services. The second one is the dependency for camel integration with spring. Additionally we have dependencies for lombok and slf4j-simple as well. I have not showed them here.

We will start with a simple route that only greets the person calling service.

Greet Service

Firstly let’s create a simple service.

@Slf4j
public class GreetEmployeeService {
    
  public GenResponseBean greetEmployeeService(final String name) {

    log.info("Greetings, {}", name);
    final GenResponseBean resp = new GenResponseBean();
    resp.setResponseText("Hello, " + name);
    resp.setStatus(true);
    return resp;
  }
}

Having defined this service, one way of using this is directly by referencing this service from spring controller that we create. However, camel, which promotes message driven architecture gives us a way to call this service from a controller using internal message.

The controller, in this case looks as follows,

@Slf4j
@CrossOrigin
@RestController
@RequestMapping("/employee")
public class EmployeeController {
    
    @Autowired
    private CamelContext camelContext;

    @Autowired
    private ProducerTemplate producerTemplate;

    @GetMapping(path = "/hellocamel", produces = MediaType.APPLICATION_JSON_VALUE)
    public ResponseEntity<CamelContext> getCamelContext() {

        return ResponseEntity.ok().body(camelContext);
    }

    @GetMapping(path = "/greet/{name}", produces = MediaType.APPLICATION_JSON_VALUE)
    public ResponseEntity<GenResponseBean> getBasicResponse(@PathVariable("name") final String name) {

        /**
         * Controller --> Service appending Hello, <name>
         */
        producerTemplate.start();
        final GenResponseBean resp = producerTemplate.requestBody(RouteNames.DIRECT_GREET_EMPLOYEE, name, GenResponseBean.class);
        producerTemplate.stop();

        return ResponseEntity.ok().body(resp);
    }
}

The invoker for this service starts at line #25. We start by initializing the producerTemplate and initiating a message on line #26.

But this is incomplete till Camel knows how to route this message to the service. So, in the next section let’s talk about Camel routings.

Camel Routings

Routes are the primary building blocks for Camel. They are instructions that tells Camel about all the processing steps when a message travels from source to destination. It defines how two discrete systems are integrated. Let’s take an example,

from("ftp:hostname/folder")
  .to("jms:queue:name")

Camel allows to define the routes as Java DSL, XML or YAML. The above given is a very simple example of Camel route written in Java DSL. Even to someone who has no idea of Camel it sends a clear message, take whatever is in that ftp directory and put to the JMS queue defined by name.

Camel also allows to filter messages, enhance/ transform messages from source before forwarding to destination and even can form an entirely new message template.

Within Camel we can pass messages in different ways. The following transfer mechanisms can be used.

  • direct: this uses single Camel context and is used to send messages within the same application. Messages using direct will block (synchronous)
  • seda: also uses single Camel context. However, in this case the messages do not block producer and can be sent in parallel
  • vm: this uses multiple Camel contexts and is normally used to send messages across applications Vm is also non blocking

With that background, we will start implementing our routings. We will only use direct and seda as we are remaining within the realms of a single application.

@Slf4j
@Component
public class GreetEmployeeRoute extends RouteBuilder {
    
    @Override
    public void configure() throws Exception {
        log.info("Loading {} route....", RouteNames.DIRECT_GREET_EMPLOYEE);
        from(RouteNames.DIRECT_GREET_EMPLOYEE) // direct:greetEmployee
            .routeId("direct-greetEmployee")
            .bean(GreetEmployeeService.class, "greetEmployeeService(${body})")
            .end();
    }
}

This listens for direct:greetEmployee which is emitted by the controller we defined earlier, forwards the request to the service (see bean). Eventually, it send the response to the caller controller. Remember, we defined a direct transport, it means call will be blocking.

We have passed the entire message body to the bean using ${body}. We are getting the name as body from controller.

When we call this service as /greet/john, following response will be returned,

{
    "status": true,
    "responseText": "Hello, john",
    "responseCode": "72bd6ba3-f20a-41b8-837e-e707f43b6152"
}

That was easy. Next we will get some more examples.

We will work with our own custom Employee Repository. Let’s see how we defined it next.

Dummy Employee Repository

Let me put the full code for repository. It is just a singleton defined using enumeration. We have two data stores, one keeps employees and the other keeps archives.

/**
 * Dummy Employee Repository
 */
@Slf4j
public enum EmployeeRepo {

    INSTANCE;

    Map<String, EmployeeBean> mapEmployees;
    Map<String, List<ArchiveBean>> mapArchived;

    EmployeeRepo() {
        mapEmployees = new HashMap<>();
        mapArchived = new HashMap<>();
    }

    // All Employee Functions starts here
    /**
     * Add an Employee
     * @param id
     * @param bean
     */
    public void addEmployee(final String id, final EmployeeBean bean) {
        log.info("Adding Employee {} with ID: {}", bean.getName(), id);
        mapEmployees.put(id, bean);
    }

    /**
     * Delete an Employee
     * @param id
     */
    public void delEmployee(final String id) {
        log.info("Removing Employee {}", id);
        mapEmployees.remove(id);
    }

    /**
     * Get an Employee
     * @param id
     * @return
     */
    public EmployeeBean getEmployee(final String id) {
        log.info("Fetch Employee {}", id);
        return mapEmployees.get(id);
    }

    /**
     * Get all Employees
     * @return
     */
    public Map<String, EmployeeBean> getEmployees() {
        return mapEmployees;
    }

    // All Archive Functions starts here
    /**
     * Add an Archive
     * @param id
     * @param bean
     */
    public void addArchive(final String id, final ArchiveBean bean) {
        log.info("Archiving {}", bean.getName());
        if (mapArchived.containsKey(id)) {
            mapArchived.get(id).add(bean);
        } else {
            final List<ArchiveBean> lstArchive = new ArrayList<>();
            lstArchive.add(bean);
            mapArchived.put(id, lstArchive);
        }
    }

    /**
     * Get all Archives
     * @return
     */
    public Map<String, List<ArchiveBean>> getArchives() {
        return mapArchived;
    }

    /**
     * Delete an Archive
     * @param id
     * @param genId
     */
    public void deleteArchive(final String id, final String genId) {
        if (mapArchived.containsKey(id)) {
            mapArchived.get(id).remove(mapArchived.get(id).stream().filter(a -> a.getGenId() == genId).collect(Collectors.toList()).get(0));

            // Remove if emplty
            if (mapArchived.get(id).size() == 0) {
                mapArchived.remove(id);
            }
        }
    }
}

This is kind of self explanatory and I will not spend time explaining this. I added one liner comments just in case.

Using this we will now start exploring more examples.

More Examples

Direct Message – Getting All Employees

Let’s start with a simple routing first.

// Get all Employees
log.info("Loading {} route....", RouteNames.DIRECT_GET_EMPLOYEES);
from(RouteNames.DIRECT_GET_EMPLOYEES)   // direct:getEmployees
  .routeId("direct-getEmployees")
  .bean(ManageEmployeeService.class, "getAllEmployees()")
  .end();

What this creates is a direct route to ManageEmployeeService bean method getAllEmployees(). Response is what is returned from the service. The service method in this case is unnecessary to be added here as it has nothing but returning a list of users. Let’s see what the controller looks like,

@Slf4j
@CrossOrigin
@RestController
@RequestMapping("/employee")
public class EmployeeController {
    
    @Autowired
    private CamelContext camelContext;
  
    @GetMapping(path = "/get", produces = MediaType.APPLICATION_JSON_VALUE)
    public ResponseEntity<Map<String, EmployeeBean>> getEmployee() {

        /**
         * Controller --> Add to a Direct Queue - Get Employee
         */
        producerTemplate.start();
        final Map<String, EmployeeBean> resp = producerTemplate.requestBody(
            RouteNames.DIRECT_GET_EMPLOYEES, null,
            Map.class);
        producerTemplate.stop();
        return ResponseEntity.ok(resp);
    }
  
  	// Other controller methods
}
Add Header and Choice – Update Employee

Let’s go to the next example. In this case we will see if an employee exists, and if it does, we update user else we insert the user. This may look like a dumb use case, as there is a better way of doing it, but just for the case of presenting a Camel case, you have to accept the dumb example :).

In this case we will start with controller first.

@PostMapping(path = "/update/{id}",
        consumes = MediaType.APPLICATION_JSON_VALUE,
        produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<AddResponseBean> updateEmployee(@PathVariable("id") String id, @RequestBody EmployeeBean emp) {

  /**
    * Controller --> Add to a Direct Queue   - Choice
    *                                              if present - add to Update Direct Queue
    *                                              if absent - add to Add Direct Queue
    */
  producerTemplate.start();
  final EmployeeBean resp = producerTemplate.requestBodyAndHeader(
    RouteNames.DIRECT_UPDATE_EMPLOYEE, emp, "id", id, 
    EmployeeBean.class);
  producerTemplate.stop();
  final AddResponseBean arb = new AddResponseBean();
  arb.setId(id);
  arb.setEmpBean(resp);
  return ResponseEntity.ok().body(arb);
}

When we call the routing, we send employee ID as header in the call. We can access headers in routing using special ${header} keyword. Next we see the Java DSL for routing.

// Update or Add an Employee
log.info("Loading {} route....", RouteNames.DIRECT_UPDATE_EMPLOYEE);
from(RouteNames.DIRECT_UPDATE_EMPLOYEE) // direct:updateEmployee
  .routeId("direct-updateEmployee")
  .setHeader("present")
  .method(ManageEmployeeService.class, "isEmployeePresent(${header.id})")
  .log("${header.present}")
  .choice()
  	.when(header("present").isEqualTo(false))
  		.bean(ManageEmployeeService.class, "updateEmployee(true, ${header.id}, ${body})")
  	.otherwise()
  		.bean(ManageEmployeeService.class, "updateEmployee(false, ${header.id}, ${body})")
  		.bean(ManageEmployeeService.class, "addToArchive('UPD', ${header.id}, ${body})")
  .endChoice()
  .end();

Ok, this is a bit more complicated. On line #5 and line #6, we set a header as true or false based in if the employee already exists. For this we use isEmployeePresent method from ManageEmployeeService class.

On line #8 we have a choice(). If we received a false for the header from previous method call, we know that this employee does not exist. So, we send it to insert on line #10. However, if we received a true, it means employee exists. In this case we update the employee and send the old employee to an archive for retrieval.

Publish Subscribe messaging – Delete Employee

Ok, enough with direct messaging. Let’s try some seda. If you remember from the discussion earlier, seda is always asynchronous. In case of seda, we will need to define two parts, one is the publisher part and the other is consumer part of the message. Think of a publish subscribe type of messaging. Again, let’s just go with the Camel use case and not complain about how ridiculous this example is :).

Let’s again start with the controller code for this. It’s very simple.

@DeleteMapping(path = "/delete/{id}", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<EmployeeBean> deleteEmployee(@PathVariable("id") String id) {

  /**
    * Controller --> Add to a Seda Queue   - Get it
    *                                      - Delete from repository
    *                                      - Add to Archive
    */
  producerTemplate.start();
  final EmployeeBean resp = producerTemplate.requestBodyAndHeader(
    RouteNames.DIRECT_DEL_EMPLOYEE, null, "id", id, 
    EmployeeBean.class);
  producerTemplate.stop();
  return ResponseEntity.ok().body(resp);
}

Nothing here striking or that needs an explanation. We call a direct route that returns a response which in turn is sent back to the client. Ok, on to the routing part of it.

// Delete an Employee -> Publisher
log.info("Loading {} route....", RouteNames.DIRECT_DEL_EMPLOYEE);
from(RouteNames.DIRECT_DEL_EMPLOYEE)    // direct:delEmployee
  .routeId("direct-delEmployee")
  .bean(ManageEmployeeService.class, "getEmployee(${header.id})")
  .to(RouteNames.SEDA_DEL_EMPLOYEE)   //  seda:deleteEmployee
  .end();

// Delete an Employee -> Consumer 1
from(RouteNames.SEDA_DEL_EMPLOYEE)      // seda:deleteEmployee?multipleConsumers=true
  .routeId("seda-deleteEmployee")
  .log("Deleting... ${header.id}")
  .bean(ManageEmployeeService.class, "delEmployee(${header.id}, ${body})")
  .end();

// Delete an Employee -> Consumer 2
from(RouteNames.SEDA_DEL_EMPLOYEE)      // seda:deleteEmployee?multipleConsumers=true
  .routeId("seda-addDelEmplArchive")
  .log("Archiving Deleted... ${header.id}")
  .bean(ManageEmployeeService.class, "addToArchive('DEL', ${header.id}, ${body})")
  .end();

Again a lot going on in that block of code. Publisher gets the detail for the employee for the ID passed, and then starts an async message. Then it returns the employer detail already fetched to the calling consumer.

Now the two consumers defined on line #10 and line #17 takes up this message asynchronously. The first one does the real work of deleting this employee. The second one adds this to archive. Like I said, this is not ideal design. If the delete failed, we are not sending an appropriate message. Also, we are still adding the message to archive. This just shows an example when two consumers are independently consuming the same message.

Parallel Messaging – Sending to multiple consumers at the same time

Now let’s assume a scenario where we want to send the message to multiple different consumers at the same time. We can write that as follows,

from(RouteNames.SEDA_UPD_SALARY)        // seda:updateSalary
  .routeId("seda-updateSalary")
  .process(new UpdateSalaryProcessor())
  .end();
from(RouteNames.SEDA_UPD_SALARY_AC)     // seda:updateSalaryAc
  .routeId("seda-updateSalaryAc")
  .process(new SendToAccountingProcessor())
  .end();
from(RouteNames.SEDA_UPD_SALARY_HR)     // seda:updateSalaryHr
  .routeId("seda-updateSalaryHr")
  .process(new SendToHRProcessor())
  .end();

from(RouteNames.DIRECT_UPD_SALARY)      // direct:updateSalary
  .routeId("direct-updateSalary")
  .multicast().parallelProcessing()
  .to(RouteNames.SEDA_UPD_SALARY, RouteNames.SEDA_UPD_SALARY_AC, RouteNames.SEDA_UPD_SALARY_HR)
  .transform(simple("Sent to multiple routes..."))
  .end();

Here we have three different consumers defined on line #1, line #5 and line #9. From line #17, we are sending a message in parallel to all these three queues. This is useful when we want to broadcast the message to different systems for processing.

Timer – Delete Archive

As a final example, we will take a look at timer. This will be triggered every time the timer expired. We will build a simple timer in this case. However, Camel supports building the timer using crontab format as well.

// Timer to clean
log.info("Loading {} route....", RouteNames.TIMER_CLEAN_ARCHIVE);
from(RouteNames.TIMER_CLEAN_ARCHIVE)    // timer:cleanEmployee?fixedRate=true&period=60000
  .routeId("timer-cleanArchive")
  .bean(ManageArchiveService.class, "runOnTimer()")
  .end();

This is a very simple timer that is triggered at a fixed rate of every minute. We could have also added a delay to start it at a later time.

Conclusion

In this blog, we have touched on some of the routing mechanisms that Camel supports. This is still the tip of the iceberg, and Camel supports a lot of additional patterns as well. We have also used this will Spring Boot as that is the use case my team would be looking for, however, you can always use Camel without spring integration. Hope that helps some of you. Ciao for now!