Being able to publish and subscribe to event streams is a powerful enabler for business activities. As business rules change and systems evolve, the low coupling that is inherent to this integration pattern allows an IT landscape to evolve gracefully.

Imagine, for example, that you need to perform several independent actions whenever a user signs-up to your site (like: create an account, register to a marketing mailing list, warm-up caches…). A good design would be to have these different actions performed by different systems acting upon receiving their marching order from a central place where “new user sign-up” events would be published to.

In the enterprise world, publish/subscribe design is generally achieved by using a messaging platform that presents a bias towards a particular technology like JMS or MSMQ. Though perfectly fine, these solutions inter-operate poorly with systems running on heterogeneous technologies.

AMQP, the Advanced Message Queuing Protocol, has been introduced to address such problem. Because it is a protocol specification (unlike JMS, which only defines an API), truly fosters interoperability.

In this article, we will look into a use case that illustrates how AMQP and Mule can be leveraged to put into motion a truly inter-operable publish/subscribe architecture.

What about PubSubHubbub?
Per its own definition, PubSubHubbub (aka PuSH) is “A simple, open, server-to-server web-hook-based pubsub (publish/subscribe) protocol as an extension to Atom and RSS”. As such, PuSH is a great candidate technology for what we want to achieve here. AMQP offers interesting extra capacities, like key-driven message routing, which we think make it slightly more compelling for enterprise integration. This said, should a publish/subscribe architecture be web-scaled, PuSH would be the architecture of choice.

Use Case: Sharing financial data updates

Companies dealing with financial transactions in multiple currencies need to have up-to-date exchange data in order to stay on top of monetary fluctuations and adapt their pricing policies or projections on the fly. Applications can easily access readily available financial feeds from commercial entities that provide this data for a fee. It is common for such feeds to come with strict rate limits and restricted number of allowed consumers.

In a corporate landscape where multiple systems need to access such feeds, the best strategy consists in centralizing all the external feeds access in a central location and, from there, to distribute their data to the interested applications. The publish/subscribe model is the perfect distribution model for such a scenario.

Feed Poller Overview

As illustrated above, we’ve unsurprisingly opted for using AMQP and Mule as the supporting technologies of this scenario. Here is why:

  • AMQP, as an inter-operable publish/subscribe protocol, is the best technical implementation that a company can decide to use for this scenario as it allows disparate applications to directly subscribe to the financial data update with a standalone client library in the language of their choice.
  • Mule, as an integration broker and thanks to its support for AMPQ, is the perfect platform for performing the external feeds polling and publishing their content to AMQP.

AMQP & Mule

Let’s now delve into the implementation details. The following diagram illustrates how this will be implemented:

Feed Poller Details

Mule will poll the currency exchange rate feed on an hourly basis and will publish it to a topic exchange named “financial” with the “currency.rates” routing key. We decided for this approach instead of a fanout exchange (and no routing key) because we will later on add a stock market poller that will publish to the same exchange but with a different routing key, allowing subscribers to filter out feed data they’re not interested in.

Mule will also subscribe to the exchange and will write down an audit trail of all the messages that have been dispatched to the exchange, this for compliance and traceability reasons.

Here is the complete configuration for Mule 3.1:

Notice how we’ve created a global endpoint: it will be used for sharing the exchange configuration between all the outbound endpoints used to publish data to the “financial” exchange, as we expect our solution to grow over time. Note also that we’ve redefined the exchange on the inbound endpoint alongside the queue configuration: it is a very common AMQP pattern for subscribers to re-declare an exchange (an operation that is idempotent), which alleviates any potential issue if the exchange doesn’t already exist.

The audit trail simply writes each message received on the exchange to a different file. Mule expression language is used to create file names that represent the nature of the received message (via its routing key) and a time-stamp.

So is this solution truly inter-operable?

A simple Python consumer

Let’s introduce some heterogeneity and create a simple Python consumer, using the amqplib client library. Here is its code:

