暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Oracle AQ

askTom 2015-09-09
195

问题描述

ORA-25247出队时获取错误消息: %s不是指定邮件的收件人

已将消息入队,并且成功。
使用PLSQL订户出队并向接收者注册plsql过程。

然后,获取错误消息ORA-25247 : %s不是指定邮件的收件人

甚至尝试将收件人添加到Enqueue过程中,但仍然出现相同的错误。

相同的代码在QA ENv中工作。

已验证所有权限。。也给出了AQ_ administROR_ROLE ,但仍然没有运气。


授权连接,资源到测试AQ ;

使用授权选项对dbms_aqadm进行授权执行,以执行对测试AQ的授权;
使用授权选项对dbms_aq进行授权执行;

将AQ_管理-ROLE授予测试-AQ ;

开始
DBMS_AQADM.GANT_SER_PRIVILEGE(
特权=>'MANAGE_Y',
被授权者=>'Test_AQ',
admin_option =>假) ;
结束;
/

开始
DBMS_AQADM.Add_SUB订阅者
(
队列名称=>'TEST_AQ.qu_datasync',
订户=> SYS.AQ$_Agent ('QSS_DATAS', NULL, NULL )
);

DBMS_AQ.REG
(
SYS.AQ$_REG_INFO_列表
(
SYS.AQ$_REG_INFO
(
'TEST_AQ.Q_DATA Sync:QSS_DATA Sync',
DBMS_AQ.NAME_A_AQ ,
'plsql://TEST_AQ.SP_SUBCUR_DATASCH?PR=0',
六角形('FF')
)
),
1
);
结束;
/


创建或替换过程测试_AQ.sp_subchr_datasync
(
上下文,
reginfo SYS.AQ$_REG_INFO ,
描述SYS.AQ$_DE描述,
有效负载原始,
有效载荷编号
)组件
r_dequeue_options DBMS_AQ.DEQUE_OPIONS_T ;
r_message_properties DBMS_AQ.MESSAGE_PROPERTY_T ;
v_message_handle原始(16) ;
o_有效载荷ty_有效载荷数据同步;
v_dq_time时间戳;
v_ws_status VARCHAR2(2000) ;
v_ws_time编号;
no_message异常;
PRAGMA异常INIT (无消息, -25228 ) ;
开始
r_dequeue_options.msgid := descr.msg_id ;
r_dequeue_options.comsumer_name := descr.comsumer_name ;
r_dequeue_options. wait:=DBMS_AQ.NO_WAIT ;


开始
DBMS_AQ.DEQUE
(
队列名称=> desc.queue_name ,
dequeue_options => r_dequeue_options ,
message_properties => r_message_properties ,
有效负载=> o_有效负载,
消息=> v_message_handle
) ;
例外
当别人那么
dbms_输出.put_line ('DEQUE失败') ;
结束;
v_dq_time :=系统时间戳;
-- dbms_lock.sleep(0.5) ;
插入到test中(o__pload.client_id、o_pload.aid、o_pload.wfv_id、o_pload.rea、o_pload.action、o_pload.角色、, v_ws_status、v_ws_time) ;



提交;
例外
当别人那么
dbms_输出.put_line ('DEQUE失败') ;
承诺;
结束;
/
显示错误


宣布
r_enqueue_options DBMS_AQ.EN queue_OPIONS_T ;
r_message_properties DBMS_AQ.MESSAGE_PROPERTY_T ;
v_message_handle原始(16) ;
o_有效载荷ty_有效载荷数据同步;

开始

o_负载:= ty_负载负载数据同步
(
NVL ( : new.client_id,:old.client_id )
, NVL ( : new.区域, : old.区域)
, NVL(:new.action,:old.action)
, NVL ( : new.aid,:old.aid )
, NVL(:new.wfv_id,:old.wfv_id)
、'~'
、'~'
,'I'
, 0
, NVL(:new.modified_by,:old.modified_by)
);

DBMS_AQ.ENQUE
(
队列名称=>'TEST_AQ.qu_datasync',
enqueue_options => r_enqueue_options ,
message_properties => r_message_properties ,
有效负载=> o_有效负载,
消息=> v_message_handle
);
如果正在更新,则
←Wikivoyage :new.modified_on:=SYSSTAMP ;
结束IF ;
例外
当别人那么
插入到......中
承诺;
结束;
/


