如何在断开连接时创建对话对话后进行清理?

How to clean up after CREATE DIALOG CONVERSATION on disconnect?

---编辑以包含更多更好的细节---

在SQL服务器中,您可以在队列中初始化对话:

https://msdn.microsoft.com/en-us/library/ms187377.aspx

多个进程都会初始化对话,将@dialog_handle存储在table[dbo].[ActiveConversations]中,然后等待消息到达(来自[=41上的触发器=]).

这是代码的要点(省略了一些无关紧要的内容 并发症和错误检查等):

正在通过 .NET SqlCommand 开始对话:

DECLARE @LoginTime datetime;
DECLARE @handle uniqueidentifier;
SELECT TOP 1 @LoginTime = login_time FROM sys.sysprocesses WHERE spid = @@SPID;
BEGIN DIALOG @handle
   FROM SERVICE [myService]
   TO SERVICE 'myService'
   ON CONTRACT myContract
   WITH ENCRYPTION=OFF;

INSERT INTO [dbo].[ActiveConversations]
(
   ConversationHandle,
   SysProcessID,
   SysProcessLoginTime
)
VALUES
(
   @handle,
   @@SPID,
   @LoginTime
)

通过 .NET SqlCommand 结束对话:

DECLARE @handle uniqueidentifier;
SELECT TOP 1 @handle = ConversationHandle
   FROM [dbo].[ActiveConversations] AS conv
   INNER JOIN sys.sysprocesses as sysp
      ON conv.SysProcessLoginTime = sysp.login_time;

DELETE FROM [dbo].[ActiveConversations] WHERE ConversationHandle = @handle;
END CONVERSATION @handle;

正在发送消息:

CREATE TRIGGER [dbo].[myTableChanged] ON [dbo].[myTable] AFTER UPDATE
AS
BEGIN
   DECLARE @handle uniqueidentifier;
   DECLARE curs CURSOR LOCAL STATIC READ_ONLY FORWARD_ONLY
   FOR
   SELECT ConversationHandle FROM [dbo].[ActiveConversations];

   OPEN curs
   FETCH NEXT FROM curs INTO @handle;
   WHILE @@FETCH_STATUS = 0
   BEGIN
      BEGIN TRY
         SEND ON CONVERSATION @handle
            MESSAGE TYPE [myType] ( '' );
      END TRY
      BEGIN CATCH
      END CATCH
      FETCH NEXT FROM curs INTO @ConversationHandle;
   END
   CLOSE curs;
   DEALLOCATE curs;
END

正在等待消息:

DECLARE @handle uniqueidentifier;
SELECT @handle = [ConversationHandle]
    FROM [dbo].ActiveConversations] AS conv
    INNER JOIN sys.sysprocesses AS sysp
    ON sysp.spid = conv.SysProcessID
    AND sysp.login_time = conv.SysProcessLoginTime
    AND sysp.spid = @@SPID;

WAITFOR (RECEIVE * FROM [dbo].[myQueue] WHERE conversation_handle = @handle);

当每个进程退出时,它负责调用END CONVERSATION。然而,如果一个进程过早地死亡,它将永远没有机会调用 END CONVERSATION,并且对话将永远存在。还是会?

是否会自动清理对话,或者如何确保死对话不会累积?

这种情况下的最佳做法是什么?是不是设置一个特定的超时时间,然后周期性的重新初始化会话?如果对话意外结束,是否有办法查明,以便将其从 [dbo].[ActiveConversations] table?

中删除

我最初的想法是使用对话计时器,虽然它可能有效,但可能有点矫枉过正。相反,在发送对话时为对话设置生命周期可能更容易。当该生命周期到期时,对话会自动出错,这会在该对话的发起者和目标队列中放置一条错误消息。这是一个 POC。正如我在下面的评论中提到的,其中很多都是管道代码。这里是:

use master;
go
if exists (select 1 from sys.databases where name = 'TimerInitiator')
begin
    alter database [TimerInitiator] set offline with rollback immediate;
    alter database [TimerInitiator] set online;
    drop database [TimerInitiator];
end

