52 - Queue busy when trying to consume messages.

Reported by mclaughlin77mclaughlin77 (1235576646|%O ago)

I wrote a very simple test program that creates a direct exchange and 2 queues that are bound to the exchange with different routing keys. I then publish 10 messages to the exchange for each routing key. Next I sleep for 10 seconds. Then I call amq_client_session_basic_consume for one queue, attempt to consume the messages. Next I call amq_client_session_basic_consume for the second queue and consume the messages. Most of the time it works with no problems, but about a third of the time I get no messages consumed from the first queue and message output from amq_server like this:

2009-02-24 08:38:21: Q: dispatch queue=test_queue nbr_messages=10 nbr_consumers=1
2009-02-24 08:38:21: Q: busy queue=test_queue message=

Any idea what's happening here? I don't know if this is something I am doing wrong or a bug.

I am running 1.3c5 and was using the —debug_queue and —debug_route options to get the above output from the server.

Here's some sample C code I've used to observe this issue.

#include "asl.h"
#include "amq_client_connection.h"
#include "amq_client_session.h"

int main (int argc, char *argv [])
{
    int message_count = 0;
    amq_client_connection_t *connection = NULL;
    amq_client_session_t *session = NULL;
    amq_content_basic_t *content = NULL;
    amq_content_basic_t *content2 = NULL;
    amq_content_basic_t *incoming_content = NULL;
    char message_text [1024];
    size_t message_size;
    icl_longstr_t *auth_data;
    icl_system_initialise (argc, argv);
    auth_data  = amq_client_connection_auth_plain ("guest", "guest");
    connection = amq_client_connection_new ("localhost", "/", auth_data, "test", 0, 30000);
    icl_longstr_destroy (&auth_data);

    session = amq_client_session_new (connection);
    content = amq_content_basic_new ();
    amq_content_basic_set_body(content, (void*)"0123456789", 10, NULL);

    content2 = amq_content_basic_new();
    amq_content_basic_set_body(content2, (void*)"0123456789", 10, NULL);

    amq_client_session_exchange_declare(session, 0, "test_exchange", "direct", 0, FALSE, TRUE, 0, NULL);
    amq_client_session_queue_declare(session, 0, "test_queue1", FALSE, FALSE, TRUE, TRUE, NULL);             
    amq_client_session_queue_bind(session, 0, "test_queue1", "test_exchange", "rk1", NULL);
    amq_client_session_queue_declare(session, 0, "test_queue2", FALSE, FALSE, TRUE, TRUE, NULL);             
    amq_client_session_queue_bind(session, 0, "test_queue2", "test_exchange", "rk2", NULL);

    //Publish 10 messages to each routing key
    int i = 0;
    for (i=0; i != 10; ++i)
    {
        amq_client_session_basic_publish(session, content, 0, "test_exchange", "rk1", FALSE, FALSE);
    }
    amq_content_basic_unlink(&content);

    for (i=0; i != 10; ++i)
    {
        amq_client_session_basic_publish(session, content2, 0, "test_exchange", "rk2", FALSE, FALSE);
    }
    amq_content_basic_unlink(&content2);

    sleep(10);

    //Consume messages from first queue
    amq_client_session_basic_consume(session, 0,"test_queue1", NULL, FALSE, TRUE, FALSE, NULL);
    message_count = 0;
    short done = 0;
    while (!done)
    {
        amq_client_session_wait(session, 3000);
        incoming_content = amq_content_basic_new();
        incoming_content = amq_client_session_basic_arrived(session);
        if (incoming_content)
        {
            memset(message_text, 0, 1024);
            message_size = amq_content_basic_get_body (incoming_content, (byte*) message_text, 1024);
            amq_content_basic_unlink(&incoming_content);
            incoming_content = amq_client_session_basic_arrived(session);
            message_count++;
        }
        else
        {
            done = 1;
        }
        amq_content_basic_unlink(&incoming_content);
    }

    printf("Consumed %d messages from test_queue1\n", message_count);

    //Consume messages from second queue
    amq_client_session_basic_consume(session, 0,"test_queue2", NULL, FALSE, TRUE, FALSE, NULL);
    message_count = 0;
    done = 0;
    while (!done)
    {
        amq_client_session_wait(session, 3000);
        incoming_content = amq_content_basic_new();
        incoming_content = amq_client_session_basic_arrived(session);
        if(incoming_content)
        {
            memset(message_text, 0, 1024);
            message_size = amq_content_basic_get_body (incoming_content, (byte*) message_text, 1024);
            amq_content_basic_unlink(&incoming_content);
            incoming_content = amq_client_session_basic_arrived(session);
            message_count++;
        }
        else
        {
            done = 1;
        }
        amq_content_basic_unlink(&incoming_content);
    }

    printf("Consumed %d messages from test_queue2\n", message_count);

    amq_client_session_destroy(&session);
    amq_client_connection_destroy(&connection);
    icl_system_terminate ();

    return (0);
}