检查的进程还:
AQ_TM_过程=1
JOB_QUEUE_PROCESS=32

OK下面是完整的脚本
===============================

创建提示测试AQ模式

创建由adpadp标识的用户
缺省表空间数据1
临时表空间
数据1的配额不受限制;

授权连接,资源到测试AQ ;

使用授权选项对dbms_aqadm进行授权执行,以执行对测试AQ的授权;
使用授权选项对dbms_aq进行授权执行;

将AQ_管理-ROLE授予测试-AQ ;

开始
DBMS_AQADM.GANT_SER_PRIVILEGE(
特权=>'MANAGE_Y',
被授权者=>'Test_AQ',
admin_option =>假) ;
结束;
/


创建表测试AQ.MSG_TAb
(
客户端ID VARCHAR2(50)
,避免VARCHAR2(100)
, wfv_id VARCHAR2(100)
, dq_time时间戳默认SYSTAMP
, dq_完成时间戳默认SYSTAMP
, ws_status VARCHAR2(2000)
, ws_time编号
)
/


创建或替换类型:测试AQ.ty_Payload_datasync作为对象
(
客户端ID VARCHAR2(50)
,区域VARCHAR2(50)
,操作VARCHAR2(50)
,避免VARCHAR2(100)
, wfv_id VARCHAR2(50)
,角色VARCHAR2(20)
,滤芯类型VARCHAR2(10)
、f_dml VARCHAR2(10)
, no_of_attries编号
,修改_ by VARCHAR2(50)
) ;
/

创建表测试AQ.data_sync
(
客户端ID VARCHAR2(50) DEFAULT SYS_CONTEXT('ctx_ng_vpd','ctx_client_id')
,区域VARCHAR2(50)
,操作VARCHAR2(50)
,避免VARCHAR2(100)
, wfv_id VARCHAR2(50)
,角色VARCHAR2(20) DEFAULT SYS_CONTEXT('ctx_ng_vpd','ctx_role_name')
,滤芯类型VARCHAR2(10)
、f_dml VARCHAR2(10)
, no_of_attries编号 DEFAULT 0
,创建时间TAMP默认SYSTAMP
,修改了时间戳上的默认SYSTAMP
,修改_ by VARCHAR2(50) DEFAULT SYS_CONTEXT('ctx_ng_vpd', 'ctx_mod_usr')
)
/



快速眼动
快速眼动 Create a queue table
快速眼动

开始
DBMS_AQADM.CREATE_ queUE_ TABLE
(
query_table =>'TEST_AQ.qt_datasync'
,quesquery_Payload_type =>'TEST_AQ.ty_Payload_datasync'
,多个消费者=>正确
) ;
结束;
/

快速眼动
快速眼动 Create a queue using the table.
快速眼动

开始
DBMS_AQADM.CREATE_ queUE
(
队列名称=>'TEST_AQ.qu_datasync'
,query_table =>'TEST_AQ.qt_datasync'
,最大重试次数=> 4
) ;

DBMS_AQADM.开始_ queUE
(
队列名称=>'TEST_AQ.qu_datasync'
) ;

结束;
/



快速眼动
快速眼动 Subscriber code
快速眼动

创建或替换过程测试_AQ.sp_subchr_datasync
(
上下文,
reginfo SYS.AQ$_REG_INFO ,
描述SYS.AQ$_DE描述,
有效负载原始,
有效载荷编号
)组件
r_dequeue_options DBMS_AQ.DEQUE_OPIONS_T ;
r_message_properties DBMS_AQ.MESSAGE_PROPERTY_T ;
v_message_handle原始(16) ;
o_有效载荷ty_有效载荷数据同步;
v_dq_time时间戳;
v_ws_status VARCHAR2(2000) ;
v_ws_time编号;
no_message异常;
PRAGMA异常INIT (无消息, -25228 ) ;
开始
r_dequeue_options.msgid := descr.msg_id ;
r_dequeue_options.comsumer_name := descr.comsumer_name ;
r_dequeue_options. wait:=DBMS_AQ.NO_WAIT ;


