Oracle Advanced Queue - 出队不工作
Oracle Advance Queue - Dequeue not working
我似乎找不到解决问题的办法,我已经被困在这里好几个小时了。
我正在使用 Oracle AQ:
Dbms_Aqadm.Create_Queue_Table(Queue_Table => 'ITEM_EVENT_QT',
Queue_Payload_Type => 'ITEM_EVENT',
Multiple_Consumers => TRUE);
Dbms_Aqadm.Create_Queue(Queue_Name => 'ITEM_EVENT_QUEUE',
Queue_Table => 'ITEM_EVENT_QT',
Max_Retries => 5,
Retry_Delay => 0,
Retention_Time => 432000, -- 5 DAYS
Dependency_Tracking => FALSE,
COMMENT => 'Item Event Queue');
-- START THE QUEUE
Dbms_Aqadm.Start_Queue('ITEM_EVENT_QUEUE');
-- GRANT QUEUE PRIVILEGES
Dbms_Aqadm.Grant_Queue_Privilege(Privilege => 'ALL',
Queue_Name => 'ITEM_EVENT_QUEUE',
Grantee => 'PUBLIC',
Grant_Option => FALSE);
END;
这是我的订阅者之一:
Dbms_Aqadm.Add_Subscriber(Queue_Name => 'ITEM_EVENT_QUEUE',
Subscriber => Sys.Aq$_Agent('ITEM_SUBSCRIBER_1',
NULL,
NULL),
rule => 'tab.user_data.header.thread_no = 1');
Dbms_Aq.Register(Sys.Aq$_Reg_Info_List(Sys.Aq$_Reg_Info('ITEM_EVENT_QUEUE:ITEM_SUBSCRIBER_1',
Dbms_Aq.Namespace_Aq,
'plsql://ITEM_API.GET_QUEUE_FROM_QUEUE',
HEXTORAW('FF'))),1);
订户注册:
每当我的数据库发生某个事件时,我都会通过从我的 ITEM_API
包中调用以下过程来使用触发器将 "the event" 添加到我的 AQ:
PROCEDURE ADD_EVENT_TO_QUEUE(I_EVENT IN ITEM_EVENT,
O_STATUS_CODE OUT VARCHAR2,
O_ERROR_MSG OUT VARCHAR2) IS
ENQUEUE_OPTIONS DBMS_AQ.ENQUEUE_OPTIONS_T;
MESSAGE_PROPERTIES DBMS_AQ.MESSAGE_PROPERTIES_T;
MESSAGE_HANDLE RAW(16);
EVENT ITEM_EVENT;
HEADER_PROP HEADER_PROPERTIES;
BEGIN
EVENT := I_EVENT;
EVENT.SEQ_NO := ITEM_EVENT_SEQ.NEXTVAL;
ENQUEUE_OPTIONS.VISIBILITY := DBMS_AQ.ON_COMMIT;
ENQUEUE_OPTIONS.SEQUENCE_DEVIATION := NULL;
MESSAGE_PROPERTIES.PRIORITY := 1;
MESSAGE_PROPERTIES.DELAY := DBMS_AQ.NO_DELAY;
MESSAGE_PROPERTIES.EXPIRATION := DBMS_AQ.NEVER;
HEADER_PROP := HEADER_PROPERTIES(1);
EVENT.HEADER := HEADER_PROP;
DBMS_AQ.ENQUEUE(QUEUE_NAME => 'ITEM_EVENT_QUEUE',
ENQUEUE_OPTIONS => ENQUEUE_OPTIONS,
MESSAGE_PROPERTIES => MESSAGE_PROPERTIES,
PAYLOAD => EVENT,
MSGID => MESSAGE_HANDLE);
EXCEPTION
WHEN OTHERS THEN
ERROR_HANDLER.LOG_ERROR(NULL,
EVENT.ITEM,
EVENT.SEQ_NO,
SQLCODE,
SQLERRM,
O_STATUS_CODE,
O_ERROR_MSG);
RAISE;
END ADD_EVENT_TO_QUEUE;
它正在工作,因为当我检查我的 AQ table 时,我可以找到 "the event",但是我的出队方法没有出队,如下图所示,没有 DEQ_TIME
.
这是我的出队方法,也来自我的 ITEM_API
包:
PROCEDURE GET_QUEUE_FROM_QUEUE(CONTEXT RAW,
REGINFO SYS.AQ$_REG_INFO,
DESCR SYS.AQ$_DESCRIPTOR,
PAYLOAD RAW,
PAYLOADL NUMBER) IS
R_DEQUEUE_OPTIONS DBMS_AQ.DEQUEUE_OPTIONS_T;
R_MESSAGE_PROPERTIES DBMS_AQ.MESSAGE_PROPERTIES_T;
V_MESSAGE_HANDLE RAW(16);
I_PAYLOAD ITEM_EVENT;
L_PROC_EVENT BOOLEAN;
O_TARGETS CFG_EVENT_STAGE_TBL;
O_ERROR_MSG VARCHAR2(300);
O_STATUS_CODE VARCHAR2(100);
BEGIN
R_DEQUEUE_OPTIONS.MSGID := DESCR.MSG_ID;
R_DEQUEUE_OPTIONS.CONSUMER_NAME := DESCR.CONSUMER_NAME;
R_DEQUEUE_OPTIONS.DEQUEUE_MODE := DBMS_AQ.REMOVE;
--R_DEQUEUE_OPTIONS.WAIT := DBMS_AQ.NO_WAIT;
DBMS_AQ.DEQUEUE(QUEUE_NAME => DESCR.QUEUE_NAME,
DEQUEUE_OPTIONS => R_DEQUEUE_OPTIONS,
MESSAGE_PROPERTIES => R_MESSAGE_PROPERTIES,
PAYLOAD => I_PAYLOAD,
MSGID => V_MESSAGE_HANDLE);
IF I_PAYLOAD IS NOT NULL THEN
L_PROC_EVENT := PROCESS_EVENT(I_PAYLOAD,
O_TARGETS,
O_STATUS_CODE,
O_ERROR_MSG);
END IF;
EXCEPTION
WHEN OTHERS THEN
ERROR_HANDLER.LOG_ERROR(NULL,
NULL,
NULL,
SQLCODE,
SQLERRM,
O_STATUS_CODE,
O_ERROR_MSG);
RAISE;
END GET_QUEUE_FROM_QUEUE;
我做错了什么吗?我怎样才能解决这个问题?我想我的订阅者注册可能有问题,但我不确定。
编辑: 我刚刚发现,如果我删除订阅者和注册器,然后重新添加它们,它们将使所有消息出列。但是,如果另一个事件被排队,它会无限期地留在那里(或者直到我删除并再次添加订阅者):
状态为0且没有DEQ_TIME
的记录是新记录。
我需要调度程序或类似的东西吗?
编辑: 我已将调度程序传播添加到我的 AQ:
DBMS_AQADM.SCHEDULE_PROPAGATION('ITEM_EVENT_QUEUE');
甚至添加了 next_time 字段:
DBMS_AQADM.SCHEDULE_PROPAGATION('ITEM_EVENT_QUEUE', SYSDATE + 30/86400);
还是不行。有什么建议么?我猜 AQ Notifications 没有工作,我的回调过程从未被调用过。我该如何解决这个问题?
编辑: 我已经从包中删除了我的程序只是为了测试目的,所以我的队友可以编译 ITEM_API 包(我不知道如果重新编译包,可能会或可能不会影响出队过程)。
还是不行。
创建代码块并运行以下内容:
DECLARE
dequeue_options DBMS_AQ.dequeue_options_t;
message_properties DBMS_AQ.message_properties_t;
message_handle RAW (16);
I_PAYLOAD ITEM_EVENT;
no_messages exception;
msg_content VARCHAR2 (4000);
PRAGMA EXCEPTION_INIT (no_messages, -25228);
BEGIN
dequeue_options.wait := DBMS_AQ.NO_WAIT;
dequeue_options.consumer_name := 'ITEM_SUBSCRIBER_1';
dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
LOOP
DBMS_AQ.DEQUEUE (queue_name => 'ITEM_EVENT_QUEUE',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => I_PAYLOAD,
msgid => message_handle
);
END LOOP;
EXCEPTION
WHEN no_messages
THEN
DBMS_OUTPUT.PUT_LINE ('No more messages left');
END;
让我知道您排队的消息发生了什么。
您应该有一个 table 来处理数据。
能否也试试在agent中加入enqueudtable然后指定agent去dequeuetable.
DECLARE
aSubscriber sys.aq$_agent;
BEGIN
aSubscriber := sys.aq$_agent('ITEM_SUBSCRIBER_1',
'ITEM_EVENT_QUEUE',
0);
dbms_aqadm.add_subscriber
( queue_name => 'ITEM_EVENT_QUEUE'
,subscriber => aSubscriber);
END;
/
我遇到了同样的问题,但在更改这两个数据库参数后问题得到解决:
- job_queue_processes(必须大于 0)
- aq_tm_processes(自动调整)
希望对您有所帮助。
我们遇到了一个相关问题(至少与标题相关),我们无法延迟删除queue 消息。 queue 中的消息保持状态 "WAITING"。并且没有更改为 "READY".
负责将状态从 "WAITING" 更改为 "READY"(延迟到期后)的 Oracle AQ 监视进程未正常工作。
对我们来说,重启数据库解决了这个问题。
我似乎找不到解决问题的办法,我已经被困在这里好几个小时了。
我正在使用 Oracle AQ:
Dbms_Aqadm.Create_Queue_Table(Queue_Table => 'ITEM_EVENT_QT',
Queue_Payload_Type => 'ITEM_EVENT',
Multiple_Consumers => TRUE);
Dbms_Aqadm.Create_Queue(Queue_Name => 'ITEM_EVENT_QUEUE',
Queue_Table => 'ITEM_EVENT_QT',
Max_Retries => 5,
Retry_Delay => 0,
Retention_Time => 432000, -- 5 DAYS
Dependency_Tracking => FALSE,
COMMENT => 'Item Event Queue');
-- START THE QUEUE
Dbms_Aqadm.Start_Queue('ITEM_EVENT_QUEUE');
-- GRANT QUEUE PRIVILEGES
Dbms_Aqadm.Grant_Queue_Privilege(Privilege => 'ALL',
Queue_Name => 'ITEM_EVENT_QUEUE',
Grantee => 'PUBLIC',
Grant_Option => FALSE);
END;
这是我的订阅者之一:
Dbms_Aqadm.Add_Subscriber(Queue_Name => 'ITEM_EVENT_QUEUE',
Subscriber => Sys.Aq$_Agent('ITEM_SUBSCRIBER_1',
NULL,
NULL),
rule => 'tab.user_data.header.thread_no = 1');
Dbms_Aq.Register(Sys.Aq$_Reg_Info_List(Sys.Aq$_Reg_Info('ITEM_EVENT_QUEUE:ITEM_SUBSCRIBER_1',
Dbms_Aq.Namespace_Aq,
'plsql://ITEM_API.GET_QUEUE_FROM_QUEUE',
HEXTORAW('FF'))),1);
订户注册:
每当我的数据库发生某个事件时,我都会通过从我的 ITEM_API
包中调用以下过程来使用触发器将 "the event" 添加到我的 AQ:
PROCEDURE ADD_EVENT_TO_QUEUE(I_EVENT IN ITEM_EVENT,
O_STATUS_CODE OUT VARCHAR2,
O_ERROR_MSG OUT VARCHAR2) IS
ENQUEUE_OPTIONS DBMS_AQ.ENQUEUE_OPTIONS_T;
MESSAGE_PROPERTIES DBMS_AQ.MESSAGE_PROPERTIES_T;
MESSAGE_HANDLE RAW(16);
EVENT ITEM_EVENT;
HEADER_PROP HEADER_PROPERTIES;
BEGIN
EVENT := I_EVENT;
EVENT.SEQ_NO := ITEM_EVENT_SEQ.NEXTVAL;
ENQUEUE_OPTIONS.VISIBILITY := DBMS_AQ.ON_COMMIT;
ENQUEUE_OPTIONS.SEQUENCE_DEVIATION := NULL;
MESSAGE_PROPERTIES.PRIORITY := 1;
MESSAGE_PROPERTIES.DELAY := DBMS_AQ.NO_DELAY;
MESSAGE_PROPERTIES.EXPIRATION := DBMS_AQ.NEVER;
HEADER_PROP := HEADER_PROPERTIES(1);
EVENT.HEADER := HEADER_PROP;
DBMS_AQ.ENQUEUE(QUEUE_NAME => 'ITEM_EVENT_QUEUE',
ENQUEUE_OPTIONS => ENQUEUE_OPTIONS,
MESSAGE_PROPERTIES => MESSAGE_PROPERTIES,
PAYLOAD => EVENT,
MSGID => MESSAGE_HANDLE);
EXCEPTION
WHEN OTHERS THEN
ERROR_HANDLER.LOG_ERROR(NULL,
EVENT.ITEM,
EVENT.SEQ_NO,
SQLCODE,
SQLERRM,
O_STATUS_CODE,
O_ERROR_MSG);
RAISE;
END ADD_EVENT_TO_QUEUE;
它正在工作,因为当我检查我的 AQ table 时,我可以找到 "the event",但是我的出队方法没有出队,如下图所示,没有 DEQ_TIME
.
这是我的出队方法,也来自我的 ITEM_API
包:
PROCEDURE GET_QUEUE_FROM_QUEUE(CONTEXT RAW,
REGINFO SYS.AQ$_REG_INFO,
DESCR SYS.AQ$_DESCRIPTOR,
PAYLOAD RAW,
PAYLOADL NUMBER) IS
R_DEQUEUE_OPTIONS DBMS_AQ.DEQUEUE_OPTIONS_T;
R_MESSAGE_PROPERTIES DBMS_AQ.MESSAGE_PROPERTIES_T;
V_MESSAGE_HANDLE RAW(16);
I_PAYLOAD ITEM_EVENT;
L_PROC_EVENT BOOLEAN;
O_TARGETS CFG_EVENT_STAGE_TBL;
O_ERROR_MSG VARCHAR2(300);
O_STATUS_CODE VARCHAR2(100);
BEGIN
R_DEQUEUE_OPTIONS.MSGID := DESCR.MSG_ID;
R_DEQUEUE_OPTIONS.CONSUMER_NAME := DESCR.CONSUMER_NAME;
R_DEQUEUE_OPTIONS.DEQUEUE_MODE := DBMS_AQ.REMOVE;
--R_DEQUEUE_OPTIONS.WAIT := DBMS_AQ.NO_WAIT;
DBMS_AQ.DEQUEUE(QUEUE_NAME => DESCR.QUEUE_NAME,
DEQUEUE_OPTIONS => R_DEQUEUE_OPTIONS,
MESSAGE_PROPERTIES => R_MESSAGE_PROPERTIES,
PAYLOAD => I_PAYLOAD,
MSGID => V_MESSAGE_HANDLE);
IF I_PAYLOAD IS NOT NULL THEN
L_PROC_EVENT := PROCESS_EVENT(I_PAYLOAD,
O_TARGETS,
O_STATUS_CODE,
O_ERROR_MSG);
END IF;
EXCEPTION
WHEN OTHERS THEN
ERROR_HANDLER.LOG_ERROR(NULL,
NULL,
NULL,
SQLCODE,
SQLERRM,
O_STATUS_CODE,
O_ERROR_MSG);
RAISE;
END GET_QUEUE_FROM_QUEUE;
我做错了什么吗?我怎样才能解决这个问题?我想我的订阅者注册可能有问题,但我不确定。
编辑: 我刚刚发现,如果我删除订阅者和注册器,然后重新添加它们,它们将使所有消息出列。但是,如果另一个事件被排队,它会无限期地留在那里(或者直到我删除并再次添加订阅者):
状态为0且没有DEQ_TIME
的记录是新记录。
我需要调度程序或类似的东西吗?
编辑: 我已将调度程序传播添加到我的 AQ:
DBMS_AQADM.SCHEDULE_PROPAGATION('ITEM_EVENT_QUEUE');
甚至添加了 next_time 字段:
DBMS_AQADM.SCHEDULE_PROPAGATION('ITEM_EVENT_QUEUE', SYSDATE + 30/86400);
还是不行。有什么建议么?我猜 AQ Notifications 没有工作,我的回调过程从未被调用过。我该如何解决这个问题?
编辑: 我已经从包中删除了我的程序只是为了测试目的,所以我的队友可以编译 ITEM_API 包(我不知道如果重新编译包,可能会或可能不会影响出队过程)。 还是不行。
创建代码块并运行以下内容:
DECLARE
dequeue_options DBMS_AQ.dequeue_options_t;
message_properties DBMS_AQ.message_properties_t;
message_handle RAW (16);
I_PAYLOAD ITEM_EVENT;
no_messages exception;
msg_content VARCHAR2 (4000);
PRAGMA EXCEPTION_INIT (no_messages, -25228);
BEGIN
dequeue_options.wait := DBMS_AQ.NO_WAIT;
dequeue_options.consumer_name := 'ITEM_SUBSCRIBER_1';
dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
LOOP
DBMS_AQ.DEQUEUE (queue_name => 'ITEM_EVENT_QUEUE',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => I_PAYLOAD,
msgid => message_handle
);
END LOOP;
EXCEPTION
WHEN no_messages
THEN
DBMS_OUTPUT.PUT_LINE ('No more messages left');
END;
让我知道您排队的消息发生了什么。
您应该有一个 table 来处理数据。
能否也试试在agent中加入enqueudtable然后指定agent去dequeuetable.
DECLARE
aSubscriber sys.aq$_agent;
BEGIN
aSubscriber := sys.aq$_agent('ITEM_SUBSCRIBER_1',
'ITEM_EVENT_QUEUE',
0);
dbms_aqadm.add_subscriber
( queue_name => 'ITEM_EVENT_QUEUE'
,subscriber => aSubscriber);
END;
/
我遇到了同样的问题,但在更改这两个数据库参数后问题得到解决:
- job_queue_processes(必须大于 0)
- aq_tm_processes(自动调整)
希望对您有所帮助。
我们遇到了一个相关问题(至少与标题相关),我们无法延迟删除queue 消息。 queue 中的消息保持状态 "WAITING"。并且没有更改为 "READY".
负责将状态从 "WAITING" 更改为 "READY"(延迟到期后)的 Oracle AQ 监视进程未正常工作。
对我们来说,重启数据库解决了这个问题。