Sample Code to propagate messages from one Queue to Another Queue
connect "/ as sysdba"
drop user aq cascade;
CREATE USER aq IDENTIFIED BY aq;
GRANT CONNECT, RESOURCE, aq_administrator_role TO aq;
GRANT EXECUTE ON dbms_aq TO aq;
GRANT EXECUTE ON dbms_aqadm TO aq;
begin
dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','AQ',FALSE);
dbms_aqadm.grant_system_privilege('DEQUEUE_ANY','AQ',FALSE);
end;
/
connect AQ/AQ
CREATE type aq.Message_typ as object(subject VARCHAR2(30), text VARCHAR2(80));
/
Create Queue Table
begin
DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'aq.objmsgs80_qtab',
queue_payload_type => 'aq.Message_typ',
multiple_consumers => TRUE);
end;
Create Queue & Start the Queue
begin
DBMS_AQADM.CREATE_QUEUE(queue_name => 'MSG_QUEUE',
queue_table => 'aq.objmsgs80_qtab');
DBMS_AQADM.START_QUEUE(queue_name => 'MSG_QUEUE');
end;
/
Setup Addition Queue to propagate messages to:
begin
DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'aq.objmsgs80_qtabX',
queue_payload_type => 'aq.Message_typ',
multiple_consumers => TRUE);
DBMS_AQADM.CREATE_QUEUE(queue_name => 'MSG_QUEUEX',
queue_table => 'aq.objmsgs80_qtabX');
DBMS_AQADM.START_QUEUE(queue_name => 'MSG_QUEUEX');
end;
/
Create Procedure to Enqueue the messsage:
create or replace procedure enqueue_msg(p_msg in varchar2)
as
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message aq.message_typ;
recipients DBMS_AQ.aq$_recipient_list_t;
BEGIN
-- ADDED
recipients(1) := SYS.aq$_agent('RECIPIENT', null, null);
message_properties.recipient_list := recipients;
message := message_typ('NORMAL MESSAGE', p_msg );
dbms_aq.enqueue(queue_name => 'msg_queue',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
end;
/
begin enqueue_msg('This is a test....'); commit; end;
/
Create Database link to loopback:
create database link AQ.LoopBack connect to AQ identified by AQ using 'ORCL';
Setup Scheduling for messages to propagate:
begin DBMS_AQADM.Schedule_Propagation(Queue_Name => 'MSG_QUEUE',
Destination => 'AQ.LOOPBACK',
Start_Time => sysdate,
Latency => 0);
end;
/
-- Check scheduling: Everything checked out OK.
select * from user_queue_schedules;
begin enqueue_msg('This should be propagated.'); commit; end;
/
Check Queue query
Check Scheduling
select * from user_queue_schedules;
Drop Commands
begin
DBMS_AQADM.stop_queue('MSG_QUEUE');
DBMS_AQADM.drop_queue('MSG_QUEUE');
DBMS_AQADM.drop_queue_table('aq.objmsgs80_qtab');
DBMS_AQADM.stop_queue('MSG_QUEUEX');
DBMS_AQADM.drop_queue('MSG_QUEUEX');
DBMS_AQADM.drop_queue_table('aq.objmsgs80_qtabX');
end;
/
Create New Subscriber
DECLARE
aSubscriber sys.aq$_agent;
BEGIN
aSubscriber :=
sys.aq$_agent ('GW'
, '"PHX_NCMS"."XXONT_XML_ECP_QUEUE_M"@NCMS2DPPCI'
, 0);
DBMS_AQADM.add_subscriber (queue_name => 'PHX_NCMS.XXONT_ECP_MSG_QUEUE_M'
, subscriber => aSubscriber);
END;
/
connect "/ as sysdba"
drop user aq cascade;
CREATE USER aq IDENTIFIED BY aq;
GRANT CONNECT, RESOURCE, aq_administrator_role TO aq;
GRANT EXECUTE ON dbms_aq TO aq;
GRANT EXECUTE ON dbms_aqadm TO aq;
begin
dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','AQ',FALSE);
dbms_aqadm.grant_system_privilege('DEQUEUE_ANY','AQ',FALSE);
end;
/
connect AQ/AQ
CREATE type aq.Message_typ as object(subject VARCHAR2(30), text VARCHAR2(80));
/
Create Queue Table
begin
DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'aq.objmsgs80_qtab',
queue_payload_type => 'aq.Message_typ',
multiple_consumers => TRUE);
end;
Create Queue & Start the Queue
begin
DBMS_AQADM.CREATE_QUEUE(queue_name => 'MSG_QUEUE',
queue_table => 'aq.objmsgs80_qtab');
DBMS_AQADM.START_QUEUE(queue_name => 'MSG_QUEUE');
end;
/
Setup Addition Queue to propagate messages to:
begin
DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'aq.objmsgs80_qtabX',
queue_payload_type => 'aq.Message_typ',
multiple_consumers => TRUE);
DBMS_AQADM.CREATE_QUEUE(queue_name => 'MSG_QUEUEX',
queue_table => 'aq.objmsgs80_qtabX');
DBMS_AQADM.START_QUEUE(queue_name => 'MSG_QUEUEX');
end;
/
Create Procedure to Enqueue the messsage:
create or replace procedure enqueue_msg(p_msg in varchar2)
as
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message aq.message_typ;
recipients DBMS_AQ.aq$_recipient_list_t;
BEGIN
-- ADDED
recipients(1) := SYS.aq$_agent('RECIPIENT', null, null);
message_properties.recipient_list := recipients;
message := message_typ('NORMAL MESSAGE', p_msg );
dbms_aq.enqueue(queue_name => 'msg_queue',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
end;
/
begin enqueue_msg('This is a test....'); commit; end;
/
Create Database link to loopback:
create database link AQ.LoopBack connect to AQ identified by AQ using 'ORCL';
Setup Scheduling for messages to propagate:
begin DBMS_AQADM.Schedule_Propagation(Queue_Name => 'MSG_QUEUE',
Destination => 'AQ.LOOPBACK',
Start_Time => sysdate,
Latency => 0);
end;
/
-- Check scheduling: Everything checked out OK.
select * from user_queue_schedules;
begin enqueue_msg('This should be propagated.'); commit; end;
/
Check Queue query
SELECT *
FROM
user_queues
ORDER BY
1;
Check Subscribers
SELECT *
FROM
user_queue_subscribers;
Check Scheduling
select * from user_queue_schedules;
Drop Commands
begin
DBMS_AQADM.stop_queue('MSG_QUEUE');
DBMS_AQADM.drop_queue('MSG_QUEUE');
DBMS_AQADM.drop_queue_table('aq.objmsgs80_qtab');
DBMS_AQADM.stop_queue('MSG_QUEUEX');
DBMS_AQADM.drop_queue('MSG_QUEUEX');
DBMS_AQADM.drop_queue_table('aq.objmsgs80_qtabX');
end;
/
Create New Subscriber
DECLARE
aSubscriber sys.aq$_agent;
BEGIN
aSubscriber :=
sys.aq$_agent ('GW'
, '"PHX_NCMS"."XXONT_XML_ECP_QUEUE_M"@NCMS2DPPCI'
, 0);
DBMS_AQADM.add_subscriber (queue_name => 'PHX_NCMS.XXONT_ECP_MSG_QUEUE_M'
, subscriber => aSubscriber);
END;
/