SQL Message Broker 在发送队列中留下消息

SQL Message Broker leaving messages in Sending Queue

我们通过 table 上的触发器将消息放入 SQL 服务器消息队列。 (当一个字段更新时,我们构建一些XML,并调用下面的触发器)。

CREATE PROCEDURE [dbo].[up_CarePay_BrokerSendXml] 
    -- Add the parameters for the stored procedure here
    @Data VARCHAR(MAX) 

AS
BEGIN

    DECLARE @InitDlgHandle UNIQUEIDENTIFIER
    DECLARE @RequestMessage VARCHAR(1000) 
    BEGIN TRY
          BEGIN TRAN

                BEGIN DIALOG CONVERSATION @InitDlgHandle 
                FROM SERVICE [//IcmsCarePay/Service/Initiator]
                TO SERVICE N'//IcmsCarePay/Service/Target'
                ON CONTRACT [//IcmsCarePay/Contract]
                WITH ENCRYPTION = OFF;

                SEND ON CONVERSATION @InitDlgHandle
                MESSAGE TYPE [//IcmsCarePay/Message/Request] (@Data);

          COMMIT TRAN;
    END TRY
    BEGIN CATCH
          ROLLBACK TRAN;
          DECLARE @Message VARCHAR(MAX);
          SELECT @Message = ERROR_MESSAGE();
          PRINT @Message
    END CATCH;

END

这行得通。一条消息被放入队列中。

然后消息被发送到同一服务器上的接收队列 - 不同的数据库。然后我们每分钟运行一个proc,它从目标队列中抓取消息,并将其处理成staging table进行处理。然后消息就离开了目标队列,这一切都没有错误。

然而...

当我检查消息来源的启动器队列时,它正在填满消息。

SELECT TOP 1000 *, casted_message_body = 
CASE message_type_name WHEN 'X' 
  THEN CAST(message_body AS NVARCHAR(MAX)) 
  ELSE message_body 
END 
FROM [ICMS].[dbo].[IcmsCarePayInitiatorQueue] WITH(NOLOCK)

我原以为当消息从发起者传到目标时,发起者就会消失。不过好像快满了。

我注意到启动器中的消息的 'message_type_id' 为 2,'validation' 为 'E',并且消息正文和已转换的消息正文均为 NULL。都有一个 message_type_name 的 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'。

在目标数据库端,这是用于从队列中获取消息的过程:

CREATE PROCEDURE [dbo].[up_CarePayBrokerReceiveXml]   
AS
BEGIN  
  SET NOCOUNT ON;  

  DECLARE @XML XML, @Response XML = 'OK', @ConversationHandle UNIQUEIDENTIFIER, @message_type_name SYSNAME, @message_body VARBINARY(MAX), @source_table VARCHAR(100)
  DECLARE @Message VARCHAR(MAX), @Line INT, @Proc VARCHAR(MAX), @Exception VARCHAR(MAX)  

  WHILE ( 1 = 1 )
  BEGIN  
    -- Clear variables, as they may have been populated in previous loop.
    SET @message_type_name = NULL
    SET @message_body = NULL
    SET @ConversationHandle = NULL  
    SET @source_table = NULL

    BEGIN TRY 
      BEGIN TRAN

        WAITFOR (    -- Pop off a message at a time, and add to storage table.
           RECEIVE TOP (1) 
               @message_type_name = message_type_name  
             , @message_body = message_body  
             , @ConversationHandle = conversation_handle  
             , @source_table = CAST([message_body] AS XML).value('(/row/@SourceTable)[1]', 'varchar(50)')  
           FROM dbo.IcmsCarePayTargetQueue  
        ), TIMEOUT 3000;  

        IF @@ROWCOUNT = 0
        BEGIN  
          ROLLBACK  -- Complete the Transaction (Rollback, as opposeed to Commit, as there is nothing to commit).
          BREAK  
        END

        -- Code removed for example, but the fields are saved to a staging table in the database here...

         -- Respond to Initiator  
        SEND ON CONVERSATION @ConversationHandle MESSAGE TYPE [//IcmsCarePay/Message/Response](@Response);  
        END CONVERSATION @ConversationHandle;  

      COMMIT -- End of Transaction

    END TRY
    BEGIN CATCH
      -- End the conversation
      END CONVERSATION @ConversationHandle WITH CLEANUP  

      -- Get details about the issue.
      SELECT  @Exception = ERROR_MESSAGE(), @Line = ERROR_LINE(), @Proc = ERROR_PROCEDURE(), @Message = 'proc: ' + @Proc + '; line: ' + CAST(@Line AS VARCHAR) + '; msg: ' + @Exception  
      SELECT  @Message -- Displays on Concole when debugging.

      -- Log the issue to the Application Log.
      INSERT  INTO dbo.ApplicationLog
              ( LogDate ,
                Thread ,
                Level ,
                Logger ,
                Message ,
                Exception  
              )
      VALUES  ( GETDATE() , -- LogDate - datetime  
                'None' , -- Thread - varchar(255)  
                'FATAL' , -- Level - varchar(50)  
                '____up_CarePayBrokerReceiveXml' , -- Logger - varchar(255)  
                @Message , -- Message - varchar(4000)  
                @Exception  -- Exception - varchar(2000)  
              )  
      COMMIT -- We have stored the erronous message, and popped it off the queue. Commit these changes.
    END CATCH 
  END  -- end while  

END

为什么这些消息会留在那里?

保留在发起程序队列中的消息的详细信息是:

Status: 1
Priority: 5
queuing_order: 395
mess_sequence_number: 0
service_name: //IcmsCarePay/Service/Initiator
service_contract_name: //IcmsCarePay/Contract
message_type_name: http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog
message_type_id: 2
validation: E
message_body: NULL
casted_message_body: NULL

看起来你使用一次性对话进行这些对话。您的目标存储过程从目标队列中检索消息,然后关闭它们的对话框,但您不在发起程序队列上处理它。

由于dialog是一个分布式的东西,为了关闭,它必须在发起方和目标端都关闭。当您的目标过程在目标上发出 end conversation @Handle; 时,Service Broker 会将您提到的类型的消息发送给发起者,以通知它此特定对话已成为历史。

正确完成后,发起程序激活程序将收到此消息,在其端发出相应的end conversation,然后关闭对话框。

由于您不在发起端处理任何消息,因此这些系统消息会在那里累积。

这里有 2 种可能的解决方案:

  1. 处理 EndDialog 条消息。这实际上应该在两侧都完成,因为对话可以在其任一侧关闭。
  2. 重复使用对话框,这样您就不必在每次需要发送内容时都创建一个新对话框。它将节省一些重要的资源,尤其是在流量足够大的情况下。

请注意,无论您是使用持久性对话还是一次性对话,都应该完成#1。

编辑: 这是默认处理过程的示例,取自我的一个项目:

create procedure [dbo].[ssb_Queue_DefaultProcessor]
(
    @Handle uniqueidentifier,
    @MessageType sysname,
    @Body xml,
    @ProcId int
) with execute as owner as

set nocount, ansi_nulls, ansi_padding, ansi_warnings, concat_null_yields_null, quoted_identifier, arithabort on;
set numeric_roundabort, xact_abort, implicit_transactions off;

declare @Error int, @ErrorMessage nvarchar(2048);

declare @Action varchar(20);

begin try

-- System stuff
if @MessageType in (
    N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog',
    N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
    ) begin

    -- Depending on the actual message, action type will be different
    if @MessageType = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' begin
        set @Action = 'PURGE';
    end else if @MessageType = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error'
        set @Action = 'CLOSE';

    -- Close the dialog
    exec dbo.ssb_DialogPools_Maintain @Action = @Action, @DialogHandle = @Handle, @Error = @Error output, @ErrorMessage = @ErrorMessage output;

    if nullif(@Error, 0) is not null
        throw 50000, @ErrorMessage, 1;

end else
    -- Some unknown messages may end up here, log them
    throw 50011, 'Unknown message type has been passed into default processor.', 1;

end try
begin catch

if nullif(@Error, 0) is null
    select @Error = error_number(), @ErrorMessage = error_message();

-- Don't try to resend messages from default processing
exec dbo.ssb_Poison_Log @ErrorNumber = @Error, @ErrorMessage = @ErrorMessage, @MessageType = @MessageType, @MessageBody = @Body, @ProcId = @ProcId;

end catch;
return;

当它们遇到任何类型的消息而不是它们应该处理的消息时,它会从所有激活过程中调用。 以下是此类激活程序之一的示例:

create procedure [dbo].[ssb_QProcessor_Clients]
with execute as owner as


set nocount, ansi_nulls, ansi_padding, ansi_warnings, concat_null_yields_null, quoted_identifier, arithabort on;
set numeric_roundabort, xact_abort, implicit_transactions off;

declare @Handle uniqueidentifier, @MessageType sysname, @Body xml, @MessageTypeId int;
declare @Error int, @ErrorMessage nvarchar(2048), @ProcId int = @@procid;
declare @TS datetime2(4), @Diff int, @Delay datetime;


-- Fast entry check for queue contents
if not exists (select 0 from dbo.ssb_OY_Clients with (nolock))
    return;

while exists (select 0 from sys.service_queues where name = 'ssb_OY_Clients' and is_receive_enabled = 1) begin

    begin try
    begin tran;

    -- Receive something, if any
    waitfor (
        receive top (1) @Handle = conversation_handle,
            @MessageType = message_type_name,
            @Body = message_body
        from dbo.ssb_OY_Clients
    ), timeout 3000;

    if @Handle is null begin

        -- Empty, get out
        rollback;
        break;

    end;

    -- Check for allowed message type
    select @MessageTypeId = mt.Id
    from dbo.ExportMessageTypes mt
        inner join dbo.ExportSystems xs on xs.Id = mt.ExportSystemId
    where mt.MessageTypeName = @MessageType
        and xs.Name = N'AUDIT.OY.Clients';

    if @MessageTypeId is not null begin

        -- Store the data
        exec dbo.log_Clients @MessageType = @MessageType, @Body = @Body, @Error = @Error output, @ErrorMessage = @ErrorMessage output;

        -- Check the result
        if nullif(@Error, 0) is not null
            throw 50000, @ErrorMessage, 1;

    end else
        -- Put it into default processor
        exec dbo.ssb_Queue_DefaultProcessor @Handle = @Handle, @MessageType = @MessageType, @Body = @Body, @ProcId = @ProcId;

    commit;
    end try
    begin catch

    if nullif(@Error, 0) is null
        select @Error = error_number(), @ErrorMessage = error_message();

    -- Check commitability of the transaction
    if xact_state() = -1
        rollback;
    else if xact_state() = 1
        commit;

    -- Try to resend the message again
    exec dbo.[ssb_Poison_Retry] @MessageType = @MessageType, @MessageBody = @Body, @ProcId = @ProcId, @ErrorNumber = @Error, @ErrorMessage = @ErrorMessage;

    end catch;

    -- Reset dialog handle
    select @Handle = null, @Error = null, @ErrorMessage = null;

end;

-- Done!
return;

当然,此示例中的内容比您可能需要的要多一些,但我希望一般方法是显而易见的。并且你需要处理发起者和目标上的 EndDialogError 消息类型,因为你永远不知道它们会出现在哪里。