Recent Forum Posts
From categories:
page »


I wrote a simple publisher code based on im_sender.c using pthreads which simply sends messsages out. In this code connection request is timing out which otherwise works properly in a single-threaded environment (im_sender.c).

Can anyone suggest what is it that I am doing wrong here ?


im_sender.c - Sends messages to the chatroom
// By iMatix Corporation, April 2008. Code released into the public domain.

Name: im_sender
// Usage: im_sender <broker-addeess> <chatroom> <your-name>
// Example: im_sender "OpenAMQ discussion"
// "Baron Bartholomaeus von Saburg-Fridetzki"
// Sends messages from stdio to the chat room
// To receive messages from the chat room, use im_receiver application

/* Open amq */
#include "base.h"
#include "amq_client_connection.h"
#include "amq_client_session.h"

#include <pthread.h>
#include <stdio.h>
#include <string.h>

* Publisher stuff.
char *publisher = "mt_sender";
char open_am_host[] = "";
char routing_key[] = "ava-80";
amq_client_connection_t *amq_connection;
amq_client_session_t *amq_session;

char *amq_messages[] =
"Hello World\n",
"I am a publisher\n",
"I can send messages to interesed listeners\n",
"Let me know what you are interested in\n",
"Thank you and happy holidays\n",

amq_client_session_t *
amq_client_session_t *session;
amq_client_connection_t *connection;
icl_longstr_t *auth_data;

// Initialize system
icl_system_initialise(1, &publisher);