开始
DBMS_AQ.DEQUE
(
队列名称=> desc.queue_name ,
dequeue_options => r_dequeue_options ,
message_properties => r_message_properties ,
有效负载=> o_有效负载,
消息=> v_message_handle
) ;
例外
当别人那么
dbms_输出.put_line ('DEQUE失败') ;
结束;
v_dq_time :=系统时间戳;
-- dbms_lock.sleep(0.5) ;
insert into msg_tab (o_payload.client_id, o_payload.aoid,o_payload.wfv_oid,systimpestamp,systimestamp, v_ws_status, v_ws_time) ;



提交;
例外
当别人那么
dbms_输出.put_line ('DEQUE失败') ;
承诺;
结束;
/
显示错误

快速眼动
快速眼动 Subscribe the code to automatically process the message.
快速眼动

开始
DBMS_AQADM.Add_SUB订阅者
(
队列名称=>'TEST_AQ.qu_datasync',
订户=> SYS.AQ$_Agent ('QSS_DATAS', NULL, NULL )
);

DBMS_AQ.REG
(
SYS.AQ$_REG_INFO_列表
(
SYS.AQ$_REG_INFO
(
'TEST_AQ.Q_DATA Sync:QSS_DATA Sync',
DBMS_AQ.NAME_A_AQ ,
'plsql://TEST_AQ.SP_SUBCUR_DATASCH?PR=0',
六角形('FF')
)
),
1
);
结束;
/

创建或替换触发器测试AQ.tr_bIU_data_sync
在插入或更新测试AQ.data_sync之前
将新引用为新
对于每行
宣布
r_enqueue_options DBMS_AQ.EN queue_OPIONS_T ;
r_message_properties DBMS_AQ.MESSAGE_PROPERTY_T ;
v_message_handle原始(16) ;
o_有效载荷ty_有效载荷数据同步;

开始

o_负载:= ty_负载负载数据同步
(
NVL ( : new.client_id,:old.client_id )
, NVL ( : new.区域, : old.区域)
, NVL(:new.action,:old.action)
, NVL ( : new.aid,:old.aid )
, NVL(:new.wfv_id,:old.wfv_id)
、'~'
、'~'
,'I'
, 0
, NVL(:new.modified_by,:old.modified_by)
);

DBMS_AQ.ENQUE
(
队列名称=>'TEST_AQ.qu_datasync',
enqueue_options => r_enqueue_options ,
message_properties => r_message_properties ,
有效负载=> o_有效负载,
消息=> v_message_handle
);
如果正在更新,则
←Wikivoyage :new.modified_on:=SYSSTAMP ;
结束IF ;
例外
当别人那么
insert into msg_tab (o_payload.client_id, o_payload.aoid,o_payload.wfv_oid,systimpestamp,systimestamp, v_ws_status, v_ws_time) ;



承诺;
结束;
/





专家解答

谢谢你的问题,

您提供的代码对我们有很大帮助。但是,它缺少一个关键组件-创建队列本身的代码!

请用您用来构建队列和关联表的脚本更新问题?

增编:

你好,阿曼迪普,

我已经运行了你的脚本(仅对MSG_TAB插入进行了一些错误更正,在11.2.0.4和12.1.0.2上运行一切正常。).每种产品的输出如下。很抱歉,但我认为您可能需要使用Oracle支持,因为我认为您的代码很好。

12c
=====================

SQL> select * from v$version;

BANNER                                                                               CON_ID
-------------------------------------------------------------------------------- ----------
Oracle Database 12c Enterprise Edition Release 12.1.0.2.0 - 64bit Production              0
PL/SQL Release 12.1.0.2.0 - Production                                                    0
CORE    12.1.0.2.0      Production                                                                0
TNS for 64-bit Windows: Version 12.1.0.2.0 - Production                                   0
NLSRTL Version 12.1.0.2.0 - Production                                                    0


SQL> drop user TEST_AQ cascade;

User dropped.

SQL>
SQL> PROMPT TEST_AQ SCHEMA CREATION
TEST_AQ SCHEMA CREATION
SQL>
SQL>
SQL> create user TEST_AQ identified by adpadp
  2  default tablespace users
  3  temporary tablespace TEMP
  4  quota unlimited on users;

User created.

SQL>
SQL> GRANT CONNECT, RESOURCE TO TEST_AQ;

