问题描述
ORA-25247出队时获取错误消息: %s不是指定邮件的收件人
已将消息入队,并且成功。
使用PLSQL订户出队并向接收者注册plsql过程。
然后,获取错误消息ORA-25247 : %s不是指定邮件的收件人
甚至尝试将收件人添加到Enqueue过程中,但仍然出现相同的错误。
相同的代码在QA ENv中工作。
已验证所有权限。。也给出了AQ_ administROR_ROLE ,但仍然没有运气。
授权连接,资源到测试AQ ;
使用授权选项对dbms_aqadm进行授权执行,以执行对测试AQ的授权;
使用授权选项对dbms_aq进行授权执行;
将AQ_管理-ROLE授予测试-AQ ;
开始
DBMS_AQADM.GANT_SER_PRIVILEGE(
特权=>'MANAGE_Y',
被授权者=>'Test_AQ',
admin_option =>假) ;
结束;
/
开始
DBMS_AQADM.Add_SUB订阅者
(
队列名称=>'TEST_AQ.qu_datasync',
订户=> SYS.AQ$_Agent ('QSS_DATAS', NULL, NULL )
);
DBMS_AQ.REG
(
SYS.AQ$_REG_INFO_列表
(
SYS.AQ$_REG_INFO
(
'TEST_AQ.Q_DATA Sync:QSS_DATA Sync',
DBMS_AQ.NAME_A_AQ ,
'plsql://TEST_AQ.SP_SUBCUR_DATASCH?PR=0',
六角形('FF')
)
),
1
);
结束;
/
创建或替换过程测试_AQ.sp_subchr_datasync
(
上下文,
reginfo SYS.AQ$_REG_INFO ,
描述SYS.AQ$_DE描述,
有效负载原始,
有效载荷编号
)组件
r_dequeue_options DBMS_AQ.DEQUE_OPIONS_T ;
r_message_properties DBMS_AQ.MESSAGE_PROPERTY_T ;
v_message_handle原始(16) ;
o_有效载荷ty_有效载荷数据同步;
v_dq_time时间戳;
v_ws_status VARCHAR2(2000) ;
v_ws_time编号;
no_message异常;
PRAGMA异常INIT (无消息, -25228 ) ;
开始
r_dequeue_options.msgid := descr.msg_id ;
r_dequeue_options.comsumer_name := descr.comsumer_name ;
r_dequeue_options. wait:=DBMS_AQ.NO_WAIT ;
开始
DBMS_AQ.DEQUE
(
队列名称=> desc.queue_name ,
dequeue_options => r_dequeue_options ,
message_properties => r_message_properties ,
有效负载=> o_有效负载,
消息=> v_message_handle
) ;
例外
当别人那么
dbms_输出.put_line ('DEQUE失败') ;
结束;
v_dq_time :=系统时间戳;
-- dbms_lock.sleep(0.5) ;
插入到test中(o__pload.client_id、o_pload.aid、o_pload.wfv_id、o_pload.rea、o_pload.action、o_pload.角色、, v_ws_status、v_ws_time) ;
提交;
例外
当别人那么
dbms_输出.put_line ('DEQUE失败') ;
承诺;
结束;
/
显示错误
宣布
r_enqueue_options DBMS_AQ.EN queue_OPIONS_T ;
r_message_properties DBMS_AQ.MESSAGE_PROPERTY_T ;
v_message_handle原始(16) ;
o_有效载荷ty_有效载荷数据同步;
开始
o_负载:= ty_负载负载数据同步
(
NVL ( : new.client_id,:old.client_id )
, NVL ( : new.区域, : old.区域)
, NVL(:new.action,:old.action)
, NVL ( : new.aid,:old.aid )
, NVL(:new.wfv_id,:old.wfv_id)
、'~'
、'~'
,'I'
, 0
, NVL(:new.modified_by,:old.modified_by)
);
DBMS_AQ.ENQUE
(
队列名称=>'TEST_AQ.qu_datasync',
enqueue_options => r_enqueue_options ,
message_properties => r_message_properties ,
有效负载=> o_有效负载,
消息=> v_message_handle
);
如果正在更新,则
←Wikivoyage :new.modified_on:=SYSSTAMP ;
结束IF ;
例外
当别人那么
插入到......中
承诺;
结束;
/
检查的进程还:
AQ_TM_过程=1
JOB_QUEUE_PROCESS=32
OK下面是完整的脚本
===============================
创建提示测试AQ模式
创建由adpadp标识的用户
缺省表空间数据1
临时表空间
数据1的配额不受限制;
授权连接,资源到测试AQ ;
使用授权选项对dbms_aqadm进行授权执行,以执行对测试AQ的授权;
使用授权选项对dbms_aq进行授权执行;
将AQ_管理-ROLE授予测试-AQ ;
开始
DBMS_AQADM.GANT_SER_PRIVILEGE(
特权=>'MANAGE_Y',
被授权者=>'Test_AQ',
admin_option =>假) ;
结束;
/
创建表测试AQ.MSG_TAb
(
客户端ID VARCHAR2(50)
,避免VARCHAR2(100)
, wfv_id VARCHAR2(100)
, dq_time时间戳默认SYSTAMP
, dq_完成时间戳默认SYSTAMP
, ws_status VARCHAR2(2000)
, ws_time编号
)
/
创建或替换类型:测试AQ.ty_Payload_datasync作为对象
(
客户端ID VARCHAR2(50)
,区域VARCHAR2(50)
,操作VARCHAR2(50)
,避免VARCHAR2(100)
, wfv_id VARCHAR2(50)
,角色VARCHAR2(20)
,滤芯类型VARCHAR2(10)
、f_dml VARCHAR2(10)
, no_of_attries编号
,修改_ by VARCHAR2(50)
) ;
/
创建表测试AQ.data_sync
(
客户端ID VARCHAR2(50) DEFAULT SYS_CONTEXT('ctx_ng_vpd','ctx_client_id')
,区域VARCHAR2(50)
,操作VARCHAR2(50)
,避免VARCHAR2(100)
, wfv_id VARCHAR2(50)
,角色VARCHAR2(20) DEFAULT SYS_CONTEXT('ctx_ng_vpd','ctx_role_name')
,滤芯类型VARCHAR2(10)
、f_dml VARCHAR2(10)
, no_of_attries编号 DEFAULT 0
,创建时间TAMP默认SYSTAMP
,修改了时间戳上的默认SYSTAMP
,修改_ by VARCHAR2(50) DEFAULT SYS_CONTEXT('ctx_ng_vpd', 'ctx_mod_usr')
)
/
快速眼动
快速眼动 Create a queue table
快速眼动
开始
DBMS_AQADM.CREATE_ queUE_ TABLE
(
query_table =>'TEST_AQ.qt_datasync'
,quesquery_Payload_type =>'TEST_AQ.ty_Payload_datasync'
,多个消费者=>正确
) ;
结束;
/
快速眼动
快速眼动 Create a queue using the table.
快速眼动
开始
DBMS_AQADM.CREATE_ queUE
(
队列名称=>'TEST_AQ.qu_datasync'
,query_table =>'TEST_AQ.qt_datasync'
,最大重试次数=> 4
) ;
DBMS_AQADM.开始_ queUE
(
队列名称=>'TEST_AQ.qu_datasync'
) ;
结束;
/
快速眼动
快速眼动 Subscriber code
快速眼动
创建或替换过程测试_AQ.sp_subchr_datasync
(
上下文,
reginfo SYS.AQ$_REG_INFO ,
描述SYS.AQ$_DE描述,
有效负载原始,
有效载荷编号
)组件
r_dequeue_options DBMS_AQ.DEQUE_OPIONS_T ;
r_message_properties DBMS_AQ.MESSAGE_PROPERTY_T ;
v_message_handle原始(16) ;
o_有效载荷ty_有效载荷数据同步;
v_dq_time时间戳;
v_ws_status VARCHAR2(2000) ;
v_ws_time编号;
no_message异常;
PRAGMA异常INIT (无消息, -25228 ) ;
开始
r_dequeue_options.msgid := descr.msg_id ;
r_dequeue_options.comsumer_name := descr.comsumer_name ;
r_dequeue_options. wait:=DBMS_AQ.NO_WAIT ;
开始
DBMS_AQ.DEQUE
(
队列名称=> desc.queue_name ,
dequeue_options => r_dequeue_options ,
message_properties => r_message_properties ,
有效负载=> o_有效负载,
消息=> v_message_handle
) ;
例外
当别人那么
dbms_输出.put_line ('DEQUE失败') ;
结束;
v_dq_time :=系统时间戳;
-- dbms_lock.sleep(0.5) ;
insert into msg_tab (o_payload.client_id, o_payload.aoid,o_payload.wfv_oid,systimpestamp,systimestamp, v_ws_status, v_ws_time) ;
提交;
例外
当别人那么
dbms_输出.put_line ('DEQUE失败') ;
承诺;
结束;
/
显示错误
快速眼动
快速眼动 Subscribe the code to automatically process the message.
快速眼动
开始
DBMS_AQADM.Add_SUB订阅者
(
队列名称=>'TEST_AQ.qu_datasync',
订户=> SYS.AQ$_Agent ('QSS_DATAS', NULL, NULL )
);
DBMS_AQ.REG
(
SYS.AQ$_REG_INFO_列表
(
SYS.AQ$_REG_INFO
(
'TEST_AQ.Q_DATA Sync:QSS_DATA Sync',
DBMS_AQ.NAME_A_AQ ,
'plsql://TEST_AQ.SP_SUBCUR_DATASCH?PR=0',
六角形('FF')
)
),
1
);
结束;
/
创建或替换触发器测试AQ.tr_bIU_data_sync
在插入或更新测试AQ.data_sync之前
将新引用为新
对于每行
宣布
r_enqueue_options DBMS_AQ.EN queue_OPIONS_T ;
r_message_properties DBMS_AQ.MESSAGE_PROPERTY_T ;
v_message_handle原始(16) ;
o_有效载荷ty_有效载荷数据同步;
开始
o_负载:= ty_负载负载数据同步
(
NVL ( : new.client_id,:old.client_id )
, NVL ( : new.区域, : old.区域)
, NVL(:new.action,:old.action)
, NVL ( : new.aid,:old.aid )
, NVL(:new.wfv_id,:old.wfv_id)
、'~'
、'~'
,'I'
, 0
, NVL(:new.modified_by,:old.modified_by)
);
DBMS_AQ.ENQUE
(
队列名称=>'TEST_AQ.qu_datasync',
enqueue_options => r_enqueue_options ,
message_properties => r_message_properties ,
有效负载=> o_有效负载,
消息=> v_message_handle
);
如果正在更新,则
←Wikivoyage :new.modified_on:=SYSSTAMP ;
结束IF ;
例外
当别人那么
insert into msg_tab (o_payload.client_id, o_payload.aoid,o_payload.wfv_oid,systimpestamp,systimestamp, v_ws_status, v_ws_time) ;
承诺;
结束;
/
已将消息入队,并且成功。
使用PLSQL订户出队并向接收者注册plsql过程。
然后,获取错误消息ORA-25247 : %s不是指定邮件的收件人
甚至尝试将收件人添加到Enqueue过程中,但仍然出现相同的错误。
相同的代码在QA ENv中工作。
已验证所有权限。。也给出了AQ_ administROR_ROLE ,但仍然没有运气。
授权连接,资源到测试AQ ;
使用授权选项对dbms_aqadm进行授权执行,以执行对测试AQ的授权;
使用授权选项对dbms_aq进行授权执行;
将AQ_管理-ROLE授予测试-AQ ;
开始
DBMS_AQADM.GANT_SER_PRIVILEGE(
特权=>'MANAGE_Y',
被授权者=>'Test_AQ',
admin_option =>假) ;
结束;
/
开始
DBMS_AQADM.Add_SUB订阅者
(
队列名称=>'TEST_AQ.qu_datasync',
订户=> SYS.AQ$_Agent ('QSS_DATAS', NULL, NULL )
);
DBMS_AQ.REG
(
SYS.AQ$_REG_INFO_列表
(
SYS.AQ$_REG_INFO
(
'TEST_AQ.Q_DATA Sync:QSS_DATA Sync',
DBMS_AQ.NAME_A_AQ ,
'plsql://TEST_AQ.SP_SUBCUR_DATASCH?PR=0',
六角形('FF')
)
),
1
);
结束;
/
创建或替换过程测试_AQ.sp_subchr_datasync
(
上下文,
reginfo SYS.AQ$_REG_INFO ,
描述SYS.AQ$_DE描述,
有效负载原始,
有效载荷编号
)组件
r_dequeue_options DBMS_AQ.DEQUE_OPIONS_T ;
r_message_properties DBMS_AQ.MESSAGE_PROPERTY_T ;
v_message_handle原始(16) ;
o_有效载荷ty_有效载荷数据同步;
v_dq_time时间戳;
v_ws_status VARCHAR2(2000) ;
v_ws_time编号;
no_message异常;
PRAGMA异常INIT (无消息, -25228 ) ;
开始
r_dequeue_options.msgid := descr.msg_id ;
r_dequeue_options.comsumer_name := descr.comsumer_name ;
r_dequeue_options. wait:=DBMS_AQ.NO_WAIT ;
开始
DBMS_AQ.DEQUE
(
队列名称=> desc.queue_name ,
dequeue_options => r_dequeue_options ,
message_properties => r_message_properties ,
有效负载=> o_有效负载,
消息=> v_message_handle
) ;
例外
当别人那么
dbms_输出.put_line ('DEQUE失败') ;
结束;
v_dq_time :=系统时间戳;
-- dbms_lock.sleep(0.5) ;
插入到test中(o__pload.client_id、o_pload.aid、o_pload.wfv_id、o_pload.rea、o_pload.action、o_pload.角色、, v_ws_status、v_ws_time) ;
提交;
例外
当别人那么
dbms_输出.put_line ('DEQUE失败') ;
承诺;
结束;
/
显示错误
宣布
r_enqueue_options DBMS_AQ.EN queue_OPIONS_T ;
r_message_properties DBMS_AQ.MESSAGE_PROPERTY_T ;
v_message_handle原始(16) ;
o_有效载荷ty_有效载荷数据同步;
开始
o_负载:= ty_负载负载数据同步
(
NVL ( : new.client_id,:old.client_id )
, NVL ( : new.区域, : old.区域)
, NVL(:new.action,:old.action)
, NVL ( : new.aid,:old.aid )
, NVL(:new.wfv_id,:old.wfv_id)
、'~'
、'~'
,'I'
, 0
, NVL(:new.modified_by,:old.modified_by)
);
DBMS_AQ.ENQUE
(
队列名称=>'TEST_AQ.qu_datasync',
enqueue_options => r_enqueue_options ,
message_properties => r_message_properties ,
有效负载=> o_有效负载,
消息=> v_message_handle
);
如果正在更新,则
←Wikivoyage :new.modified_on:=SYSSTAMP ;
结束IF ;
例外
当别人那么
插入到......中
承诺;
结束;
/
检查的进程还:
AQ_TM_过程=1
JOB_QUEUE_PROCESS=32
OK下面是完整的脚本
===============================
创建提示测试AQ模式
创建由adpadp标识的用户
缺省表空间数据1
临时表空间
数据1的配额不受限制;
授权连接,资源到测试AQ ;
使用授权选项对dbms_aqadm进行授权执行,以执行对测试AQ的授权;
使用授权选项对dbms_aq进行授权执行;
将AQ_管理-ROLE授予测试-AQ ;
开始
DBMS_AQADM.GANT_SER_PRIVILEGE(
特权=>'MANAGE_Y',
被授权者=>'Test_AQ',
admin_option =>假) ;
结束;
/
创建表测试AQ.MSG_TAb
(
客户端ID VARCHAR2(50)
,避免VARCHAR2(100)
, wfv_id VARCHAR2(100)
, dq_time时间戳默认SYSTAMP
, dq_完成时间戳默认SYSTAMP
, ws_status VARCHAR2(2000)
, ws_time编号
)
/
创建或替换类型:测试AQ.ty_Payload_datasync作为对象
(
客户端ID VARCHAR2(50)
,区域VARCHAR2(50)
,操作VARCHAR2(50)
,避免VARCHAR2(100)
, wfv_id VARCHAR2(50)
,角色VARCHAR2(20)
,滤芯类型VARCHAR2(10)
、f_dml VARCHAR2(10)
, no_of_attries编号
,修改_ by VARCHAR2(50)
) ;
/
创建表测试AQ.data_sync
(
客户端ID VARCHAR2(50) DEFAULT SYS_CONTEXT('ctx_ng_vpd','ctx_client_id')
,区域VARCHAR2(50)
,操作VARCHAR2(50)
,避免VARCHAR2(100)
, wfv_id VARCHAR2(50)
,角色VARCHAR2(20) DEFAULT SYS_CONTEXT('ctx_ng_vpd','ctx_role_name')
,滤芯类型VARCHAR2(10)
、f_dml VARCHAR2(10)
, no_of_attries编号 DEFAULT 0
,创建时间TAMP默认SYSTAMP
,修改了时间戳上的默认SYSTAMP
,修改_ by VARCHAR2(50) DEFAULT SYS_CONTEXT('ctx_ng_vpd', 'ctx_mod_usr')
)
/
快速眼动
快速眼动 Create a queue table
快速眼动
开始
DBMS_AQADM.CREATE_ queUE_ TABLE
(
query_table =>'TEST_AQ.qt_datasync'
,quesquery_Payload_type =>'TEST_AQ.ty_Payload_datasync'
,多个消费者=>正确
) ;
结束;
/
快速眼动
快速眼动 Create a queue using the table.
快速眼动
开始
DBMS_AQADM.CREATE_ queUE
(
队列名称=>'TEST_AQ.qu_datasync'
,query_table =>'TEST_AQ.qt_datasync'
,最大重试次数=> 4
) ;
DBMS_AQADM.开始_ queUE
(
队列名称=>'TEST_AQ.qu_datasync'
) ;
结束;
/
快速眼动
快速眼动 Subscriber code
快速眼动
创建或替换过程测试_AQ.sp_subchr_datasync
(
上下文,
reginfo SYS.AQ$_REG_INFO ,
描述SYS.AQ$_DE描述,
有效负载原始,
有效载荷编号
)组件
r_dequeue_options DBMS_AQ.DEQUE_OPIONS_T ;
r_message_properties DBMS_AQ.MESSAGE_PROPERTY_T ;
v_message_handle原始(16) ;
o_有效载荷ty_有效载荷数据同步;
v_dq_time时间戳;
v_ws_status VARCHAR2(2000) ;
v_ws_time编号;
no_message异常;
PRAGMA异常INIT (无消息, -25228 ) ;
开始
r_dequeue_options.msgid := descr.msg_id ;
r_dequeue_options.comsumer_name := descr.comsumer_name ;
r_dequeue_options. wait:=DBMS_AQ.NO_WAIT ;
开始
DBMS_AQ.DEQUE
(
队列名称=> desc.queue_name ,
dequeue_options => r_dequeue_options ,
message_properties => r_message_properties ,
有效负载=> o_有效负载,
消息=> v_message_handle
) ;
例外
当别人那么
dbms_输出.put_line ('DEQUE失败') ;
结束;
v_dq_time :=系统时间戳;
-- dbms_lock.sleep(0.5) ;
insert into msg_tab (o_payload.client_id, o_payload.aoid,o_payload.wfv_oid,systimpestamp,systimestamp, v_ws_status, v_ws_time) ;
提交;
例外
当别人那么
dbms_输出.put_line ('DEQUE失败') ;
承诺;
结束;
/
显示错误
快速眼动
快速眼动 Subscribe the code to automatically process the message.
快速眼动
开始
DBMS_AQADM.Add_SUB订阅者
(
队列名称=>'TEST_AQ.qu_datasync',
订户=> SYS.AQ$_Agent ('QSS_DATAS', NULL, NULL )
);
DBMS_AQ.REG
(
SYS.AQ$_REG_INFO_列表
(
SYS.AQ$_REG_INFO
(
'TEST_AQ.Q_DATA Sync:QSS_DATA Sync',
DBMS_AQ.NAME_A_AQ ,
'plsql://TEST_AQ.SP_SUBCUR_DATASCH?PR=0',
六角形('FF')
)
),
1
);
结束;
/
创建或替换触发器测试AQ.tr_bIU_data_sync
在插入或更新测试AQ.data_sync之前
将新引用为新
对于每行
宣布
r_enqueue_options DBMS_AQ.EN queue_OPIONS_T ;
r_message_properties DBMS_AQ.MESSAGE_PROPERTY_T ;
v_message_handle原始(16) ;
o_有效载荷ty_有效载荷数据同步;
开始
o_负载:= ty_负载负载数据同步
(
NVL ( : new.client_id,:old.client_id )
, NVL ( : new.区域, : old.区域)
, NVL(:new.action,:old.action)
, NVL ( : new.aid,:old.aid )
, NVL(:new.wfv_id,:old.wfv_id)
、'~'
、'~'
,'I'
, 0
, NVL(:new.modified_by,:old.modified_by)
);
DBMS_AQ.ENQUE
(
队列名称=>'TEST_AQ.qu_datasync',
enqueue_options => r_enqueue_options ,
message_properties => r_message_properties ,
有效负载=> o_有效负载,
消息=> v_message_handle
);
如果正在更新,则
←Wikivoyage :new.modified_on:=SYSSTAMP ;
结束IF ;
例外
当别人那么
insert into msg_tab (o_payload.client_id, o_payload.aoid,o_payload.wfv_oid,systimpestamp,systimestamp, v_ws_status, v_ws_time) ;
承诺;
结束;
/
专家解答
谢谢你的问题,
您提供的代码对我们有很大帮助。但是,它缺少一个关键组件-创建队列本身的代码!
请用您用来构建队列和关联表的脚本更新问题?
增编:
你好,阿曼迪普,
我已经运行了你的脚本(仅对MSG_TAB插入进行了一些错误更正,在11.2.0.4和12.1.0.2上运行一切正常。).每种产品的输出如下。很抱歉,但我认为您可能需要使用Oracle支持,因为我认为您的代码很好。
12c
=====================
11.2
============================
您提供的代码对我们有很大帮助。但是,它缺少一个关键组件-创建队列本身的代码!
请用您用来构建队列和关联表的脚本更新问题?
增编:
你好,阿曼迪普,
我已经运行了你的脚本(仅对MSG_TAB插入进行了一些错误更正,在11.2.0.4和12.1.0.2上运行一切正常。).每种产品的输出如下。很抱歉,但我认为您可能需要使用Oracle支持,因为我认为您的代码很好。
12c
=====================
SQL> select * from v$version;
BANNER CON_ID
-------------------------------------------------------------------------------- ----------
Oracle Database 12c Enterprise Edition Release 12.1.0.2.0 - 64bit Production 0
PL/SQL Release 12.1.0.2.0 - Production 0
CORE 12.1.0.2.0 Production 0
TNS for 64-bit Windows: Version 12.1.0.2.0 - Production 0
NLSRTL Version 12.1.0.2.0 - Production 0
SQL> drop user TEST_AQ cascade;
User dropped.
SQL>
SQL> PROMPT TEST_AQ SCHEMA CREATION
TEST_AQ SCHEMA CREATION
SQL>
SQL>
SQL> create user TEST_AQ identified by adpadp
2 default tablespace users
3 temporary tablespace TEMP
4 quota unlimited on users;
User created.
SQL>
SQL> GRANT CONNECT, RESOURCE TO TEST_AQ;
Grant succeeded.
SQL>
SQL> GRANT EXECUTE ON dbms_aqadm TO TEST_AQ WITH GRANT OPTION ;
Grant succeeded.
SQL> GRANT EXECUTE ON dbms_aq TO TEST_AQ WITH GRANT OPTION ;
Grant succeeded.
SQL>
SQL> GRANT AQ_ADMINISTRATOR_ROLE TO TEST_AQ;
Grant succeeded.
SQL>
SQL> BEGIN
2 DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
3 privilege => 'MANAGE_ANY',
4 grantee => 'TEST_AQ',
5 admin_option => FALSE);
6 END;
7 /
PL/SQL procedure successfully completed.
SQL>
SQL>
SQL> CREATE TABLE TEST_AQ.MSG_TAb
2 (
3 client_id VARCHAR2(50)
4 , aoid VARCHAR2(100)
5 , wfv_oid VARCHAR2(100)
6 , dq_time TIMESTAMP DEFAULT SYSTIMESTAMP
7 , dq_complete TIMESTAMP DEFAULT SYSTIMESTAMP
8 , ws_status VARCHAR2(2000)
9 , ws_time NUMBER
10 )
11 /
Table created.
SQL>
SQL>
SQL> CREATE OR REPLACE TYPE TEST_AQ.ty_payload_datasync AS OBJECT
2 (
3 client_id VARCHAR2(50)
4 , area VARCHAR2(50)
5 , action VARCHAR2(50)
6 , aoid VARCHAR2(100)
7 , wfv_oid VARCHAR2(50)
8 , role VARCHAR2(20)
9 , filtype VARCHAR2(10)
10 , f_dml VARCHAR2(10)
11 , no_of_attempts NUMBER
12 , modified_by VARCHAR2(50)
13 ) ;
14 /
Type created.
SQL>
SQL> CREATE TABLE TEST_AQ.data_sync
2 (
3 client_id VARCHAR2(50) DEFAULT SYS_CONTEXT('ctx_ng_vpd','ctx_client_id')
4 , area VARCHAR2(50)
5 , action VARCHAR2(50)
6 , aoid VARCHAR2(100)
7 , wfv_oid VARCHAR2(50)
8 , role VARCHAR2(20) DEFAULT SYS_CONTEXT('ctx_ng_vpd','ctx_role_name')
9 , filtype VARCHAR2(10)
10 , f_dml VARCHAR2(10)
11 , no_of_attempts NUMBER DEFAULT 0
12 , created_on TIMESTAMP DEFAULT SYSTIMESTAMP
13 , modified_on TIMESTAMP DEFAULT SYSTIMESTAMP
14 , modified_by VARCHAR2(50) DEFAULT SYS_CONTEXT('ctx_ng_vpd', 'ctx_mod_usr')
15 )
16 /
Table created.
SQL>
SQL>
SQL>
SQL> rem
SQL> rem Create a queue table
SQL> rem
SQL>
SQL> BEGIN
2 DBMS_AQADM.CREATE_QUEUE_TABLE
3 (
4 queue_table => 'TEST_AQ.qt_datasync'
5 , queue_payload_type => 'TEST_AQ.ty_payload_datasync'
6 , multiple_consumers => TRUE
7 ) ;
8 END ;
9 /
PL/SQL procedure successfully completed.
SQL>
SQL> rem
SQL> rem Create a queue using the table.
SQL> rem
SQL>
SQL> BEGIN
2 DBMS_AQADM.CREATE_QUEUE
3 (
4 queue_name => 'TEST_AQ.qu_datasync'
5 , queue_table => 'TEST_AQ.qt_datasync'
6 , max_retries => 4
7 ) ;
8
9 DBMS_AQADM.START_QUEUE
10 (
11 queue_name => 'TEST_AQ.qu_datasync'
12 ) ;
13
14 END ;
15 /
PL/SQL procedure successfully completed.
SQL>
SQL>
SQL>
SQL> rem
SQL> rem Subscriber code
SQL> rem
SQL>
SQL> CREATE OR REPLACE PROCEDURE TEST_AQ.sp_subscriber_datasync
2 (
3 context RAW,
4 reginfo SYS.AQ$_REG_INFO,
5 descr SYS.AQ$_DESCRIPTOR,
6 payload RAW,
7 payloadl NUMBER
8 ) AS
9 r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
10 r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
11 v_message_handle RAW(16);
12 o_payload ty_payload_datasync ;
13 v_dq_time TIMESTAMP ;
14 v_ws_status VARCHAR2(2000) ;
15 v_ws_time NUMBER ;
16 no_messages exception;
17 PRAGMA EXCEPTION_INIT (no_messages, -25228);
18 BEGIN
19 r_dequeue_options.msgid := descr.msg_id;
20 r_dequeue_options.consumer_name := descr.consumer_name;
21 r_dequeue_options.wait:=DBMS_AQ.NO_WAIT;
22
23
24 BEGIN
25 DBMS_AQ.DEQUEUE
26 (
27 queue_name => descr.queue_name,
28 dequeue_options => r_dequeue_options,
29 message_properties => r_message_properties,
30 payload => o_payload,
31 msgid => v_message_handle
32 ) ;
33 EXCEPTIOn
34 WHEN OTHERS THEn
35 dbms_output.put_line('DEQUEUE FAILEd');
36 insert into msg_tab values ('x', 'x','x',systimestamp,systimestamp, 'deq err', 'deq err') ;
37 commit;
38 END;
39
40 v_dq_time := SYSTIMESTAMP ;
41 -- dbms_lock.sleep(0.5) ;
42 insert into msg_tab values (o_payload.client_id, o_payload.aoid,o_payload.wfv_oid,systimestamp,systimestamp, 'success', v_ws_time) ;
43
44
45
46 COMMIT;
47 EXCEPTION
48 WHEN others THEN
49 dbms_output.put_line('DEQUEUE FAILEd');
50 commit;
51 END;
52 /
Procedure created.
SQL> SHOW ERRORS
No errors.
SQL>
SQL> rem
SQL> rem Subscribe the code to automatically process the message.
SQL> rem
SQL>
SQL> BEGIN
2 DBMS_AQADM.ADD_SUBSCRIBER
3 (
4 queue_name => 'TEST_AQ.qu_datasync',
5 subscriber => SYS.AQ$_AGENT('QUSS_DATASYNC', NULL, NULL)
6 );
7
8 DBMS_AQ.REGISTER
9 (
10 SYS.AQ$_REG_INFO_LIST
11 (
12 SYS.AQ$_REG_INFO
13 (
14 'TEST_AQ.QU_DATASYNC:QUSS_DATASYNC',
15 DBMS_AQ.NAMESPACE_AQ,
16 'plsql://TEST_AQ.SP_SUBSCRIBER_DATASYNC?PR=0',
17 HEXTORAW('FF')
18 )
19 ),
20 1
21 );
22 END;
23 /
PL/SQL procedure successfully completed.
SQL>
SQL> CREATE OR REPLACE TRIGGER TEST_AQ.tr_biu_data_sync
2 BEFORE INSERT OR UPDATE ON TEST_AQ.data_sync
3 referencing new as new
4 FOR EACH ROW
5 DECLARE
6 r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
7 r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
8 v_message_handle RAW(16);
9 o_payload ty_payload_datasync;
10
11 BEGIN
12
13 o_payload := ty_payload_datasync
14 (
15 NVL(:new.client_id, :old.client_id)
16 , NVL(:new.area, :old.area)
17 , NVL(:new.action, :old.action)
18 , NVL(:new.aoid, :old.aoid)
19 , NVL(:new.wfv_oid, :old.wfv_oid)
20 , '~'
21 , '~'
22 , 'I'
23 , 0
24 , NVL(:new.modified_by, :old.modified_by)
25 );
26
27 DBMS_AQ.ENQUEUE
28 (
29 queue_name => 'TEST_AQ.qu_datasync',
30 enqueue_options => r_enqueue_options,
31 message_properties => r_message_properties,
32 payload => o_payload,
33 msgid => v_message_handle
34 );
35 IF UPDATING THEN
36 :new.modified_on:=SYSTIMESTAMP;
37 END IF;
38 EXCEPTION
39 WHEN OTHERS THEN
40 insert into msg_tab values (o_payload.client_id, o_payload.aoid,o_payload.wfv_oid,systimestamp,systimestamp, 'trig fail', null) ;
41
42
43
44 commit;
45 END;
46 /
Trigger created.
SQL>
SQL> conn TEST_AQ/adpadp
Connected.
SQL>
SQL> insert into TEST_AQ.data_sync values ('x','a','a','oid','wfv','ro','fi','dml',1,systimestamp,systimestamp,'me');
1 row created.
SQL>
SQL> select count(*) from test_aq.QT_DATASYNC;
COUNT(*)
----------
1
SQL>
SQL> select * from TEST_AQ.msg_tab;
no rows selected
SQL>
SQL> commit;
Commit complete.
(wait for few seconds)
SQL>
SQL> select * from TEST_AQ.msg_tab;
CLIENT_ID
--------------------------------------------------
AOID
----------------------------------------------------------------------------------------------------
WFV_OID
----------------------------------------------------------------------------------------------------
DQ_TIME
---------------------------------------------------------------------------
DQ_COMPLETE
---------------------------------------------------------------------------
WS_STATUS
----------------------------------------------------------------------------------------------------------------------------------
WS_TIME
----------
x
oid
wfv
10-SEP-15 11.29.34.731000 AM
10-SEP-15 11.29.34.731000 AM
success
11.2
============================
SQL>
SQL> select * from v$version;
BANNER
--------------------------------------------------------------------------------
Oracle Database 11g Enterprise Edition Release 11.2.0.4.0 - 64bit Production
PL/SQL Release 11.2.0.4.0 - Production
CORE 11.2.0.4.0 Production
TNS for 64-bit Windows: Version 11.2.0.4.0 - Production
NLSRTL Version 11.2.0.4.0 - Production
SQL>
SQL> drop user TEST_AQ cascade;
drop user TEST_AQ cascade
*
ERROR at line 1:
ORA-01918: user 'TEST_AQ' does not exist
SQL>
SQL> PROMPT TEST_AQ SCHEMA CREATION
TEST_AQ SCHEMA CREATION
SQL>
SQL>
SQL> create user TEST_AQ identified by adpadp
2 default tablespace users
3 temporary tablespace TEMP
4 quota unlimited on users;
User created.
SQL>
SQL> GRANT CONNECT, RESOURCE TO TEST_AQ;
Grant succeeded.
SQL>
SQL> GRANT EXECUTE ON dbms_aqadm TO TEST_AQ WITH GRANT OPTION ;
Grant succeeded.
SQL> GRANT EXECUTE ON dbms_aq TO TEST_AQ WITH GRANT OPTION ;
Grant succeeded.
SQL>
SQL> GRANT AQ_ADMINISTRATOR_ROLE TO TEST_AQ;
Grant succeeded.
SQL>
SQL> BEGIN
2 DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE(
3 privilege => 'MANAGE_ANY',
4 grantee => 'TEST_AQ',
5 admin_option => FALSE);
6 END;
7 /
PL/SQL procedure successfully completed.
SQL>
SQL>
SQL> CREATE TABLE TEST_AQ.MSG_TAb
2 (
3 client_id VARCHAR2(50)
4 , aoid VARCHAR2(100)
5 , wfv_oid VARCHAR2(100)
6 , dq_time TIMESTAMP DEFAULT SYSTIMESTAMP
7 , dq_complete TIMESTAMP DEFAULT SYSTIMESTAMP
8 , ws_status VARCHAR2(2000)
9 , ws_time NUMBER
10 )
11 /
Table created.
SQL>
SQL>
SQL> CREATE OR REPLACE TYPE TEST_AQ.ty_payload_datasync AS OBJECT
2 (
3 client_id VARCHAR2(50)
4 , area VARCHAR2(50)
5 , action VARCHAR2(50)
6 , aoid VARCHAR2(100)
7 , wfv_oid VARCHAR2(50)
8 , role VARCHAR2(20)
9 , filtype VARCHAR2(10)
10 , f_dml VARCHAR2(10)
11 , no_of_attempts NUMBER
12 , modified_by VARCHAR2(50)
13 ) ;
14 /
Type created.
SQL>
SQL> CREATE TABLE TEST_AQ.data_sync
2 (
3 client_id VARCHAR2(50) DEFAULT SYS_CONTEXT('ctx_ng_vpd','ctx_client_id')
4 , area VARCHAR2(50)
5 , action VARCHAR2(50)
6 , aoid VARCHAR2(100)
7 , wfv_oid VARCHAR2(50)
8 , role VARCHAR2(20) DEFAULT SYS_CONTEXT('ctx_ng_vpd','ctx_role_name')
9 , filtype VARCHAR2(10)
10 , f_dml VARCHAR2(10)
11 , no_of_attempts NUMBER DEFAULT 0
12 , created_on TIMESTAMP DEFAULT SYSTIMESTAMP
13 , modified_on TIMESTAMP DEFAULT SYSTIMESTAMP
14 , modified_by VARCHAR2(50) DEFAULT SYS_CONTEXT('ctx_ng_vpd', 'ctx_mod_usr')
15 )
16 /
Table created.
SQL>
SQL>
SQL>
SQL> rem
SQL> rem Create a queue table
SQL> rem
SQL>
SQL> BEGIN
2 DBMS_AQADM.CREATE_QUEUE_TABLE
3 (
4 queue_table => 'TEST_AQ.qt_datasync'
5 , queue_payload_type => 'TEST_AQ.ty_payload_datasync'
6 , multiple_consumers => TRUE
7 ) ;
8 END ;
9 /
PL/SQL procedure successfully completed.
SQL>
SQL> rem
SQL> rem Create a queue using the table.
SQL> rem
SQL>
SQL> BEGIN
2 DBMS_AQADM.CREATE_QUEUE
3 (
4 queue_name => 'TEST_AQ.qu_datasync'
5 , queue_table => 'TEST_AQ.qt_datasync'
6 , max_retries => 4
7 ) ;
8
9 DBMS_AQADM.START_QUEUE
10 (
11 queue_name => 'TEST_AQ.qu_datasync'
12 ) ;
13
14 END ;
15 /
PL/SQL procedure successfully completed.
SQL>
SQL>
SQL>
SQL> rem
SQL> rem Subscriber code
SQL> rem
SQL>
SQL> CREATE OR REPLACE PROCEDURE TEST_AQ.sp_subscriber_datasync
2 (
3 context RAW,
4 reginfo SYS.AQ$_REG_INFO,
5 descr SYS.AQ$_DESCRIPTOR,
6 payload RAW,
7 payloadl NUMBER
8 ) AS
9 r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
10 r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
11 v_message_handle RAW(16);
12 o_payload ty_payload_datasync ;
13 v_dq_time TIMESTAMP ;
14 v_ws_status VARCHAR2(2000) ;
15 v_ws_time NUMBER ;
16 no_messages exception;
17 PRAGMA EXCEPTION_INIT (no_messages, -25228);
18 BEGIN
19 r_dequeue_options.msgid := descr.msg_id;
20 r_dequeue_options.consumer_name := descr.consumer_name;
21 r_dequeue_options.wait:=DBMS_AQ.NO_WAIT;
22
23
24 BEGIN
25 DBMS_AQ.DEQUEUE
26 (
27 queue_name => descr.queue_name,
28 dequeue_options => r_dequeue_options,
29 message_properties => r_message_properties,
30 payload => o_payload,
31 msgid => v_message_handle
32 ) ;
33 EXCEPTIOn
34 WHEN OTHERS THEn
35 dbms_output.put_line('DEQUEUE FAILEd');
36 insert into msg_tab values ('x', 'x','x',systimestamp,systimestamp, 'deq err', 'deq err') ;
37 commit;
38 END;
39
40 v_dq_time := SYSTIMESTAMP ;
41 -- dbms_lock.sleep(0.5) ;
42 insert into msg_tab values (o_payload.client_id, o_payload.aoid,o_payload.wfv_oid,systimestamp,systimestamp, 'success', v_ws_time) ;
43
44
45
46 COMMIT;
47 EXCEPTION
48 WHEN others THEN
49 dbms_output.put_line('DEQUEUE FAILEd');
50 commit;
51 END;
52 /
Procedure created.
SQL> SHOW ERRORS
No errors.
SQL>
SQL> rem
SQL> rem Subscribe the code to automatically process the message.
SQL> rem
SQL>
SQL> BEGIN
2 DBMS_AQADM.ADD_SUBSCRIBER
3 (
4 queue_name => 'TEST_AQ.qu_datasync',
5 subscriber => SYS.AQ$_AGENT('QUSS_DATASYNC', NULL, NULL)
6 );
7
8 DBMS_AQ.REGISTER
9 (
10 SYS.AQ$_REG_INFO_LIST
11 (
12 SYS.AQ$_REG_INFO
13 (
14 'TEST_AQ.QU_DATASYNC:QUSS_DATASYNC',
15 DBMS_AQ.NAMESPACE_AQ,
16 'plsql://TEST_AQ.SP_SUBSCRIBER_DATASYNC?PR=0',
17 HEXTORAW('FF')
18 )
19 ),
20 1
21 );
22 END;
23 /
PL/SQL procedure successfully completed.
SQL>
SQL> CREATE OR REPLACE TRIGGER TEST_AQ.tr_biu_data_sync
2 BEFORE INSERT OR UPDATE ON TEST_AQ.data_sync
3 referencing new as new
4 FOR EACH ROW
5 DECLARE
6 r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
7 r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
8 v_message_handle RAW(16);
9 o_payload ty_payload_datasync;
10
11 BEGIN
12
13 o_payload := ty_payload_datasync
14 (
15 NVL(:new.client_id, :old.client_id)
16 , NVL(:new.area, :old.area)
17 , NVL(:new.action, :old.action)
18 , NVL(:new.aoid, :old.aoid)
19 , NVL(:new.wfv_oid, :old.wfv_oid)
20 , '~'
21 , '~'
22 , 'I'
23 , 0
24 , NVL(:new.modified_by, :old.modified_by)
25 );
26
27 DBMS_AQ.ENQUEUE
28 (
29 queue_name => 'TEST_AQ.qu_datasync',
30 enqueue_options => r_enqueue_options,
31 message_properties => r_message_properties,
32 payload => o_payload,
33 msgid => v_message_handle
34 );
35 IF UPDATING THEN
36 :new.modified_on:=SYSTIMESTAMP;
37 END IF;
38 EXCEPTION
39 WHEN OTHERS THEN
40 insert into msg_tab values (o_payload.client_id, o_payload.aoid,o_payload.wfv_oid,systimestamp,systimestamp, 'trig fail', null) ;
41
42
43
44 commit;
45 END;
46 /
Trigger created.
SQL>
SQL> conn TEST_AQ/adpadp
Connected.
SQL>
SQL> insert into TEST_AQ.data_sync values ('x','a','a','oid','wfv','ro','fi','dml',1,systimestamp,systimestamp,'me');
1 row created.
SQL>
SQL> select count(*) from test_aq.QT_DATASYNC;
COUNT(*)
----------
1
SQL>
SQL> select * from TEST_AQ.msg_tab;
no rows selected
SQL>
SQL> commit;
Commit complete.
SQL> select * from TEST_AQ.msg_tab;
CLIENT_ID
--------------------------------------------------
AOID
----------------------------------------------------------------------------------------------------
WFV_OID
----------------------------------------------------------------------------------------------------
DQ_TIME
---------------------------------------------------------------------------
DQ_COMPLETE
---------------------------------------------------------------------------
WS_STATUS
----------------------------------------------------------------------------------------------------------------------------------
WS_TIME
----------
x
oid
wfv
10-SEP-15 11.32.10.158000 AM
10-SEP-15 11.32.10.158000 AM
success
SQL>
「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。




