MassTransit Courier 稍后重新发送 activity
MassTransit Courier resends activity after a while
我通过一系列活动实施了路由单。其中之一在 Azure 中执行长 运行 过程,甚至需要 15-20 分钟。我注意到 10 分钟后,虽然进程仍在 运行,但 activity 再次执行。它破坏了整个路由单,因为第一个过程仍在进行中,重试会产生错误。
我正在使用 Azure 服务总线作为消息代理。我在文档中找不到关于这个主题的任何参考,所以我想知道这是否是 Courier 特有的,它是否可以更改,以便它根本不会重新投递,或者这是某种不正确的行为?
编辑:
修改后的代码如下:
services.AddMassTransit(c =>
{
c.SetEndpointNameFormatter(new DefaultEndpointNameFormatter(false));
c.AddConsumersFromNamespaceContaining<TestConsumer>();
c.AddActivitiesFromNamespaceContaining<ActivityBase<IArguments, LogBase>>();
c.UsingAzureServiceBus((ctx, cfg) =>
{
cfg.Host(hostContext.Configuration.GetConnectionString("connection_string"));
cfg.LockDuration = TimeSpan.FromMinutes(25);
cfg.MaxAutoRenewDuration = TimeSpan.FromMinutes(25);
cfg.MaxDeliveryCount = 1;
cfg.ConfigureEndpoints(ctx);
});
});
由于使用消息所需的时间长度,消息可能正在由 Azure 服务总线重新传送。
在接收端点上,MaxAutoRenewDuration
用于调整更新消息锁的最大时间,直接传递给Azure客户端库。默认值为五分钟,与默认值 LockDuration
相同。您可以将此时间增加到 activity 完成的最大预期时间。
可选地,您可以将 MaxDeliveryCount
属性 更改为 1,这样 Azure 将只尝试传递一次消息,之后如果锁定超时并且没有更新,它会将消息移动到该接收端点的死信队列。
更新:如果您在 Activity 定义中配置它,您可以如下所示进行配置:
public class CustomActivityDefinition :
ActivityDefinition<TActivity, TArguments, TLog>
{
protected override void ConfigureExecuteActivity(IReceiveEndpointConfigurator endpointConfigurator, IExecuteActivityConfigurator<TActivity, TArguments> executeActivityConfigurator)
{
if(endpointConfigurator is IServiceBusReceiveEndpointConfigurator sb)
{
sb.MaxDeliveryCount = 1;
sb.MaxAutoRenewDuration = TimeSpan.FromMinutes(67);
}
}
}
我通过一系列活动实施了路由单。其中之一在 Azure 中执行长 运行 过程,甚至需要 15-20 分钟。我注意到 10 分钟后,虽然进程仍在 运行,但 activity 再次执行。它破坏了整个路由单,因为第一个过程仍在进行中,重试会产生错误。
我正在使用 Azure 服务总线作为消息代理。我在文档中找不到关于这个主题的任何参考,所以我想知道这是否是 Courier 特有的,它是否可以更改,以便它根本不会重新投递,或者这是某种不正确的行为?
编辑:
修改后的代码如下:
services.AddMassTransit(c =>
{
c.SetEndpointNameFormatter(new DefaultEndpointNameFormatter(false));
c.AddConsumersFromNamespaceContaining<TestConsumer>();
c.AddActivitiesFromNamespaceContaining<ActivityBase<IArguments, LogBase>>();
c.UsingAzureServiceBus((ctx, cfg) =>
{
cfg.Host(hostContext.Configuration.GetConnectionString("connection_string"));
cfg.LockDuration = TimeSpan.FromMinutes(25);
cfg.MaxAutoRenewDuration = TimeSpan.FromMinutes(25);
cfg.MaxDeliveryCount = 1;
cfg.ConfigureEndpoints(ctx);
});
});
由于使用消息所需的时间长度,消息可能正在由 Azure 服务总线重新传送。
在接收端点上,MaxAutoRenewDuration
用于调整更新消息锁的最大时间,直接传递给Azure客户端库。默认值为五分钟,与默认值 LockDuration
相同。您可以将此时间增加到 activity 完成的最大预期时间。
可选地,您可以将 MaxDeliveryCount
属性 更改为 1,这样 Azure 将只尝试传递一次消息,之后如果锁定超时并且没有更新,它会将消息移动到该接收端点的死信队列。
更新:如果您在 Activity 定义中配置它,您可以如下所示进行配置:
public class CustomActivityDefinition :
ActivityDefinition<TActivity, TArguments, TLog>
{
protected override void ConfigureExecuteActivity(IReceiveEndpointConfigurator endpointConfigurator, IExecuteActivityConfigurator<TActivity, TArguments> executeActivityConfigurator)
{
if(endpointConfigurator is IServiceBusReceiveEndpointConfigurator sb)
{
sb.MaxDeliveryCount = 1;
sb.MaxAutoRenewDuration = TimeSpan.FromMinutes(67);
}
}
}