使用 MassTransit 重新传送 RabbitMq 消息时坚持 headers
Persist headers when redelivering a RabbitMq message using MassTransit
目的:我需要在重新发送消息时跟踪headers。
配置:
- RabbitMQ 3.7.9
- 二郎 21.2
- 地铁 5.1.5
- MySql 8.0 用于 Quartz 数据库
我试过没有成功:
第一次尝试:
await context.Redeliver(TimeSpan.FromSeconds(5), (consumeCtx, sendCtx) => {
if (consumeCtx.Headers.TryGetHeader("SenderApp", out object sender))
{
sendCtx.Headers.Set("SenderApp", sender);
}
}).ConfigureAwait(false);
第二次尝试:
protected Task ScheduleSend(Uri rabbitUri, double delay)
{
return GetBus().ScheduleSend<IProcessOrganisationUpdate>(
rabbitUri,
TimeSpan.FromSeconds(delay),
_Data,
new HeaderPipe(_SenderApp, 0));
}
public class HeaderPipe : IPipe<SendContext>
{
private readonly byte _Priority;
private readonly string _SenderApp;
public HeaderPipe (byte priority)
{
_Priority = priority;
_SenderApp = Assembly.GetEntryAssembly()?.GetName()?.Name ?? "Default";
}
public HeaderPipe (string senderApp, byte priority)
{
_Priority = priority;
_SenderApp = senderApp;
}
public void Probe (ProbeContext context)
{ }
public Task Send (SendContext context)
{
context.Headers.Set("SenderApp", _SenderApp);
context.SetPriority(_Priority);
return Task.CompletedTask;
}
}
预期:FinQuest.Robot.DBProcess
结果: null
我登录 Consume 方法我的 SenderApp。第一次是这样的
Initial trigger checking returns true for FinQuest.Robots.OrganisationLinkedinFeed (id: 001ae487-ad3d-4619-8d34-367881ec91ba, sender: FinQuest.Robot.DBProcess, modif: LinkedIn)
重新投递后看起来像这样
Initial trigger checking returns true for FinQuest.Robots.OrganisationLinkedinFeed (id: 001ae487-ad3d-4619-8d34-367881ec91ba, sender: , modif: LinkedIn)
我做错了什么?我不想使用重试功能,因为它有最大重试次数(我不想被限制)。
提前致谢。
您可能想要使用重新传递过滤器使用的一种方法:
https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit/SendContextExtensions.cs#L90
public static void TransferConsumeContextHeaders(this SendContext sendContext, ConsumeContext consumeContext)
在您的代码中,您将使用它:
await context.Redeliver(TimeSpan.FromSeconds(5), (consumeCtx, sendCtx) => {
sendCtx.TransferConsumeContextHeaders(consumeCtx);
});
目的:我需要在重新发送消息时跟踪headers。
配置:
- RabbitMQ 3.7.9
- 二郎 21.2
- 地铁 5.1.5
- MySql 8.0 用于 Quartz 数据库
我试过没有成功:
第一次尝试:
await context.Redeliver(TimeSpan.FromSeconds(5), (consumeCtx, sendCtx) => {
if (consumeCtx.Headers.TryGetHeader("SenderApp", out object sender))
{
sendCtx.Headers.Set("SenderApp", sender);
}
}).ConfigureAwait(false);
第二次尝试:
protected Task ScheduleSend(Uri rabbitUri, double delay)
{
return GetBus().ScheduleSend<IProcessOrganisationUpdate>(
rabbitUri,
TimeSpan.FromSeconds(delay),
_Data,
new HeaderPipe(_SenderApp, 0));
}
public class HeaderPipe : IPipe<SendContext>
{
private readonly byte _Priority;
private readonly string _SenderApp;
public HeaderPipe (byte priority)
{
_Priority = priority;
_SenderApp = Assembly.GetEntryAssembly()?.GetName()?.Name ?? "Default";
}
public HeaderPipe (string senderApp, byte priority)
{
_Priority = priority;
_SenderApp = senderApp;
}
public void Probe (ProbeContext context)
{ }
public Task Send (SendContext context)
{
context.Headers.Set("SenderApp", _SenderApp);
context.SetPriority(_Priority);
return Task.CompletedTask;
}
}
预期:FinQuest.Robot.DBProcess
结果: null
我登录 Consume 方法我的 SenderApp。第一次是这样的
Initial trigger checking returns true for FinQuest.Robots.OrganisationLinkedinFeed (id: 001ae487-ad3d-4619-8d34-367881ec91ba, sender: FinQuest.Robot.DBProcess, modif: LinkedIn)
重新投递后看起来像这样
Initial trigger checking returns true for FinQuest.Robots.OrganisationLinkedinFeed (id: 001ae487-ad3d-4619-8d34-367881ec91ba, sender: , modif: LinkedIn)
我做错了什么?我不想使用重试功能,因为它有最大重试次数(我不想被限制)。
提前致谢。
您可能想要使用重新传递过滤器使用的一种方法:
https://github.com/MassTransit/MassTransit/blob/develop/src/MassTransit/SendContextExtensions.cs#L90
public static void TransferConsumeContextHeaders(this SendContext sendContext, ConsumeContext consumeContext)
在您的代码中,您将使用它:
await context.Redeliver(TimeSpan.FromSeconds(5), (consumeCtx, sendCtx) => {
sendCtx.TransferConsumeContextHeaders(consumeCtx);
});