52 - Queue busy when trying to consume messages.

Reported by mclaughlin77mclaughlin77 (1235576646|%O ago)

I wrote a very simple test program that creates a direct exchange and 2 queues that are bound to the exchange with different routing keys. I then publish 10 messages to the exchange for each routing key. Next I sleep for 10 seconds. Then I call amq_client_session_basic_consume for one queue, attempt to consume the messages. Next I call amq_client_session_basic_consume for the second queue and consume the messages. Most of the time it works with no problems, but about a third of the time I get no messages consumed from the first queue and message output from amq_server like this:

2009-02-24 08:38:21: Q: dispatch queue=test_queue nbr_messages=10 nbr_consumers=1
2009-02-24 08:38:21: Q: busy queue=test_queue message=

Any idea what's happening here? I don't know if this is something I am doing wrong or a bug.

I am running 1.3c5 and was using the —debug_queue and —debug_route options to get the above output from the server.

Here's some sample C code I've used to observe this issue.

#include "asl.h"
#include "amq_client_connection.h"
#include "amq_client_session.h"

int main (int argc, char *argv [])
{
    int message_count = 0;
    amq_client_connection_t *connection = NULL;
    amq_client_session_t *session = NULL;
    amq_content_basic_t *content = NULL;
    amq_content_basic_t *content2 = NULL;
    amq_content_basic_t *incoming_content = NULL;
    char message_text [1024];
    size_t message_size;
    icl_longstr_t *auth_data;
    icl_system_initialise (argc, argv);
    auth_data  = amq_client_connection_auth_plain ("guest", "guest");
    connection = amq_client_connection_new ("localhost", "/", auth_data, "test", 0, 30000);
    icl_longstr_destroy (&auth_data);

    session = amq_client_session_new (connection);
    content = amq_content_basic_new ();
    amq_content_basic_set_body(content, (void*)"0123456789", 10, NULL);

    content2 = amq_content_basic_new();
    amq_content_basic_set_body(content2, (void*)"0123456789", 10, NULL);

    amq_client_session_exchange_declare(session, 0, "test_exchange", "direct", 0, FALSE, TRUE, 0, NULL);
    amq_client_session_queue_declare(session, 0, "test_queue1", FALSE, FALSE, TRUE, TRUE, NULL);             
    amq_client_session_queue_bind(session, 0, "test_queue1", "test_exchange", "rk1", NULL);
    amq_client_session_queue_declare(session, 0, "test_queue2", FALSE, FALSE, TRUE, TRUE, NULL);             
    amq_client_session_queue_bind(session, 0, "test_queue2", "test_exchange", "rk2", NULL);

    //Publish 10 messages to each routing key
    int i = 0;
    for (i=0; i != 10; ++i)
    {
        amq_client_session_basic_publish(session, content, 0, "test_exchange", "rk1", FALSE, FALSE);
    }
    amq_content_basic_unlink(&content);

    for (i=0; i != 10; ++i)
    {
        amq_client_session_basic_publish(session, content2, 0, "test_exchange", "rk2", FALSE, FALSE);
    }
    amq_content_basic_unlink(&content2);

    sleep(10);

    //Consume messages from first queue
    amq_client_session_basic_consume(session, 0,"test_queue1", NULL, FALSE, TRUE, FALSE, NULL);
    message_count = 0;
    short done = 0;
    while (!done)
    {
        amq_client_session_wait(session, 3000);
        incoming_content = amq_content_basic_new();
        incoming_content = amq_client_session_basic_arrived(session);
        if (incoming_content)
        {
            memset(message_text, 0, 1024);
            message_size = amq_content_basic_get_body (incoming_content, (byte*) message_text, 1024);
            amq_content_basic_unlink(&incoming_content);
            incoming_content = amq_client_session_basic_arrived(session);
            message_count++;
        }
        else
        {
            done = 1;
        }
        amq_content_basic_unlink(&incoming_content);
    }

    printf("Consumed %d messages from test_queue1\n", message_count);

    //Consume messages from second queue
    amq_client_session_basic_consume(session, 0,"test_queue2", NULL, FALSE, TRUE, FALSE, NULL);
    message_count = 0;
    done = 0;
    while (!done)
    {
        amq_client_session_wait(session, 3000);
        incoming_content = amq_content_basic_new();
        incoming_content = amq_client_session_basic_arrived(session);
        if(incoming_content)
        {
            memset(message_text, 0, 1024);
            message_size = amq_content_basic_get_body (incoming_content, (byte*) message_text, 1024);
            amq_content_basic_unlink(&incoming_content);
            incoming_content = amq_client_session_basic_arrived(session);
            message_count++;
        }
        else
        {
            done = 1;
        }
        amq_content_basic_unlink(&incoming_content);
    }

    printf("Consumed %d messages from test_queue2\n", message_count);

    amq_client_session_destroy(&session);
    amq_client_connection_destroy(&connection);
    icl_system_terminate ();

    return (0);
}

Attachments:

No files attached to this page.

Comments

Add a New Comment

Edit | Files | Tags | Print

rating: 0+x

Who's following this issue?

pieterhpieterh
martin_sustrikmartin_sustrik
mclaughlin77mclaughlin77
CybariteCybarite
Watch: site | category | page

Submitted by mclaughlin77mclaughlin77

Use one of these tags to say what kind of issue it is:

  • issue - a fault in the software or the packaging or the documentation.
  • change - a change or feature request.

Use one of these tags to say what state the issue is in:

  • open - a new, open issue.
  • closed - issue has been closed.
  • rejected - the issue has been rejected.

Use one of these tags to say how urgent the issue is:

  • fatal - the issue is stopping all work.
  • urgent - it's urgent.

All open

89 - multi-threaded client connection failure (17 Nov 2012 16:28) [open]
87 - Zyre returns incomplete XML (26 Apr 2010 08:15) [open]
86 - SFL 'random(num)' macro is wrong in sfl.h (31 Mar 2010 09:23) [open]
85 - Zyre does not start on Solaris (23 Mar 2010 01:29) [open]
84 - OpenAMQ JMS - AMQTopic constructor use HEADER name and class instead of TOPIC (28 Jan 2010 17:04) [open]
83 - WireAPI: How to 'override' signal handlers? (14 Jan 2010 17:33) [open]
82 - Opf Classes Cannot Accept Default Values With Characte (06 Jan 2010 09:34) [open]
81 - AMQP Topic Exhange Routing (29 Dec 2009 00:21) [open]
80 - OpenAMQ reports malformed frame on 0-9-1 queue.unbind (20 Nov 2009 12:33) [open]
79 - AMQ Server crashing if subscribe topic is set as #.# (30 Oct 2009 06:11) [open]
78 - Error while publishing the messages faster (30 Oct 2009 05:57) [open]
77 - Tuning for latency (28 Oct 2009 16:47) [open]
76 - New user forum (28 Oct 2009 11:29) [change open]
74 - Simulaneous connect/disconnect from multiple threads crashes (03 Sep 2009 15:32) [open]
73 - Topic Exchange not sending a message to XXX.* (25 Aug 2009 21:10) [open]
72 - amq_content_basic_new() causes seg fault if not connected to broker (12 Aug 2009 23:50) [open]
71 - zyre bugs (06 Aug 2009 09:33) [open]
69 - OpenAMQ and Zyre (15 Jul 2009 11:27) [open]
68 - Change names of max and min source code macros (10 Jul 2009 16:52) [open]
67 - Server crash when multiple consumers ack on shared queue (26 Jun 2009 11:35) [open]

page 1 of 212next »

Most recent