if exists (select 1 from sys.databases where name = 'TimerTarget')
begin
    alter database [TimerTarget] set offline with rollback immediate;
    alter database [TimerTarget] set online;
    drop database [TimerTarget];
end

--execute as login = 'sa';
create database [TimerInitiator];
create database [TimerTarget];
revert;
go

use [TimerInitiator];
create master key encryption by password = 'f00bar!23';
create user [BrokerUser] without login;
CREATE CERTIFICATE [BrokerCert] AUTHORIZATION [BrokerUser] FROM BINARY = 0x308201E33082014CA00302010202101A83EEBC1E132A9149AC7D6D5AA1423F300D06092A864886F70D0101050500302F312D302B06035504031324436572746966696361746520666F722062726F6B657220636F6D6D756E69636174696F6E3020170D3137303132323034303430315A180F32303939303130313030303030305A302F312D302B06035504031324436572746966696361746520666F722062726F6B657220636F6D6D756E69636174696F6E30819F300D06092A864886F70D010101050003818D0030818902818100BC4C750343E36206C5A8D672C06D5A204DE499F1CF94A5D678F12DA2F9834877C901AEFE11CA64F9FEA46E31E65FA66FFD80DA139386F3F834C65114025563A9BBD85BDAAFAA694C1D2A36060B380BBD658DD2D93643303F2018F605AAB31840659EF0B5034766FF00F69EF0C1FF9A035686EC81C1E9995C599833ACCEF1BA230203010001300D06092A864886F70D01010505000381810081163FBED4A8B85429C6B6AC7D2123671F751EB72A468CCFBF2C593A8E2A7F51F59584D4EE7ECD247BD73D7A809007C0A8BE23E18A90AC927C124632578F971CD177269EF6752891DDFAB43DC0F1A24D509116EE578BAAC553B81376A69F386B6401AADF1C9D0D2121070B216C864C31B12D8B02081C35D70A8B7DFBD8904DF5 WITH PRIVATE KEY (BINARY = 0x1EF1B5B000000000010000000100000010000000540200005795E582353CC30C984D89654C0B65170702000000A40000B92EB9B271982F1DA38DCC06FC333E60512569DD94A1B661FAAC382E73665869FF9F2995A58812E64163354FA81C957EBF29DA8F20699C59AA16268401C0679FEEF639AB9C38E0C4E4E605F8DCDFB6CC5CB011F7113170EE6CE49623DE061D6FB82F8ABE92284E9D1FA481FCD150AA0B356FCFC86593EF3D7DDC03D7893462A9C1AA628970C04FE4ECB92E8E5234600A059D4213CE51369E0C0D8B2676F9F4E7FC08A6043991A21716DBB8C05B62E78A36571361C646C3D3BEE252A816CDDF6184E4954CE8EF65A584CDC1C45E9D17CE5B2ABB4CCCEF86A4F943DA26792E5BFCEA7E379BE98E799DCBF06C4235F2B685772842F181383FC1DC420660B17F5A9FD8460C50054AC50CF1BB57C6DD36D6CA40AC596E0AA492254B3E7D52A9F2079FBEB7CA1B6659B9923CBA72FA7DAFA67B83726C623AE4568F83EB748CAB6D7A87DCFD964E90A92C14649B99CAEE56DAEC3139DF1B33B1683D1E67357C9A6B563686A16842C2BBE0CCA0C41A6565FB90505F83F1F1D6B28B786DD7FEDDE7DEBC5A3A7BC3407DC77DBC49DB27135F4E09BA23CCFAA7C551C020B06AF4F585A5AD72A19DF35D9B04F0956B9ADFFFF1BE5897026CEC6D1B9582E2559F9354D09844B4058E656780579CF6FBE342522B40126D095749AAFAAAD4C3D53B8E1C469CF4EB18150F3A7D2ED6E308BAA71375D3657688AC0DBD3709E4412B8625DB53516292CCBE5E324DD6139C9D6F6522CD5FCC1229284B2B78D1D5246461BA930428065E9FC6738EF885D9A3EED1D2CAEC0368CA6D09D0DF2DB9C9042C29076087EF043DD085AE12EC561AABADB92B5A, DECRYPTION BY PASSWORD = 'f00bar!23')
create message type [Request] validation = none;
create message type [Response] validation = none;
create contract [ConversationContract] (
    [Request] sent by initiator,
    [Response] sent by target
);
create queue [InitiatorQueue] ;
create service [InitiatorService] 
    authorization [BrokerUser] 
    on queue [InitiatorQueue]
    ( [ConversationContract] );