Notice how the subscriber binds it’s queue using the “currency.rates” routing key. This will guarantee that whatever happens to be published to the “financial” exchange, only the currency exchange rates will be received by this client.

As discussed in the introduction, one of the advantages of a publish-subscribe architecture is its extensibility. Let’s add a new feature to our solution.

Open for extension

With the previous Mule configuration in place, adding a new feed poller and publisher is very easy, as shown in the following example:

This new addition takes care of polling the Nasdaq Composite Index once per day and publishing it in the “financial” exchange with this specific routing key: “stockmarket.nasdaq.composite”. Subscribers that have only subscribed to the “currency.rates” routing key will not receive any of these updates.

Conclusion

AMQP opens the door to a truly inter-operable publish/subscribe architecture. By using Mule’s integration capacities, it becomes possible to easily publish messages to an AMQP broker or to subscribe to it for consuming messages. Companies opting for these technologies will discover new opportunities to evolve their software systems and easily roll-out new business processes that tap on events.

No related posts.

Interested in 3 days of knowledge sharing, hands-on labs, industry focused sessions, and plenty of networking? Register for the premier integration event, CONNECT London »

7 Responses to “Inter-operable Publish/Subscribe With AMQP”

Varun Narula March 27th, 2011, 3:53 am

Hi Ross,

I have been trying to work a simple AMQP example with Mule 3.1.1 (precompiled version) and IDE 2.1.1, but it seems some libraries are missing. Part of the error which suggests this could be a reason is reproduced below –

INFO 2011-03-27 15:46:30,491 [main] org.mule.config.spring.MuleApplicationContext: Refreshing org.mule.config.spring.MuleApplicationContext@30d82d: startup date [Sun Mar 27 15:46:30 IST 2011]; root of context hierarchy
ERROR 2011-03-27 15:46:37,099 [main] org.mule.config.spring.SpringXmlConfigurationBuilder: Configuration with “org.mule.config.spring.SpringXmlConfigurationBuilder” failed.
org.mule.api.lifecycle.InitialisationException: Configuration problem: Unable to locate NamespaceHandler for namespace [http://www.mulesoft.org/schema/mule/amqp]
Offending resource: URL [file:/C:/development/UID/eclipse/workspace/mule_proj01/src/main/app/stdio-rabbitmq-config.xml]

Are there any Spring specific jars ( Spring-AMQP maybe) which need to be on the classpath ? Or am I missing something else ?

Regards,
Varun

Varun Narula March 27th, 2011, 4:24 am

Ahh… on another blog I found the link to the AMQP transport

https://github.com/downloads/mulesoft/mule-transport-amqp/mule-transport-amqp-3.1.0.zip

Just missed it before the earlier post :-)

From the Mule’s Mouth » Make the move to Mule 3 followup Q&A March 28th, 2011, 9:08 am

[...] Q:  Is there an example of polling A:  There are several examples on the blog. Here is one using AMQP. [...]

Ben July 8th, 2011, 10:55 pm

My message add custom headers e.g. IMEI=”1234567890123456″.
Can use provide examples to demo related expression in header evaluator?

Bill October 6th, 2011, 4:32 am

Hi Ross i have a problem working with amqp in mule.I wanna make a simple example and i always get ‘amqpReplyTargetService.stage1′ phase ‘start’ does not support phase ‘dispose’
and
Message : Failed to invoke lifecycle phase “start” on object: Flow{amqpReplyTargetService}
Code : MULE_ERROR-70228

my example is like this

Amar February 15th, 2013, 10:55 am

Hi All could any one please tell while sending Message (amqp:outbound)to MOM(Rabbit) no any Queue property is used.but while retrival from MOM we use queue property.How the Mom comes to know we have to fetch message from the queue(which is declared in amqp:inbound property) by help of riuting key.

David Dossot February 16th, 2013, 8:28 am

@Amar: In pre-1.0 AMQP messages are dispatched to exchanges, not queues. This is why the outbound endpoint has no notion of queue. The consumers bind queues to exchanges, but the producer is completely unaware of that.

Leave a Comment