公共交通 - 如何使用 Azure 服务总线安排消息
Mass Transit - How to Schedule a message using Azure Service Bus
我已经查看了有关 Scheduling with Azure Service Bus 的文档,但我不清楚如何从 "disconnected" 总线发送消息。
以下是我如何配置在服务器上处理消息的服务:
builder.AddMassTransit(mt =>
{
mt.AddConsumers(cqrsAssembly);
mt.AddBus(context => Bus.Factory.CreateUsingAzureServiceBus(x =>
{
x.RequiresSession = true;
x.MaxConcurrentCalls = 500;
x.MessageWaitTimeout = TimeSpan.FromMinutes(5);
x.UseRenewLock(TimeSpan.FromMinutes(4));
x.UseServiceBusMessageScheduler();
var host = x.Host(serviceUri, h =>
{
h.SharedAccessSignature(s =>
{
s.KeyName = "key-name";
s.SharedAccessKey = "access-key";
s.TokenTimeToLive = TimeSpan.FromDays(1);
s.TokenScope = TokenScope.Namespace;
});
h.OperationTimeout = TimeSpan.FromMinutes(2);
});
x.ReceiveEndpoint(host, $"mt.myqueue", ep =>
{
ep.RequiresSession = true;
ep.MaxConcurrentCalls = 500;
ep.RemoveSubscriptions = true;
ep.UseMessageRetry(r =>
{
r.Interval(4, TimeSpan.FromSeconds(30));
r.Handle<TransientCommandException>();
});
ep.ConfigureConsumers(context);
});
});
});
我明确调用了 UseServiceBusMessageScheduler()
。
在创建消息并将消息发送到队列的项目中(在不同的上下文中运行,"send only" 也是如此),我们有:
var bus = Bus.Factory.CreateUsingAzureServiceBus(x =>
{
x.RequiresSession = true;
x.MessageWaitTimeout = TimeSpan.FromMinutes(5);
x.UseRenewLock(TimeSpan.FromMinutes(4));
x.Send<ICommand>(s => s.UseSessionIdFormatter(ctx => ctx.Message.SessionId ?? Guid.NewGuid().ToString()));
var host = x.Host(serviceUri, h =>
{
h.SharedAccessSignature(s =>
{
s.KeyName = "key-name";
s.SharedAccessKey = "key";
s.TokenTimeToLive = TimeSpan.FromDays(1);
s.TokenScope = TokenScope.Namespace;
});
h.OperationTimeout = TimeSpan.FromMinutes(2);
});
EndpointConvention.Map<ICommand>(new Uri($"{serviceUri.ToString()}mt.myqueue"));
EndpointConvention.Map<Command>(new Uri($"{serviceUri.ToString()}mt.myqueue"));
});
现在,要发送预定消息,我们这样做:
var dest = "what?";
await bus.ScheduleSend(dest, scheduledEnqueueTimeUtc.Value, message);
我不确定需要将什么传递到 destinationAddress
。
我试过:
- serviceUri
-`{serviceUri}mt.myqueue"
但是检查队列,我在基本队列、skipped
队列或 error
队列中都没有看到我的消息。
我是否遗漏了一些其他配置,如果没有,如何确定目标队列?
我正在使用版本 5.5.4
的 Mass Transit,每次过载到 ScheduleSend()
都需要它。
首先,是的,您的 Uri 格式是正确的。最后格式化后你需要这样的东西:
new Uri(@"sb://yourdomain.servicebus.windows.net/yourapp/your_message_queue")
还要确保在配置端点时添加了。 (见下面的link)
configurator.UseServiceBusMessageScheduler();
如果您遵循公共交通文档,调度是从 ConsumeContext 完成的。参见 Mass-Transit Azure Scheduling
public class ScheduleNotificationConsumer :
IConsumer<AssignSeat>
{
Uri _schedulerAddress;
Uri _notificationService;
public async Task Consume(ConsumeContext<AssignSeat> context)
{
if(context.Message.ReservationTime - DateTime.Now < TimeSpan.FromHours(8))
{
// assign the seat for the reservation
}
else
{
// seats can only be assigned eight hours before the reservation
context.ScheduleMessage(context.Message.ReservationTime - TimeSpan.FromHours(8), context.Message);
}
}
}
然而,在本周我们遇到的一个用例中,我们需要从 consumeContext 外部进行调度,或者根本不想将上下文转发到我们调度的位置。使用 IBusControl.ScheduleSend
时,我们没有得到任何错误反馈,但我们也没有真正完成任何调度。
在查看了 Mass-Transit 的作用后发现,它从 IBusControl 创建了一个新的调度提供程序。而从上下文来看,它使用 ServiceBusScheduleMessageProvider。
因此,在我们清理这一点之前,我们现在正在做的是直接调用 ServiceBusScheduleMessageProvider。
await new ServiceBusScheduleMessageProvider(_busControl).ScheduleSend(destinationUri
, scheduleDateTime.UtcDateTime
, Task.FromResult<T>(message)
, Pipe.Empty<SendContext>()
, default);
希望它有道理并能有所帮助。
我已经查看了有关 Scheduling with Azure Service Bus 的文档,但我不清楚如何从 "disconnected" 总线发送消息。
以下是我如何配置在服务器上处理消息的服务:
builder.AddMassTransit(mt =>
{
mt.AddConsumers(cqrsAssembly);
mt.AddBus(context => Bus.Factory.CreateUsingAzureServiceBus(x =>
{
x.RequiresSession = true;
x.MaxConcurrentCalls = 500;
x.MessageWaitTimeout = TimeSpan.FromMinutes(5);
x.UseRenewLock(TimeSpan.FromMinutes(4));
x.UseServiceBusMessageScheduler();
var host = x.Host(serviceUri, h =>
{
h.SharedAccessSignature(s =>
{
s.KeyName = "key-name";
s.SharedAccessKey = "access-key";
s.TokenTimeToLive = TimeSpan.FromDays(1);
s.TokenScope = TokenScope.Namespace;
});
h.OperationTimeout = TimeSpan.FromMinutes(2);
});
x.ReceiveEndpoint(host, $"mt.myqueue", ep =>
{
ep.RequiresSession = true;
ep.MaxConcurrentCalls = 500;
ep.RemoveSubscriptions = true;
ep.UseMessageRetry(r =>
{
r.Interval(4, TimeSpan.FromSeconds(30));
r.Handle<TransientCommandException>();
});
ep.ConfigureConsumers(context);
});
});
});
我明确调用了 UseServiceBusMessageScheduler()
。
在创建消息并将消息发送到队列的项目中(在不同的上下文中运行,"send only" 也是如此),我们有:
var bus = Bus.Factory.CreateUsingAzureServiceBus(x =>
{
x.RequiresSession = true;
x.MessageWaitTimeout = TimeSpan.FromMinutes(5);
x.UseRenewLock(TimeSpan.FromMinutes(4));
x.Send<ICommand>(s => s.UseSessionIdFormatter(ctx => ctx.Message.SessionId ?? Guid.NewGuid().ToString()));
var host = x.Host(serviceUri, h =>
{
h.SharedAccessSignature(s =>
{
s.KeyName = "key-name";
s.SharedAccessKey = "key";
s.TokenTimeToLive = TimeSpan.FromDays(1);
s.TokenScope = TokenScope.Namespace;
});
h.OperationTimeout = TimeSpan.FromMinutes(2);
});
EndpointConvention.Map<ICommand>(new Uri($"{serviceUri.ToString()}mt.myqueue"));
EndpointConvention.Map<Command>(new Uri($"{serviceUri.ToString()}mt.myqueue"));
});
现在,要发送预定消息,我们这样做:
var dest = "what?";
await bus.ScheduleSend(dest, scheduledEnqueueTimeUtc.Value, message);
我不确定需要将什么传递到 destinationAddress
。
我试过:
- serviceUri
-`{serviceUri}mt.myqueue"
但是检查队列,我在基本队列、skipped
队列或 error
队列中都没有看到我的消息。
我是否遗漏了一些其他配置,如果没有,如何确定目标队列?
我正在使用版本 5.5.4
的 Mass Transit,每次过载到 ScheduleSend()
都需要它。
首先,是的,您的 Uri 格式是正确的。最后格式化后你需要这样的东西:
new Uri(@"sb://yourdomain.servicebus.windows.net/yourapp/your_message_queue")
还要确保在配置端点时添加了。 (见下面的link)
configurator.UseServiceBusMessageScheduler();
如果您遵循公共交通文档,调度是从 ConsumeContext 完成的。参见 Mass-Transit Azure Scheduling
public class ScheduleNotificationConsumer :
IConsumer<AssignSeat>
{
Uri _schedulerAddress;
Uri _notificationService;
public async Task Consume(ConsumeContext<AssignSeat> context)
{
if(context.Message.ReservationTime - DateTime.Now < TimeSpan.FromHours(8))
{
// assign the seat for the reservation
}
else
{
// seats can only be assigned eight hours before the reservation
context.ScheduleMessage(context.Message.ReservationTime - TimeSpan.FromHours(8), context.Message);
}
}
}
然而,在本周我们遇到的一个用例中,我们需要从 consumeContext 外部进行调度,或者根本不想将上下文转发到我们调度的位置。使用 IBusControl.ScheduleSend
时,我们没有得到任何错误反馈,但我们也没有真正完成任何调度。
在查看了 Mass-Transit 的作用后发现,它从 IBusControl 创建了一个新的调度提供程序。而从上下文来看,它使用 ServiceBusScheduleMessageProvider。
因此,在我们清理这一点之前,我们现在正在做的是直接调用 ServiceBusScheduleMessageProvider。
await new ServiceBusScheduleMessageProvider(_busControl).ScheduleSend(destinationUri
, scheduleDateTime.UtcDateTime
, Task.FromResult<T>(message)
, Pipe.Empty<SendContext>()
, default);
希望它有道理并能有所帮助。