gpt4 book ai didi

oracle - dbms_aq.dequeue_array,第一条消息返回两次

转载 作者:行者123 更新时间:2023-12-04 21:10:54 44 4
gpt4 key购买 nike

介绍

使用 Oracle 高级排队方法时,我的 Oracle SQL Server(确切地说是:Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production)遇到了一个非常奇怪的行为。

问题

错误是我将 X 消息入队,但 dequeue_array 返回 X+1 消息,第一条消息被复制(如 MessageId 所示)。

复制:

我能够编写一些简单的 PoC 来重现错误。这段代码非常简单,入队/出队是标准的 Oracle AQ。代码两次执行以下步骤(测试运行):

  • 清除队列表
  • 将 X 条消息加入队列
  • 使用 dbms_aq.dequeue_array 调用使所有消息出列
  • 查看有多少消息出队

  • 在新连接上运行 POC 时,第一次运行成功且没有错误,但随后的每次运行都会失败。之后,当使用相同的连接时,每次执行脚本时,它都会在两次测试运行中失败。

    到目前为止我尝试过的:
  • 在"new"连接上运行此脚本:只有第一次运行失败
  • 在同一连接上额外执行脚本:所有运行都失败
  • 使用/创建新队列时:只有第一次运行失败
  • 当使用“从 删除”而不是 dbms_aqadm.purge_queue_table() 时:一切正常
  • 使用“正常”一一出队时:一切都是 很好

  • 结论:

    我既无法解释这种行为,也无法在我的代码中找到错误。请看一下,它应该可以在您最喜欢的 sql 客户端中直接执行(用 PL/SQL Developer 测试)。

    如果您需要任何进一步的信息或在使 PoC 工作时遇到问题,请询问,我会定期查看此线程。我试图使 PoC 尽可能具有可读性,包括关于正在发生的事情的详细输出。

    代码:
    declare
    C_QueueName constant varchar2(32767) := 'TEST_QUEUE';
    C_QueueTable constant varchar2(32767) := 'TEST_Q_TABLE';
    C_MsgCount constant pls_integer := 1;
    C_TestRuns constant pls_integer := 2;
    C_DequeueArraySize constant pls_integer := 10;

    /*
    * Create the queue and the queue table used for theses tests
    */
    procedure CreateQueueIfMissing is
    L_Present pls_integer;
    begin
    dbms_output.put_line('START CreateQueueIfMissing');

    execute immediate 'select count(*) from USER_OBJECTS where OBJECT_NAME = ''' || C_QueueName || ''' and OBJECT_TYPE = ''QUEUE''' into L_Present;
    if L_Present = 1 then
    dbms_output.put_line('Skipping queue creation, already present.');
    dbms_output.put_line('END CreateQueueIfMissing');
    return;
    end if;

    dbms_output.put_line(' Creating queue table ' || C_QueueTable);
    DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => C_QueueTable
    ,storage_clause => 'LOGGING NOCACHE NOPARALLEL MONITORING'
    ,sort_list => 'priority,enq_time'
    ,multiple_consumers => false
    ,queue_payload_type => 'SYS.AQ$_JMS_BYTES_MESSAGE'
    ,comment => 'Queue for messages');

    dbms_output.put_line(' Creating queue ' || C_QueueName);
    DBMS_AQADM.CREATE_QUEUE(queue_name => C_QueueName
    ,queue_table => C_QueueTable
    ,max_retries => 8640
    ,retry_delay => 30
    ,comment => 'Queue for messages');

    dbms_output.put_line(' Starting queue ' || C_QueueName);
    DBMS_AQADM.START_QUEUE(queue_name => C_QueueName);
    dbms_output.put_line('END CreateQueueIfMissing');
    end CreateQueueIfMissing;
    -- ================================================================================================


    /*
    * This procedure is the root of all evil.
    * The error only occurs when using the purge_queue_tables procedure.
    * When using a normal "delete from <queue_table>" then everything is just fine.
    */
    procedure CleanQueueTable is
    L_PurgeOptions dbms_aqadm.aq$_purge_options_t;
    L_Count pls_integer;
    begin
    dbms_output.put_line('START CleanQueueTable');

    execute immediate 'select count(*) from ' || C_QueueTable into L_Count;
    dbms_output.put_line(' Messages in queue table BEFORE purge: ' || L_Count);

    dbms_aqadm.purge_queue_table(queue_table => C_QueueTable
    ,purge_condition => null
    ,purge_options => L_PurgeOptions);

    execute immediate 'select count(*) from ' || C_QueueTable into L_Count;
    dbms_output.put_line(' Messages in queue table AFTER purge: ' || L_Count);

    dbms_output.put_line('END CleanQueueTable');
    end CleanQueueTable;
    -- ================================================================================================


    /*
    * Enqueue the configured count of messages on the queue
    */
    procedure EnqueueMessages is
    L_BodyId pls_integer;
    L_Msg sys.aq$_jms_bytes_message;
    L_MsgId raw(16);
    L_Count pls_integer;

    L_EnqueueOptions DBMS_AQ.ENQUEUE_OPTIONS_T;
    L_MessageProperties DBMS_AQ.MESSAGE_PROPERTIES_T;
    begin
    dbms_output.put_line('START EnqueueMessages');

    execute immediate 'select count(*) from ' || C_QueueTable into L_Count;
    dbms_output.put_line(' Messages in queue table BEFORE enqueue: ' || L_Count);

    for i in 1 .. C_MsgCount
    loop
    dbms_output.put_line(' Construct #' || i);
    L_Msg := sys.aq$_jms_bytes_message.construct;

    -- set the JMS header
    L_Msg.set_type('JmsBytesMessage');
    L_Msg.set_userid(1);
    L_Msg.set_appid('test');
    L_Msg.set_groupid('cs');
    L_Msg.set_groupseq(1);

    -- set JMS message content
    L_BodyId := L_Msg.clear_body(-1);
    L_Msg.write_bytes(L_BodyId, to_blob(utl_raw.cast_to_raw('<test>Lorem Ipsum</test>')));
    L_Msg.flush(L_BodyId);
    L_Msg.clean(L_BodyId);

    dbms_output.put_line(' Enqueue #' || i);
    DBMS_AQ.ENQUEUE (queue_name => C_QueueName
    ,enqueue_options => L_EnqueueOptions
    ,message_properties => L_MessageProperties
    ,payload => L_Msg
    ,msgid => L_MsgId);
    end loop;

    execute immediate 'select count(*) from ' || C_QueueTable into L_Count;
    dbms_output.put_line(' Messages in queue table AFTER enqueue: ' || L_Count);
    dbms_output.put_line('END EnqueueMessages');
    end EnqueueMessages;
    -- ================================================================================================


    /*
    * Dequeues messages using dequeue_array from the configured queue.
    */
    procedure DequeueMessages is
    L_DequeueOptions dbms_aq.dequeue_options_t;
    L_MsgPropArr dbms_aq.message_properties_array_t := dbms_aq.message_properties_array_t();
    L_PayloadArr sys.aq$_jms_bytes_messages;
    L_MsgIdArr dbms_aq.msgid_array_t;

    L_MsgCnt pls_integer := 0;
    L_Count pls_integer;
    begin
    dbms_output.put_line('START DequeueMessages');

    execute immediate 'select count(*) from ' || C_QueueTable into L_Count;
    dbms_output.put_line(' Messages in queue table BEFORE dequeue: ' || L_Count);

    L_MsgCnt := dbms_aq.dequeue_array(queue_name => C_QueueName
    ,dequeue_options => L_DequeueOptions
    ,array_size => C_DequeueArraySize
    ,message_properties_array => L_MsgPropArr
    ,payload_array => L_PayloadArr
    ,msgid_array => L_MsgIdArr);

    execute immediate 'select count(*) from ' || C_QueueTable into L_Count;
    dbms_output.put_line(' Messages in queue table AFTER dequeue: ' || L_Count);

    dbms_output.put_line(' Expected: ' || C_MsgCount || ', Received: ' || L_MsgCnt);
    if C_MsgCount != L_MsgCnt then
    dbms_output.put_line(' *****************************************');
    dbms_output.put_line(' TOO MANY ITEMS DEQUEUED?!?');
    dbms_output.put_line(' *****************************************');
    for i in 1 .. L_MsgCnt
    loop
    dbms_output.put_line(' #' || i || ' MsdId=' || L_MsgIdArr(i));
    end loop;
    end if;
    dbms_output.put_line('END DequeueMessages');
    end DequeueMessages;
    -- ================================================================================================

    /*
    * This is the testcase
    */
    procedure RunTestCase is
    begin
    CreateQueueIfMissing;

    for i in 1 .. C_TestRuns
    loop
    dbms_output.put_line(null);
    dbms_output.put_line('=========== START test run #' || i || '===========');
    CleanQueueTable;
    EnqueueMessages;
    DequeueMessages;
    end loop;
    end;
    -- ================================================================================================
    begin
    RunTestCase;
    end;

    示例输出:
    START CreateQueueIfMissing
    Skipping queue creation, already present.
    END CreateQueueIfMissing

    =========== START test run #1===========
    START CleanQueueTable
    Messages in queue table BEFORE purge: 0
    Messages in queue table AFTER purge: 0
    END CleanQueueTable
    START EnqueueMessages
    Messages in queue table BEFORE enqueue: 0
    Construct #1
    Enqueue #1
    Messages in queue table AFTER enqueue: 1
    END EnqueueMessages
    START DequeueMessages
    Messages in queue table BEFORE dequeue: 1
    Messages in queue table AFTER dequeue: 0
    Expected: 1, Received: 1
    END DequeueMessages

    =========== START test run #2===========
    START CleanQueueTable
    Messages in queue table BEFORE purge: 0
    Messages in queue table AFTER purge: 0
    END CleanQueueTable
    START EnqueueMessages
    Messages in queue table BEFORE enqueue: 0
    Construct #1
    Enqueue #1
    Messages in queue table AFTER enqueue: 1
    END EnqueueMessages
    START DequeueMessages
    Messages in queue table BEFORE dequeue: 1
    Messages in queue table AFTER dequeue: 0
    Expected: 1, Received: 2
    *****************************************
    TOO MANY ITEMS DEQUEUED?!?
    *****************************************
    #1 MsdId=2949A0FF2EE456A7E0540010E0467A30
    #2 MsdId=2949A0FF2EE456A7E0540010E0467A30
    END DequeueMessages

    最佳答案

    这看起来像 bug 20659700 .在 document 2002148.1 中有更多信息.

    您(或您的 DBA)应该提出服务请求以确认这一点,并查看是否有适用于您的平台的补丁。

    关于oracle - dbms_aq.dequeue_array,第一条消息返回两次,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34787888/

    44 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com