Designing and Implementing our Camel-based mgm Cosmo Router

We recently finished a subproject to integrate our mgm Cosmo insurance software with an external CRM system. Both systems had to exchange XML documents in a reliable and robust manner in order to keep their data in sync. We used Apache Camel as the middleware to handle all the transfers between the Java and .NET based systems. This blog series discusses our solution and shares our experiences with Apache Camel.

mgm Cosmo is a standardized software solution that allows insurance companies to produce and manage commercial and industrial business. It accompanies the insurance sales process starting with a submission as a sales lead. A submission may produce one or more offers. If an offer is accepted by both the customer and all the other parties, it becomes an insurance policy. This policy can later be extended, changed or cancelled in the form of an endorsement.

Technically, all these five main business entities are managed as self-contained XML documents which you could easily store in document-oriented NoSQL database. In the following they are simply referred to as XML documents.

Exemplary insurance process with all involved parties and its five main business entities.

Project Goals

The business goal for the system integration was first, to make all XML documents from Cosmo available for viewing in an external CRM system and second, to receive all corresponding partner and account information from the CRM.

We chose Apache Camel as the middleware to handle all the Enterprise Application Integration (EAI) tasks, specifically the SOAP transfers between our Java backend application and the external .NET services. This Camel-driven application was simply named “Cosmo Router”.

Our main goal for the Cosmo Router was to be as reliable, automated, robust and fail-safe as possible. For example, we strived for a system which can handle connection failures and timeout errors with adaptable retry strategies like increasing waiting periods. As you can imagine it didn’t take us long to find even more sophisticated failover scenarios. However, retry strategies respectively failover behavior was only one small topic to be encountered during this project.

This blog article is accompanied by several source code examples but assumes a basic understanding of Apache Camel. In case you never heard of Apache Camel, you might want to take a look at these additional resources and examples:

Architecture Overview

Pretty early, our customer defined SOAP as the mandatory messaging technology. The task at hand was both to transfer data to newly created web services as well as to receive data from them. In this article we will only focus on the sending part. For better distinction, the target web services will be called the “external” web services. The external web services were developed by a third party in .NET respectively Windows Communication Foundation 4 (WCF4).

One of our main objectives for our architecture was to decouple the transfer process from the Cosmo Web Application’s workflow in order to process all transfers asynchronously (from the web application’s point of view). This decision was possible because the application workflow did not depend on the transfer results. It was also driven by previous negative experiences with another SOAP web service and its limited availability. In practical words, we wanted a Cosmo user to be able to continue working with the application instead of being blocked by a “Transfer in progress, please wait” popup.

As it turned out, this definition became very handy: during the deployment of the final release, an initial migration of all existing XML documents was planned. In numbers, tens of thousands of transfers needed to be processed. But with the help of asynchronous transferring, the Cosmo Web Application only needed to be unavailable for a few hours to safely prepare all transfers. This was a great improvement compared to two full days needed for the actual transferring. When the initial transferring even became a whole week due to bugs, we could relax with a “no problem” smile and happy users.

Driven by the idea of asynchronism we finally ended up with the following architecture:

This diagram shows the sending process from the Cosmo Web Application to the external CRM web service.

The whole process is triggered by specific user interactions in the web application as indicated by the big grey arrow. We won’t go into details there but instead focus on the “Transfer Table” and particularly the Cosmo Router in the middle lane.

Transfer Processing

Let’s take a closer look at each step in the above shown diagram. Please note that for the sake of simplicity all of the following examples don’t contain log statements or error and exception handling.

Step 1: Getting new Transfers from the Database

The Cosmo Router periodically checks the database’s “Transfer Table” for new transfers. At first, we wanted to do the database polling with Camel’s SQL component which seemed to be a perfect fit. However, it doesn’t support being used as a “Camel Consumer” (i.e. as a starting point of a Camel route or in other words as a “from” endpoint). So we decided to simply start the routing process with the Camel Timer component as described next.

Starting Routes with Timers

A timer as a “from” endpoint triggers the processing und subsequently the polling of the database for new transfers. In Camel’s Spring XML DSL this looks as follows:

<!-- Note: the timer must be explicitly defined as an endpoint
           due to the timer's period being configurable -->
<endpoint id="newOffersTimer" uri="timer://newOffersTimer?period=${router.timer.offer.period}"/>

<route id="Send.Start.Timer.NewOfferTransfers" autoStartup="true" startupOrder="901">
  <from uri="ref:newOffersTimer"/>
  ...
  <setProperty propertyName="CurTransferType">
    <simple>${properties:TRANSFER_TYPE_OFFER}</simple>
  </setProperty>
  ...
  <to uri="direct:Send.GetAndProcessNewTransfers"/>