create remote service binding [TimerRSB]
    authorization [BrokerUser]
    to service 'TargetService'
    with user = [BrokerUser];

declare @broker_instance uniqueidentifier;
select @broker_instance = service_broker_guid
from sys.databases
where name = 'TimerTarget';

declare @sql nvarchar(max);
set @sql = concat(
'create route [TimerTargetRoute]
with service_name = ''TargetService'',
    broker_instance = ''', @broker_instance,
    ''', address = ''LOCAL''');
exec(@sql)
go

create table [dbo].[ConversationHistory] (
    conversation_handle uniqueidentifier,
    status varchar(100) null
);
go
create procedure [InitiatorActivation]
as
begin
    declare @ch uniqueidentifier, @message_type sysname;

    while(1=1)
    begin
        waitfor (
            receive top(1) @ch = conversation_handle,
                @message_type = message_type_name
            from [dbo].[InitiatorQueue]
        ), timeout 10000;

        if (@@ROWCOUNT = 0)
            break;

        if (@message_type = 'Response')
        begin
            update [dbo].[ConversationHistory]
            set status = 'Received response'
            where conversation_handle = @ch;
        end
        else
        begin
            update [dbo].[ConversationHistory]
            set status = 'Conversation timed out'
            where conversation_handle = @ch;
        end
        end conversation @ch
    end
end
go
alter queue [dbo].[InitiatorQueue] with activation (
    procedure_name = [dbo].[InitiatorActivation], 
    max_queue_readers = 5, 
    status = on,
    execute as owner
);

use [TimerTarget];
create master key encryption by password = 'f00bar!23';
create user [BrokerUser] without login;
CREATE CERTIFICATE [BrokerCert] AUTHORIZATION [BrokerUser] FROM BINARY = 0x308201E33082014CA00302010202101A83EEBC1E132A9149AC7D6D5AA1423F300D06092A864886F70D0101050500302F312D302B06035504031324436572746966696361746520666F722062726F6B657220636F6D6D756E69636174696F6E3020170D3137303132323034303430315A180F32303939303130313030303030305A302F312D302B06035504031324436572746966696361746520666F722062726F6B657220636F6D6D756E69636174696F6E30819F300D06092A864886F70D010101050003818D0030818902818100BC4C750343E36206C5A8D672C06D5A204DE499F1CF94A5D678F12DA2F9834877C901AEFE11CA64F9FEA46E31E65FA66FFD80DA139386F3F834C65114025563A9BBD85BDAAFAA694C1D2A36060B380BBD658DD2D93643303F2018F605AAB31840659EF0B5034766FF00F69EF0C1FF9A035686EC81C1E9995C599833ACCEF1BA230203010001300D06092A864886F70D01010505000381810081163FBED4A8B85429C6B6AC7D2123671F751EB72A468CCFBF2C593A8E2A7F51F59584D4EE7ECD247BD73D7A809007C0A8BE23E18A90AC927C124632578F971CD177269EF6752891DDFAB43DC0F1A24D509116EE578BAAC553B81376A69F386B6401AADF1C9D0D2121070B216C864C31B12D8B02081C35D70A8B7DFBD8904DF5 WITH PRIVATE KEY (BINARY = 0x1EF1B5B000000000010000000100000010000000540200005795E582353CC30C984D89654C0B65170702000000A40000B92EB9B271982F1DA38DCC06FC333E60512569DD94A1B661FAAC382E73665869FF9F2995A58812E64163354FA81C957EBF29DA8F20699C59AA16268401C0679FEEF639AB9C38E0C4E4E605F8DCDFB6CC5CB011F7113170EE6CE49623DE061D6FB82F8ABE92284E9D1FA481FCD150AA0B356FCFC86593EF3D7DDC03D7893462A9C1AA628970C04FE4ECB92E8E5234600A059D4213CE51369E0C0D8B2676F9F4E7FC08A6043991A21716DBB8C05B62E78A36571361C646C3D3BEE252A816CDDF6184E4954CE8EF65A584CDC1C45E9D17CE5B2ABB4CCCEF86A4F943DA26792E5BFCEA7E379BE98E799DCBF06C4235F2B685772842F181383FC1DC420660B17F5A9FD8460C50054AC50CF1BB57C6DD36D6CA40AC596E0AA492254B3E7D52A9F2079FBEB7CA1B6659B9923CBA72FA7DAFA67B83726C623AE4568F83EB748CAB6D7A87DCFD964E90A92C14649B99CAEE56DAEC3139DF1B33B1683D1E67357C9A6B563686A16842C2BBE0CCA0C41A6565FB90505F83F1F1D6B28B786DD7FEDDE7DEBC5A3A7BC3407DC77DBC49DB27135F4E09BA23CCFAA7C551C020B06AF4F585A5AD72A19DF35D9B04F0956B9ADFFFF1BE5897026CEC6D1B9582E2559F9354D09844B4058E656780579CF6FBE342522B40126D095749AAFAAAD4C3D53B8E1C469CF4EB18150F3A7D2ED6E308BAA71375D3657688AC0DBD3709E4412B8625DB53516292CCBE5E324DD6139C9D6F6522CD5FCC1229284B2B78D1D5246461BA930428065E9FC6738EF885D9A3EED1D2CAEC0368CA6D09D0DF2DB9C9042C29076087EF043DD085AE12EC561AABADB92B5A, DECRYPTION BY PASSWORD = 'f00bar!23')
create message type [Request] validation = none;
create message type [Response] validation = none;
create contract [ConversationContract] (
    [Request] sent by initiator,
    [Response] sent by target
);
create queue [TargetQueue];
create service [TargetService] 
    authorization [BrokerUser] 
    on queue [TargetQueue]
    ( [ConversationContract] );

create remote service binding [TimerRSB]
    authorization [BrokerUser]
    to service 'InitiatorService'
    with user = [BrokerUser];


declare @broker_instance uniqueidentifier;
select @broker_instance = service_broker_guid
from sys.databases
where name = 'TimerInitiator';

declare @sql nvarchar(max);
set @sql = concat(
'create route [TimerInitiatorRoute]
with service_name = ''InitiatorService'',
    broker_instance = ''', @broker_instance,
    ''', address = ''LOCAL''');
