52 - Queue busy when trying to consume messages.

Reported by mclaughlin77mclaughlin77 (1235576646|%O ago)

I wrote a very simple test program that creates a direct exchange and 2 queues that are bound to the exchange with different routing keys. I then publish 10 messages to the exchange for each routing key. Next I sleep for 10 seconds. Then I call amq_client_session_basic_consume for one queue, attempt to consume the messages. Next I call amq_client_session_basic_consume for the second queue and consume the messages. Most of the time it works with no problems, but about a third of the time I get no messages consumed from the first queue and message output from amq_server like this:

2009-02-24 08:38:21: Q: dispatch queue=test_queue nbr_messages=10 nbr_consumers=1
2009-02-24 08:38:21: Q: busy queue=test_queue message=

Any idea what's happening here? I don't know if this is something I am doing wrong or a bug.

I am running 1.3c5 and was using the —debug_queue and —debug_route options to get the above output from the server.

Here's some sample C code I've used to observe this issue.

#include "asl.h"
#include "amq_client_connection.h"
#include "amq_client_session.h"

int main (int argc, char *argv [])
{
    int message_count = 0;
    amq_client_connection_t *connection = NULL;
    amq_client_session_t *session = NULL;
    amq_content_basic_t *content = NULL;
    amq_content_basic_t *content2 = NULL;
    amq_content_basic_t *incoming_content = NULL;
    char message_text [1024];
    size_t message_size;
    icl_longstr_t *auth_data;
    icl_system_initialise (argc, argv);
    auth_data  = amq_client_connection_auth_plain ("guest", "guest");
    connection = amq_client_connection_new ("localhost", "/", auth_data, "test", 0, 30000);
    icl_longstr_destroy (&auth_data);

    session = amq_client_session_new (connection);
    content = amq_content_basic_new ();
    amq_content_basic_set_body(content, (void*)"0123456789", 10, NULL);

    content2 = amq_content_basic_new();
    amq_content_basic_set_body(content2, (void*)"0123456789", 10, NULL);

    amq_client_session_exchange_declare(session, 0, "test_exchange", "direct", 0, FALSE, TRUE, 0, NULL);
    amq_client_session_queue_declare(session, 0, "test_queue1", FALSE, FALSE, TRUE, TRUE, NULL);             
    amq_client_session_queue_bind(session, 0, "test_queue1", "test_exchange", "rk1", NULL);
    amq_client_session_queue_declare(session, 0, "test_queue2", FALSE, FALSE, TRUE, TRUE, NULL);             
    amq_client_session_queue_bind(session, 0, "test_queue2", "test_exchange", "rk2", NULL);

    //Publish 10 messages to each routing key
    int i = 0;
    for (i=0; i != 10; ++i)
    {
        amq_client_session_basic_publish(session, content, 0, "test_exchange", "rk1", FALSE, FALSE);
    }
    amq_content_basic_unlink(&content);

    for (i=0; i != 10; ++i)
    {
        amq_client_session_basic_publish(session, content2, 0, "test_exchange", "rk2", FALSE, FALSE);
    }
    amq_content_basic_unlink(&content2);

    sleep(10);

    //Consume messages from first queue
    amq_client_session_basic_consume(session, 0,"test_queue1", NULL, FALSE, TRUE, FALSE, NULL);
    message_count = 0;
    short done = 0;
    while (!done)
    {
        amq_client_session_wait(session, 3000);
        incoming_content = amq_content_basic_new();
        incoming_content = amq_client_session_basic_arrived(session);
        if (incoming_content)
        {
            memset(message_text, 0, 1024);
            message_size = amq_content_basic_get_body (incoming_content, (byte*) message_text, 1024);
            amq_content_basic_unlink(&incoming_content);
            incoming_content = amq_client_session_basic_arrived(session);
            message_count++;
        }
        else
        {
            done = 1;
        }
        amq_content_basic_unlink(&incoming_content);
    }

    printf("Consumed %d messages from test_queue1\n", message_count);

    //Consume messages from second queue
    amq_client_session_basic_consume(session, 0,"test_queue2", NULL, FALSE, TRUE, FALSE, NULL);
    message_count = 0;
    done = 0;
    while (!done)
    {
        amq_client_session_wait(session, 3000);
        incoming_content = amq_content_basic_new();
        incoming_content = amq_client_session_basic_arrived(session);
        if(incoming_content)
        {
            memset(message_text, 0, 1024);
            message_size = amq_content_basic_get_body (incoming_content, (byte*) message_text, 1024);
            amq_content_basic_unlink(&incoming_content);
            incoming_content = amq_client_session_basic_arrived(session);
            message_count++;
        }
        else
        {
            done = 1;
        }
        amq_content_basic_unlink(&incoming_content);
    }

    printf("Consumed %d messages from test_queue2\n", message_count);

    amq_client_session_destroy(&session);
    amq_client_connection_destroy(&connection);
    icl_system_terminate ();

    return (0);
}

Attachments:

No files attached to this page.

Comments

Add a New Comment

Edit | Files | Tags | Print

rating: 0+x

Who's following this issue?

pieterhpieterh
martin_sustrikmartin_sustrik
mclaughlin77mclaughlin77
CybariteCybarite
Watch: site | category | page

Submitted by mclaughlin77mclaughlin77

Use one of these tags to say what kind of issue it is:

  • issue - a fault in the software or the packaging or the documentation.
  • change - a change or feature request.

Use one of these tags to say what state the issue is in:

  • open - a new, open issue.
  • closed - issue has been closed.
  • rejected - the issue has been rejected.

Use one of these tags to say how urgent the issue is:

  • fatal - the issue is stopping all work.
  • urgent - it's urgent.

All open

Most recent

Page tags: closed