Grant succeeded.

SQL>
SQL> GRANT EXECUTE ON dbms_aqadm TO TEST_AQ WITH GRANT OPTION ;

Grant succeeded.

SQL> GRANT EXECUTE ON dbms_aq TO TEST_AQ WITH GRANT OPTION ;

Grant succeeded.

SQL>
SQL> GRANT AQ_ADMINISTRATOR_ROLE TO TEST_AQ;

Grant succeeded.

SQL>
SQL> BEGIN
  2  DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
  3  privilege => 'MANAGE_ANY',
  4  grantee => 'TEST_AQ',
  5  admin_option => FALSE);
  6  END;
  7  /

PL/SQL procedure successfully completed.

SQL>
SQL>
SQL> CREATE TABLE TEST_AQ.MSG_TAb
  2  (
  3    client_id VARCHAR2(50)
  4  , aoid VARCHAR2(100)
  5  , wfv_oid VARCHAR2(100)
  6  , dq_time TIMESTAMP DEFAULT SYSTIMESTAMP
  7  , dq_complete TIMESTAMP DEFAULT SYSTIMESTAMP
  8  , ws_status VARCHAR2(2000)
  9  , ws_time NUMBER
 10  )
 11  /

Table created.

SQL>
SQL>
SQL> CREATE OR REPLACE TYPE TEST_AQ.ty_payload_datasync AS OBJECT
  2  (
  3  client_id VARCHAR2(50)
  4  , area VARCHAR2(50)
  5  , action VARCHAR2(50)
  6  , aoid VARCHAR2(100)
  7  , wfv_oid VARCHAR2(50)
  8  , role VARCHAR2(20)
  9  , filtype VARCHAR2(10)
 10  , f_dml VARCHAR2(10)
 11  , no_of_attempts NUMBER
 12  , modified_by VARCHAR2(50)
 13  ) ;
 14  /

Type created.

SQL>
SQL> CREATE TABLE TEST_AQ.data_sync
  2  (
  3  client_id VARCHAR2(50) DEFAULT SYS_CONTEXT('ctx_ng_vpd','ctx_client_id')
  4  , area VARCHAR2(50)
  5  , action VARCHAR2(50)
  6  , aoid VARCHAR2(100)
  7  , wfv_oid VARCHAR2(50)
  8  , role VARCHAR2(20) DEFAULT SYS_CONTEXT('ctx_ng_vpd','ctx_role_name')
  9  , filtype VARCHAR2(10)
 10  , f_dml VARCHAR2(10)
 11  , no_of_attempts NUMBER DEFAULT 0
 12  , created_on TIMESTAMP DEFAULT SYSTIMESTAMP
 13  , modified_on TIMESTAMP DEFAULT SYSTIMESTAMP
 14  , modified_by VARCHAR2(50) DEFAULT SYS_CONTEXT('ctx_ng_vpd', 'ctx_mod_usr')
 15  )
 16  /

Table created.

SQL>
SQL>
SQL>
SQL> rem
SQL> rem Create a queue table
SQL> rem
SQL>
SQL> BEGIN
  2  DBMS_AQADM.CREATE_QUEUE_TABLE
  3  (
  4  queue_table => 'TEST_AQ.qt_datasync'
  5  , queue_payload_type => 'TEST_AQ.ty_payload_datasync'
  6  , multiple_consumers => TRUE
  7  ) ;
  8  END ;
  9  /

PL/SQL procedure successfully completed.

SQL>
SQL> rem
SQL> rem Create a queue using the table.
SQL> rem
SQL>
SQL> BEGIN
  2  DBMS_AQADM.CREATE_QUEUE
  3  (
  4  queue_name => 'TEST_AQ.qu_datasync'
  5  , queue_table => 'TEST_AQ.qt_datasync'
  6  , max_retries => 4
  7  ) ;
  8
  9  DBMS_AQADM.START_QUEUE
 10  (
 11  queue_name => 'TEST_AQ.qu_datasync'
 12  ) ;
 13
 14  END ;
 15  /

PL/SQL procedure successfully completed.

