john.demic on Wednesday, February 29, 2012

Asynchronous Message Processing with Mule


Processing messages asynchronously is an important technique when developing integration applications.   Asynchronous applications are typically easier to scale, allow for the implementation of reliability patterns and sometimes better reflect use cases in the real world.  Mule, not surprisingly, offers a wealth of opportunities to process messages asynchronously.

Asynchronous Flows

Setting exchange-pattern of a message source to “one-way” enables asynchronous processing for a flow. Some transports and connectors, like JMS or the VM transport, are asynchronous by default. Other transports which are inherently synchronous, like HTTP, need there exchange pattern explicitly set. Setting one-way exchange patterns on these transports allows you to simulate asynchronous behavior with protocols that would otherwise  not be asynchronous.    The following Gist demonstrates how to asynchronously bridge an HTTP request to JMS.

Message Aggregation

You can process asynchronously dispatched messages in groups by using the collection- aggregator. Message groups are defined by setting the correlationId property of a MuleMessage or by setting the MULE_CORRELATION_ID outbound header. The correlationGroupSize property of MuleMessage, or the MULE_CORRELATION_GROUP_SIZE header, define the amount of messages in a group.

The following demonstrates how the collection-aggregator can be used to asynchronously wait and collect the contents of a correlation group arriving on a VM inbound-endpoint.

Message Splitting

Some message payloads, like collections or XML documents, can be split and dispatched asynchronously.  Here’s some of the message splitters Mule supports:

  • collection-splitter:  Splits a List payload into individual messages.
  • splitter: Split a message using the Mule Expression Language.
  • mulexml:filter-based-splitter: Spits an XML document payload using an XPath expression.

The following Gist demonstrates splitting a java.util.List and routing to a JMS queue.

Message Chunking

Split message payloads can be reassembled by using the message-chunk-aggregator. By default the message-chunk-aggregator will use the correlationId and correlationGroupSize propertis of the MuleMessage for reassembly. You can define an optional “correlationIdExpression” to reassemble with a different message property.

The following flow illustrates how to assemble a group of split messages back together.

Tuning Asynchronous Flows

Asynchronous processing for a flow can be tuned by defining a queued-asynchronous-processing-strategy. Multiple queued-asynchronous-processing-strategy can be defined and set using the flow’s “processingStrategy” attribute. The following illustrates how to configure a flow to use up to 500 threads to asynchronously process messages arriving a VM inbound- endpoint.

Wrapping Up

Asynchronous message handling is one of the keys to using Mule effectively.  Hopefully this post illustrated some of Mule’s features that make dispatching, sending and tuning asynchronous message flows easy.

2 Responses to “Asynchronous Message Processing with Mule”

MuleWrestler March 1st, 2012, 9:15 am

There’s no “collection-aggregator-router” in Mule 3.2 …

As most published Mule examples, your’s is just a couple of fragments that can’t work together, and there’s nowhere a simple-full-working example …

For instance, could you rather post a full config for a working basic example of using the collection-aggregator to collect multiple asynchronous messages ? I’ve been trying to make this work for several days, using request-reply, “all”, “async” and a number of other things, but nothing seems to work.

john.demic March 7th, 2012, 7:37 am

Thanks for the feedback and good catch with collection-aggregator-router! I updated the post to reflect that.

Here’s a complete example that demonstrates accepting a message off a VM queue, routing it asynchronously to 3 flows, aggregating the responses and sending to another queue:

I used a custom header called “requestId” that I propagate to MULE_CORRELATION_ID to control the aggregation. Please let me know if this is more useful and if you have any questions.

Leave a Comment