john.demic on Wednesday, February 29, 2012

Asynchronous Message Processing with Mule


Processing messages asynchronously is an important technique when developing integration applications.   Asynchronous applications are typically easier to scale, allow for the implementation of reliability patterns and sometimes better reflect use cases in the real world.  Mule, not surprisingly, offers a wealth of opportunities to process messages asynchronously.

Asynchronous Flows

Setting exchange-pattern of a message source to “one-way” enables asynchronous processing for a flow. Some transports and connectors, like JMS or the VM transport, are asynchronous by default. Other transports which are inherently synchronous, like HTTP, need there exchange pattern explicitly set. Setting one-way exchange patterns on these transports allows you to simulate asynchronous behavior with protocols that would otherwise  not be asynchronous.    The following Gist demonstrates how to asynchronously bridge an HTTP request to JMS.

Message Aggregation

You can process asynchronously dispatched messages in groups by using the collection- aggregator. Message groups are defined by setting the correlationId property of a MuleMessage or by setting the MULE_CORRELATION_ID outbound header. The correlationGroupSize property of MuleMessage, or the MULE_CORRELATION_GROUP_SIZE header, define the amount of messages in a group.

The following demonstrates how the collection-aggregator can be used to asynchronously wait and collect the contents of a correlation group arriving on a VM inbound-endpoint.

2 Responses to “Asynchronous Message Processing with Mule”

MuleWrestler March 1st, 2012, 9:15 am

There’s no “collection-aggregator-router” in Mule 3.2 …

As most published Mule examples, your’s is just a couple of fragments that can’t work together, and there’s nowhere a simple-full-working example …

For instance, could you rather post a full config for a working basic example of using the collection-aggregator to collect multiple asynchronous messages ? I’ve been trying to make this work for several days, using request-reply, “all”, “async” and a number of other things, but nothing seems to work.

john.demic March 7th, 2012, 7:37 am

Thanks for the feedback and good catch with collection-aggregator-router! I updated the post to reflect that.

Here’s a complete example that demonstrates accepting a message off a VM queue, routing it asynchronously to 3 flows, aggregating the responses and sending to another queue:

I used a custom header called “requestId” that I propagate to MULE_CORRELATION_ID to control the aggregation. Please let me know if this is more useful and if you have any questions.

Leave a Comment