exec(@sql);
go
create procedure [dbo].[TargetActivation]
as
begin
    set nocount on;
    declare @message_type sysname,
        @ch uniqueidentifier;
    while(1=1)
    begin
        waitfor(
            receive top(1) @message_type = message_type_name,
                @ch = conversation_handle
            from [dbo].[TargetQueue]
        ), timeout 10000;

        if (@@ROWCOUNT = 0)
            break;
        if @message_type = 'Request'
        begin
            if (datepart(millisecond, getdate()) < 500)
            begin
                -- simulate a long-running process
                waitfor delay '0:0:11';
            end
            begin try
                send on conversation (@ch)
                    message type [Response]
                    ('<Response />');
            end try
            begin catch
                if ERROR_NUMBER() = 8429 --tried to send on an errored conversation
                begin
                    continue;
                end
            end catch
        end
        end conversation @ch;
    end
end
go
alter queue [dbo].[TargetQueue] with activation (
    procedure_name = [dbo].[TargetActivation], 
    max_queue_readers = 5, 
    status = on,
    execute as owner
);
go
use [TimerInitiator];

declare @ch uniqueidentifier, @i int = 0;
set nocount on;

while(@i < 100)
begin
--declare @ch uniqueidentifier;
    begin dialog conversation @ch
        from service [InitiatorService]
        to service 'TargetService'
        on contract [ConversationContract]
        with lifetime = 60;

    send on conversation (@ch)
        message type [Request]
        ('<request />');

    insert into dbo.ConversationHistory (conversation_handle)
    values (@ch);

    set @i += 1;
end

