如何调用由Service Broker组成的Stored Procedure

How to call Stored Procedure that consists of Service Broker

我不知道这个问题是否重复。如果这个问题重复,请给我link。

我的问题是如何调用由 BEGIN TRANSACTION & COMMIT TRANSACTION (Service Broker) 组成的 2 个存储过程。

我有 2 个存储过程用于执行 Service Broker 的某些操作。

这是包含BEGIN CONVERSATION的存储过程:

USE [EventCloud]
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO

ALTER PROCEDURE [dbo].[SendingMessage_Group_Id]
    @reference_id UNIQUEIDENTIFIER
AS
BEGIN
    DECLARE @ch UNIQUEIDENTIFIER
    DECLARE @conversation_group_id UNIQUEIDENTIFIER
    DECLARE @msg NVARCHAR(MAX)

    SET @conversation_group_id = @reference_id

    BEGIN TRY
        BEGIN TRANSACTION

            BEGIN DIALOG CONVERSATION @ch
                FROM SERVICE [InitiatorService]
                TO SERVICE 'TargetService'
                ON CONTRACT [http://ssb.csharp.at/SSB_Book/c03/HelloWorldContract]
                WITH RELATED_CONVERSATION_GROUP = @conversation_group_id,
                    ENCRYPTION = OFF

            SET @msg = '<HelloWorldRequest>1234</HelloWorldRequest>'

            ;SEND ON CONVERSATION @ch MESSAGE TYPE [http://ssb.csharp.at/SSB_Book/c03/RequestMessage]
            (
                @msg
            )

        COMMIT TRANSACTION;
    END TRY
    BEGIN CATCH
        ROLLBACK TRANSACTION
    END CATCH
END

下面的代码是TargetQueue的内部激活存储过程:

USE [EventCloud]
GO

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO

ALTER PROCEDURE [dbo].[ProcessRequestMessage]
AS
BEGIN
    DECLARE @ch UNIQUEIDENTIFIER
    DECLARE @messagetypename NVARCHAR(256)
    DECLARE @messagebody XML
    DECLARE @responsemessage XML;
    DECLARE @errorcode INT
    DECLARE @errormessage NVARCHAR(3000);

    WHILE (1 = 1)
    BEGIN
        BEGIN TRY
            BEGIN TRANSACTION;
                WAITFOR(
                    RECEIVE TOP (1)
                        @ch = conversation_handle,
                        @messagetypename = message_type_name,
                        @messagebody = CAST(message_body AS XML)
                    FROM TargetQueue
                )

                IF (@@ROWCOUNT = 0)
                BEGIN
                    ROLLBACK TRANSACTION
                    BREAK
                END

                -- Process the requested message and send back to Initiator
                ELSE IF (@messagetypename = 'http://ssb.csharp.at/SSB_Book/c03/RequestMessage')
                BEGIN
                    -- Store the received request message in a table
                    INSERT INTO ProcessedMessages (ID, MessageBody, ServiceName, ProcessedDateTime)
                    VALUES (NEWID(), @messagebody, 'TargetService', GETDATE())

                    -- Construct the response message
                    SET @responsemessage =
                        '<HelloWorldResponse>' +
                            @messagebody.value('/HelloWorldRequest[1]', 'NVARCHAR(MAX)') +
                        '</HelloWorldResponse>';

                    -- Send the response message back to the initiating service
                    SEND ON CONVERSATION @ch MESSAGE TYPE [http://ssb.csharp.at/SSB_Book/c03/ResponseMessage]
                    (
                        @responsemessage
                    );

                    -- END the conversation on the target's side
                    END CONVERSATION @ch;
                END

                -- End the conversation if meet the message type
                IF (@messagetypename = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
                BEGIN
                    -- End the conversation
                    END CONVERSATION @ch;
                END

            COMMIT TRANSACTION
        END TRY
        BEGIN CATCH
            ROLLBACK TRANSACTION
            PRINT ERROR_MESSAGE()
        END CATCH
    END
END

此代码用于接收来自 InitiatorQueue 的响应消息:

ALTER PROCEDURE [dbo].[ProcessMessageWithTimeOut]
    @reference_id UNIQUEIDENTIFIER,
    @receive_timeout INT
AS
BEGIN
    DECLARE @ch UNIQUEIDENTIFIER
    DECLARE @conversation_group_id UNIQUEIDENTIFIER
    DECLARE @messagetypename NVARCHAR(256)
    DECLARE @messagebody XML
    DECLARE @responsemessage XML
    DECLARE @errorcode INT
    DECLARE @errormessage NVARCHAR(3000)
    DECLARE @queuing_order BIGINT
    DECLARE @timeout INT

    SET @conversation_group_id = @reference_id
    SET @timeout = @receive_timeout

    DECLARE @tableMessage TABLE
    (
        queuing_order BIGINT,
        conversation_handle UNIQUEIDENTIFIER,
        message_type_name NVARCHAR(256),
        message_body VARBINARY(MAX)
    )

    BEGIN TRY
        BEGIN TRANSACTION
            WAITFOR(
                RECEIVE
                    queuing_order,
                    conversation_handle,
                    message_type_name,
                    message_body
                FROM InitiatorQueue INTO @tableMessage
                WHERE conversation_group_id = @conversation_group_id
            ), TIMEOUT @timeout;

            DECLARE @count INT
            SET @count = (SELECT COUNT(*) FROM @tableMessage)
            IF (@count = 0)
            BEGIN;
                THROW 50001, 'No message response within 5 seconds.', 1
            END

            IF (@count <>2)
            BEGIN
                DECLARE @timeout2 INT
                SET @timeout2 = ABS(@timeout * 0.5)

                WAITFOR(
                    RECEIVE
                        queuing_order,
                        conversation_handle,
                        message_type_name,
                        message_body
                    FROM InitiatorQueue INTO @tableMessage
                    WHERE conversation_group_id = @conversation_group_id
                ), TIMEOUT 5000
                SET @count = (SELECT COUNT(*) FROM @tableMessage)
                IF (@count <> 2)
                BEGIN;
                    THROW 50002, 'End Dialog without Response Message', 1
                END
            END

            WHILE (@count <> 0)
            BEGIN
            SET @queuing_order = (SELECT TOP 1 queuing_order FROM @tableMessage)
            SET @ch = (SELECT conversation_handle FROM @tableMessage WHERE queuing_order = @queuing_order)
            SET @messagetypename = (SELECT message_type_name FROM @tableMessage WHERE queuing_order = @queuing_order)
            SET @messagebody = CAST((SELECT message_body FROM @tableMessage WHERE queuing_order = @queuing_order) AS XML)

                IF (@messagetypename = 'http://ssb.csharp.at/SSB_Book/c03/ResponseMessage')
                BEGIN
                    -- Store the received response message in a table
                    INSERT INTO ProcessedMessages (ID, MessageBody, ServiceName, ProcessedDateTime)
                    VALUES (NEWID(), @messagebody, 'InitiatorService', GETDATE())
                END

                ELSE IF (@messagetypename = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
                BEGIN
                    -- End the conversation on the initiator's side
                    END CONVERSATION @ch
                END
                DELETE FROM @tableMessage WHERE queuing_order = @queuing_order
                SET @count = (SELECT COUNT(*) FROM @tableMessage)
            END

        COMMIT TRANSACTION;
    END TRY
    BEGIN CATCH
        IF ERROR_NUMBER() = 50001
        BEGIN
            ;THROW
        END

        IF ERROR_NUMBER() = 50002
        BEGIN
            ;THROW
        END
        ;THROW
        ROLLBACK TRANSACTION
        PRINT ERROR_MESSAGE()
    END CATCH
END

这是我用于调用存储过程的 C# 代码:

public async Task<TestObject> Begin_Conversation_With_Group_Id(Guid ch)
{
    try
    {
        return await Context.Database.SqlQuery<TestObject>(
        "EXEC SendingMessage_Group_Id @ch",
        new SqlParameter("ch", ch))
            .SingleOrDefaultAsync();
    }
    catch (Exception e)
    {
        TestObject exception = new TestObject();
        exception.Data = "Cannot get the data due to: " + " " + e.Message;
        return exception;
    }
}

public async Task<TestObject> Process_Response_Message_With_TimeOut(Guid ch)
{
    var timeout = 5000;
    try
    {
        return await Context.Database.SqlQuery<TestObject>(
            "EXEC ProcessMessageWithTimeOut @ch, @timeout",
            new SqlParameter("ch", ch),
            new SqlParameter("timeout", timeout))
                .SingleOrDefaultAsync();
    }
    catch (SqlException ex)
    {
        TestObject exception = new TestObject();
        exception.Data = "Process is not finish yet due to: " + ex.Message + " " + ex.Number;
        return exception;
    }
}

问题是我运行代码的时候,消息还在TargetQueue,按理说应该由内部激活接收和处理。

但如果只调用 1 个存储过程 (SendingMessage_Group_Id),它工作正常。该消息能够响应 InitiatorQueue 而不是停留在 TargetQueue.

据我猜测,是因为事务还没有提交。

问题已解决。

出现这个问题的原因是因为我使用了一个框架导致出现这种情况。 框架在启动时自带事务,所以我只需要关闭框架的事务即可解决问题

我使用的框架是ASP.NET Zero