SQL>
SQL>
SQL>
SQL> rem
SQL> rem Subscriber code
SQL> rem
SQL>
SQL> CREATE OR REPLACE PROCEDURE TEST_AQ.sp_subscriber_datasync
  2  (
  3    context RAW,
  4    reginfo SYS.AQ$_REG_INFO,
  5    descr SYS.AQ$_DESCRIPTOR,
  6    payload RAW,
  7    payloadl NUMBER
  8    ) AS
  9    r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
 10    r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
 11    v_message_handle RAW(16);
 12    o_payload ty_payload_datasync ;
 13    v_dq_time TIMESTAMP ;
 14    v_ws_status VARCHAR2(2000) ;
 15    v_ws_time NUMBER ;
 16    no_messages exception;
 17  PRAGMA EXCEPTION_INIT (no_messages, -25228);
 18  BEGIN
 19  r_dequeue_options.msgid := descr.msg_id;
 20  r_dequeue_options.consumer_name := descr.consumer_name;
 21  r_dequeue_options.wait:=DBMS_AQ.NO_WAIT;
 22
 23
 24    BEGIN
 25    DBMS_AQ.DEQUEUE
 26    (
 27    queue_name => descr.queue_name,
 28    dequeue_options => r_dequeue_options,
 29    message_properties => r_message_properties,
 30    payload => o_payload,
 31    msgid => v_message_handle
 32    ) ;
 33    EXCEPTIOn
 34    WHEN OTHERS THEn
 35    dbms_output.put_line('DEQUEUE FAILEd');
 36    insert into msg_tab values ('x', 'x','x',systimestamp,systimestamp, 'deq err', 'deq err') ;
 37    commit;
 38    END;
 39
 40  v_dq_time := SYSTIMESTAMP ;
 41  -- dbms_lock.sleep(0.5) ;
 42  insert into msg_tab values (o_payload.client_id, o_payload.aoid,o_payload.wfv_oid,systimestamp,systimestamp, 'success', v_ws_time) ;
 43
 44
 45
 46  COMMIT;
 47  EXCEPTION
 48  WHEN others THEN
 49  dbms_output.put_line('DEQUEUE FAILEd');
 50  commit;
 51  END;
 52  /

Procedure created.

SQL> SHOW ERRORS
No errors.
SQL>
SQL> rem
SQL> rem Subscribe the code to automatically process the message.
SQL> rem
SQL>
SQL> BEGIN
  2  DBMS_AQADM.ADD_SUBSCRIBER
  3  (
  4  queue_name => 'TEST_AQ.qu_datasync',
  5  subscriber => SYS.AQ$_AGENT('QUSS_DATASYNC', NULL, NULL)
  6  );
  7
  8  DBMS_AQ.REGISTER
  9  (
 10  SYS.AQ$_REG_INFO_LIST
 11  (
 12  SYS.AQ$_REG_INFO
 13  (
 14  'TEST_AQ.QU_DATASYNC:QUSS_DATASYNC',
 15  DBMS_AQ.NAMESPACE_AQ,
 16  'plsql://TEST_AQ.SP_SUBSCRIBER_DATASYNC?PR=0',
 17  HEXTORAW('FF')
 18  )
 19  ),
 20  1
 21  );
 22  END;
 23  /

PL/SQL procedure successfully completed.

SQL>
SQL> CREATE OR REPLACE TRIGGER TEST_AQ.tr_biu_data_sync
  2  BEFORE INSERT OR UPDATE ON TEST_AQ.data_sync
  3  referencing new as new
  4  FOR EACH ROW
  5  DECLARE
  6  r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
  7  r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  8  v_message_handle RAW(16);
  9  o_payload ty_payload_datasync;
 10
 11  BEGIN
 12
 13  o_payload := ty_payload_datasync
 14  (
 15  NVL(:new.client_id, :old.client_id)
 16  , NVL(:new.area, :old.area)
 17  , NVL(:new.action, :old.action)
 18  , NVL(:new.aoid, :old.aoid)
 19  , NVL(:new.wfv_oid, :old.wfv_oid)
 20  , '~'
 21  , '~'
 22  , 'I'
 23  , 0
 24  , NVL(:new.modified_by, :old.modified_by)
 25  );
 26
 27  DBMS_AQ.ENQUEUE
 28  (
 29  queue_name => 'TEST_AQ.qu_datasync',
 30  enqueue_options => r_enqueue_options,
 31  message_properties => r_message_properties,
 32  payload => o_payload,
 33  msgid => v_message_handle
 34  );
 35  IF UPDATING THEN
 36  :new.modified_on:=SYSTIMESTAMP;
 37  END IF;
 38  EXCEPTION
 39  WHEN OTHERS THEN
 40  insert into msg_tab values (o_payload.client_id, o_payload.aoid,o_payload.wfv_oid,systimestamp,systimestamp, 'trig fail', null) ;
 41
 42
 43
 44  commit;
 45  END;
 46  /

