Oracle aq 传播,ORA-25215:user_data 类型和队列类型不匹配

Oracle aq propagation, ORA-25215: user_data type and queue type do not match

我正在处理从一个数据库到另一个数据库的 AQ 传播,但是当我安排传播并将第一条消息排队到 LOCAL AQ table 时,我在 DBA_QUEUE_SCHEDULES.LAST_ERROR_MSG、"ORA-25215: user_data type and queue type do not match"。请注意,AQ tables 中使用的两种对象类型是相同的,出于测试目的,我使用的是这个:

create or replace type LOCAL_OBJ_MSG as object(
    test varchar2(4000))
/

和两个 AQ table 也是相同的,我使用相同的脚本创建了它们,只是更改了名称,一个是本地的,另一个是远程的。 LOCAL AQ table 在 LOCAL 数据库中,REMOTE 一个在 SCHEMA_NAME.REMOTE 数据库中。

这是我用来创建 AQ tables 的脚本:

declare
  QUEUE_NOT_FOUND exception;
  PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24010);
begin
  dbms_aqadm.stop_queue(
    queue_name => 'REMOTE_iTEST');
exception
  when QUEUE_NOT_FOUND then
    null;
  when others then
    raise;
end;
/

declare
  QUEUE_NOT_FOUND exception;
  PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24010);
begin
  dbms_aqadm.drop_queue('REMOTE_iTEST');
exception
  when QUEUE_NOT_FOUND then
    null;
  when others then
    raise;
end;
/

declare
  QUEUE_NOT_FOUND exception;
  PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24002);
begin
  dbms_aqadm.drop_queue_table('REMOTE_iTEST', force => true);
exception
  when QUEUE_NOT_FOUND then
    null;
  when others then
    raise;
end;
/

declare
  QUEUE_NOT_FOUND exception;
  PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24010);
begin
  dbms_aqadm.stop_queue(
    queue_name => 'REMOTE_oTEST');
exception
  when QUEUE_NOT_FOUND then
    null;
  when others then
    raise;
end;
/

declare
  QUEUE_NOT_FOUND exception;
  PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24010);
begin
  dbms_aqadm.drop_queue('REMOTE_oTEST');
exception
  when QUEUE_NOT_FOUND then
    null;
  when others then
    raise;
end;
/

declare
  QUEUE_NOT_FOUND exception;
  PRAGMA EXCEPTION_INIT(QUEUE_NOT_FOUND, -24002);
begin
  dbms_aqadm.drop_queue_table('REMOTE_oTEST', force => true);
exception
  when QUEUE_NOT_FOUND then
    null;
  when others then
    raise;
end;
/

declare
  l_exist number;
  cursor c_index is
    select 1 from user_objects u where u.OBJECT_NAME = upper('LOCAL_OBJ_MSG') and u.OBJECT_TYPE = 'TYPE';
begin
  open c_index;
  fetch c_index into l_exist;
  close c_index;

  if l_exist = 1 then
    execute immediate 'drop type LOCAL_OBJ_MSG';
  end if;
end;
/

create or replace type LOCAL_OBJ_MSG as object(
    test varchar2(4000))    
/

begin
  dbms_aqadm.create_queue_table (
    queue_table => 'REMOTE_iTEST',
    queue_payload_type => 'LOCAL_OBJ_MSG',
    storage_clause => 'pctfree 5 pctused 90 tablespace SEPA_INTEG_AQ',
    message_grouping => DBMS_AQADM.NONE,
    sort_list => 'ENQ_TIME',
    multiple_consumers => true,
    comment => 'Incoming TEST message table.');
end;
/

begin
  dbms_aqadm.create_queue (
    queue_table    => 'REMOTE_iTEST',
    queue_name     => 'REMOTE_iTEST',
    queue_type => sys.dbms_aqadm.normal_queue,
    retention_time => sys.dbms_aqadm.INFINITE,
    comment => 'Incoming TEST messages.',
    max_retries => 5);
end;
/

begin
  dbms_aqadm.start_queue(
    queue_name  => 'REMOTE_iTEST',
    dequeue     => true,
    enqueue     => true);
