发布到 Azure 服务总线主题失败且没有错误
Publishing to Azure Service Bus Topic fails without error
10 月更新。 2021 年 20 日
发布到队列似乎按预期工作。当我发布到队列时,消息会保留在队列中。相反,当我发布到主题时,消息不会持久化。
已更新
我添加了一个简单的控制台应用程序,它在下面重现了相同的行为。
我正在尝试从 Azure 函数向服务总线中的主题发送消息。我已经使用 Mass Transit 通过托管身份尝试过此操作。我还使用 Azure.Messaging.ServiceBus nuget 包通过共享访问密钥尝试了此操作。两种方法均完成无异常,但消息不在主题中。
这是我从我的函数发送时看到的:
主题的附加设置:
我可以使用 Azure 门户中的服务总线资源管理器将消息放在主题上。
没有关于该主题的订阅。我之前确实有一个设置作为测试,但它已经被删除了。
公共交通设置(在 Startup.cs 中)
private void ConfigureMassTransit(IServiceCollection services, IConfiguration config) {
const string KEY_QUEUE_SERVER = "REDACTED";
const string EMAIL_RETRY_TOPIC = "REDACTED";
const string EMAIL_SENT_TOPIC = "REDACTED";
services.AddMassTransit(x => {
x.UsingAzureServiceBus((context, cfg) => {
cfg.Host(new Uri(config[KEY_QUEUE_SERVER]), host => {
host.TokenProvider = TokenProvider.CreateManagedIdentityTokenProvider();
});
cfg.Message<EmailSentEvent>(m => m.SetEntityName(EMAIL_SENT_TOPIC));
cfg.Message<TransactionEmailFailedEvent>(m => m.SetEntityName(EMAIL_RETRY_TOPIC));
cfg.ConfigureEndpoints(context);
});
});
}
MassTransitQueueAdapter.cs
public class MassTransitQueueAdapter : IQueueAdapter {
#region attributes
private readonly IBus _bus;
#endregion
#region ctor
public MassTransitQueueAdapter(IBus bus) {
_bus = bus;
}
#endregion
#region methods
public void PublishFailure(TransactionEmailFailedEvent failedEvent) {
_bus.Publish(failedEvent);
}
public void PublishSuccess(EmailSentEvent sentEvent) {
_bus.Publish(sentEvent);
}
#endregion
}
ServiceBusQueueAdapter.cs
public class ServiceBusQueueAdapter : IQueueAdapter {
#region attributes
private readonly QueueContext _context;
#endregion
#region ctor
public ServiceBusQueueAdapter(QueueContext context) {
_context = context;
}
#endregion
#region methods
private static ServiceBusClient BuildClient(string connectionString) => new ServiceBusClient(connectionString);
public void PublishFailure(TransactionEmailFailedEvent failedEvent) {
throw new System.NotImplementedException();
}
public void PublishSuccess(EmailSentEvent sentEvent) {
ServiceBusClient client = BuildClient(_context.SentTopicConnectionString);
ServiceBusSender sender = client.CreateSender(_context.SentTopicName);
Task.Run(() => sender.SendMessageAsync(new ServiceBusMessage(JsonConvert.SerializeObject(sentEvent))));
}
#endregion
}
简单的控制台应用程序
class Program {
static void Main() {
string cs = "Endpoint=sb://REDACTED.servicebus.windows.net/;SharedAccessKeyName=test_with_manage;SharedAccessKey=REDACTED;";
ServiceBusClient client = new ServiceBusClient(cs);
ServiceBusSender sender = client.CreateSender("test_1");
sender.SendMessageAsync(new ServiceBusMessage("Hello World!"))
.Wait();
}
}
这里的问题是我对 Azure 服务总线主题的理解。我期望主题充当消息的一种存储类型。我的假设是错误的。该主题只会将消息转发给订阅。订阅可以保留消息或转发到另一个主题或队列。
有了这些知识,我就能够让我的服务总线特定实现正常工作。我显然还有一点要了解公共交通。
10 月更新。 2021 年 20 日
发布到队列似乎按预期工作。当我发布到队列时,消息会保留在队列中。相反,当我发布到主题时,消息不会持久化。
已更新
我添加了一个简单的控制台应用程序,它在下面重现了相同的行为。
我正在尝试从 Azure 函数向服务总线中的主题发送消息。我已经使用 Mass Transit 通过托管身份尝试过此操作。我还使用 Azure.Messaging.ServiceBus nuget 包通过共享访问密钥尝试了此操作。两种方法均完成无异常,但消息不在主题中。
这是我从我的函数发送时看到的:
主题的附加设置:
我可以使用 Azure 门户中的服务总线资源管理器将消息放在主题上。
没有关于该主题的订阅。我之前确实有一个设置作为测试,但它已经被删除了。
公共交通设置(在 Startup.cs 中)
private void ConfigureMassTransit(IServiceCollection services, IConfiguration config) {
const string KEY_QUEUE_SERVER = "REDACTED";
const string EMAIL_RETRY_TOPIC = "REDACTED";
const string EMAIL_SENT_TOPIC = "REDACTED";
services.AddMassTransit(x => {
x.UsingAzureServiceBus((context, cfg) => {
cfg.Host(new Uri(config[KEY_QUEUE_SERVER]), host => {
host.TokenProvider = TokenProvider.CreateManagedIdentityTokenProvider();
});
cfg.Message<EmailSentEvent>(m => m.SetEntityName(EMAIL_SENT_TOPIC));
cfg.Message<TransactionEmailFailedEvent>(m => m.SetEntityName(EMAIL_RETRY_TOPIC));
cfg.ConfigureEndpoints(context);
});
});
}
MassTransitQueueAdapter.cs
public class MassTransitQueueAdapter : IQueueAdapter {
#region attributes
private readonly IBus _bus;
#endregion
#region ctor
public MassTransitQueueAdapter(IBus bus) {
_bus = bus;
}
#endregion
#region methods
public void PublishFailure(TransactionEmailFailedEvent failedEvent) {
_bus.Publish(failedEvent);
}
public void PublishSuccess(EmailSentEvent sentEvent) {
_bus.Publish(sentEvent);
}
#endregion
}
ServiceBusQueueAdapter.cs
public class ServiceBusQueueAdapter : IQueueAdapter {
#region attributes
private readonly QueueContext _context;
#endregion
#region ctor
public ServiceBusQueueAdapter(QueueContext context) {
_context = context;
}
#endregion
#region methods
private static ServiceBusClient BuildClient(string connectionString) => new ServiceBusClient(connectionString);
public void PublishFailure(TransactionEmailFailedEvent failedEvent) {
throw new System.NotImplementedException();
}
public void PublishSuccess(EmailSentEvent sentEvent) {
ServiceBusClient client = BuildClient(_context.SentTopicConnectionString);
ServiceBusSender sender = client.CreateSender(_context.SentTopicName);
Task.Run(() => sender.SendMessageAsync(new ServiceBusMessage(JsonConvert.SerializeObject(sentEvent))));
}
#endregion
}
简单的控制台应用程序
class Program {
static void Main() {
string cs = "Endpoint=sb://REDACTED.servicebus.windows.net/;SharedAccessKeyName=test_with_manage;SharedAccessKey=REDACTED;";
ServiceBusClient client = new ServiceBusClient(cs);
ServiceBusSender sender = client.CreateSender("test_1");
sender.SendMessageAsync(new ServiceBusMessage("Hello World!"))
.Wait();
}
}
这里的问题是我对 Azure 服务总线主题的理解。我期望主题充当消息的一种存储类型。我的假设是错误的。该主题只会将消息转发给订阅。订阅可以保留消息或转发到另一个主题或队列。
有了这些知识,我就能够让我的服务总线特定实现正常工作。我显然还有一点要了解公共交通。