Trigger created.

SQL>
SQL> conn TEST_AQ/adpadp
Connected.

SQL>
SQL> insert into TEST_AQ.data_sync values ('x','a','a','oid','wfv','ro','fi','dml',1,systimestamp,systimestamp,'me');

1 row created.

SQL>
SQL> select count(*) from test_aq.QT_DATASYNC;

  COUNT(*)
----------
         1

SQL>
SQL> select * from TEST_AQ.msg_tab;

no rows selected

SQL>
SQL> commit;

Commit complete.

(wait for few seconds)

SQL>
SQL> select * from TEST_AQ.msg_tab;


CLIENT_ID
--------------------------------------------------
AOID
----------------------------------------------------------------------------------------------------
WFV_OID
----------------------------------------------------------------------------------------------------
DQ_TIME
---------------------------------------------------------------------------
DQ_COMPLETE
---------------------------------------------------------------------------
WS_STATUS
----------------------------------------------------------------------------------------------------------------------------------
   WS_TIME
----------
x
oid
wfv
10-SEP-15 11.29.34.731000 AM
10-SEP-15 11.29.34.731000 AM
success



11.2
============================

SQL>


SQL> select * from v$version;

BANNER
--------------------------------------------------------------------------------
Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production
PL/SQL Release 11.2.0.4.0 - Production
CORE    11.2.0.4.0      Production
TNS for 64-bit Windows: Version 11.2.0.4.0 - Production
NLSRTL Version 11.2.0.4.0 - Production

SQL>
SQL> drop user TEST_AQ cascade;
drop user TEST_AQ cascade
          *
ERROR at line 1:
ORA-01918: user 'TEST_AQ' does not exist


SQL>
SQL> PROMPT TEST_AQ SCHEMA CREATION
TEST_AQ SCHEMA CREATION
SQL>
SQL>
SQL> create user TEST_AQ identified by adpadp
  2  default tablespace users
  3  temporary tablespace TEMP
  4  quota unlimited on users;

User created.

SQL>
SQL> GRANT CONNECT, RESOURCE TO TEST_AQ;

Grant succeeded.

SQL>
SQL> GRANT EXECUTE ON dbms_aqadm TO TEST_AQ WITH GRANT OPTION ;

Grant succeeded.

SQL> GRANT EXECUTE ON dbms_aq TO TEST_AQ WITH GRANT OPTION ;

Grant succeeded.

SQL>
SQL> GRANT AQ_ADMINISTRATOR_ROLE TO TEST_AQ;

Grant succeeded.

SQL>
SQL> BEGIN
  2  DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
  3  privilege => 'MANAGE_ANY',
  4  grantee => 'TEST_AQ',
  5  admin_option => FALSE);
  6  END;
  7  /

PL/SQL procedure successfully completed.

SQL>
SQL>
SQL> CREATE TABLE TEST_AQ.MSG_TAb
  2  (
  3    client_id VARCHAR2(50)
  4  , aoid VARCHAR2(100)
  5  , wfv_oid VARCHAR2(100)
  6  , dq_time TIMESTAMP DEFAULT SYSTIMESTAMP
  7  , dq_complete TIMESTAMP DEFAULT SYSTIMESTAMP
  8  , ws_status VARCHAR2(2000)
  9  , ws_time NUMBER
 10  )
 11  /

Table created.

SQL>
SQL>
SQL> CREATE OR REPLACE TYPE TEST_AQ.ty_payload_datasync AS OBJECT
  2  (
  3  client_id VARCHAR2(50)
  4  , area VARCHAR2(50)
  5  , action VARCHAR2(50)
  6  , aoid VARCHAR2(100)
  7  , wfv_oid VARCHAR2(50)
  8  , role VARCHAR2(20)
  9  , filtype VARCHAR2(10)
 10  , f_dml VARCHAR2(10)
 11  , no_of_attempts NUMBER
 12  , modified_by VARCHAR2(50)
 13  ) ;
 14  /

