The AMQ model

Explanation

The OpenAMQ broker provides two basic functionalities: Message routing - i.e. dispatching messages to different destinations based on business criteria - and message storage - i.e. storing the messages even if the consumer for the message is not immediately available.

Message routing is done by exchanges. There are several exchange types, each of them defining different routing algorithm (say topic exchange type routes messages using regular expressions, fanout exchange type routes messages blindly without even examining them, headers exchange type routes messages on complex criteria expressed in form of table, etc.) Each exchange type can have as many instances as needed.

The client application sends messages to a particular exchange instance. The exchange instance in its turn sends the message to particular destinations (explained below).

So, for example, if you are a news agency, you would like to distribute news to some subscribers, while other subscribers want only to get digests. You can solve the problem by creating two exchange instances of fanout exchange, one of them called "news" other one called "digests". You send news to "news" exchange instance and digests to "digests" exchange instance. Subscribers subscribed to "news" exchange instance will get news only, subscribers subscribed to "digests" exchange instance will get digests only, those subscribed to both exchange instances will get both news and digests, while those subscribed to none of them will get no messages:

amq1.png

The above diagram is a bit simplified. Imagine that Mr. A logs off or shuts down his computer in the evening and next day he logs in and expects to get the news that arrived during the night. What's missing is some storage for the messages. That's what queues are for. Exchanges actually route messages to queues exclusively, never directly to the consumers. Messages are then stored in queues and after a subscriber application starts up and asks to consume the messages, they are finally delivered.

In our example, we can create a queue for each subscriber (A, B, C, D respectively). The queues reside in the broker, therefore the messages won't be lost even if subscriber logs off, experiences power outage, network failure or becomes unavailable for whatever other reason:

amq2.png

Now imagine that the news services wants to publish news in different categories, say politics, sport, culture or business instead of simple news and digests categories. You could implement such a system using four exchange instances instead of two.

At this point scalability kicks in. What if we wanted to add a new caegory like science in the future? We would have to rewrite our news publising application to use five categories instead of four. While that's achievable, what if we want to allow news editors to use any categories they want? We would have to constantly rewrite our application. Wouldn't it be nice is OpenAMQ provided a way to use only a single exchange instance for all the news items, allow us to tag individual items by categories and allow subscribers to specify what categories of news they are interested in?

In fact, OpenAMQ provides exactly that kind of functionality.

Firstly, when publishing a message you can tag it by a string called routing key. The only thing you have to do is to fill routing_key parameter when calling publishing method (amq_client_session_basic_publish).

Secondly, you have to use single exchange instance called news of direct exchange type. By using direct exchange instead of fanout you instruct the broker to take the routing key into account when routing messages. (Fanout exchange type simply routes all the messages to all the bound queues ignoring the routing key altogether.)

Thirdly, when creating the binding between exchange instance and a queue (amq_client_session_queue_bind) specify the routing key. That way only the messages with specified routing key will be sent to the bound queue. You can have several bindings with different routing keys between a exchange instance and a queue. That way the subscriber can subscribe to several categories. Have a look at the diagram below:

amq3.png

Following diagram shows what happens when message is sent. In the beginning producer (our news publishing application) creates a message, tags it with appropriate category (sport) and send it to news exchange instance. News is of direct exchange type so it knows that it should deliver messages only when routing key (category) in the message is same as routing key of the binding connecting it with particular queue.

This process of selecting appropriate destinations for the message is called message matching. In our case there are two matches (pointed star symbol), one of them with binding to the queue A, another one with the binding to the queue B. Exchange instance therefore sends one copy of the message to A and one copy to B:

amq4.png

To make things even more complex, imagine you would like to define news categories as a hierarchy rather than plain list of topics. That way politics category will have subcategories for various parts of the world like politics.europe and politics.asia and even those can have subcategories of their own (politics.europe.france, politics.europe.uk etc.)

You would then like subscribers to be able to subscribe to a whole subtree of topics in addition to subscribing to individual topic. So for example you would like Mr. A to be able to subscribe to all the political news, not depending on the region, however, you would still like Mr. B to be able to subscribe for UK political news only.

To solve this problem, use topic exchange. Topic exchange matches messages using regular expressions. That way message tagged with routing key politics.europe.uk would match Mr. B's subscription for UK political news (politics.europe.uk) as well as Mr. A's subscription for all political news (politics.*.*), however it won't match Mr. C's subscription for asian political events (politics.asia.*) :

amq5.png

To find out how to implement such a news publishing appication, check our code exmples:

Compile the examples:

$ c -l news_publisher.c
Compiling news_publisher...
Linking news_publisher...
$ c -l news_terminal.c
Compiling news_terminal...
Linking news_terminal...
$ c -l subscribe_to_news.c
Compiling subscribe_to_news...
Linking subscribe_to_news...

Start OpenAMQ broker:

$ amq_server

Start the news publishing application:

$ ./news_publisher localhost

Start the news terminal:

$ ./news_terminal localhost johnm

Now, subscribe for a topic you are interested in:

$ ./subscribe_to_news localhost johnm "politics.*.*"

At this point the news items should start appearing in your news terminal window.

Note the following facts:

  • If you have subscription set up, news will be queued for you even if the news terminal is not running. When you run the news terminal, all the queued messages will arrive immediately.
  • You can subscribe to several topics.
  • You can subscribe to new topics even while news publisher and news terminal are running. You don't have to restart the applications for new subscriptions to take effect.
  • There can be several news subscribers. The example above shows only one subscriber (johnm), however, you can have any amount of them. Each subscriber has its own separate set of subscriptions.

Comments

Add a New Comment

Edit | Files | Tags | Source | Print

rating: +2+x

Author

Martin Sustrik <moc.xitami|kirtsus#moc.xitami|kirtsus>

All tutorials

Performance Tests: This tutorial explains how to do OpenAMQ performance tests using 0MQ performance testing framework.

Broker federation: How to setup a geographically distributed federation of OpenAMQ brokers and how to define dataflows between individual geographical locations.

The ESB cookbook: How to implement your own Enterprise Service Bus, step by step, using OpenAMQ.

Publishing contents: How to publish contents at high speed without bizarre memory issues.

Tuning OpenAMQ for low latency: How to tune your operating system and hardware to get the lowest latency, i.e. the best response times, from your OpenAMQ broker.

The mandatory and immediate flags: How to use the 'mandatory' and 'immediate' flags on the Basic.Publish method.

Load balancing: How to use OpenAMQ to distribute work between multiple applications, a technique called "load balancing".

Content based routing: How to route messages based on their header properties. It is a fast and simple way to do 'content based routing' without needing to inspect entire messages. We explain the principles and provide worked examples.

Transient or durable subscriptions: How to make subscriptions that are transient (come and go with their consuming applications) or durable (stay around).

The AMQ model: How the AMQ model works: this is a basic backgrounder for anyone starting to use OpenAMQ in their applications.

Developing on Windows: How to build OpenAMQ client applications using MSVC on Windows

Handling Ctrl-C in applications: How to properly detect an interrupt (Ctrl-C) and shut-down gracefully in C/C++ WireAPI applications.