</route>

The route references the timer as a “from” endpoint. The timer fires as soon as the route is started and does nothing more than creating an (almost) empty Camel Exchange object which is all we want at that moment. Afterward, the route just sets a Camel property and delegates the processing to another route called “Send.GetAndProcessNewTransfers”. As you might have guessed already, this is for re-using the route several times (more precisely once for each transferable XML document).

Using multiple, separated timer routes brings some benefits:

  • Each timer route can be disabled without affecting all the other timer routes. This is very useful when there is a bug in the processing of one specific XML document and you want to “pause” exactly that type. This can keep your log clean of many retry or failure attempts – not to mention the savings on bandwidth, memory or CPU usage (if these count anyways).
  • All the timer routes can run concurrently which greatly speeds things up but keeps the routing logic simple, too. Or at least simpler than using one asynchronously firing Timer for all the types together.

Fetching new XML Documents from the Transfer Table

As mentioned before, all the timer routes delegate the processing to another Camel route named “Send.GetAndProcessNewTransfers”. Here it is:

<route id="Send.GetAndProcessNewTransfers">
  <from uri="direct:Send.GetAndProcessNewTransfers"/>
  ...

  <!-- check for new transfers -->
  <to uri="bean:sqlExecutor?method=selectNewTransfers"/>

  <!-- delegate the processing of the body with all the selected transfers -->
  <to uri="direct:Send.ProcessSelectedTransfers"/>
</route>

The obvious thing to recognize is the usage of a custom bean named “sqlExecutor”. We found this approach to be much easier than working with Camel’s SQL component – particularly when using dynamic SQL placeholders. We will come back to this topic later on (in the chapter “Struggling with the SQL Component”) but for now here is the custom bean:

public class SqlExecutor {
  ...

  public List<Map<String, Object>> selectNewTransfers(String body, Exchange exchange) {
    final String transferType = exchange.getProperty(Constants.TRANSFER_TYPE, String.class);
    List<Map<String, Object>> newTransfers =
        jdbcTemplate.queryForList(
            SELECT * FROM TRANSFER WHERE TRANSFER_TYPE = ? AND STATUS = ?",
            transferType,
            TransferStatus.NEW.name() );

    if (newTransfers.size() == 0) {
      log.debug("No '{}' {} transfers - STOP", TransferStatus.NEW, transferType);
      // STOP the routing by appropriately marking the Exchange (equals "<stop/>" in XML DSL)
      exchange.setProperty(Exchange.ROUTE_STOP, Boolean.TRUE);
      return null;
    }
    log.info("Found {} '{}' {} transfers", new Object[]{ newTransfers.size(), TransferStatus.NEW, transferType });
    return newTransfers;
  }
}

The code should be pretty much straight forward. The bean uses an underlying Spring JdbcTemplate and all new transfers are returned as a List of Maps (with the SQL column names as keys). Until now this is the same behavior as the Camel SQL component. But then the handling of one of our processing definitions follows: don’t continue with the routing if there are no new transfers (see “if”-condition). In contrast, the SQL component would continue with an empty array which would have to be checked and reacted upon similarly in the Camel XML DSL.

Step 2: Preprocessing

After having all new transfers as a List in the Camel body, the route “Send.ProcessSelectedTransfers” is called. Within this route several great Camel features are used: the Splitter (see line 8 ) and the Validate DSL (see line 5 and 13-14).

<route id="Send.ProcessSelectedTransfers">
  <from uri="direct:Send.ProcessSelectedTransfers"/>

  <!-- some validations -->
  <validate><simple>${body} is 'java.util.List'</simple></validate>

  <!-- Split the List with all the Transfers and process each Transfer -->
  <split stopOnException="true" parallelProcessing="false">
    <!-- choose what to split (here: the body which is a List) -->
    <simple>${body}</simple>

    <!-- again some validations -->
    <validate><simple>${body} is 'java.util.Map'</simple></validate>
    <validate><simple>${body["REQUEST_XML"]} != null</simple></validate>

    <!-- remember some properties for later use -->
    <setProperty propertyName="TransferId">
      <simple>${body["ID"]}</simple>
    </setProperty>

    <!-- set body to XML payload -->
    <setBody>
      <simple>${body["REQUEST_XML"]}</simple>
    </setBody>

    <!-- route continues with Step 3 -->

Aside from the validations, the first thing to happen is the splitting of the List which the previously described “sqlExecutor” returned. Next, the ID is saved to a Camel property allowing the later correlation of the SOAP reply. Finally, the body is set to the payload XML as required by the (upcoming) CXF component.