Type created.

SQL>
SQL> CREATE TABLE TEST_AQ.data_sync
  2  (
  3  client_id VARCHAR2(50) DEFAULT SYS_CONTEXT('ctx_ng_vpd','ctx_client_id')
  4  , area VARCHAR2(50)
  5  , action VARCHAR2(50)
  6  , aoid VARCHAR2(100)
  7  , wfv_oid VARCHAR2(50)
  8  , role VARCHAR2(20) DEFAULT SYS_CONTEXT('ctx_ng_vpd','ctx_role_name')
  9  , filtype VARCHAR2(10)
 10  , f_dml VARCHAR2(10)
 11  , no_of_attempts NUMBER DEFAULT 0
 12  , created_on TIMESTAMP DEFAULT SYSTIMESTAMP
 13  , modified_on TIMESTAMP DEFAULT SYSTIMESTAMP
 14  , modified_by VARCHAR2(50) DEFAULT SYS_CONTEXT('ctx_ng_vpd', 'ctx_mod_usr')
 15  )
 16  /

Table created.

SQL>
SQL>
SQL>
SQL> rem
SQL> rem Create a queue table
SQL> rem
SQL>
SQL> BEGIN
  2  DBMS_AQADM.CREATE_QUEUE_TABLE
  3  (
  4  queue_table => 'TEST_AQ.qt_datasync'
  5  , queue_payload_type => 'TEST_AQ.ty_payload_datasync'
  6  , multiple_consumers => TRUE
  7  ) ;
  8  END ;
  9  /

PL/SQL procedure successfully completed.

SQL>
SQL> rem
SQL> rem Create a queue using the table.
SQL> rem
SQL>
SQL> BEGIN
  2  DBMS_AQADM.CREATE_QUEUE
  3  (
  4  queue_name => 'TEST_AQ.qu_datasync'
  5  , queue_table => 'TEST_AQ.qt_datasync'
  6  , max_retries => 4
  7  ) ;
  8
  9  DBMS_AQADM.START_QUEUE
 10  (
 11  queue_name => 'TEST_AQ.qu_datasync'
 12  ) ;
 13
 14  END ;
 15  /

PL/SQL procedure successfully completed.

SQL>
SQL>
SQL>
SQL> rem
SQL> rem Subscriber code
SQL> rem
SQL>
SQL> CREATE OR REPLACE PROCEDURE TEST_AQ.sp_subscriber_datasync
  2  (
  3    context RAW,
  4    reginfo SYS.AQ$_REG_INFO,
  5    descr SYS.AQ$_DESCRIPTOR,
  6    payload RAW,
  7    payloadl NUMBER
  8    ) AS
  9    r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
 10    r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
 11    v_message_handle RAW(16);
 12    o_payload ty_payload_datasync ;
 13    v_dq_time TIMESTAMP ;
 14    v_ws_status VARCHAR2(2000) ;
 15    v_ws_time NUMBER ;
 16    no_messages exception;
 17  PRAGMA EXCEPTION_INIT (no_messages, -25228);
 18  BEGIN
 19  r_dequeue_options.msgid := descr.msg_id;
 20  r_dequeue_options.consumer_name := descr.consumer_name;
 21  r_dequeue_options.wait:=DBMS_AQ.NO_WAIT;
 22
 23
 24    BEGIN
 25    DBMS_AQ.DEQUEUE
 26    (
 27    queue_name => descr.queue_name,
 28    dequeue_options => r_dequeue_options,
 29    message_properties => r_message_properties,
 30    payload => o_payload,
 31    msgid => v_message_handle
 32    ) ;
 33    EXCEPTIOn
 34    WHEN OTHERS THEn
 35    dbms_output.put_line('DEQUEUE FAILEd');
 36    insert into msg_tab values ('x', 'x','x',systimestamp,systimestamp, 'deq err', 'deq err') ;
 37    commit;
 38    END;
 39
 40  v_dq_time := SYSTIMESTAMP ;
 41  -- dbms_lock.sleep(0.5) ;
 42  insert into msg_tab values (o_payload.client_id, o_payload.aoid,o_payload.wfv_oid,systimestamp,systimestamp, 'success', v_ws_time) ;
 43
 44
 45
 46  COMMIT;
 47  EXCEPTION
 48  WHEN others THEN
 49  dbms_output.put_line('DEQUEUE FAILEd');
 50  commit;
 51  END;
 52  /

