Reported by
mclaughlin77 (1235576646|%O ago)
I wrote a very simple test program that creates a direct exchange and 2 queues that are bound to the exchange with different routing keys. I then publish 10 messages to the exchange for each routing key. Next I sleep for 10 seconds. Then I call amq_client_session_basic_consume for one queue, attempt to consume the messages. Next I call amq_client_session_basic_consume for the second queue and consume the messages. Most of the time it works with no problems, but about a third of the time I get no messages consumed from the first queue and message output from amq_server like this:
2009-02-24 08:38:21: Q: dispatch queue=test_queue nbr_messages=10 nbr_consumers=1
2009-02-24 08:38:21: Q: busy queue=test_queue message=
Any idea what's happening here? I don't know if this is something I am doing wrong or a bug.
I am running 1.3c5 and was using the —debug_queue and —debug_route options to get the above output from the server.
Here's some sample C code I've used to observe this issue.
#include "asl.h"
#include "amq_client_connection.h"
#include "amq_client_session.h"
int main (int argc, char *argv [])
{
int message_count = 0;
amq_client_connection_t *connection = NULL;
amq_client_session_t *session = NULL;
amq_content_basic_t *content = NULL;
amq_content_basic_t *content2 = NULL;
amq_content_basic_t *incoming_content = NULL;
char message_text [1024];
size_t message_size;
icl_longstr_t *auth_data;
icl_system_initialise (argc, argv);
auth_data = amq_client_connection_auth_plain ("guest", "guest");
connection = amq_client_connection_new ("localhost", "/", auth_data, "test", 0, 30000);
icl_longstr_destroy (&auth_data);
session = amq_client_session_new (connection);
content = amq_content_basic_new ();
amq_content_basic_set_body(content, (void*)"0123456789", 10, NULL);
content2 = amq_content_basic_new();
amq_content_basic_set_body(content2, (void*)"0123456789", 10, NULL);
amq_client_session_exchange_declare(session, 0, "test_exchange", "direct", 0, FALSE, TRUE, 0, NULL);
amq_client_session_queue_declare(session, 0, "test_queue1", FALSE, FALSE, TRUE, TRUE, NULL);
amq_client_session_queue_bind(session, 0, "test_queue1", "test_exchange", "rk1", NULL);
amq_client_session_queue_declare(session, 0, "test_queue2", FALSE, FALSE, TRUE, TRUE, NULL);
amq_client_session_queue_bind(session, 0, "test_queue2", "test_exchange", "rk2", NULL);
//Publish 10 messages to each routing key
int i = 0;
for (i=0; i != 10; ++i)
{
amq_client_session_basic_publish(session, content, 0, "test_exchange", "rk1", FALSE, FALSE);
}
amq_content_basic_unlink(&content);
for (i=0; i != 10; ++i)
{
amq_client_session_basic_publish(session, content2, 0, "test_exchange", "rk2", FALSE, FALSE);
}
amq_content_basic_unlink(&content2);
sleep(10);
//Consume messages from first queue
amq_client_session_basic_consume(session, 0,"test_queue1", NULL, FALSE, TRUE, FALSE, NULL);
message_count = 0;
short done = 0;
while (!done)
{
amq_client_session_wait(session, 3000);
incoming_content = amq_content_basic_new();
incoming_content = amq_client_session_basic_arrived(session);
if (incoming_content)
{
memset(message_text, 0, 1024);
message_size = amq_content_basic_get_body (incoming_content, (byte*) message_text, 1024);
amq_content_basic_unlink(&incoming_content);
incoming_content = amq_client_session_basic_arrived(session);
message_count++;
}
else
{
done = 1;
}
amq_content_basic_unlink(&incoming_content);
}
printf("Consumed %d messages from test_queue1\n", message_count);
//Consume messages from second queue
amq_client_session_basic_consume(session, 0,"test_queue2", NULL, FALSE, TRUE, FALSE, NULL);
message_count = 0;
done = 0;
while (!done)
{
amq_client_session_wait(session, 3000);
incoming_content = amq_content_basic_new();
incoming_content = amq_client_session_basic_arrived(session);
if(incoming_content)
{
memset(message_text, 0, 1024);
message_size = amq_content_basic_get_body (incoming_content, (byte*) message_text, 1024);
amq_content_basic_unlink(&incoming_content);
incoming_content = amq_client_session_basic_arrived(session);
message_count++;
}
else
{
done = 1;
}
amq_content_basic_unlink(&incoming_content);
}
printf("Consumed %d messages from test_queue2\n", message_count);
amq_client_session_destroy(&session);
amq_client_connection_destroy(&connection);
icl_system_terminate ();
return (0);
}
Attachments:
No files attached to this page.
Comments
Who's following this issue?
Submitted by
mclaughlin77
Use one of these tags to say what kind of issue it is:
- issue - a fault in the software or the packaging or the documentation.
- change - a change or feature request.
Use one of these tags to say what state the issue is in:
- open - a new, open issue.
- closed - issue has been closed.
- rejected - the issue has been rejected.
Use one of these tags to say how urgent the issue is:
- fatal - the issue is stopping all work.
- urgent - it's urgent.
All open
87 - Zyre returns incomplete XML (26 Apr 2010 08:15) [open]
86 - SFL 'random(num)' macro is wrong in sfl.h (31 Mar 2010 09:23) [open]
85 - Zyre does not start on Solaris (23 Mar 2010 01:29) [open]
84 - OpenAMQ JMS - AMQTopic constructor use HEADER name and class instead of TOPIC (28 Jan 2010 17:04) [open]
83 - WireAPI: How to 'override' signal handlers? (14 Jan 2010 17:33) [open]
82 - Opf Classes Cannot Accept Default Values With Characte (06 Jan 2010 09:34) [open]
81 - AMQP Topic Exhange Routing (29 Dec 2009 00:21) [open]
80 - OpenAMQ reports malformed frame on 0-9-1 queue.unbind (20 Nov 2009 12:33) [open]
79 - AMQ Server crashing if subscribe topic is set as #.# (30 Oct 2009 06:11) [open]
78 - Error while publishing the messages faster (30 Oct 2009 05:57) [open]
77 - Tuning for latency (28 Oct 2009 16:47) [open]
76 - New user forum (28 Oct 2009 11:29) [change open]
74 - Simulaneous connect/disconnect from multiple threads crashes (03 Sep 2009 15:32) [open]
73 - Topic Exchange not sending a message to XXX.* (25 Aug 2009 21:10) [open]
72 - amq_content_basic_new() causes seg fault if not connected to broker (12 Aug 2009 23:50) [open]
71 - zyre bugs (06 Aug 2009 09:33) [open]
69 - OpenAMQ and Zyre (15 Jul 2009 11:27) [open]
68 - Change names of max and min source code macros (10 Jul 2009 16:52) [open]
67 - Server crash when multiple consumers ack on shared queue (26 Jun 2009 11:35) [open]
63 - Broker Crashing (14 May 2009 20:17) [open]
Most recent
88 - amq_console_agent crashes (28 Aug 2010 08:46) [closed]
87 - Zyre returns incomplete XML (26 Apr 2010 08:15) [open]
86 - SFL 'random(num)' macro is wrong in sfl.h (31 Mar 2010 09:23) [open]
85 - Zyre does not start on Solaris (23 Mar 2010 01:29) [open]
84 - OpenAMQ JMS - AMQTopic constructor use HEADER name and class instead of TOPIC (28 Jan 2010 17:04) [open]
83 - WireAPI: How to 'override' signal handlers? (14 Jan 2010 17:33) [open]
82 - Opf Classes Cannot Accept Default Values With Characte (06 Jan 2010 09:34) [open]
81 - AMQP Topic Exhange Routing (29 Dec 2009 00:21) [open]
80 - OpenAMQ reports malformed frame on 0-9-1 queue.unbind (20 Nov 2009 12:33) [open]
79 - AMQ Server crashing if subscribe topic is set as #.# (30 Oct 2009 06:11) [open]







The problem was due to a race condition in amq_server_channel_consume().
You can fix it by patching amq_server_channel_agent.c:2083:
The line 'amq_queue_consume (queue, consumer, self->active, nowait);' must be moved to after the channel credit calculation.
What was happening was that (on fast multicore systems) the amq_queue_consume() call was starting to dispatch messages before the channel was given credit, and so the dispatch failed, with a "busy" message (which normally indicates that the channel has used up all its credit due to an overloaded network).
This patch will be in the next 1.3 and 1.4 releases.
Portfolio
@mclaughlin77:
There is an error in the sample code that I'll point out in case you're using this in real apps:
This will cause a memory leak. The first statement is not needed. The arrived() method automatically creates a new content to return to the caller.
Portfolio
Pieter,
Thanks for the quick fix on this and the tip about the sample code. I appreciate your help.
Kelly