两个 SQL 传输失败之间的 NServiceBus 路由器
NServiceBus router between two SQL transports failing
我有两个业务域(DomainA 和 DomainB),它们使用自己的 SQL 在单独的数据库(单独的服务器)中为其 NServiceBus enpdpoints 传输。
我了解到您可以通过路由器连接消息从一个域发送/发布到另一个域。官方文档明确表示是possible。
出于复制目的,我进行了更新并发布 sample 并对其进行了修改,以便两个端点都使用 SQL 服务器传输 + SQL 持久性。
目标是从 DomainA 发布消息并在 DomainB 中处理它。
DomainA(网络应用程序)- 没有路由器配置,因为它只为演示发布一些内容:
var endpointConfiguration = new EndpointConfiguration("DomainA-Endpoint");
var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(ConnectionStrings.DomainA);
transport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MsSqlServer>();
persistence.ConnectionBuilder(
connectionBuilder: () =>
{
return new SqlConnection(ConnectionStrings.DomainA);
});
var subscriptions = persistence.SubscriptionSettings();
subscriptions.DisableCache();
endpointConfiguration.EnableInstallers();
DomainB - 连接到路由器以发布事件:
var endpointConfiguration = new EndpointConfiguration("DomainB-Endpoint");
var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(ConnectionStrings.DomainB);
transport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MsSqlServer>();
persistence.ConnectionBuilder(
connectionBuilder: () =>
{
return new SqlConnection(ConnectionStrings.DomainB);
});
var subscriptions = persistence.SubscriptionSettings();
subscriptions.DisableCache();
endpointConfiguration.EnableInstallers();
var routerConnector = transport.Routing().ConnectToRouter("DomainA-B-Router");
routerConnector.RegisterPublisher(
eventType: typeof(OrderAccepted),
publisherEndpointName: "DomainA-Endpoint");
路由器:
var routerConfig = new RouterConfiguration("DomainA-B-Router");
var domainAInterface = routerConfig.AddInterface<SqlServerTransport>("DomainA", t =>
{
t.ConnectionString(ConnectionStrings.DomainA);
t.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
});
var domainASqlSubscriptionStorage = new SqlSubscriptionStorage(
() => new SqlConnection(ConnectionStrings.Router),
"DomainA-", new SqlDialect.MsSqlServer(), null);
domainAInterface.EnableMessageDrivenPublishSubscribe(domainASqlSubscriptionStorage);
var domainBInterface = routerConfig.AddInterface<SqlServerTransport>("DomainB", t =>
{
t.ConnectionString(ConnectionStrings.DomainB);
t.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
});
var domainBSqlSubscriptionStorage = new SqlSubscriptionStorage(
() => new SqlConnection(ConnectionStrings.Router),
"DomainB-", new SqlDialect.MsSqlServer(), null);
domainBInterface.EnableMessageDrivenPublishSubscribe(domainBSqlSubscriptionStorage);
var staticRouting = routerConfig.UseStaticRoutingProtocol();
staticRouting.AddForwardRoute("DomainA", "DomainB");
staticRouting.AddForwardRoute("DomainB", "DomainA");
domainASqlSubscriptionStorage.Install().GetAwaiter().GetResult();
domainBSqlSubscriptionStorage.Install().GetAwaiter().GetResult();
routerConfig.AutoCreateQueues();
为简洁起见,省略了启动端点和路由器的代码。
连接字符串如下 - 为 DomainA 和 DomainB 使用不同的 SQL 实例:
public class ConnectionStrings
{
public const string DomainA = @"Data Source=(local);Initial Catalog=Nsb-DomainA-Endpoint-DB;Integrated Security=True;Max Pool Size=100";
public const string DomainB = @"Data Source=(localDB)\MSSQLLocalDB;Initial Catalog=Nsb-DomainB-Endpoint-DB;Integrated Security=True;Max Pool Size=100";
public const string Router = @"Data Source=(local);Initial Catalog=Nsb-DomainA-B-Router-DB;Integrated Security=True;Max Pool Size=100";
}
当我运行样本时,我在路由器中收到以下错误:
2019-07-10 11:46:08.889 WARN RepeatedFailuresCircuitBreaker The circuit breaker for DomainA-B-Router is now in the armed state
2019-07-10 11:46:15.955 WARN RepeatedFailuresCircuitBreaker The circuit breaker for DomainA-B-Router will now be triggered
2019-07-10 11:46:15.991 ERROR ThrottlingRawEndpointConfig`1[[NServiceBus.SqlServerTransport, NServiceBus.Transport.SqlServer, Version=4.0.0.0, Culture=neutral, PublicKeyToken=9fc386479f8a226c]] Persistent error while processing messages in DomainA-B-Router. Entering throttled mode.
NServiceBus.Unicast.Queuing.QueueNotFoundException: Failed to send message to [Nsb-DomainA-Endpoint-DB].[dbo].[DomainA-Endpoint] ---> System.Data.SqlClient.SqlException: Invalid object name 'Nsb-DomainA-Endpoint-DB.dbo.DomainA-Endpoint'.
at System.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection, Action`1 wrapCloseInAction)
at System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj, Boolean callerHasConnectionLock, Boolean asyncClose)
at System.Data.SqlClient.TdsParser.TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj, Boolean& dataReady)
at System.Data.SqlClient.SqlCommand.FinishExecuteReader(SqlDataReader ds, RunBehavior runBehavior, String resetOptionsString, Boolean isInternal, Boolean forDescribeParameterEncryption, Boolean shouldCacheForAlwaysEncrypted)
at System.Data.SqlClient.SqlCommand.CompleteAsyncExecuteReader(Boolean isInternal, Boolean forDescribeParameterEncryption)
at System.Data.SqlClient.SqlCommand.InternalEndExecuteNonQuery(IAsyncResult asyncResult, String endMethod, Boolean isInternal)
at System.Data.SqlClient.SqlCommand.EndExecuteNonQueryInternal(IAsyncResult asyncResult)
at System.Data.SqlClient.SqlCommand.EndExecuteNonQueryAsync(IAsyncResult asyncResult)
at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.SQLServer.TableBasedQueue.<SendRawMessage>d__10.MoveNext()
--- End of inner exception stack trace ---
at NServiceBus.Transport.SQLServer.TableBasedQueue.ThrowQueueNotFoundException(SqlException ex)
at NServiceBus.Transport.SQLServer.TableBasedQueue.<SendRawMessage>d__10.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.SQLServer.TableBasedQueueDispatcher.<Send>d__5.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.SQLServer.TableBasedQueueDispatcher.<DispatchUsingReceiveTransaction>d__4.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.SQLServer.TableBasedQueueDispatcher.<DispatchAsNonIsolated>d__2.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.SQLServer.MessageDispatcher.<Dispatch>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at PostroutingTerminator.<Terminate>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.ChainTerminator`1.<Invoke>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.TerminatorInvocationRule`1.<Invoke>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at ForwardSubscribeMessageDrivenRule.<Terminate>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.ChainTerminator`1.<Invoke>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.TerminatorInvocationRule`1.<Invoke>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at SubscribePreroutingTerminator.<Terminate>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.ChainTerminator`1.<Invoke>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.TerminatorInvocationRule`1.<Invoke>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at StorageDrivenSubscriptionRule.<Invoke>d__2.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at PreroutingToSubscribePreroutingFork.<Terminate>d__0.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.ChainTerminator`1.<Invoke>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.TerminatorInvocationRule`1.<Invoke>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at ThrottlingRawEndpointConfig`1.<>c__DisplayClass1_0.<<PrepareConfig>b__1>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.SQLServer.ReceiveStrategy.<TryProcessingMessage>d__13.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.SQLServer.ProcessWithNativeTransaction.<TryProcess>d__3.MoveNext()
我们可以看到路由器正在尝试访问 DomainA 数据库,并且在调试时,我们可以看到它是从 DomainB 连接访问的,但由于它们位于不同的服务器上,因此无法正常工作。
如果我将 DomainB 的连接字符串更改为指向同一个 SQL 实例,一切正常(前提是同一个用户可以访问所有数据库)。
我认为路由器的作用是在实例之间移动消息,但我没能做到这一点。
我做错了什么吗?
为什么路由器和端点应该使用相同的连接来订阅数据?
感谢您的帮助!
完整代码可用 here。
我下载了代码并升级到最新版本的 Router (3.8.1),它运行没有任何问题 -- 消息已发布并发送给两个订阅者。
我有两个业务域(DomainA 和 DomainB),它们使用自己的 SQL 在单独的数据库(单独的服务器)中为其 NServiceBus enpdpoints 传输。
我了解到您可以通过路由器连接消息从一个域发送/发布到另一个域。官方文档明确表示是possible。 出于复制目的,我进行了更新并发布 sample 并对其进行了修改,以便两个端点都使用 SQL 服务器传输 + SQL 持久性。
目标是从 DomainA 发布消息并在 DomainB 中处理它。
DomainA(网络应用程序)- 没有路由器配置,因为它只为演示发布一些内容:
var endpointConfiguration = new EndpointConfiguration("DomainA-Endpoint");
var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(ConnectionStrings.DomainA);
transport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MsSqlServer>();
persistence.ConnectionBuilder(
connectionBuilder: () =>
{
return new SqlConnection(ConnectionStrings.DomainA);
});
var subscriptions = persistence.SubscriptionSettings();
subscriptions.DisableCache();
endpointConfiguration.EnableInstallers();
DomainB - 连接到路由器以发布事件:
var endpointConfiguration = new EndpointConfiguration("DomainB-Endpoint");
var transport = endpointConfiguration.UseTransport<SqlServerTransport>();
transport.ConnectionString(ConnectionStrings.DomainB);
transport.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
var persistence = endpointConfiguration.UsePersistence<SqlPersistence>();
persistence.SqlDialect<SqlDialect.MsSqlServer>();
persistence.ConnectionBuilder(
connectionBuilder: () =>
{
return new SqlConnection(ConnectionStrings.DomainB);
});
var subscriptions = persistence.SubscriptionSettings();
subscriptions.DisableCache();
endpointConfiguration.EnableInstallers();
var routerConnector = transport.Routing().ConnectToRouter("DomainA-B-Router");
routerConnector.RegisterPublisher(
eventType: typeof(OrderAccepted),
publisherEndpointName: "DomainA-Endpoint");
路由器:
var routerConfig = new RouterConfiguration("DomainA-B-Router");
var domainAInterface = routerConfig.AddInterface<SqlServerTransport>("DomainA", t =>
{
t.ConnectionString(ConnectionStrings.DomainA);
t.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
});
var domainASqlSubscriptionStorage = new SqlSubscriptionStorage(
() => new SqlConnection(ConnectionStrings.Router),
"DomainA-", new SqlDialect.MsSqlServer(), null);
domainAInterface.EnableMessageDrivenPublishSubscribe(domainASqlSubscriptionStorage);
var domainBInterface = routerConfig.AddInterface<SqlServerTransport>("DomainB", t =>
{
t.ConnectionString(ConnectionStrings.DomainB);
t.Transactions(TransportTransactionMode.SendsAtomicWithReceive);
});
var domainBSqlSubscriptionStorage = new SqlSubscriptionStorage(
() => new SqlConnection(ConnectionStrings.Router),
"DomainB-", new SqlDialect.MsSqlServer(), null);
domainBInterface.EnableMessageDrivenPublishSubscribe(domainBSqlSubscriptionStorage);
var staticRouting = routerConfig.UseStaticRoutingProtocol();
staticRouting.AddForwardRoute("DomainA", "DomainB");
staticRouting.AddForwardRoute("DomainB", "DomainA");
domainASqlSubscriptionStorage.Install().GetAwaiter().GetResult();
domainBSqlSubscriptionStorage.Install().GetAwaiter().GetResult();
routerConfig.AutoCreateQueues();
为简洁起见,省略了启动端点和路由器的代码。
连接字符串如下 - 为 DomainA 和 DomainB 使用不同的 SQL 实例:
public class ConnectionStrings
{
public const string DomainA = @"Data Source=(local);Initial Catalog=Nsb-DomainA-Endpoint-DB;Integrated Security=True;Max Pool Size=100";
public const string DomainB = @"Data Source=(localDB)\MSSQLLocalDB;Initial Catalog=Nsb-DomainB-Endpoint-DB;Integrated Security=True;Max Pool Size=100";
public const string Router = @"Data Source=(local);Initial Catalog=Nsb-DomainA-B-Router-DB;Integrated Security=True;Max Pool Size=100";
}
当我运行样本时,我在路由器中收到以下错误:
2019-07-10 11:46:08.889 WARN RepeatedFailuresCircuitBreaker The circuit breaker for DomainA-B-Router is now in the armed state
2019-07-10 11:46:15.955 WARN RepeatedFailuresCircuitBreaker The circuit breaker for DomainA-B-Router will now be triggered
2019-07-10 11:46:15.991 ERROR ThrottlingRawEndpointConfig`1[[NServiceBus.SqlServerTransport, NServiceBus.Transport.SqlServer, Version=4.0.0.0, Culture=neutral, PublicKeyToken=9fc386479f8a226c]] Persistent error while processing messages in DomainA-B-Router. Entering throttled mode.
NServiceBus.Unicast.Queuing.QueueNotFoundException: Failed to send message to [Nsb-DomainA-Endpoint-DB].[dbo].[DomainA-Endpoint] ---> System.Data.SqlClient.SqlException: Invalid object name 'Nsb-DomainA-Endpoint-DB.dbo.DomainA-Endpoint'.
at System.Data.SqlClient.SqlConnection.OnError(SqlException exception, Boolean breakConnection, Action`1 wrapCloseInAction)
at System.Data.SqlClient.TdsParser.ThrowExceptionAndWarning(TdsParserStateObject stateObj, Boolean callerHasConnectionLock, Boolean asyncClose)
at System.Data.SqlClient.TdsParser.TryRun(RunBehavior runBehavior, SqlCommand cmdHandler, SqlDataReader dataStream, BulkCopySimpleResultSet bulkCopyHandler, TdsParserStateObject stateObj, Boolean& dataReady)
at System.Data.SqlClient.SqlCommand.FinishExecuteReader(SqlDataReader ds, RunBehavior runBehavior, String resetOptionsString, Boolean isInternal, Boolean forDescribeParameterEncryption, Boolean shouldCacheForAlwaysEncrypted)
at System.Data.SqlClient.SqlCommand.CompleteAsyncExecuteReader(Boolean isInternal, Boolean forDescribeParameterEncryption)
at System.Data.SqlClient.SqlCommand.InternalEndExecuteNonQuery(IAsyncResult asyncResult, String endMethod, Boolean isInternal)
at System.Data.SqlClient.SqlCommand.EndExecuteNonQueryInternal(IAsyncResult asyncResult)
at System.Data.SqlClient.SqlCommand.EndExecuteNonQueryAsync(IAsyncResult asyncResult)
at System.Threading.Tasks.TaskFactory`1.FromAsyncCoreLogic(IAsyncResult iar, Func`2 endFunction, Action`1 endAction, Task`1 promise, Boolean requiresSynchronization)
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.SQLServer.TableBasedQueue.<SendRawMessage>d__10.MoveNext()
--- End of inner exception stack trace ---
at NServiceBus.Transport.SQLServer.TableBasedQueue.ThrowQueueNotFoundException(SqlException ex)
at NServiceBus.Transport.SQLServer.TableBasedQueue.<SendRawMessage>d__10.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.SQLServer.TableBasedQueueDispatcher.<Send>d__5.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.SQLServer.TableBasedQueueDispatcher.<DispatchUsingReceiveTransaction>d__4.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.SQLServer.TableBasedQueueDispatcher.<DispatchAsNonIsolated>d__2.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.SQLServer.MessageDispatcher.<Dispatch>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at PostroutingTerminator.<Terminate>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.ChainTerminator`1.<Invoke>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.TerminatorInvocationRule`1.<Invoke>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at ForwardSubscribeMessageDrivenRule.<Terminate>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.ChainTerminator`1.<Invoke>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.TerminatorInvocationRule`1.<Invoke>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at SubscribePreroutingTerminator.<Terminate>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.ChainTerminator`1.<Invoke>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.TerminatorInvocationRule`1.<Invoke>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at StorageDrivenSubscriptionRule.<Invoke>d__2.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at PreroutingToSubscribePreroutingFork.<Terminate>d__0.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable`1.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.ChainTerminator`1.<Invoke>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at NServiceBus.Router.TerminatorInvocationRule`1.<Invoke>d__3.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
at ThrottlingRawEndpointConfig`1.<>c__DisplayClass1_0.<<PrepareConfig>b__1>d.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.SQLServer.ReceiveStrategy.<TryProcessingMessage>d__13.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.SQLServer.ProcessWithNativeTransaction.<TryProcess>d__3.MoveNext()
我们可以看到路由器正在尝试访问 DomainA 数据库,并且在调试时,我们可以看到它是从 DomainB 连接访问的,但由于它们位于不同的服务器上,因此无法正常工作。 如果我将 DomainB 的连接字符串更改为指向同一个 SQL 实例,一切正常(前提是同一个用户可以访问所有数据库)。
我认为路由器的作用是在实例之间移动消息,但我没能做到这一点。 我做错了什么吗?
为什么路由器和端点应该使用相同的连接来订阅数据?
感谢您的帮助!
完整代码可用 here。
我下载了代码并升级到最新版本的 Router (3.8.1),它运行没有任何问题 -- 消息已发布并发送给两个订阅者。