// Open a connection
auth_data = amq_client_connection_auth_plain ("guest", "guest");
connection = amq_client_connection_new (
open_am_host, "/", auth_data, publisher, 0, 30000);
amq_connection = connection;
icl_longstr_destroy (&auth_data);
if (connection) {
icl_console_print("I: connected to %s/%s - %s - %s",
} else {
icl_console_print("E: could not connect to server");

// Open a channel
session = amq_client_session_new (connection);
assert (session);

return (session);

// Close the channel
amq_client_session_destroy (&amq_session);
// Close the connection
amq_client_connection_destroy (&amq_connection);
// Uninitialise system
icl_system_terminate ();

send_message(amq_client_session_t *sess, char *message)
char *msg = malloc(strlen(publisher) + 2 + strlen(message) + 1);
amq_content_basic_t *content;

sprintf(msg, "%s: %s\n", publisher, message);
content = amq_content_basic_new();
amq_content_basic_set_body(content, msg, strlen(msg), free);

routing_key, FALSE, FALSE);

return (1);

* Thread stuff.
int cur_msg;

pthread_mutex_t th_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t th_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t tm_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t tm_cond = PTHREAD_COND_INITIALIZER;

void *
msg_thread(void *unused)
char *msg;
int msg_num = 10;

printf("%s:Opening an amq connection\n", func);
amq_session = create_ampq_conn_session();
while (1) {
pthread_cond_wait(&tm_cond, &tm_mutex);

printf("%s: Picking up msg: %d\n", func, cur_msg);
msg = amq_messages[cur_msg];
if (msg) {
send_message(amq_session, msg);
} else {
return NULL;

int sent_all_msgs;
void *
send_thread(void *unused)
cur_msg = 0;
while (amq_messages[cur_msg]) {

printf("%s: Send message: %d\n", func, cur_msg);
pthread_cond_wait(&tm_cond, &tm_mutex);
sent_all_msgs = 1;
printf("%s: exiting\n", func);
return (NULL);

#define NTHREADS 2
pthread_t thread_id[NTHREADS];
int rv;
int tid;

if ((rv = pthread_create(&thread_id[0], NULL, &msg_thread, NULL))) {
return rv;
if ((rv = pthread_create(&thread_id[1], NULL, &send_thread, NULL))) {
return rv;
sleep (5);

rv = pthread_mutex_lock(&tm_mutex);
if (rv) {
return rv;
rv = pthread_cond_broadcast(&tm_cond);
if (rv) {
return rv;
rv = pthread_mutex_unlock(&tm_mutex);
if (rv) {
return rv;

return rv;

int main (int argc, char *argv [])
int i;
if (app_init()) {
perror("Received error\n");

while (!sent_all_msgs);
for (i = 0; i < NTHREADS; ++i) {
pthread_join(thread_id[i], NULL);
return 0;


-bash-3.1# ./mt_sender2
msg_thread:Opening an amq connection
send_thread: Send message: 0
send_thread: Send message: 1
15:23:05: E: could not connect to server

multi-threaded publisher client by SrinikSrinik, 16 Nov 2012 23:30

Check the 'c' script used for compilation, it has explanations at the start of how to do this.

Re: linker flags by pieterhpieterh, 19 Oct 2012 09:46

Is there a way to link extra libraries to the app compiled with openamq WireAPI library ? Couldn't find a reference to it. Thanks a lot for you help.

-Wall -Werror -I. -I/sources/sto/apps/evpubd/lib/ I/sources/apps/include -I/sources/common/include -I/sources/appsibase/include -l test_sender.c
Compiling test_sender…
Linking test_sender…
test_sender.o: In function ‘get_status_by_index’:
/sources/sto/apps/evpubd/ev_publisher.c:1819: undefined reference to ‘get_send_ctx’
test_sender.o: In function ‘send_it’:
test_sender.c:2515: undefined reference to ‘get_send_ctx’
collect2: ld returned 1 exit status
make: *** [test_sender] Error 1

linker flags by SrinikSrinik, 17 Oct 2012 19:07

The Session properties are listed as

reply_code (integer) - error value reported by server
consumer_tag (integer) - server-assigned consumer tag
routing_key (string) - original message routing key
scope (string) - queue name scope
delivery_tag (integer) - server-assigned delivery tag
redelivered (Boolean) - message is being redelivered

I found while compiling on RedHat Linux, in a printf that
Property consumer_tag required a char*, not an integer (%s, not %d).
Property scope didn't exist. But queue does.

Keven Miller

(account deleted) 28 Jan 2011 15:09
in discussion Hidden / Per page discussions » Chatroom example

I usually dont take time to ever make comments on a web site but I have to say I would truly be doing you a grave disservice if I didnt write something. This post has most definitely opened my eyes. Thank you so much for writing it. autocouponcash

by (account deleted), 28 Jan 2011 15:09

I've been looking over the source code and thinking that it might make sense to create a lightweight type server/federation configuration that can be used as a peer to peer service and/or as a Message Cache.

This would make it far more Internet friendly. It would be easier to secure, and to deploy. By deploying many lightweight caching servers across the Internet, messages would potentially have multiple routes to get where they're going, thus mitigating the problem of failure in any one server.

A caching AMQP server? by HighjinksHighjinks, 19 Jun 2010 19:54

The document above says the following:

When you use a synchronous method (e.g. Basic.Consume), WireAPI waits for the server to respond with a synchronous reply, and it processes this reply.

I think, you meant Basic.Get is synchronous, and not Basic.Consume.


The doc for apr_vnsprintf says, "It will return the number of characters inserted into the buffer, not including the terminating NUL."

Where does the code call snprintf_flush?

Re: Analysis by pieterhpieterh, 26 Apr 2010 12:54

icl_longstr_cat_v expects apr_vsnprintf to return length of the resulting string, but it returns -1, because snprintf_flush says so. i'm really perplexed on how this could work…

Analysis by merunkamerunka, 26 Apr 2010 12:08

Unfortunatelly, I was able to reproduce the bug on linux.

Reproduced on linux by merunkamerunka, 26 Apr 2010 08:21

Thanks for reporting this.

by pieterhpieterh, 23 Mar 2010 07:18

replace amq_content_basic_destroy() to amq_content_basic_unlink() in "client.c".

The examples all run as described when running them on a single machine, using localhost as the host address. However, when I run the broker and servers on one machine and the client on a second machine on the same subnet, the client connects to the broker but an error occurs when attempting to send across the request.

I start the applications as follows:

Machine #1

./amq_server —listen
./server_time timeserver

Machine #2

./client client

When I type 'cur_time', the client quits with

11:18:53: E: buffer size is zero in smt_socket_request_write_bucket
11:18:53: E: SMT error: Success

What am I doing wrong?

by pieterhpieterh, 06 Jan 2010 09:54

The implementation of exchanges was improved in June 2008 to do internal queueing, replay, and automatic reconnection.

I'm confused; This web page says

We assume that the connection between partitions is perhaps slow, but reliable. That is, we do not attempt to queue and forward messages in case of a network failure - messages will simply be dropped. Applications that require reliable message transfer must implement end-to-end reliability.

Then the comments Pieter says

communication between exchanges is robust against dropped VPN connections; it will queue messages and forward then when the link comes back up.

This seems directly contradictory. Which is it?

Federated message transfer reliable or not? by Jeff Laughlin (guest), 13 Nov 2009 23:26
Johnathan (guest) 11 Nov 2009 18:49
in discussion Hidden / Per page discussions » Advanced use of OpenAMQ

Pieter, thanks again for your prompt answer. I'll certainly set things up and try out the federation of two exchanges over the Internet - I have to connect several apps in different data centers, if it works well, it would be ideal solution for me.


by Johnathan (guest), 11 Nov 2009 18:49

Jonathan, communication between exchanges is robust against dropped VPN connections; it will queue messages and forward then when the link comes back up.

However I'd still advise end-to-end reliability if you are doing request-response patterns across federation, e.g. to access remote services and get answers back. That's simply because a failure in a broker anywhere in the chain can cause messages to be lost.

by pieterhpieterh, 08 Nov 2009 12:35
Johnathan (guest) 07 Nov 2009 19:59
in discussion Hidden / Per page discussions » Advanced use of OpenAMQ

Sorry, slow thinker, took me a while to digest your answer - have a clarification question:

From your answer I'm not sure whether it means that communication between exchanges already provides reliability and it's only a matter of securing the communication with VPN? Would exchanges between themselves take care of retransmissions if VPN connection breaks? Or is there no inherent possibility to federate exchanges reliably over the Internet?


by Johnathan (guest), 07 Nov 2009 19:59
Johnathan (guest) 07 Nov 2009 19:51
in discussion Hidden / Per page discussions » Advanced use of OpenAMQ

Thanks Pieter,

much appreciate your quick answer.


by Johnathan (guest), 07 Nov 2009 19:51
page »