MassTransit - 在使用过滤器时防止消息进入 RMQ 跳过队列

MassTransit - prevent a message going to the RMQ skipped queue, when a filter is used

我正在将 MassTransit 与 RabbitMQ 结合使用。按照自定义中间件 official documentation page 上的示例,我正在尝试在消息消费管道上创建一个过滤器,该过滤器将根据特定条件过滤掉一些消息。我的过滤器看起来像这样:

public class MyCustomFilter<T> : IFilter<T>
    where T : class, ConsumeContext
{
    public void Probe(ProbeContext context) { }

    public async Task Send(T context, IPipe<T> next)
    {
        if (/* certain condition */)
        {
            await next.Send(context);
        }
    }
}

问题在于,当消息未沿管道向下传递时(即未调用 await next.Send(context)),消息在 _skipped 消费者 RabbitMQ 队列中结束。有没有办法阻止消息进入该队列?

skipped(死信)队列通过 DeadLetterFilter 调用获取消息。这是代码:

async Task IFilter<ReceiveContext>.Send(ReceiveContext context, IPipe<ReceiveContext> next)
{
    await next.Send(context).ConfigureAwait(false);

    if (context.IsDelivered || context.IsFaulted)
        return;

    context.LogSkipped();

    await _deadLetterPipe.Send(context).ConfigureAwait(false);
}

因此,您可以想象如果上下文将 IsDeliveredIsFaulted 设置为 true,您的消息将不会在死信队列中结束。

如果你放入过滤器,你的消息最终会进入毒药 (error) 队列,所以我想这不是一个选择。

您可以通过对过滤器中过滤后的消息执行类似以下操作来模拟您的消息传送:

public Task Send(T context, IPipe<T> next)
    => condition
        ? next.Send(context)
        : context.NotifyConsumed(context as ConsumeContext<MyMessage>, TimeSpan.Zero, "Filtered");