Procedure created.

SQL> SHOW ERRORS
No errors.
SQL>
SQL> rem
SQL> rem Subscribe the code to automatically process the message.
SQL> rem
SQL>
SQL> BEGIN
  2  DBMS_AQADM.ADD_SUBSCRIBER
  3  (
  4  queue_name => 'TEST_AQ.qu_datasync',
  5  subscriber => SYS.AQ$_AGENT('QUSS_DATASYNC', NULL, NULL)
  6  );
  7
  8  DBMS_AQ.REGISTER
  9  (
 10  SYS.AQ$_REG_INFO_LIST
 11  (
 12  SYS.AQ$_REG_INFO
 13  (
 14  'TEST_AQ.QU_DATASYNC:QUSS_DATASYNC',
 15  DBMS_AQ.NAMESPACE_AQ,
 16  'plsql://TEST_AQ.SP_SUBSCRIBER_DATASYNC?PR=0',
 17  HEXTORAW('FF')
 18  )
 19  ),
 20  1
 21  );
 22  END;
 23  /

PL/SQL procedure successfully completed.

SQL>
SQL> CREATE OR REPLACE TRIGGER TEST_AQ.tr_biu_data_sync
  2  BEFORE INSERT OR UPDATE ON TEST_AQ.data_sync
  3  referencing new as new
  4  FOR EACH ROW
  5  DECLARE
  6  r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
  7  r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
  8  v_message_handle RAW(16);
  9  o_payload ty_payload_datasync;
 10
 11  BEGIN
 12
 13  o_payload := ty_payload_datasync
 14  (
 15  NVL(:new.client_id, :old.client_id)
 16  , NVL(:new.area, :old.area)
 17  , NVL(:new.action, :old.action)
 18  , NVL(:new.aoid, :old.aoid)
 19  , NVL(:new.wfv_oid, :old.wfv_oid)
 20  , '~'
 21  , '~'
 22  , 'I'
 23  , 0
 24  , NVL(:new.modified_by, :old.modified_by)
 25  );
 26
 27  DBMS_AQ.ENQUEUE
 28  (
 29  queue_name => 'TEST_AQ.qu_datasync',
 30  enqueue_options => r_enqueue_options,
 31  message_properties => r_message_properties,
 32  payload => o_payload,
 33  msgid => v_message_handle
 34  );
 35  IF UPDATING THEN
 36  :new.modified_on:=SYSTIMESTAMP;
 37  END IF;
 38  EXCEPTION
 39  WHEN OTHERS THEN
 40  insert into msg_tab values (o_payload.client_id, o_payload.aoid,o_payload.wfv_oid,systimestamp,systimestamp, 'trig fail', null) ;
 41
 42
 43
 44  commit;
 45  END;
 46  /

Trigger created.

SQL>
SQL> conn TEST_AQ/adpadp
Connected.

SQL>
SQL> insert into TEST_AQ.data_sync values ('x','a','a','oid','wfv','ro','fi','dml',1,systimestamp,systimestamp,'me');

1 row created.

SQL>
SQL> select count(*) from test_aq.QT_DATASYNC;

  COUNT(*)
----------
         1

SQL>
SQL> select * from TEST_AQ.msg_tab;

no rows selected

SQL>
SQL> commit;

Commit complete.

SQL> select * from TEST_AQ.msg_tab;

CLIENT_ID
--------------------------------------------------
AOID
----------------------------------------------------------------------------------------------------
WFV_OID
----------------------------------------------------------------------------------------------------
DQ_TIME
---------------------------------------------------------------------------
DQ_COMPLETE
---------------------------------------------------------------------------
WS_STATUS
----------------------------------------------------------------------------------------------------------------------------------
   WS_TIME
----------
x
oid
wfv
10-SEP-15 11.32.10.158000 AM
10-SEP-15 11.32.10.158000 AM
success



SQL>



「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论