使用 MassTransit 重新传送 RabbitMq 消息时坚持 headers

Persist headers when redelivering a RabbitMq message using MassTransit

目的:我需要在重新发送消息时跟踪headers。

配置

我试过没有成功:

第一次尝试:

await context.Redeliver(TimeSpan.FromSeconds(5), (consumeCtx, sendCtx) => {
   if (consumeCtx.Headers.TryGetHeader("SenderApp", out object sender))
   {
      sendCtx.Headers.Set("SenderApp", sender);
   }
}).ConfigureAwait(false);

第二次尝试:

protected Task ScheduleSend(Uri rabbitUri, double delay)
{
  return GetBus().ScheduleSend<IProcessOrganisationUpdate>(
    rabbitUri,
    TimeSpan.FromSeconds(delay),
    _Data,
    new HeaderPipe(_SenderApp, 0));
}

public class HeaderPipe : IPipe<SendContext>
{
  private readonly byte   _Priority;
  private readonly string _SenderApp;

  public HeaderPipe (byte priority)
  {
    _Priority  = priority;
    _SenderApp = Assembly.GetEntryAssembly()?.GetName()?.Name ?? "Default";
  }

  public HeaderPipe (string senderApp, byte priority)
  {
    _Priority  = priority;
    _SenderApp = senderApp;
  }

  public void Probe (ProbeContext context)
  { }

  public Task Send (SendContext context)
  {
    context.Headers.Set("SenderApp", _SenderApp);
    context.SetPriority(_Priority);
    return Task.CompletedTask;
  }
}

预期:FinQuest.Robot.DBProcess

结果: null

我登录 Consume 方法我的 SenderApp。第一次是这样的

Initial trigger checking returns true for FinQuest.Robots.OrganisationLinkedinFeed (id: 001ae487-ad3d-4619-8d34-367881ec91ba, sender: FinQuest.Robot.DBProcess, modif: LinkedIn)

重新投递后看起来像这样

Initial trigger checking returns true for FinQuest.Robots.OrganisationLinkedinFeed (id: 001ae487-ad3d-4619-8d34-367881ec91ba, sender: , modif: LinkedIn)

我做错了什么?我不想使用重试功能,因为它有最大重试次数(我不想被限制)。

提前致谢。

您可能想要使用重新传递过滤器使用的一种方法:

https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit/SendContextExtensions.cs#L90

public static void TransferConsumeContextHeaders(this SendContext sendContext, ConsumeContext consumeContext)

在您的代码中,您将使用它:

await context.Redeliver(TimeSpan.FromSeconds(5), (consumeCtx, sendCtx) => {
    sendCtx.TransferConsumeContextHeaders(consumeCtx);
});