如何调用由Service Broker组成的Stored Procedure
How to call Stored Procedure that consists of Service Broker
我不知道这个问题是否重复。如果这个问题重复,请给我link。
我的问题是如何调用由 BEGIN TRANSACTION & COMMIT TRANSACTION
(Service Broker) 组成的 2 个存储过程。
我有 2 个存储过程用于执行 Service Broker 的某些操作。
这是包含BEGIN CONVERSATION
的存储过程:
USE [EventCloud]
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
ALTER PROCEDURE [dbo].[SendingMessage_Group_Id]
@reference_id UNIQUEIDENTIFIER
AS
BEGIN
DECLARE @ch UNIQUEIDENTIFIER
DECLARE @conversation_group_id UNIQUEIDENTIFIER
DECLARE @msg NVARCHAR(MAX)
SET @conversation_group_id = @reference_id
BEGIN TRY
BEGIN TRANSACTION
BEGIN DIALOG CONVERSATION @ch
FROM SERVICE [InitiatorService]
TO SERVICE 'TargetService'
ON CONTRACT [http://ssb.csharp.at/SSB_Book/c03/HelloWorldContract]
WITH RELATED_CONVERSATION_GROUP = @conversation_group_id,
ENCRYPTION = OFF
SET @msg = '<HelloWorldRequest>1234</HelloWorldRequest>'
;SEND ON CONVERSATION @ch MESSAGE TYPE [http://ssb.csharp.at/SSB_Book/c03/RequestMessage]
(
@msg
)
COMMIT TRANSACTION;
END TRY
BEGIN CATCH
ROLLBACK TRANSACTION
END CATCH
END
下面的代码是TargetQueue的内部激活存储过程:
USE [EventCloud]
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
ALTER PROCEDURE [dbo].[ProcessRequestMessage]
AS
BEGIN
DECLARE @ch UNIQUEIDENTIFIER
DECLARE @messagetypename NVARCHAR(256)
DECLARE @messagebody XML
DECLARE @responsemessage XML;
DECLARE @errorcode INT
DECLARE @errormessage NVARCHAR(3000);
WHILE (1 = 1)
BEGIN
BEGIN TRY
BEGIN TRANSACTION;
WAITFOR(
RECEIVE TOP (1)
@ch = conversation_handle,
@messagetypename = message_type_name,
@messagebody = CAST(message_body AS XML)
FROM TargetQueue
)
IF (@@ROWCOUNT = 0)
BEGIN
ROLLBACK TRANSACTION
BREAK
END
-- Process the requested message and send back to Initiator
ELSE IF (@messagetypename = 'http://ssb.csharp.at/SSB_Book/c03/RequestMessage')
BEGIN
-- Store the received request message in a table
INSERT INTO ProcessedMessages (ID, MessageBody, ServiceName, ProcessedDateTime)
VALUES (NEWID(), @messagebody, 'TargetService', GETDATE())
-- Construct the response message
SET @responsemessage =
'<HelloWorldResponse>' +
@messagebody.value('/HelloWorldRequest[1]', 'NVARCHAR(MAX)') +
'</HelloWorldResponse>';
-- Send the response message back to the initiating service
SEND ON CONVERSATION @ch MESSAGE TYPE [http://ssb.csharp.at/SSB_Book/c03/ResponseMessage]
(
@responsemessage
);
-- END the conversation on the target's side
END CONVERSATION @ch;
END
-- End the conversation if meet the message type
IF (@messagetypename = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
BEGIN
-- End the conversation
END CONVERSATION @ch;
END
COMMIT TRANSACTION
END TRY
BEGIN CATCH
ROLLBACK TRANSACTION
PRINT ERROR_MESSAGE()
END CATCH
END
END
此代码用于接收来自 InitiatorQueue 的响应消息:
ALTER PROCEDURE [dbo].[ProcessMessageWithTimeOut]
@reference_id UNIQUEIDENTIFIER,
@receive_timeout INT
AS
BEGIN
DECLARE @ch UNIQUEIDENTIFIER
DECLARE @conversation_group_id UNIQUEIDENTIFIER
DECLARE @messagetypename NVARCHAR(256)
DECLARE @messagebody XML
DECLARE @responsemessage XML
DECLARE @errorcode INT
DECLARE @errormessage NVARCHAR(3000)
DECLARE @queuing_order BIGINT
DECLARE @timeout INT
SET @conversation_group_id = @reference_id
SET @timeout = @receive_timeout
DECLARE @tableMessage TABLE
(
queuing_order BIGINT,
conversation_handle UNIQUEIDENTIFIER,
message_type_name NVARCHAR(256),
message_body VARBINARY(MAX)
)
BEGIN TRY
BEGIN TRANSACTION
WAITFOR(
RECEIVE
queuing_order,
conversation_handle,
message_type_name,
message_body
FROM InitiatorQueue INTO @tableMessage
WHERE conversation_group_id = @conversation_group_id
), TIMEOUT @timeout;
DECLARE @count INT
SET @count = (SELECT COUNT(*) FROM @tableMessage)
IF (@count = 0)
BEGIN;
THROW 50001, 'No message response within 5 seconds.', 1
END
IF (@count <>2)
BEGIN
DECLARE @timeout2 INT
SET @timeout2 = ABS(@timeout * 0.5)
WAITFOR(
RECEIVE
queuing_order,
conversation_handle,
message_type_name,
message_body
FROM InitiatorQueue INTO @tableMessage
WHERE conversation_group_id = @conversation_group_id
), TIMEOUT 5000
SET @count = (SELECT COUNT(*) FROM @tableMessage)
IF (@count <> 2)
BEGIN;
THROW 50002, 'End Dialog without Response Message', 1
END
END
WHILE (@count <> 0)
BEGIN
SET @queuing_order = (SELECT TOP 1 queuing_order FROM @tableMessage)
SET @ch = (SELECT conversation_handle FROM @tableMessage WHERE queuing_order = @queuing_order)
SET @messagetypename = (SELECT message_type_name FROM @tableMessage WHERE queuing_order = @queuing_order)
SET @messagebody = CAST((SELECT message_body FROM @tableMessage WHERE queuing_order = @queuing_order) AS XML)
IF (@messagetypename = 'http://ssb.csharp.at/SSB_Book/c03/ResponseMessage')
BEGIN
-- Store the received response message in a table
INSERT INTO ProcessedMessages (ID, MessageBody, ServiceName, ProcessedDateTime)
VALUES (NEWID(), @messagebody, 'InitiatorService', GETDATE())
END
ELSE IF (@messagetypename = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
BEGIN
-- End the conversation on the initiator's side
END CONVERSATION @ch
END
DELETE FROM @tableMessage WHERE queuing_order = @queuing_order
SET @count = (SELECT COUNT(*) FROM @tableMessage)
END
COMMIT TRANSACTION;
END TRY
BEGIN CATCH
IF ERROR_NUMBER() = 50001
BEGIN
;THROW
END
IF ERROR_NUMBER() = 50002
BEGIN
;THROW
END
;THROW
ROLLBACK TRANSACTION
PRINT ERROR_MESSAGE()
END CATCH
END
这是我用于调用存储过程的 C# 代码:
public async Task<TestObject> Begin_Conversation_With_Group_Id(Guid ch)
{
try
{
return await Context.Database.SqlQuery<TestObject>(
"EXEC SendingMessage_Group_Id @ch",
new SqlParameter("ch", ch))
.SingleOrDefaultAsync();
}
catch (Exception e)
{
TestObject exception = new TestObject();
exception.Data = "Cannot get the data due to: " + " " + e.Message;
return exception;
}
}
public async Task<TestObject> Process_Response_Message_With_TimeOut(Guid ch)
{
var timeout = 5000;
try
{
return await Context.Database.SqlQuery<TestObject>(
"EXEC ProcessMessageWithTimeOut @ch, @timeout",
new SqlParameter("ch", ch),
new SqlParameter("timeout", timeout))
.SingleOrDefaultAsync();
}
catch (SqlException ex)
{
TestObject exception = new TestObject();
exception.Data = "Process is not finish yet due to: " + ex.Message + " " + ex.Number;
return exception;
}
}
问题是我运行代码的时候,消息还在TargetQueue
,按理说应该由内部激活接收和处理。
但如果只调用 1 个存储过程 (SendingMessage_Group_Id
),它工作正常。该消息能够响应 InitiatorQueue
而不是停留在 TargetQueue
.
据我猜测,是因为事务还没有提交。
问题已解决。
出现这个问题的原因是因为我使用了一个框架导致出现这种情况。
框架在启动时自带事务,所以我只需要关闭框架的事务即可解决问题
我使用的框架是ASP.NET Zero
我不知道这个问题是否重复。如果这个问题重复,请给我link。
我的问题是如何调用由 BEGIN TRANSACTION & COMMIT TRANSACTION
(Service Broker) 组成的 2 个存储过程。
我有 2 个存储过程用于执行 Service Broker 的某些操作。
这是包含BEGIN CONVERSATION
的存储过程:
USE [EventCloud]
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
ALTER PROCEDURE [dbo].[SendingMessage_Group_Id]
@reference_id UNIQUEIDENTIFIER
AS
BEGIN
DECLARE @ch UNIQUEIDENTIFIER
DECLARE @conversation_group_id UNIQUEIDENTIFIER
DECLARE @msg NVARCHAR(MAX)
SET @conversation_group_id = @reference_id
BEGIN TRY
BEGIN TRANSACTION
BEGIN DIALOG CONVERSATION @ch
FROM SERVICE [InitiatorService]
TO SERVICE 'TargetService'
ON CONTRACT [http://ssb.csharp.at/SSB_Book/c03/HelloWorldContract]
WITH RELATED_CONVERSATION_GROUP = @conversation_group_id,
ENCRYPTION = OFF
SET @msg = '<HelloWorldRequest>1234</HelloWorldRequest>'
;SEND ON CONVERSATION @ch MESSAGE TYPE [http://ssb.csharp.at/SSB_Book/c03/RequestMessage]
(
@msg
)
COMMIT TRANSACTION;
END TRY
BEGIN CATCH
ROLLBACK TRANSACTION
END CATCH
END
下面的代码是TargetQueue的内部激活存储过程:
USE [EventCloud]
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
ALTER PROCEDURE [dbo].[ProcessRequestMessage]
AS
BEGIN
DECLARE @ch UNIQUEIDENTIFIER
DECLARE @messagetypename NVARCHAR(256)
DECLARE @messagebody XML
DECLARE @responsemessage XML;
DECLARE @errorcode INT
DECLARE @errormessage NVARCHAR(3000);
WHILE (1 = 1)
BEGIN
BEGIN TRY
BEGIN TRANSACTION;
WAITFOR(
RECEIVE TOP (1)
@ch = conversation_handle,
@messagetypename = message_type_name,
@messagebody = CAST(message_body AS XML)
FROM TargetQueue
)
IF (@@ROWCOUNT = 0)
BEGIN
ROLLBACK TRANSACTION
BREAK
END
-- Process the requested message and send back to Initiator
ELSE IF (@messagetypename = 'http://ssb.csharp.at/SSB_Book/c03/RequestMessage')
BEGIN
-- Store the received request message in a table
INSERT INTO ProcessedMessages (ID, MessageBody, ServiceName, ProcessedDateTime)
VALUES (NEWID(), @messagebody, 'TargetService', GETDATE())
-- Construct the response message
SET @responsemessage =
'<HelloWorldResponse>' +
@messagebody.value('/HelloWorldRequest[1]', 'NVARCHAR(MAX)') +
'</HelloWorldResponse>';
-- Send the response message back to the initiating service
SEND ON CONVERSATION @ch MESSAGE TYPE [http://ssb.csharp.at/SSB_Book/c03/ResponseMessage]
(
@responsemessage
);
-- END the conversation on the target's side
END CONVERSATION @ch;
END
-- End the conversation if meet the message type
IF (@messagetypename = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
BEGIN
-- End the conversation
END CONVERSATION @ch;
END
COMMIT TRANSACTION
END TRY
BEGIN CATCH
ROLLBACK TRANSACTION
PRINT ERROR_MESSAGE()
END CATCH
END
END
此代码用于接收来自 InitiatorQueue 的响应消息:
ALTER PROCEDURE [dbo].[ProcessMessageWithTimeOut]
@reference_id UNIQUEIDENTIFIER,
@receive_timeout INT
AS
BEGIN
DECLARE @ch UNIQUEIDENTIFIER
DECLARE @conversation_group_id UNIQUEIDENTIFIER
DECLARE @messagetypename NVARCHAR(256)
DECLARE @messagebody XML
DECLARE @responsemessage XML
DECLARE @errorcode INT
DECLARE @errormessage NVARCHAR(3000)
DECLARE @queuing_order BIGINT
DECLARE @timeout INT
SET @conversation_group_id = @reference_id
SET @timeout = @receive_timeout
DECLARE @tableMessage TABLE
(
queuing_order BIGINT,
conversation_handle UNIQUEIDENTIFIER,
message_type_name NVARCHAR(256),
message_body VARBINARY(MAX)
)
BEGIN TRY
BEGIN TRANSACTION
WAITFOR(
RECEIVE
queuing_order,
conversation_handle,
message_type_name,
message_body
FROM InitiatorQueue INTO @tableMessage
WHERE conversation_group_id = @conversation_group_id
), TIMEOUT @timeout;
DECLARE @count INT
SET @count = (SELECT COUNT(*) FROM @tableMessage)
IF (@count = 0)
BEGIN;
THROW 50001, 'No message response within 5 seconds.', 1
END
IF (@count <>2)
BEGIN
DECLARE @timeout2 INT
SET @timeout2 = ABS(@timeout * 0.5)
WAITFOR(
RECEIVE
queuing_order,
conversation_handle,
message_type_name,
message_body
FROM InitiatorQueue INTO @tableMessage
WHERE conversation_group_id = @conversation_group_id
), TIMEOUT 5000
SET @count = (SELECT COUNT(*) FROM @tableMessage)
IF (@count <> 2)
BEGIN;
THROW 50002, 'End Dialog without Response Message', 1
END
END
WHILE (@count <> 0)
BEGIN
SET @queuing_order = (SELECT TOP 1 queuing_order FROM @tableMessage)
SET @ch = (SELECT conversation_handle FROM @tableMessage WHERE queuing_order = @queuing_order)
SET @messagetypename = (SELECT message_type_name FROM @tableMessage WHERE queuing_order = @queuing_order)
SET @messagebody = CAST((SELECT message_body FROM @tableMessage WHERE queuing_order = @queuing_order) AS XML)
IF (@messagetypename = 'http://ssb.csharp.at/SSB_Book/c03/ResponseMessage')
BEGIN
-- Store the received response message in a table
INSERT INTO ProcessedMessages (ID, MessageBody, ServiceName, ProcessedDateTime)
VALUES (NEWID(), @messagebody, 'InitiatorService', GETDATE())
END
ELSE IF (@messagetypename = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
BEGIN
-- End the conversation on the initiator's side
END CONVERSATION @ch
END
DELETE FROM @tableMessage WHERE queuing_order = @queuing_order
SET @count = (SELECT COUNT(*) FROM @tableMessage)
END
COMMIT TRANSACTION;
END TRY
BEGIN CATCH
IF ERROR_NUMBER() = 50001
BEGIN
;THROW
END
IF ERROR_NUMBER() = 50002
BEGIN
;THROW
END
;THROW
ROLLBACK TRANSACTION
PRINT ERROR_MESSAGE()
END CATCH
END
这是我用于调用存储过程的 C# 代码:
public async Task<TestObject> Begin_Conversation_With_Group_Id(Guid ch)
{
try
{
return await Context.Database.SqlQuery<TestObject>(
"EXEC SendingMessage_Group_Id @ch",
new SqlParameter("ch", ch))
.SingleOrDefaultAsync();
}
catch (Exception e)
{
TestObject exception = new TestObject();
exception.Data = "Cannot get the data due to: " + " " + e.Message;
return exception;
}
}
public async Task<TestObject> Process_Response_Message_With_TimeOut(Guid ch)
{
var timeout = 5000;
try
{
return await Context.Database.SqlQuery<TestObject>(
"EXEC ProcessMessageWithTimeOut @ch, @timeout",
new SqlParameter("ch", ch),
new SqlParameter("timeout", timeout))
.SingleOrDefaultAsync();
}
catch (SqlException ex)
{
TestObject exception = new TestObject();
exception.Data = "Process is not finish yet due to: " + ex.Message + " " + ex.Number;
return exception;
}
}
问题是我运行代码的时候,消息还在TargetQueue
,按理说应该由内部激活接收和处理。
但如果只调用 1 个存储过程 (SendingMessage_Group_Id
),它工作正常。该消息能够响应 InitiatorQueue
而不是停留在 TargetQueue
.
据我猜测,是因为事务还没有提交。
问题已解决。
出现这个问题的原因是因为我使用了一个框架导致出现这种情况。 框架在启动时自带事务,所以我只需要关闭框架的事务即可解决问题
我使用的框架是ASP.NET Zero