Contents
- WireAPI - a Standard AMQP API
- Architecture of C/C++ WireAPI
- The Connection Class
- The Session Class
- Basic Content Class
- Queues and Exchanges
Describes a standard API for application developers that wish to use
the AMQP for messaging services in their applications. WireAPI is
currently a C API but is intended to evolve into a standard language-neutral
API that gives developers full access to the full functionality of the
AMQP protocol.
The C/C++ WireAPI implementation provided by iMatix for OpenAMQ is a
multithreaded application in two halves. One half runs as a background
thread , processing protocol methods and doing network i/o, so that messages
can be sent and received indepedently of application work. The second half
runs in the application thread, and provides API-level functionality,
including implementation of the WireAPI objects (connections, session, etc.)
This design has several consequences that you should take into account when designing and building your application and/or messaging frameworks:
-
If you build OpenAMQ as a single-threaded application, background processing of messages will be unreliable. Concretely, the background thread will only process when the application calls the amq_client_session_wait () method. * WireAPI is safe to use in multithreaded applications, but you must not share WireAPI classes (connection, session, etc.) between application threads.
AMQP is a multi-channel protocol, meaning that one network connection can
carry an arbitrary number of parallel, independent virtual connections,
which AMQP calls "channels". In WireAPI these are called "sessions" for
compatability with other middleware APIs. NOTE: OpenAMQ supports exactly
ONE session per connection.
Before calling any iCL method including amq_client_connection_new,
you must have called icl_system_initialise (), or your application will
fail with an abort.
amq_client_connection_new
Creates a new connection to the server.
amq_client_connection_t
*connection = NULL; // Current connection
icl_longstr_t
*auth_data; // Authentication data
auth_data = amq_client_connection_auth_plain ("guest", "guest");
connection = amq_client_connection_new (
"localhost", "/", auth_data, "test", 0, 30000);
icl_longstr_destroy (&auth_data);
if (connection)
icl_console_print ("I: connected to %s/%s - %s - %s",
connection->server_product,
connection->server_version,
connection->server_platform,
connection->server_information);
else {
icl_console_print ("E: could not connect to server");
return (1);
}
-
The host_name argument specifies a server name or IP address, optionally ending in ':' plus a port number.
-
The virtual_host name specifies the virtual host to which the connection will connect. The default virtual host is "/".
-
The auth_data provides an authentication block, used to login to the server. To create a plain-text authentication block, use the auth_plain method. The new method destroys the auth_data block on behalf of the caller.
-
The instance argument sets the client instance name, which can be used to identify a specific client in the management console or server log.
-
The trace argument sets the trace level for WireAPI.
-
The timeout argument governs all synchronous exchanges with the server - if the server does not respond within this time, the connection treats it as a fatal error. A timeout of zero means "infinite". A good value for fast networks is five to ten seconds; for a slower network, a value of 30 seconds or more is reasonable.
amq_client_connection_destroy
Closes an open connection, doing a clean shut-down. Applications should
use this to end a connection (rather than just exiting):
amq_client_connection_destroy (&connection);
amq_client_connection_auth_plain
Returns an authentication block for a plain login:
icl_longstr_t
*auth_data; // Authentication data
auth_data = amq_client_connection_auth_plain ("guest", "guest");
Connection Properties
These are the properties of a connection object:
-
alive (Boolean) - FALSE when connection has had an error
-
silent (Boolean) - set this TRUE to suppress error reporting
-
error_text (string) - error string reported by the API
-
reply_text (string) - error string reported by server
-
reply_code (integer) - error value reported by server
-
version_major (integer) - server protocol version major
-
version_major (integer) - server protocol version minor
-
server_product (string) - product name reported by server
-
server_version (string) - product version reported by server
-
server_platform (string) - operatintg system platform reported by server
-
server_copyright (string) - copyright notice reported by server
-
server_information (string) - other information reported by server
A session corresponds to an AMQP channel, and is a virtual connection to
an AMQ server. You must at least create one session in order to talk
with an AMQ server. While AMQP offers multiplexing in theory, OpenAMQ no
longer implements this, mainly because there are no proven performance
advantages. So you should create a single session per connection, no more.
NOTE: future versions of WireAPI may merge the session and connection classes.
amq_client_session_new ()
Creates a new session:
amq_client_session_t
*session = NULL; // Current session
session = amq_client_session_new (connection);
if (!session) {
icl_console_print ("E: could not open session to server");
return (1);
}
amq_client_session_destroy ()
Closes an open session, doing a clean shut-down. Applications should
call this method when closing a session. Closing the connection is
a valid way of closing all open sessions for that connection:
amq_client_session_destroy (&session);
amq_client_session_wait ()
Waits for content to arrive from the server. You must call this method
in order to get content. Returns zero if content arrived, or -1 if the
timeout expired:
if (amq_client_session_wait (session, timeout))
if (session->alive)
// timeout expired
else
// session died
else
// zero or more contents arrived
Content Access Methods
For each content class (Basic, File, Stream), WireAPI provides a set of
methods to access arrived and returned content:
amq_client_session_[classname]_[arrived|returned] ()
amq_client_session_[classname]_[arrived|returned]_count ()
The first method returns the oldest content waiting to be processed,
the second methods returns the number of contents waiting. For example:
amq_content_basic_t
*content;
amq_client_session_wait (session);
if (amq_client_session_basic_arrived_count (session)) {
icl_console_print ("I: have messages to process...");
content = amq_client_session_basic_arrived (session);
while (content) {
// process content
content = amq_client_session_basic_arrived (session);
}
}
When processing arrived or returned content the application must not
assume that a single content arrived. It should assume that zero or
more contents arrived or returned, and process each of them, and wait
again if it needs to.
Session Properties
These are the properties of a session object:
-
alive (Boolean) - FALSE when connection has had an error
-
silent (Boolean) - set this TRUE to suppress error reporting
-
error_text (string) - error string reported by the API
-
ticket (integer) - access ticket granted by server
-
queue (string) - queue name assigned by server
-
exchange (string) - exchange name from last method
-
message_count (integer) - number of messages in queue
-
consumer_count (integer) - number of consumers
-
active (Boolean) - session is paused or active
-
reply_text (string) - error string reported by server
-
reply_code (integer) - error value reported by server
-
consumer_tag (integer) - server-assigned consumer tag
-
routing_key (string) - original message routing key
-
scope (string) - queue name scope
-
delivery_tag (integer) - server-assigned delivery tag
-
redelivered (Boolean) - message is being redelivered
Note that all of these except alive, silent, and error_text are the result
of methods sent from the server to the client. For detailed descriptions
of these properties, read the AMQP specifications. All incoming method
arguments are stored as session properties. Thus the "message-count"
argument of an incoming Basic.Browse-Ok method will be stored in the
message_count property.
AMQP uses the term "content" to mean an application message (the term
"message" means different things at the application, protocol, and
internal technical levels, so is confusing).
WireAPI provides an iCL class to lets you create and manipulate basic contents:
amq_content_basic_t
*content;
content = amq_content_basic_new ();
amq_content_basic_set_body (content, "0123456789", 10, NULL);
amq_content_basic_set_message_id (content, "ID001");
amq_content_basic_destroy (&content);
To create a new content, use the 'new' method. To destroy a content,
use the 'destroy' method.
Publishing Content
The amq_client_session_basic_publish method sends a content to a specified
exchange:
rc = amq_client_session_basic_publish (
session, // Session reference
basic_content, // Content reference
0, // Access ticket (unused)
exchange_name, // Exchange name
"", // Routing key
0, // If 1, must be routable
0); // If 1, must be deliverable
Basic Content Properties
All contents have these properties, which you can inspect directly
using content->propertyname (e.g. 'content->routing_key'):
-
body_size (integer) - the body size of the content.
-
exchange (string) - the exchange to which the content was published.
-
routing_key (string) - the original routing_key specified by the publisher.
-
content_type (string) - MIME content type.
-
content_encoding (string) - MIME content encoding.
-
headers (field table) - message header field table.
-
delivery_mode (integer) - non-persistent or persistent.
-
priority (integer) - message priority, 0 to 9.
-
correlation_id (string) - application correlation identifier
-
reply_to (string) - the destination to reply to.
-
expiration(string) - expiration specification.
-
message_id (string) - the application message identifier.
-
timestamp (integer) - message timestamp.
-
type (string) - message type name.
-
user_id (string) - creating user id.
-
app_id (string) - creating application id.
To set any of a basic content's properties, DO NOT modify the property
directly but use the method:
amq_content_basic_set_[propertyname] (content, newvalue)
Content Body Data
To set a content's body, use this method:
amq_content_basic_set_body (content, byte *data, size_t size, free_fn)
Where the free_fn is a function of type 'icl_mem_free_fn *' (compatible
with the standard library free() function). If free_fn is not null, it
is called when the data needs to be destroyed (when the content is
destroyed, or if you call _set_body() again.
To get a content's body, use this method:
amq_content_basic_get_body (content, byte *buffer, size_t limit)
Where the buffer is at least as large as content->body_size. This method
returns the number of bytes copied, or -1 if the buffer was too small.
Advanced Content Manipulation
To work with large contents - which do not fit into memory - you must
use a more complex API to read and write contents. For details of this
please read the amq_content_basic_class.icl and look at the test case,
which demonstrates how to read and write content bodies in frames rather
than as single buffers.
The Exchange Class
Exchanges match and distribute messages across queues. Exchanges can be
configured in the server or created at runtime.
amq_exchange_declare ()
This method creates an exchange if it does not already exist, and if the
exchange exists, verifies that it is of the correct and expected class:
asl_field_list_t
*field_list = NULL;
icl_longstr_t
*arguments_table = NULL;
// Build arguments field as necessary
field_list = asl_field_list_new (NULL);
arguments_table = asl_field_list_flatten (field_list);
rc = amq_client_session_exchange_declare (
session, // Session reference
0, // Access ticket (unused)
exchange_name, // Exchange name
"direct", // Exchange type
0, // If 1, don't actually create
0, // Durable (unused)
0, // If 1, auto-delete when unused
0, // Internal exchange (unused)
arguments_table); // Arguments for declaration
asl_field_list_unlink (&field_list);
icl_longstr_destroy (&arguments_table);
The Exchange.Declare method has the following specific fields:
-
ticket (short) - access ticket granted by server. Not implemented: use zero.
-
exchange (shortstr) - exchange name. The exchange name is a client-selected string that identifies the exchange for publish methods. Exchange names may consist of any mixture of digits, letters, and underscores. Exchange names are scoped by the virtual host.
-
type (shortstr) - exchange type. Each exchange belongs to one of a set of exchange types implemented by the server. The exchange types define the functionality of the exchange - i.e. how messages are routed through it. It is not valid or meaningful to attempt to change the type of an existing exchange. OpenAMQ supports "fanout", "direct", "topic", and "header" exchanges.
-
passive (bit) - do not create exchange. If set, the server will not create the exchange. The client can use this to check whether an exchange exists without modifying the server state.
-
durable (bit) - request a durable exchange. Not implemented: use zero.
-
auto_delete (bit) - auto-delete when unused. If set, the exchange is deleted when all queues have finished using it.
-
internal (bit) - create internal exchange. Not implemented: use zero.
-
arguments (table) - arguments for declaration. A set of arguments for the declaration. The syntax and semantics of these arguments depends on the server implementation. This field is ignored if passive is 1.
Guidelines for implementors:
-
The 'exchange' field MUST match the regular expression /^[a-zA-Z0-9-_.:/]+$/.
-
Exchange names starting with "amq." are reserved for predeclared and standardised exchanges. If the client attempts to create an exchange starting with "amq.", the server MUST raise a channel exception with reply code 403 (access refused).
-
The 'type' field MUST match the regular expression /^[a-zA-Z0-9-_.:/]+$/.
-
If the exchange already exists with a different type, the server MUST raise a connection exception with a reply code 507 (not allowed).
-
If the server does not support the requested exchange type it MUST raise a connection exception with a reply code 503 (command invalid).
-
The server MUST support both durable and transient exchanges.
-
The server MUST ignore the durable field if the exchange already exists.
-
The server SHOULD allow for a reasonable delay between the point when it determines that an exchange is not being used (or no longer used), and the point when it deletes the exchange. At the least it must allow a client to create an exchange and then bind a queue to it, with a small but non-zero delay between these two actions.
-
The server MUST ignore the auto-delete field if the exchange already exists.
amq_exchange_delete ()
This method deletes an exchange. When an exchange is deleted all queue
bindings on the exchange are cancelled.
rc = amq_client_session_exchange_delete (
session, // Session reference
0, // Access ticket (unused)
exchange_name, // Exchange name
0); // Unimplemented, always 0
The Exchange.Delete method has the following specific fields:
-
ticket (short) - access ticket granted by server. Not implemented: use zero.
-
exchange (shortstr) - exchange name. The exchange name is a client-selected string that identifies the exchange for publish methods. Exchange names may consist of any mixture of digits, letters, and underscores. Exchange names are scoped by the virtual host.
-
if_unused (bit) - delete only if unused. If set, the server will only delete the exchange if it has no queue bindings. If the exchange has queue bindings the server does not delete it but raises a channel exception instead. Not implemented: always use zero.
Guidelines for implementors:
-
The 'exchange' field MUST not be empty.
-
The exchange MUST exist. Attempting to delete a non-existing exchange causes a channel exception.
The Queue Class
Queues store and forward messages. Queues can be configured in the server
or created at runtime. Queues must be attached to at least one exchange
in order to receive messages from publishers.
amq_queue_declare ()
This method creates or checks a queue. When creating a new queue the
client can specify various properties that control the durability of the
queue and its contents, and the level of sharing for the queue.
asl_field_list_t
*field_list = NULL;
icl_longstr_t
*arguments_table = NULL;
// Build arguments field as necessary
field_list = asl_field_list_new (NULL);
arguments_table = asl_field_list_flatten (field_list);
rc = amq_client_session_queue_declare (
session, // Session reference
0, // Access ticket (unused)
queue_name, // Queue name, null means auto-assign
0, // If 1, don't actually create
0, // Durable (unused)
0, // If 1, request exclusive queue
0, // If 1, auto-delete when unused
arguments_table); // Arguments for declaration
asl_field_list_unlink (&field_list);
icl_longstr_destroy (&arguments_table);
The Queue.Declare method has the following specific fields:
-
ticket (short) - access ticket granted by server. Not implemented: use zero.
-
queue (shortstr) - queue name. Queue names may consist of any mixture of digits, letters, and underscores. May be specified, or may be empty (NULL). If the queue name is null, the server creates and names a queue and returns this. You can access the last created queue from session->queue; if you want to create many queues, copy the returned name somewhere safe.
-
passive (bit) - do not create queue. If set, the server will not create the queue. The client can use this to check whether a queue exists without modifying the server state.
-
durable (bit) - request a durable queue. Not implemented: use zero.
-
exclusive (bit) - request an exclusive queue. Exclusive queues may only be consumed from by the current connection. Setting the 'exclusive' flag always implies 'auto-delete'.
-
auto_delete (bit) - auto-delete queue when unused. If set, the queue is deleted when all consumers have finished using it. Last consumer can be cancelled either explicitly or because its channel is closed. If there was no consumer ever on the queue, it won't be deleted.
-
arguments (table) - arguments for declaration. A set of arguments for the declaration. The syntax and semantics of these arguments depends on the server implementation. This field is ignored if passive is 1.
Guidelines for implementors:
-
The server MUST create a default binding for a newly-created queue to the default exchange, which is an exchange of type 'direct'.
-
The server SHOULD support a minimum of 256 queues per virtual host and ideally, impose no limit except as defined by available resources.
-
The 'queue' field MUST match the regular expression /^[a-zA-Z0-9-_.:/]*$/.
-
The queue name MAY be empty, in which case the server MUST create a new queue with a unique generated name and return this to the client in the Declare-Ok method.
-
Queue names starting with "amq." are reserved for predeclared and standardised server queues. If the queue name starts with "amq." and the passive option is zero, the server MUST raise a connection exception with reply code 403 (access refused).
-
If set, and the queue does not already exist, the server MUST respond with a reply code 404 (not found) and raise a channel exception.
-
The server MUST support both exclusive (private) and non-exclusive (shared) queues.
-
The server MUST raise a channel exception if 'exclusive' is specified and the queue already exists and is owned by a different connection.
-
The server SHOULD allow for a reasonable delay between the point when it determines that a queue is not being used (or no longer used), and the point when it deletes the queue. At the least it must allow a client to create a queue and then create a consumer to read from it, with a small but non-zero delay between these two actions. The server should equally allow for clients that may be disconnected prematurely, and wish to re-consume from the same queue without losing messages. We would recommend a configurable timeout, with a suitable default value being one minute.
-
The server MUST ignore the auto-delete field if the queue already exists.
amq_queue_bind ()
This method binds a queue to an exchange. Until a queue is bound it will
not receive any messages. In a classic messaging model, store-and-forward
queues are bound to a dest exchange and subscription queues are bound to
a dest_wild exchange.
rc = amq_client_session_queue_bind (
s_session, // Session reference
0, // Access ticket (unused)
queue_name, // Name of queue to bind
exchange_name, // Name of exchange to bind to
routing_key, // Message routing key
arguments_table); // Arguments for bind
The Queue.Bind method has the following specific fields:
-
ticket (short) - access ticket granted by server. Not implemented: use zero.
-
queue (shortstr) - queue name. Specifies the name of the queue to bind. If the queue name is empty (NULL), refers to the current queue for the channel, which is the last declared queue. Queue names may consist of any mixture of digits, letters, and underscores.
-
exchange (shortstr) - exchange name. The exchange name is a client-selected string that identifies the exchange for publish methods. Exchange names may consist of any mixture of digits, letters, and underscores. Exchange names are scoped by the virtual host.
-
routing_key (shortstr) - message routing key. Specifies the routing key for the binding. The routing key is used for routing messages depending on the exchange configuration. Not all exchanges use a routing key - refer to the specific exchange documentation. If the routing key is empty and the queue name is empty, the routing key will be the current queue for the channel, which is the last declared queue.
-
arguments (table) - arguments for binding. A set of arguments for the binding. The syntax and semantics of these arguments depends on the exchange class.
Guidelines for implementors:
-
A server MUST allow ignore duplicate bindings - that is, two or more bind methods for a specific queue, with identical arguments - without treating these as an error.
-
If a bind fails, the server MUST raise a connection exception.
-
The server MUST NOT allow a durable queue to bind to a transient exchange. If the client attempts this the server MUST raise a channel exception.
-
Bindings for durable queues are automatically durable and the server SHOULD restore such bindings after a server restart.
-
If the client attempts to an exchange that was declared as internal, the server MUST raise a connection exception with reply code 530 (not allowed).
-
The server SHOULD support at least 4 bindings per queue, and ideally, impose no limit except as defined by available resources.
-
If the client did not previously declare a queue, and the queue name in this method is empty, the server MUST raise a connection exception with reply code 530 (not allowed).
amq_queue_purge ()
This method removes all messages from a queue. It does not cancel
consumers. Purged messages are deleted without any formal "undo"
mechanism.
rc = amq_client_session_queue_purge (
s_session, // Session reference
0, // Access ticket (unused)
queue_name); // Name of queue to purge
The Queue.Purge method has the following specific fields:
-
ticket (short) - access ticket granted by server. Not implemented: use zero.
-
queue (shortstr) - queue name. Specifies the name of the queue to purge. If the queue name is empty, refers to the current queue for the channel, which is the last declared queue. Queue names may consist of any mixture of digits, letters, and underscores.
Guidelines for implementors:
-
A call to purge MUST result in an empty queue.
-
On transacted channels the server MUST not purge messages that have already been sent to a client but not yet acknowledged.
-
The server MAY implement a purge queue or log that allows system administrators to recover accidentally-purged messages. The server SHOULD NOT keep purged messages in the same storage spaces as the live messages since the volumes of purged messages may get very large.
-
If the client did not previously declare a queue, and the queue name in this method is empty, the server MUST raise a connection exception with reply code 530 (not allowed).
-
The queue must exist. Attempting to purge a non-existing queue causes a channel exception.
amq_queue_delete ()
This method deletes a queue. When a queue is deleted any pending messages
are sent to a dead-letter queue if this is defined in the server
configuration, and all consumers on the queue are cancelled.
rc = amq_client_session_queue_delete (
s_session, // Session reference
0, // Access ticket (unused)
queue_name, // Name of queue to delete
0, // Delete only if unused
0); // Delete only if empty
The Queue.Delete method has the following specific fields:
-
ticket (short) - access ticket granted by server. Not implemented: use zero.
-
queue (shortstr) - queue name. Specifies the name of the queue to delete. If the queue name is empty, refers to the current queue for the channel, which is the last declared queue. Queue names may consist of any mixture of digits, letters, and underscores.
-
if_unused (bit) - delete only if unused. If set, the server will only delete the queue if it has no consumers. If the queue has consumers the server does does not delete it but raises a channel exception instead.
-
if_empty (bit) - delete only if empty. If set, the server will only delete the queue if it has no messages. If the queue is not empty the server raises a channel exception.
Guidelines for implementors:
-
If the client did not previously declare a queue, and the queue name in this method is empty, the server MUST raise a connection exception with reply code 530 (not allowed).
-
The queue must exist. Attempting to delete a non-existing queue causes a channel exception.
-
The server MUST respect the if-unused flag when deleting a queue.
amq_queue_unbind ()
This method unbinds a queue from an exchange.
rc = amq_client_session_queue_unbind (
s_session, // Session reference
0, // Access ticket (unused)
queue_name, // Name of queue
exchange_name, // Name of exchange
routing_key, // Message routing key
arguments_table); // Arguments for unbind
The Queue.Unbind method has the following specific fields:
-
ticket (short) - access ticket granted by server. Not implemented: use zero.
-
queue (shortstr) - queue name. Specifies the name of the queue to unbind. If the queue name is empty, refers to the current queue for the channel, which is the last declared queue. Queue names may consist of any mixture of digits, letters, and underscores.
-
exchange (shortstr) - exchange name. The exchange name is a client-selected string that identifies the exchange for publish methods. Exchange names may consist of any mixture of digits, letters, and underscores. Exchange names are scoped by the virtual host.
-
routing_key (shortstr) - message routing key. Specifies the routing key of the binding to unbind.
-
arguments (table) - arguments for binding. A set of arguments of the binding to unbind.
Guidelines for implementors:
-
If the client did not previously declare a queue, and the queue name in this method is empty, the server MUST raise a connection exception with reply code 530 (not allowed).
-
If the queue does not exist the server MUST raise a channel exception with reply code 404 (not found).
-
The name of the exchange to unbind from.
-
If the exchange does not exist the server MUST raise a channel exception with reply code 404 (not found).