Note that the Splitter was configured purposely to sequential processing and immediate stopping on Exceptions (see line 8). We decided against the use of Camel’s asynchronous processing to keep the routing as simple as possible. For example consider you have three transfers: A1, B1 and A2 with A2 being an update of A1. Now A1 fails due to a temporary network failure but B1 and A2 succeed. Furthermore let us define that all failures should be retried. Therefore A1 must either be discarded or A2 must be resend in order to prevent the older A1 to overwrite the newer A2. Discarding of A1 might become a problem if the receiving system requires every state of A being transferred. Resending all dependent states might make the routing logic more complicated. Howsoever, if A2 would not be send unless A1 was successful, additional routing logic would not have to be considered at all. Another example would be that A1 and A2 are sent shortly after each other but A1 gets e.g. delayed by network traffic and arrives a millisecond after A2. The receiving system would have to recognize the out-dated A1 and handle it appropriately. Eventually we felt that the price of a more complex failover behavior was too high for the small performance benefit of parallel processing.

Step 3: Sending via SOAP

Before the sending via SOAP happens, a final XML schema validation of the body containing the request XML happens:

    <!-- validate before sending -->
    <to uri="validator:classpath:com/mgmtp/OfferTransfer.xsd?useDom=false"/>

This validation has the benefit of directly getting meaningful validation errors instead of generic SOAP faults from an external web service which you might have no or very little control over.

After the validation, the request XML is sent to the external web service with the help of Apache CXF which is a really simple one-liner:

    <!-- here's the enveloping in SOAP and sending happening -->
    <to uri="cxf:bean:offerWebService"/>

    <!-- route continues with Step 4 -->

Note that we configured CXF to build the SOAP envelope automatically, so we only have to bother with the important things, namely the payload XML. Here is the complete configuration of the CXF bean (belongs outside of the “<camelContext/>” element):

<cxf:cxfEndpoint id="offerWebService"
                 address="${offer.webservice.url}"
                 wsdlURL="com/mgmtp/transfer/Offer.wsdl"
                 serviceName="wsdlns:Offer" endpointName="wsdlns:OfferSoap12"
                 xmlns:wsdlns="http://www.external-ws.com/Offer/v1">
    <cxf:properties>
      <entry key="dataFormat" value="PAYLOAD"/>

      <!-- prevents DOM parsing of huge messages -->
      <entry key="allowStreaming" value="true"/>

      <!-- enables CXF Logging Feature which writes inbound and outbound SOAP messages to log -->
      <entry key="loggingFeatureEnabled" value="true"/>
    </cxf:properties>
</cxf:cxfEndpoint>

Step 4: Result Processing

The previous CXF call from Step 3 returns the response XML payload from the external web service as Camel body. In our scenario, the response XML document contains a result code from the external web service like “INTERNAL_ERROR” or “SUCCEEDED“. In the final routing, this response XML is evaluated and saved to the database:

    <!-- evaluate the SOAP result -->
    <to uri="bean:responseEvaluator?method=evaluateResponse"/>

    <!-- save the response XML -->
    <to uri="bean:sqlExecutor?method=insertResponseXml( ${body} )"/>

    <!-- other things like notify a service of a new response -->

    <!-- save the final transfer result -->
    <to uri="bean:sqlExecutor?method=insertTransferResult( ${property.TransferResult} )"/>
  </split>
</route>

The custom bean “responseEvaluator” inspects the response XML and then decides what the ultimate transfer result should be. For example, we could ignore an “INTERNAL_ERROR” result and just send the transfer again and again, because it is most likely a bug in the external system which will be fixed sooner or later. Such a strategy would not need manual interaction to resume or resend a failed transfer and thus save effort. But regardless of the evaluation’s outcome, the responseEvaluator always sets a Camel property named TransferResult which will be used by the sqlExecutor bean.

The “sqlExecutor” saves both the response XML and the TransferResult in the database. The saving is once more executed with a Spring JdbcTemplate, similar to Step 2:

public class SqlExecutor {
  ... 

  public void insertTransferResult(final String transferResult, final Exchange exchange) {
    final String transferId = exchange.getProperty(Constants.TRANSFER_ID, String.class);
    Validate.notEmpty(transferId, "Camel Property '" + Constants.TRANSFER_ID + "' must not be empty!");
    Validate.notEmpty(transferId, "Parameter transferResult must not be empty!");

    int affectedRows = jdbcTemplate.update(
        "UPDATE TRANSFER SET STATUS = ? WHERE ID = ?", transferResult, transferId
    );

    Validate.isTrue(affectedRows == 1, "Not more or less than exactly one row should have been updated.");
  }
}

