Friday, December 21, 2018

Advanced Queuing & PL/SQL Notification -- Queue Propagate

Sample Code to propagate messages from one Queue to Another Queue

connect "/ as sysdba"

drop user aq cascade;
CREATE USER aq IDENTIFIED BY aq;
GRANT CONNECT, RESOURCE, aq_administrator_role TO aq;
GRANT EXECUTE ON dbms_aq TO aq;
GRANT EXECUTE ON dbms_aqadm TO aq;

begin
dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','AQ',FALSE);
dbms_aqadm.grant_system_privilege('DEQUEUE_ANY','AQ',FALSE);
end;
/

connect AQ/AQ
CREATE type aq.Message_typ as object(subject VARCHAR2(30), text VARCHAR2(80));
/

Create Queue Table

begin
DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'aq.objmsgs80_qtab',
queue_payload_type => 'aq.Message_typ',
multiple_consumers => TRUE);
end;

Create Queue & Start the Queue

begin
DBMS_AQADM.CREATE_QUEUE(queue_name => 'MSG_QUEUE',
queue_table => 'aq.objmsgs80_qtab');
DBMS_AQADM.START_QUEUE(queue_name => 'MSG_QUEUE');
end;
/

Setup Addition Queue to propagate messages to:

begin
DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'aq.objmsgs80_qtabX',
queue_payload_type => 'aq.Message_typ',
multiple_consumers => TRUE);
DBMS_AQADM.CREATE_QUEUE(queue_name => 'MSG_QUEUEX',
queue_table => 'aq.objmsgs80_qtabX');
DBMS_AQADM.START_QUEUE(queue_name => 'MSG_QUEUEX');
end;
/

Create Procedure to Enqueue the messsage:

create or replace procedure enqueue_msg(p_msg in varchar2)
as
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message aq.message_typ;
recipients DBMS_AQ.aq$_recipient_list_t;

BEGIN
-- ADDED
recipients(1) := SYS.aq$_agent('RECIPIENT', null, null);
message_properties.recipient_list := recipients;

message := message_typ('NORMAL MESSAGE', p_msg );
dbms_aq.enqueue(queue_name => 'msg_queue',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
end;
/

begin enqueue_msg('This is a test....'); commit; end;
/

Create Database link to loopback:

create database link AQ.LoopBack connect to AQ identified by AQ using 'ORCL';

Setup Scheduling for messages to propagate:

begin DBMS_AQADM.Schedule_Propagation(Queue_Name => 'MSG_QUEUE',
Destination => 'AQ.LOOPBACK',
Start_Time => sysdate,
Latency => 0);
end;
/

-- Check scheduling: Everything checked out OK.
select * from user_queue_schedules; 

begin enqueue_msg('This should be propagated.'); commit; end;
/
Check Queue query
SELECT   *
    FROM   user_queues
ORDER BY   1;
Check Subscribers
SELECT   *

  FROM   user_queue_subscribers;

Check Scheduling
select * from user_queue_schedules; 

Drop Commands
begin
DBMS_AQADM.stop_queue('MSG_QUEUE');
DBMS_AQADM.drop_queue('MSG_QUEUE');
DBMS_AQADM.drop_queue_table('aq.objmsgs80_qtab');
DBMS_AQADM.stop_queue('MSG_QUEUEX');
DBMS_AQADM.drop_queue('MSG_QUEUEX');
DBMS_AQADM.drop_queue_table('aq.objmsgs80_qtabX');
end;
/

Create New Subscriber

DECLARE
   aSubscriber   sys.aq$_agent;
BEGIN
   aSubscriber :=
      sys.aq$_agent ('GW'
                   , '"PHX_NCMS"."XXONT_XML_ECP_QUEUE_M"@NCMS2DPPCI'
                   , 0);
   DBMS_AQADM.add_subscriber (queue_name   => 'PHX_NCMS.XXONT_ECP_MSG_QUEUE_M'
                            , subscriber   => aSubscriber);
END;

/