Reported by
mclaughlin77 (1235576646|%O ago)
I wrote a very simple test program that creates a direct exchange and 2 queues that are bound to the exchange with different routing keys. I then publish 10 messages to the exchange for each routing key. Next I sleep for 10 seconds. Then I call amq_client_session_basic_consume for one queue, attempt to consume the messages. Next I call amq_client_session_basic_consume for the second queue and consume the messages. Most of the time it works with no problems, but about a third of the time I get no messages consumed from the first queue and message output from amq_server like this:
2009-02-24 08:38:21: Q: dispatch queue=test_queue nbr_messages=10 nbr_consumers=1
2009-02-24 08:38:21: Q: busy queue=test_queue message=
Any idea what's happening here? I don't know if this is something I am doing wrong or a bug.
I am running 1.3c5 and was using the —debug_queue and —debug_route options to get the above output from the server.
Here's some sample C code I've used to observe this issue.
#include "asl.h"
#include "amq_client_connection.h"
#include "amq_client_session.h"
int main (int argc, char *argv [])
{
int message_count = 0;
amq_client_connection_t *connection = NULL;
amq_client_session_t *session = NULL;
amq_content_basic_t *content = NULL;
amq_content_basic_t *content2 = NULL;
amq_content_basic_t *incoming_content = NULL;
char message_text [1024];
size_t message_size;
icl_longstr_t *auth_data;
icl_system_initialise (argc, argv);
auth_data = amq_client_connection_auth_plain ("guest", "guest");
connection = amq_client_connection_new ("localhost", "/", auth_data, "test", 0, 30000);
icl_longstr_destroy (&auth_data);
session = amq_client_session_new (connection);
content = amq_content_basic_new ();
amq_content_basic_set_body(content, (void*)"0123456789", 10, NULL);
content2 = amq_content_basic_new();
amq_content_basic_set_body(content2, (void*)"0123456789", 10, NULL);
amq_client_session_exchange_declare(session, 0, "test_exchange", "direct", 0, FALSE, TRUE, 0, NULL);
amq_client_session_queue_declare(session, 0, "test_queue1", FALSE, FALSE, TRUE, TRUE, NULL);
amq_client_session_queue_bind(session, 0, "test_queue1", "test_exchange", "rk1", NULL);
amq_client_session_queue_declare(session, 0, "test_queue2", FALSE, FALSE, TRUE, TRUE, NULL);
amq_client_session_queue_bind(session, 0, "test_queue2", "test_exchange", "rk2", NULL);
//Publish 10 messages to each routing key
int i = 0;
for (i=0; i != 10; ++i)
{
amq_client_session_basic_publish(session, content, 0, "test_exchange", "rk1", FALSE, FALSE);
}
amq_content_basic_unlink(&content);
for (i=0; i != 10; ++i)
{
amq_client_session_basic_publish(session, content2, 0, "test_exchange", "rk2", FALSE, FALSE);
}
amq_content_basic_unlink(&content2);
sleep(10);
//Consume messages from first queue
amq_client_session_basic_consume(session, 0,"test_queue1", NULL, FALSE, TRUE, FALSE, NULL);
message_count = 0;
short done = 0;
while (!done)
{
amq_client_session_wait(session, 3000);
incoming_content = amq_content_basic_new();
incoming_content = amq_client_session_basic_arrived(session);
if (incoming_content)
{
memset(message_text, 0, 1024);
message_size = amq_content_basic_get_body (incoming_content, (byte*) message_text, 1024);
amq_content_basic_unlink(&incoming_content);
incoming_content = amq_client_session_basic_arrived(session);
message_count++;
}
else
{
done = 1;
}
amq_content_basic_unlink(&incoming_content);
}
printf("Consumed %d messages from test_queue1\n", message_count);
//Consume messages from second queue
amq_client_session_basic_consume(session, 0,"test_queue2", NULL, FALSE, TRUE, FALSE, NULL);
message_count = 0;
done = 0;
while (!done)
{
amq_client_session_wait(session, 3000);
incoming_content = amq_content_basic_new();
incoming_content = amq_client_session_basic_arrived(session);
if(incoming_content)
{
memset(message_text, 0, 1024);
message_size = amq_content_basic_get_body (incoming_content, (byte*) message_text, 1024);
amq_content_basic_unlink(&incoming_content);
incoming_content = amq_client_session_basic_arrived(session);
message_count++;
}
else
{
done = 1;
}
amq_content_basic_unlink(&incoming_content);
}
printf("Consumed %d messages from test_queue2\n", message_count);
amq_client_session_destroy(&session);
amq_client_connection_destroy(&connection);
icl_system_terminate ();
return (0);
}
Attachments:
No files attached to this page.
Comments
Who's following this issue?
Submitted by
mclaughlin77
Use one of these tags to say what kind of issue it is:
- issue - a fault in the software or the packaging or the documentation.
- change - a change or feature request.
Use one of these tags to say what state the issue is in:
- open - a new, open issue.
- closed - issue has been closed.
- rejected - the issue has been rejected.
Use one of these tags to say how urgent the issue is:
- fatal - the issue is stopping all work.
- urgent - it's urgent.
All open
84 - OpenAMQ JMS - AMQTopic constructor use HEADER name and class instead of TOPIC (1264698241|%B %d, %Y) [open]
83 - WireAPI: How to 'override' signal handlers? (1263490390|%B %d, %Y) [open]
82 - Opf Classes Cannot Accept Default Values With Characte (1262770441|%B %d, %Y) [open]
81 - AMQP Topic Exhange Routing (1262046077|%B %d, %Y) [open]
80 - OpenAMQ reports malformed frame on 0-9-1 queue.unbind (1258720402|%B %d, %Y) [open]
79 - AMQ Server crashing if subscribe topic is set as #.# (1256883113|%B %d, %Y) [open]
78 - Error while publishing the messages faster (1256882266|%B %d, %Y) [open]
77 - Tuning for latency (1256748435|%B %d, %Y) [open]
76 - New user forum (1256729358|%B %d, %Y) [change open]
74 - Simulaneous connect/disconnect from multiple threads crashes (1251991933|%B %d, %Y) [open]
73 - Topic Exchange not sending a message to XXX.* (1251234643|%B %d, %Y) [open]
72 - amq_content_basic_new() causes seg fault if not connected to broker (1250121043|%B %d, %Y) [open]
71 - zyre bugs (1249551228|%B %d, %Y) [open]
69 - OpenAMQ and Zyre (1247657260|%B %d, %Y) [open]
68 - Change names of max and min source code macros (1247244737|%B %d, %Y) [open]
67 - Server crash when multiple consumers ack on shared queue (1246016123|%B %d, %Y) [open]
63 - Broker Crashing (1242332233|%B %d, %Y) [open]
61 - Simple access rights model (1241180394|%B %d, %Y) [open]
60 - Fatal error in icl_mutex_lock (1241084787|%B %d, %Y) [open]
59 - Direct Mode "message too large" error (1240615062|%B %d, %Y) [open]
Most recent
84 - OpenAMQ JMS - AMQTopic constructor use HEADER name and class instead of TOPIC (1264698241|%B %d, %Y) [open]
83 - WireAPI: How to 'override' signal handlers? (1263490390|%B %d, %Y) [open]
82 - Opf Classes Cannot Accept Default Values With Characte (1262770441|%B %d, %Y) [open]
81 - AMQP Topic Exhange Routing (1262046077|%B %d, %Y) [open]
80 - OpenAMQ reports malformed frame on 0-9-1 queue.unbind (1258720402|%B %d, %Y) [open]
79 - AMQ Server crashing if subscribe topic is set as #.# (1256883113|%B %d, %Y) [open]
78 - Error while publishing the messages faster (1256882266|%B %d, %Y) [open]
77 - Tuning for latency (1256748435|%B %d, %Y) [open]
76 - New user forum (1256729358|%B %d, %Y) [change open]
75 - crash while building on ubuntu 9.04 (1253002089|%B %d, %Y) [closed]







The problem was due to a race condition in amq_server_channel_consume().
You can fix it by patching amq_server_channel_agent.c:2083:
The line 'amq_queue_consume (queue, consumer, self->active, nowait);' must be moved to after the channel credit calculation.
What was happening was that (on fast multicore systems) the amq_queue_consume() call was starting to dispatch messages before the channel was given credit, and so the dispatch failed, with a "busy" message (which normally indicates that the channel has used up all its credit due to an overloaded network).
This patch will be in the next 1.3 and 1.4 releases.
Portfolio
@mclaughlin77:
There is an error in the sample code that I'll point out in case you're using this in real apps:
This will cause a memory leak. The first statement is not needed. The arrived() method automatically creates a new content to return to the caller.
Portfolio
Pieter,
Thanks for the quick fix on this and the tip about the sample code. I appreciate your help.
Kelly