waitfor delay '00:01:05';
select status, count(*)
from dbo.ConversationHistory
group by status;

通过上面的语义,我设计了一个发送一个请求并返回一个响应的消息协议。我故意在目标的激活过程中设置了一些随机滞后。在这两个激活过程中,如果我看到 user-defined 消息类型(我响应请求并记录响应),我会执行人们期望的操作。在双方,我在激活过程中结束对话以避免陈旧的对话积累。这似乎清除了该特定对话中的所有消息。


这是您可以使用的东西之一 conversation timers。这个想法是通过在发送消息后发出 BEGIN CONVERSATION TIMER 语句来增强工作流程。选择一个超时值,该值是消息处理的合理时间加上一些回旋余地。然后 "listen" 用于发送方的 DialogTimer 事件。稍后我可以用一个可行的例子来充实这一点,但希望这能给你一些想法。

没有out-of-the-box机制说"has the far side service received messages within some time span?"。也就是说,我们可以使用对话计时器来完成它。它的要点是,目标服务一旦收到新对话开始的信号,就会启动一个对话计时器,并在 table 中记录该对话的句柄(连同时间戳)。当该计时器到期时,激活过程会检查 table 以查看来自启动器服务的最后一条心跳消息的处理时间。如果它超出了预定的容差范围,则对话结束。同时,启动器服务按节奏发送心跳消息,让目标服务知道它仍然存在。

设置代码如下:

use master;
go
if exists (select 1 from sys.databases where name = 'TimerInitiator')
begin
    alter database [TimerInitiator] set offline with rollback immediate;
    alter database [TimerInitiator] set online;
    drop database [TimerInitiator];
end

if exists (select 1 from sys.databases where name = 'TimerTarget')
begin
    alter database [TimerTarget] set offline with rollback immediate;
    alter database [TimerTarget] set online;
    drop database [TimerTarget];
end

create database [TimerInitiator];
create database [TimerTarget];
revert;
go

use [TimerInitiator];
create master key encryption by password = 'f00bar!23';
create user [BrokerUser] without login;
CREATE CERTIFICATE [BrokerCert] AUTHORIZATION [BrokerUser] FROM BINARY = 0x308201E33082014CA00302010202101A83EEBC1E132A9149AC7D6D5AA1423F300D06092A864886F70D0101050500302F312D302B06035504031324436572746966696361746520666F722062726F6B657220636F6D6D756E69636174696F6E3020170D3137303132323034303430315A180F32303939303130313030303030305A302F312D302B06035504031324436572746966696361746520666F722062726F6B657220636F6D6D756E69636174696F6E30819F300D06092A864886F70D010101050003818D0030818902818100BC4C750343E36206C5A8D672C06D5A204DE499F1CF94A5D678F12DA2F9834877C901AEFE11CA64F9FEA46E31E65FA66FFD80DA139386F3F834C65114025563A9BBD85BDAAFAA694C1D2A36060B380BBD658DD2D93643303F2018F605AAB31840659EF0B5034766FF00F69EF0C1FF9A035686EC81C1E9995C599833ACCEF1BA230203010001300D06092A864886F70D01010505000381810081163FBED4A8B85429C6B6AC7D2123671F751EB72A468CCFBF2C593A8E2A7F51F59584D4EE7ECD247BD73D7A809007C0A8BE23E18A90AC927C124632578F971CD177269EF6752891DDFAB43DC0F1A24D509116EE578BAAC553B81376A69F386B6401AADF1C9D0D2121070B216C864C31B12D8B02081C35D70A8B7DFBD8904DF5 WITH PRIVATE KEY (BINARY = 0x1EF1B5B000000000010000000100000010000000540200005795E582353CC30C984D89654C0B65170702000000A40000B92EB9B271982F1DA38DCC06FC333E60512569DD94A1B661FAAC382E73665869FF9F2995A58812E64163354FA81C957EBF29DA8F20699C59AA16268401C0679FEEF639AB9C38E0C4E4E605F8DCDFB6CC5CB011F7113170EE6CE49623DE061D6FB82F8ABE92284E9D1FA481FCD150AA0B356FCFC86593EF3D7DDC03D7893462A9C1AA628970C04FE4ECB92E8E5234600A059D4213CE51369E0C0D8B2676F9F4E7FC08A6043991A21716DBB8C05B62E78A36571361C646C3D3BEE252A816CDDF6184E4954CE8EF65A584CDC1C45E9D17CE5B2ABB4CCCEF86A4F943DA26792E5BFCEA7E379BE98E799DCBF06C4235F2B685772842F181383FC1DC420660B17F5A9FD8460C50054AC50CF1BB57C6DD36D6CA40AC596E0AA492254B3E7D52A9F2079FBEB7CA1B6659B9923CBA72FA7DAFA67B83726C623AE4568F83EB748CAB6D7A87DCFD964E90A92C14649B99CAEE56DAEC3139DF1B33B1683D1E67357C9A6B563686A16842C2BBE0CCA0C41A6565FB90505F83F1F1D6B28B786DD7FEDDE7DEBC5A3A7BC3407DC77DBC49DB27135F4E09BA23CCFAA7C551C020B06AF4F585A5AD72A19DF35D9B04F0956B9ADFFFF1BE5897026CEC6D1B9582E2559F9354D09844B4058E656780579CF6FBE342522B40126D095749AAFAAAD4C3D53B8E1C469CF4EB18150F3A7D2ED6E308BAA71375D3657688AC0DBD3709E4412B8625DB53516292CCBE5E324DD6139C9D6F6522CD5FCC1229284B2B78D1D5246461BA930428065E9FC6738EF885D9A3EED1D2CAEC0368CA6D09D0DF2DB9C9042C29076087EF043DD085AE12EC561AABADB92B5A, DECRYPTION BY PASSWORD = 'f00bar!23')
create message type [Request] validation = none;
create message type [Response] validation = none;
create message type [Heartbeat] validation = none;
create contract [ConversationContract] (
    [Request] sent by initiator,
    [Response] sent by target,
    [Heartbeat] sent by initiator
);
create queue [InitiatorQueue] ;
create service [InitiatorService] 
    authorization [BrokerUser] 
    on queue [InitiatorQueue]
    ( [ConversationContract] );