end;
/
begin
  dbms_aqadm.create_queue_table (
    queue_table => 'REMOTE_oTEST',
    queue_payload_type => 'LOCAL_OBJ_MSG',
    storage_clause => 'pctfree 5 pctused 90 tablespace SEPA_INTEG_AQ',
    message_grouping => DBMS_AQADM.NONE,
    sort_list => 'ENQ_TIME',
    multiple_consumers => true,
    comment => 'Outgoing TEST message table.');
end;
/

begin
  dbms_aqadm.create_queue (
    queue_table    => 'REMOTE_oTEST',
    queue_name     => 'REMOTE_oTEST',
    queue_type => sys.dbms_aqadm.normal_queue,
    retention_time => sys.dbms_aqadm.INFINITE,
    comment => 'Outgoing TEST messages.',
    max_retries => 5);
end;
/

begin
  dbms_aqadm.start_queue(
    queue_name  => 'REMOTE_oTEST',
    dequeue     => TRUE,
    enqueue     => TRUE);
end;
/

下面是用于创建订阅者、出列程序等的脚本:

本地数据库:

begin
  -- Add the remote subscriber.
  dbms_aqadm.add_subscriber(queue_name     => 'LOCAL_oTEST',
                            subscriber     => sys.aq$_agent(name     => 'LOCAL_oTEST_subscriber',
                                                            address  => 'SCHEMA_NAME.REMOTE_oTEST@DB_LINK_NAME',
                                                            protocol => 0),
                            queue_to_queue => true);
  -- Start the propagation of messages.
  dbms_aqadm.schedule_propagation(queue_name        => 'LOCAL_oTEST',
                                  latency           => 0,
                                  destination       => 'DB_LINK_NAME',
                                  destination_queue => 'SCHEMA_NAME.REMOTE_oTEST');
end;
/

远程数据库:

-- Create a table to store the messages received.
create table sepa_omsg_aq_demo
  (received timestamp default systimestamp,
   message LOCAL_OBJ_MSG);

-- Create a callback procedure that dequeues the received message and saves it
create or replace
procedure REMOTE_CALLBACK_TEST
  (
    context raw,
    reginfo sys.aq$_reg_info,
    descr sys.aq$_descriptor,
    payload raw,
    payloadl number
  )
as
  r_dequeue_options dbms_aq.dequeue_options_t;
  r_message_properties dbms_aq.message_properties_t;
  v_message_handle raw(26);
  o_payload LOCAL_OBJ_MSG;
begin
  r_dequeue_options.msgid         := descr.msg_id;
  r_dequeue_options.consumer_name := descr.consumer_name;
  dbms_aq.dequeue(queue_name => descr.queue_name, 
                  dequeue_options => r_dequeue_options, 
                  message_properties => r_message_properties, 
                  payload => o_payload, 
                  msgid => v_message_handle);
  insert into sepa_omsg_aq_demo 
    (message) 
    values (o_payload);
  commit;
exception
  when others then
    rollback;
end;
/

-- Register the procedure for dequeuing the messages received.
-- I'd like to point out that the subscriber is the one defined for the local database
begin
  dbms_aq.register (
     sys.aq$_reg_info_list(
        sys.aq$_reg_info('REMOTE_oTEST:LOCAL_oTEST_subscriber',
                         dbms_aq.namespace_aq,
                         'plsql://REMOTE_CALLBACK_TEST',
                         hextoraw('FF'))
                        ), 
        1);
end;
/

将消息排队到本地 AQ 的脚本 table:

declare
  enq_msgid raw(16);
  eopt      dbms_aq.enqueue_options_t;
  mprop     dbms_aq.message_properties_t;

  message local_obj_msg;
begin
  message := local_obj_msg('a');
  dbms_aq.enqueue(queue_name         => 'LOCAL_oTEST',
                  enqueue_options    => eopt,
                  message_properties => mprop,
                  payload            => message,
                  msgid              => enq_msgid);
  commit;
end;
/

另请注意,在 SYS.AQ$_MESSAGE_TYPES table(LOCAL db) 中,创建传播的验证状态 = 'F',并且:

declare
  rc binary_integer;
begin
  dbms_aqadm.verify_queue_types(src_queue_name  => 'local_otest',
                                dest_queue_name => 'schema_name.remote_otest',
                                rc              => rc,
                                destination     => 'db_link_name');
  dbms_output.put_line('Compatible: ' || rc);
end;
/

