发布到 Azure 服务总线主题失败且没有错误

Publishing to Azure Service Bus Topic fails without error

10 月更新。 2021 年 20 日

发布到队列似乎按预期工作。当我发布到队列时,消息会保留在队列中。相反,当我发布到主题时,消息不会持久化。

已更新

我添加了一个简单的控制台应用程序,它在下面重现了相同的行为。

我正在尝试从 Azure 函数向服务总线中的主题发送消息。我已经使用 Mass Transit 通过托管身份尝试过此操作。我还使用 Azure.Messaging.ServiceBus nuget 包通过共享访问密钥尝试了此操作。两种方法均完成无异常,但消息不在主题中。

这是我从我的函数发送时看到的:

主题的附加设置:

我可以使用 Azure 门户中的服务总线资源管理器将消息放在主题上。

没有关于该主题的订阅。我之前确实有一个设置作为测试,但它已经被删除了。

公共交通设置(在 Startup.cs 中)

        private void ConfigureMassTransit(IServiceCollection services, IConfiguration config) {
            const string KEY_QUEUE_SERVER  = "REDACTED";
            const string EMAIL_RETRY_TOPIC = "REDACTED";
            const string EMAIL_SENT_TOPIC  = "REDACTED";

            services.AddMassTransit(x => {
                x.UsingAzureServiceBus((context, cfg) => {
                    cfg.Host(new Uri(config[KEY_QUEUE_SERVER]), host => {
                        host.TokenProvider = TokenProvider.CreateManagedIdentityTokenProvider();
                    });
                    cfg.Message<EmailSentEvent>(m => m.SetEntityName(EMAIL_SENT_TOPIC));
                    cfg.Message<TransactionEmailFailedEvent>(m => m.SetEntityName(EMAIL_RETRY_TOPIC));

                    cfg.ConfigureEndpoints(context);
                });
            });
        }

MassTransitQueueAdapter.cs

    public class MassTransitQueueAdapter : IQueueAdapter {
        #region attributes
        private readonly IBus _bus;
        #endregion

        #region ctor
        public MassTransitQueueAdapter(IBus bus) {
            _bus = bus;
        }
        #endregion

        #region methods
        public void PublishFailure(TransactionEmailFailedEvent failedEvent) {
            _bus.Publish(failedEvent);
        }

        public void PublishSuccess(EmailSentEvent sentEvent) {
            _bus.Publish(sentEvent);
        }
        #endregion
    }

ServiceBusQueueAdapter.cs

    public class ServiceBusQueueAdapter : IQueueAdapter {
        #region attributes
        private readonly QueueContext _context;
        #endregion

        #region ctor
        public ServiceBusQueueAdapter(QueueContext context) {
            _context = context;
        }
        #endregion

        #region methods
        private static ServiceBusClient BuildClient(string connectionString) => new ServiceBusClient(connectionString);

        public void PublishFailure(TransactionEmailFailedEvent failedEvent) {
            throw new System.NotImplementedException();
        }

        public void PublishSuccess(EmailSentEvent sentEvent) {
            ServiceBusClient client = BuildClient(_context.SentTopicConnectionString);
            ServiceBusSender sender = client.CreateSender(_context.SentTopicName);

            Task.Run(() => sender.SendMessageAsync(new ServiceBusMessage(JsonConvert.SerializeObject(sentEvent))));
        }
        #endregion
    }

简单的控制台应用程序

    class Program {
        static void Main() {
            string           cs     = "Endpoint=sb://REDACTED.servicebus.windows.net/;SharedAccessKeyName=test_with_manage;SharedAccessKey=REDACTED;";
            ServiceBusClient client = new ServiceBusClient(cs);
            ServiceBusSender sender = client.CreateSender("test_1");

            sender.SendMessageAsync(new ServiceBusMessage("Hello World!"))
                  .Wait();
        }
    }

这里的问题是我对 Azure 服务总线主题的理解。我期望主题充当消息的一种存储类型。我的假设是错误的。该主题只会将消息转发给订阅。订阅可以保留消息或转发到另一个主题或队列。

有了这些知识,我就能够让我的服务总线特定实现正常工作。我显然还有一点要了解公共交通。