create remote service binding [TimerRSB]
    authorization [BrokerUser]
    to service 'TargetService'
    with user = [BrokerUser];

declare @broker_instance uniqueidentifier;
select @broker_instance = service_broker_guid
from sys.databases
where name = 'TimerTarget';

declare @sql nvarchar(max);
set @sql = concat(
'create route [TimerTargetRoute]
with service_name = ''TargetService'',
    broker_instance = ''', @broker_instance,
    ''', address = ''LOCAL''');
exec(@sql)
go

go
create procedure [InitiatorActivation]
as
begin
    declare @ch uniqueidentifier, @message_type sysname;

    while(1=1)
    begin
        waitfor (
            receive top(1) @ch = conversation_handle,
                @message_type = message_type_name
            from [dbo].[InitiatorQueue]
        ), timeout 1000;

        if (@@ROWCOUNT = 0)
            break;

        --if (@message_type = 'Response')
        --begin
        --  -- do something to process the message
        --end
        --else 
        if (@message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
        begin
            end conversation @ch
        end
    end
end
go
alter queue [dbo].[InitiatorQueue] with activation (
    procedure_name = [dbo].[InitiatorActivation], 
    max_queue_readers = 5, 
    status = on,
    execute as owner
);
go

create procedure [dbo].[SendHeartbeat] (
    @ch uniqueidentifier
)
as
begin
    send on conversation (@ch)
        message type [Heartbeat];
end
go

use [TimerTarget];
create master key encryption by password = 'f00bar!23';
create user [BrokerUser] without login;
CREATE CERTIFICATE [BrokerCert] AUTHORIZATION [BrokerUser] FROM BINARY = 0x308201E33082014CA00302010202101A83EEBC1E132A9149AC7D6D5AA1423F300D06092A864886F70D0101050500302F312D302B06035504031324436572746966696361746520666F722062726F6B657220636F6D6D756E69636174696F6E3020170D3137303132323034303430315A180F32303939303130313030303030305A302F312D302B06035504031324436572746966696361746520666F722062726F6B657220636F6D6D756E69636174696F6E30819F300D06092A864886F70D010101050003818D0030818902818100BC4C750343E36206C5A8D672C06D5A204DE499F1CF94A5D678F12DA2F9834877C901AEFE11CA64F9FEA46E31E65FA66FFD80DA139386F3F834C65114025563A9BBD85BDAAFAA694C1D2A36060B380BBD658DD2D93643303F2018F605AAB31840659EF0B5034766FF00F69EF0C1FF9A035686EC81C1E9995C599833ACCEF1BA230203010001300D06092A864886F70D01010505000381810081163FBED4A8B85429C6B6AC7D2123671F751EB72A468CCFBF2C593A8E2A7F51F59584D4EE7ECD247BD73D7A809007C0A8BE23E18A90AC927C124632578F971CD177269EF6752891DDFAB43DC0F1A24D509116EE578BAAC553B81376A69F386B6401AADF1C9D0D2121070B216C864C31B12D8B02081C35D70A8B7DFBD8904DF5 WITH PRIVATE KEY (BINARY = 0x1EF1B5B000000000010000000100000010000000540200005795E582353CC30C984D89654C0B65170702000000A40000B92EB9B271982F1DA38DCC06FC333E60512569DD94A1B661FAAC382E73665869FF9F2995A58812E64163354FA81C957EBF29DA8F20699C59AA16268401C0679FEEF639AB9C38E0C4E4E605F8DCDFB6CC5CB011F7113170EE6CE49623DE061D6FB82F8ABE92284E9D1FA481FCD150AA0B356FCFC86593EF3D7DDC03D7893462A9C1AA628970C04FE4ECB92E8E5234600A059D4213CE51369E0C0D8B2676F9F4E7FC08A6043991A21716DBB8C05B62E78A36571361C646C3D3BEE252A816CDDF6184E4954CE8EF65A584CDC1C45E9D17CE5B2ABB4CCCEF86A4F943DA26792E5BFCEA7E379BE98E799DCBF06C4235F2B685772842F181383FC1DC420660B17F5A9FD8460C50054AC50CF1BB57C6DD36D6CA40AC596E0AA492254B3E7D52A9F2079FBEB7CA1B6659B9923CBA72FA7DAFA67B83726C623AE4568F83EB748CAB6D7A87DCFD964E90A92C14649B99CAEE56DAEC3139DF1B33B1683D1E67357C9A6B563686A16842C2BBE0CCA0C41A6565FB90505F83F1F1D6B28B786DD7FEDDE7DEBC5A3A7BC3407DC77DBC49DB27135F4E09BA23CCFAA7C551C020B06AF4F585A5AD72A19DF35D9B04F0956B9ADFFFF1BE5897026CEC6D1B9582E2559F9354D09844B4058E656780579CF6FBE342522B40126D095749AAFAAAD4C3D53B8E1C469CF4EB18150F3A7D2ED6E308BAA71375D3657688AC0DBD3709E4412B8625DB53516292CCBE5E324DD6139C9D6F6522CD5FCC1229284B2B78D1D5246461BA930428065E9FC6738EF885D9A3EED1D2CAEC0368CA6D09D0DF2DB9C9042C29076087EF043DD085AE12EC561AABADB92B5A, DECRYPTION BY PASSWORD = 'f00bar!23')
create message type [Request] validation = none;
create message type [Response] validation = none;
create message type [Heartbeat] validation = empty;
create contract [ConversationContract] (
    [Request] sent by initiator,
    [Response] sent by target,
    [Heartbeat] sent by initiator
);
create queue [TargetQueue];
create service [TargetService] 
    authorization [BrokerUser] 
    on queue [TargetQueue]
    ( [ConversationContract] );

create remote service binding [TimerRSB]
    authorization [BrokerUser]
    to service 'InitiatorService'
    with user = [BrokerUser];


declare @broker_instance uniqueidentifier;
select @broker_instance = service_broker_guid
from sys.databases
where name = 'TimerInitiator';

declare @sql nvarchar(max);
set @sql = concat(
'create route [TimerInitiatorRoute]
with service_name = ''InitiatorService'',
    broker_instance = ''', @broker_instance,
    ''', address = ''LOCAL''');
