在 Amazon SQS 传输上发布的 NServiceBus 路由器事件不由 Azure 服务总线传输端点处理

NServiceBus Router events published on Amazon SQS transport are not handled by an Azure Service Bus transport endpoint

我一直在努力 NServiceBus.Router 允许使用 AmazonSQS 传输和 AzureServiceBus 传输的端点相互通信。到目前为止,我能够通过路由器获得从 ASB 端点发送并由 SQS 端点处理的命令。但是,当我从 SQS 端点发布事件时,即使我已将 SQS 端点注册为发布者,ASB 端点也不会处理该事件。我不知道我做错了什么,但看看我能从 from the docs 中找到的每个示例,它似乎应该有效。

我已经尝试添加另一条与下面相反的转发路由(SQS 到 ASB),但这并没有解决问题。

.net 5 worker 服务中的端点和路由器各自 运行。

我制作了一个重现问题的示例项目 here,但这里有一些显示相关设置的快速概览片段:

路由器设置

var routerConfig = new RouterConfiguration("ASBToSQS.Router");

var azureInterface = routerConfig.AddInterface<AzureServiceBusTransport>("ASB", t =>
{
    t.ConnectionString(Environment.GetEnvironmentVariable("ASB_CONNECTION_STRING"));

    t.Transactions(TransportTransactionMode.ReceiveOnly);
    t.SubscriptionRuleNamingConvention((entityType) =>
    {
        var entityPathOrName = entityType.Name;
        if (entityPathOrName.Length >= 50)
        {
            return entityPathOrName.Split('.').Last();
        }

        return entityPathOrName;
    });
});

var sqsInterface = routerConfig.AddInterface<SqsTransport>("SQS", t =>
{
    t.UnrestrictedDurationDelayedDelivery();

    t.Transactions(TransportTransactionMode.ReceiveOnly);

    var settings = t.GetSettings();

    // Avoids a missing setting error
    //https://github.com/SzymonPobiega/NServiceBus.Raw/blob/master/src/AcceptanceTests.SQS/Helper.cs#L18
    bool isMessageType(Type t) => true;
    var ctor = typeof(MessageMetadataRegistry).GetConstructor(
        BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Instance, null,
        new[] {typeof(Func<Type, bool>)}, null);
#pragma warning disable CS0618 // Type or member is obsolete
    settings.Set<MessageMetadataRegistry>(ctor.Invoke(new object[] {(Func<Type, bool>) isMessageType}));
#pragma warning restore CS0618 // Type or member is obsolete

});

var staticRouting = routerConfig.UseStaticRoutingProtocol();

staticRouting.AddForwardRoute("ASB", "SQS");

routerConfig.AutoCreateQueues();

ASB 端点设置

var endpointConfiguration = new EndpointConfiguration("ASBToSQSRouter.ASBEndpoint");

var transport = endpointConfiguration.UseTransport<AzureServiceBusTransport>();

transport.SubscriptionRuleNamingConvention((entityType) =>
{
    var entityPathOrName = entityType.Name;
    if (entityPathOrName.Length >= 50)
    {
        return entityPathOrName.Split('.').Last();
    }

    return entityPathOrName;
});

transport.Transactions(TransportTransactionMode.ReceiveOnly);
transport.ConnectionString(Environment.GetEnvironmentVariable("ASB_CONNECTION_STRING"));

var bridge = transport.Routing().ConnectToRouter("ASBToSQS.Router");

bridge.RouteToEndpoint(typeof(ASBToSQSCommand), "ASBToSQSRouter.SQSEndpoint");
bridge.RegisterPublisher(typeof(ASBToSQSEvent), "ASBToSQSRouter.SQSEndpoint");

endpointConfiguration.EnableInstallers();

SQS 端点设置(没什么特别的,因为它不需要了解路由器)

var endpointConfiguration = new EndpointConfiguration("ASBToSQSRouter.SQSEndpoint");

var transport = endpointConfiguration.UseTransport<SqsTransport>();

transport.UnrestrictedDurationDelayedDelivery();

transport.Transactions(TransportTransactionMode.ReceiveOnly);

endpointConfiguration.EnableInstallers();

如有任何帮助,我们将不胜感激!

不幸的是,最近的 SQS 传输版本之一包含一项更改,使订阅仅在默认情况下在完整 NServiceBus 端点的上下文中工作。此功能为订阅批处理。

为了让路由器正常工作(路由器没有运行一个完整的端点,只是NServiceBus传输),你需要在SQS接口配置中添加这条魔法线:

settings.Set("NServiceBus.AmazonSQS.DisableSubscribeBatchingOnStart", true);

这是一个未记录的标志,它禁用订阅批处理并允许路由器正常完成订阅操作。

对于给您带来的不便,我们深表歉意。