In case you wonder why we decided to use a bean as a response evaluator instead of a Camel “choice” statement: one of the main reasons was the beneficial syntax-checking of the Java compiler. The WSDL contains an XML schema and that schema is integrated with JAXB into the Cosmo Router by using JAXB’s XJC compiler to generate Java classes from the XML schema. As a result, XML schema changes like return-code renames (being an enumeration in the XSD) will immediately bring up compiler errors in the Java code. Another reason was readability: many “choice/when/otherwise” evaluations in the Camel XML DSL are more difficult to read than in Java.

This concludes the most important steps in the transfer processing. Of course we were only able to define this behavior because we had the luxury that neither the prevention of duplicate transfers nor high-throughput were very important. Nevertheless we achieved reliability, robustness, simplicity and last but not least project delivery on time.

Struggling with the SQL Component

As mentioned before in “Step 1″, Camel’s SQL component requires the data for dynamic placeholders to be delivered in the Camel body. Although this may make sense for integrating the SQL component into Camel’s way of processing, we felt the mandatory changing of the body to be cumbersome. This is fine if you don’t have a Camel body yet, e.g. when checking the database for new transfers. But it can become a headache when trying to save the response.

Imagine your Camel body contains a SOAP response and you want to save it together with a final result code like “SUCCESS” or “ERROR” in the database. You could use a Camel SQL statement like this:

UPDATE TRANSFER
  SET RESPONSE_XML = #, RESULT_CODE = #
  WHERE ID = #

But for multiple placeholders, the Camel SQL component requires the body to be an iterable array. Thus you need to convert the body with the SOAP response into an iterable array, supplemented with the result code and the ID to update.

Surprisingly we found no way to achieve this directly with the standard Camel XML DSL. So one idea was to write a custom Camel Type Converter which would parse a comma-separated body and split it into an iterable array:

<route>
  ...

  <setBody>
    <simple>"${body}" ; "${property.TransferResult}" ; "${property.TransferId}"</simple>
  </setBody>
  <convertBodyTo type="com.mgmtp.IterableList"/>
  <to uri="sql:UPDATE TRANSFER SET RESPONSE_XML = #, RESULT_CODE = # WHERE ID = #"/>

  ...
</route>

Although this idea would work, it is of course more difficult to read and introduces new logic to test and new potential failure points like empty strings, null values, exceeded string lengths or unescaped quotation marks. Additionally, accessing the SOAP response once more after having it saved is difficult, too. Not only did you change the body to an iterable array but also the SQL component’s behavior is to “overwrite” the body with the number of updated rows! You could of course save the body to a Camel property before the SQL statement is executed. Or you could use Camel’s Multicast DSL to clone the original body with the SOAP response. But why do all this bending in the first place if you just want to execute a simple dynamic SQL statement with multiple placeholders? Well, our answer was simple: avoid the SQL component and just use a custom bean like the previously described “sqlExecutor” bean.

Fighting Doubts with Prototypes

The previously described architecture did not appear out of thin air but did undergo several iterations and prototypes. We especially wanted to collect early experiences in some crucial areas like using Apache CXF as JAX-WS implementation (we previously used Metro) and evaluating how good it integrates into Apache Camel.

Through early prototypes we quickly gained enough confidence in the chosen architecture. Additionally, we were able to start at once with the definition of most failure scenarios and consequently the optimal failover behavior we wanted the Cosmo Router to have. In fact, every time an unexpected failure occurred it was instantly checked against all previously defined failure scenarios and added, if necessary. The corresponding failover logic was implemented at once, too. Following this agile and pragmatic approach we soon achieved a robust messaging router which became better and better the more the implementation and testing went on.

If you are curious how we defined our failure scenarios and the required failover behavior of the Cosmo Router, then watch out for the next part of this blog series.

Experiences with ActiveMQ

Our first architectural draft was based on the idea to use ActiveMQ for exchanging messages between the Cosmo Web Application and the Cosmo Router. The idea was that the “TransferCreator” would insert transferable XML documents as JMS messages into the “Process Queue”. Thus we drafted the architecture shown in the following diagram:

Initial experimental architecture with ActiveMQ.

We built some prototypes around this idea but soon stumbled upon several issues. For example, we wanted both the Cosmo Router and the Cosmo Web Application to support non-availability. However, that would have been problematic in the above described architecture because the TransferCreator is attached to a user request. This user request should not have to wait if the Cosmo Router’s “Process Queue” is not available for several minutes or even hours.