exec(@sql);
go
create table [dbo].[OpenConversations] (
    conversation_handle uniqueidentifier not null,
        constraint [PK_OpenConversations] primary key clustered (conversation_handle),
    heartbeat_ts datetime2(3) not null
);
go
create procedure [dbo].[TargetActivation]
as
begin
    set nocount on;
    declare @message_type sysname,
        @ch uniqueidentifier,
        @heartbeat_ts datetime2(3);
    while(1=1)
    begin
        waitfor(
            receive top(1) @message_type = message_type_name,
                @ch = conversation_handle
            from [dbo].[TargetQueue]
        ), timeout 1000;

        if (@@ROWCOUNT = 0)
            break;
        if @message_type = 'Request'
        begin
            insert into [dbo].[OpenConversations] 
                (conversation_handle, heartbeat_ts)
            values
                (@ch, SYSUTCDATETIME());
            begin conversation timer (@ch) timeout = 30; -- 30 seconds
        end
        else if (@message_type = 'http://schemas.microsoft.com/SQL/ServiceBroker/DialogTimer')
        begin
            set @heartbeat_ts = (
                select [heartbeat_ts] 
                from [dbo].[OpenConversations] 
                where [conversation_handle] = @ch
            );

            if (datediff(second, @heartbeat_ts, SYSUTCDATETIME()) > 5*60) -- 5 minute time out
            begin
                end conversation @ch;
                delete [dbo].[OpenConversations]
                where [conversation_handle] = @ch;
            end
            else
            begin
                begin conversation timer (@ch) timeout = 30;
            end
        end
        else if (@message_type = 'Heartbeat')
        begin
            update [dbo].[OpenConversations]
            set [heartbeat_ts] = SYSUTCDATETIME()
            where [conversation_handle] = @ch;
        end
    end