returns 0,这意味着 table 类型不兼容。在我做了一些挖掘之后,我发现如果 LOCAL 和 REMOTE 数据库 NLS_LENGTH_SEMANTICS 不同,则无法完成远程传播,但是在这种情况下情况并非如此。我查过了。任何想法是我做错了什么,或者我怎样才能找到这两个 table 之间的区别以及如何解决它?或者它可能是某些数据库参数值之间的差异?

Oracle 数据库 11g 版本 11.2.0.3.0

要检查的一个区域是每个数据库上的 NLS_LENGHT_SEMANTICS 参数值。

NLS_LENGTH_SEMANTICS allows you to specify the length of a column datatype in terms of characters rather in terms of bytes.

您的 PL/SQL 类型创建脚本未在 VARCHAR2 长度内隐式使用 BYTE 或 CHAR 转换。这将导致生成的类型继承为 NLS_LENGTH_SEMANTICS 设置的值。

确保检查您的两个数据库 NLS_LENGTH_SEMANTICS 值。如果值不同(BYTE 与 CHAR),则可能是问题的原因。您可以通过显式使用 VARCHAR2 中的 BYTE 或 CHAR 在其中一个数据库上重建类型和队列 即

create or replace type LOCAL_OBJ_MSG as object(
    test varchar2(4000 CHAR))
/

或更改其中一个数据库上的 NLS_LENGTH_SEMANTICS 以确保它们匹配。更改NLS_LENGTH_SEMANTICS

后重新编译数据库很重要

我发布的有点晚了,但我们有解决方法。在具有不同语义的两个数据库之间的传播中,您不能使用对象类型作为有效负载,但是您可以指定将该对象类型传播到 xmltype 的转换,后者可以传播。为此,您将需要在远程数据库中接收该 xmltype 的 temp 或非 queueu。然后你可以有一个调度程序作业,它将定期从该队列中取出消息并将其进一步排队到你的远程目标队列。在出队时,您可以指定另一个转换,将您的 xmltype 有效负载转换为所需的对象类型。以下是示例:

假设我们有对象类型:

create or replace type OBJ_MSG as object(
    id number,
    value varchar2(4000)
)

我们将在本地和远程数据库中使用它。下一步是拥有一个有效负载为 xmltype 的远程队列。接下来我们需要转换函数,它将我们的 OBJ_MSG 对象类型作为参数和 returns xmltype.

function trans_to_xml_type(
  p_payload in OBJ_MSG)
  return xmltype is

  cursor c_trans_payload(
    cp_id number,
    cp_value varchar2) is
  select
    xmlelement(
      "envelope",
      xmlelement(
        "id", cp_client),
      xmlelement(
        "value", cp_value))
  from
    dual;

  l_return xmltype;
begin
  open c_trans_payload(
    p_payload.id,
    p_payload.value);
  fetch c_trans_payload
    into l_return;
  close c_trans_payload;

  return l_return;

  exception
    when others then
      raise_application_error(-20001, '.trans_to_xml_type .Failed to transform OBJ_MSG payload to sys.xmltype: ' || sqlerrm);
end;

接下来,我们创建转换:

begin
  dbms_transform.create_transformation(
    'LOCAL',--SCHEMA
    'LOCAL_TRANS_TO_XML_TYPE',--Our transformation name
    'LOCAL',--SCHEMA
    'OBJ_MSG',--Our object name
    'SYS',
    'XMLTYPE',--Transformation result
    'PACKAGE.trans_to_xml_type(source.user_data)');--function name, which takes payload as a parameter
end;
/

接下来我们用我们的转换创建一个订阅者并将其添加到 queueu:

declare
  l_agent sys.aq$_agent := sys.aq$_agent(
    name => 'local_agent',
    address => 'REMOTE.REMOTE_TEMP_QUEUE@REMOTE_DB',
    protocol => 0);
begin
  dbms_aqadm.add_subscriber(
    queue_name => 'LOCAL.LOCAL_QUEUE',
    subscriber => l_agent,
    transformation => 'LOCAL_TRANS_TO_XML_TYPE',
    queue_to_queue => true);
end;
/

所以现在我们安排传播到我们的临时队列。传播消息的对象类型将转换为 xmltype 并排入我们的远程临时队列。在远程数据库中创建一个 dbms_scheduler 作业,该作业将使用指定的转换定期使消息出列并将其排入远程目标队列。