Attachments:

No files attached to this page.

Manage attachments

Comments

Fixed
pieterhpieterh 1235743323|%e %b %Y, %H:%M %Z|agohover

The problem was due to a race condition in amq_server_channel_consume().

You can fix it by patching amq_server_channel_agent.c:2083:

...
if (self->credit == 0)
    self->credit = 1;
amq_queue_consume (queue, consumer, self->active, nowait);
...

The line 'amq_queue_consume (queue, consumer, self->active, nowait);' must be moved to after the channel credit calculation.

What was happening was that (on fast multicore systems) the amq_queue_consume() call was starting to dispatch messages before the channel was given credit, and so the dispatch failed, with a "busy" message (which normally indicates that the channel has used up all its credit due to an overloaded network).

This patch will be in the next 1.3 and 1.4 releases.

Reply  |  Options
Unfold Fixed by pieterhpieterh, 1235743323|%e %b %Y, %H:%M %Z|agohover
Errors in sample code
pieterhpieterh 1235743526|%e %b %Y, %H:%M %Z|agohover

@mclaughlin77:

There is an error in the sample code that I'll point out in case you're using this in real apps:

incoming_content = amq_content_basic_new();
        incoming_content = amq_client_session_basic_arrived(session);

This will cause a memory leak. The first statement is not needed. The arrived() method automatically creates a new content to return to the caller.

Reply  |  Options
Unfold Errors in sample code by pieterhpieterh, 1235743526|%e %b %Y, %H:%M %Z|agohover
Re: Errors in sample code
mclaughlin77mclaughlin77 1235746516|%e %b %Y, %H:%M %Z|agohover

Pieter,

Thanks for the quick fix on this and the tip about the sample code. I appreciate your help.

Kelly

Reply  |  Options
Unfold Re: Errors in sample code by mclaughlin77mclaughlin77, 1235746516|%e %b %Y, %H:%M %Z|agohover
Add a New Comment

Edit | Files | Tags | Print

rating: 0+x

Who's following this issue?

pieterhpieterh
martin_sustrikmartin_sustrik
mclaughlin77mclaughlin77
philmccrackenphilmccracken
Watch: site | category | page

Submitted by mclaughlin77mclaughlin77

Use one of these tags to say what kind of issue it is:

  • issue - a fault in the software or the packaging or the documentation.
  • change - a change or feature request.

Use one of these tags to say what state the issue is in:

  • open - a new, open issue.
  • closed - issue has been closed.
  • rejected - the issue has been rejected.

Use one of these tags to say how urgent the issue is:

  • fatal - the issue is stopping all work.
  • urgent - it's urgent.

All open

88 - amq_console_agent crashes (1282985215|%B %d, %Y) [open]
87 - Zyre returns incomplete XML (1272269702|%B %d, %Y) [open]
86 - SFL 'random(num)' macro is wrong in sfl.h (1270027409|%B %d, %Y) [open]
85 - Zyre does not start on Solaris (1269307758|%B %d, %Y) [open]
84 - OpenAMQ JMS - AMQTopic constructor use HEADER name and class instead of TOPIC (1264698241|%B %d, %Y) [open]
83 - WireAPI: How to 'override' signal handlers? (1263490390|%B %d, %Y) [open]
82 - Opf Classes Cannot Accept Default Values With Characte (1262770441|%B %d, %Y) [open]
81 - AMQP Topic Exhange Routing (1262046077|%B %d, %Y) [open]
80 - OpenAMQ reports malformed frame on 0-9-1 queue.unbind (1258720402|%B %d, %Y) [open]
79 - AMQ Server crashing if subscribe topic is set as #.# (1256883113|%B %d, %Y) [open]
78 - Error while publishing the messages faster (1256882266|%B %d, %Y) [open]
77 - Tuning for latency (1256748435|%B %d, %Y) [open]
76 - New user forum (1256729358|%B %d, %Y) [change open]
74 - Simulaneous connect/disconnect from multiple threads crashes (1251991933|%B %d, %Y) [open]
73 - Topic Exchange not sending a message to XXX.* (1251234643|%B %d, %Y) [open]
72 - amq_content_basic_new() causes seg fault if not connected to broker (1250121043|%B %d, %Y) [open]
71 - zyre bugs (1249551228|%B %d, %Y) [open]
69 - OpenAMQ and Zyre (1247657260|%B %d, %Y) [open]
68 - Change names of max and min source code macros (1247244737|%B %d, %Y) [open]
67 - Server crash when multiple consumers ack on shared queue (1246016123|%B %d, %Y) [open]

page 1 of 212next »

Most recent

Page tags: closed