end
go
alter queue [dbo].[TargetQueue] with activation (
    procedure_name = [dbo].[TargetActivation], 
    max_queue_readers = 5, 
    status = on,
    execute as owner
);
go

其中大部分是服务代理管道。 "interesting" 部分在激活过程中 InitiatorActivation。这就是跟踪计时器以及跟踪给定对话是否已过期的地方。

下面是一些用于练习设置的代码:

use [TimerInitiator]
go
if object_id('tempdb.dbo.#conversations') is not null
    drop table #conversations;
create table #conversations (conversation_handle uniqueidentifier not null);

declare @ch uniqueidentifier, @i tinyint = 0;

-- start 5 conversations
while(@i < 5)
begin
    begin dialog @ch
        from service [InitiatorService]
        to service 'TargetService'
        on contract [ConversationContract];

    send on conversation (@ch)
        message type [Request]
        ('<Request />');

    insert into #conversations ([conversation_handle])
    values (@ch);

    set @i += 1;
end

go

declare @ch uniqueidentifier = (select top(1) [conversation_handle] from #conversations)
while (1=1)
begin
    exec [TimerInitiator].[dbo].[SendHeartbeat] @ch = @ch;
    waitfor delay '0:00:30';
end

这里发生的所有事情是开始了五个对话,并且任意选​​择一个来发送周期性心跳。在另一个 window 中,您可以 运行 以下一项或全部来跟踪事情的进展情况:

select 'T', conversation_handle, message_type_name
from [TimerTarget].[dbo].[TargetQueue]
union all 
select 'I', conversation_handle, message_type_name
from [TimerInitiator].[dbo].[InitiatorQueue];

select 'T', conversation_handle, state_desc
from TimerInitiator.sys.conversation_endpoints
union all
select 'I', conversation_handle, state_desc
from TimerTarget.sys.conversation_endpoints;

select *
from [TimerTarget].[dbo].[OpenConversations];

在您的实际设置中,将由您的应用程序发送心跳。无论它是否正在处理消息,它都应该做什么(即,如果您遇到目标没有发送消息的时间段,您的应用程序仍然需要发送心跳)。除此之外,这应该可以进行最少的调整。