So we came up with the solution to have two queues: one in the Cosmo Web Application and one in the Cosmo Router and both queues are communicating with each other. In ActiveMQ terminology this is called a “store and forward network of brokers“:

Improved experimental architecture with ActiveMQ using a "store and forward network of brokers" approach.

But again we had concerns with this idea as explained in the next chapter.

Why we didn’t choose ActiveMQ

Please consider once more our final architecture which favored a simple database over ActiveMQ:

Our final architecture for comparison as previously described at the beginning of the article.

We saw several benefits in using the transfer table approach as depicted above instead of ActiveMQ:

  • We didn’t require additional time to gain sufficient ActiveMQ knowledge. Although we were very interested in ActiveMQ, the project timeline was very tight and time was of the essence.
  • We didn’t have to introduce yet another framework. This was particularly important because the used development process was Scrum and by process definition everyone should be able to implement everything everywhere. Consequently the more frameworks are used, the higher the learning curve becomes.
  • We didn’t have to worry about transaction support. Unfortunately we didn’t achieve quick und solid results with transaction support. For example it stayed unclear how difficult it would be to integrate both ActiveMQ brokers into transactions while also keeping the Cosmo Router a lightweight stand-alone application.

Thus we finally decided that using a relational database would be absolutely sufficient for our requirements and much easier to setup, maintain and understand, although the use of ActiveMQ might be quite reasonable in more complex scenarios.

Conclusion

In this article we described how Camel can be used to transfer XML documents between SOAP web services. We have shown that the use of a transfer database and the use of synchronous transfer processing allowed us to keep the routing relatively simple and easy to understand. Yet different types can be transferred concurrently which increases the overall throughput and sufficiently reduces bottlenecks. Furthermore the Cosmo Web Application can create and dispatch transfers asynchronously and therefore will not block its users from working with it.

All this is achieved in a transactional way, meaning that as long as a transfer is not completely processed, it stays in a “NEW” state and is therefore tried to be send again and again. This adaptable retry behavior does not only greatly reduce the need for manual investigation and interaction. It has also the benefit that the Cosmo Router can resume from any state even after a sudden cold restart.

Last but not least, our pragmatic approach enabled us to deliver the application integration of mgm Cosmo with the external CRM system on time. Moreover, the asynchronous transfer creation and the highly automated retry strategies saved us effort when going live. We only required a short downtime and we needed no further interaction when several XML document transfers were failing a whole week.

Share

Leave a Reply

*

2 Responses to “Designing and Implementing our Camel-based mgm Cosmo Router”

  1. Claus Ibsen says:

    I really enjoyed reading this blog. Its well written and goes in depth, and also details why you did it this way. As well telling about the pitfalls and problems you had.

    I took the liberty of adding a link to this blog entry from the Camel articles page (our link collection, takes a while to sync the change).
    http://camel.apache.org/articles

    From Camel 2.11 onwards the SQL component now also supports consumer side, so you don’t need to trigger this with the timer component anymore.

    Though a good idea with the “noop” so you can execute the SQL but dont change the message body. I will log a ticket to get this implement in the SQL component.

    I agree about the parameter mappings. Its been improved in 2.11, to allow using named parameters. But so far it only grabs from body/headers. Not sure if there is a good way to define a syntax that don’t get to verbose in the DSL.

    Some people use a template language to do the mapping and build dynamic queries. For example with freemarker / velocity etc.

    PS: I personally like the spring jdbc template for SQL stuff. Its very flexible and you can do all in the Java code.

    Another alternative is MyBatis when you have more extensive mapping needs as its excellent for that.

    Looking forward to the next blogs.

    /Claus Ibsen
    Apache Camel committer

    • Michael Frieß says:

      Thank you for your comment. We appreciate the positive feedback and the reference on the Camel website!

      I’m looking forward to a noop option in future Camel releases which could further reduce the need for custom logic or “workarounds” like the multicast EIP.

      The new SQL component options in the upcoming Camel 2.11 sound indeed like an interesting alternative to the Timer endpoints and the SqlExecutor#selectNewTransfers method as described in the article. The new option “useIterator” looks like an appropriate alternative to the “<split/>” usage, too.

      The named parameters are interesting as well. Wouldn’t it be even nicer if their resolving process would consider the Exchange’s properties, too?
      Here is a simple (untestet) example what I mean:

      <route>
        <setProperty name="prop1"><simple>${TransferId}</simple></setProperty>
        <!-- ${body}: contains the SOAP message -->
        <to uri="sql:UPDATE TRANSFER SET RESPONSE_XML = # WHERE ID = :prop1"/>
      </route>