MassTransit RabbitMQ 将错误队列上的一半消耗消息移动到 Error_Skipped 队列
MassTransit RabbitMQ moving half of Consumed messages on Error Queue to Error_Skipped Queue
我正在尝试将消息从错误队列移回错误队列。
为此,我在错误队列上创建了一个消费者,然后将其发布到必需队列。
当我尝试这样做时,一半的消费消息被发布,但另一半被发送到 Error_Skipped 队列。
我尝试了很多东西都没有成功,所以我可能缺少一些简单的东西。
以下是我的代码示例:
public class ClaimsMessage
{
public string Description { get; set; }
public DateTime Date { get; set; }
public bool Handled { get; set; }
}
public class ClaimsMessageErrorConsumer : IConsumer<Fault<ClaimsMessage>>
{
public async Task Consume(ConsumeContext<Fault<ClaimsMessage>> context)
{
try
{
await context.Publish<ClaimsMessage>(context.Message.Message);
}
catch (Exception e)
{
string error = e.Message;
}
}
}
public static IBusControl CreateClaimsErrorConsumerBus(string endPoint)
{
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint(host, endPoint, e =>
{
e.Consumer(() => new ClaimsMessageErrorConsumer());
});
});
return busControl;
}
如果您将消息从错误 queue 移回处理 queue,则不应调用 Publish
-- 这会将消息重新发送给所有订阅者。您已经知道 queue 个名字,所以直接将消息发送回 queue。您所看到的是,您已经针对错误 queue 创建了一个使用者,它为该消息创建了一个交换绑定。
所以,改为这样做:
sbc.ReceiveEndpoint("input_error", x =>
{
// this prevents extra message bindings from being created
x.BindMessageExchanges = false;
x.Consumer<MyMover>(() => new MyMover(inputQueueAddress);
});
public class MyMover :
IConsumer<ClaimsMessage>
{
public async Task Consume(ConsumeContext<ClaimsMessage> context)
{
try
{
var endpoint = await context.GetSendEndpoint(_inputQueueAddress);
await endpoint.Send<ClaimsMessage>(context.Message);
}
catch (Exception e)
{
string error = e.Message;
}
}
}
如需加分,请复制原始消息 headers 以保留消息的保真度。
我正在尝试将消息从错误队列移回错误队列。 为此,我在错误队列上创建了一个消费者,然后将其发布到必需队列。 当我尝试这样做时,一半的消费消息被发布,但另一半被发送到 Error_Skipped 队列。
我尝试了很多东西都没有成功,所以我可能缺少一些简单的东西。
以下是我的代码示例:
public class ClaimsMessage
{
public string Description { get; set; }
public DateTime Date { get; set; }
public bool Handled { get; set; }
}
public class ClaimsMessageErrorConsumer : IConsumer<Fault<ClaimsMessage>>
{
public async Task Consume(ConsumeContext<Fault<ClaimsMessage>> context)
{
try
{
await context.Publish<ClaimsMessage>(context.Message.Message);
}
catch (Exception e)
{
string error = e.Message;
}
}
}
public static IBusControl CreateClaimsErrorConsumerBus(string endPoint)
{
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri("rabbitmq://localhost/"), h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ReceiveEndpoint(host, endPoint, e =>
{
e.Consumer(() => new ClaimsMessageErrorConsumer());
});
});
return busControl;
}
如果您将消息从错误 queue 移回处理 queue,则不应调用 Publish
-- 这会将消息重新发送给所有订阅者。您已经知道 queue 个名字,所以直接将消息发送回 queue。您所看到的是,您已经针对错误 queue 创建了一个使用者,它为该消息创建了一个交换绑定。
所以,改为这样做:
sbc.ReceiveEndpoint("input_error", x =>
{
// this prevents extra message bindings from being created
x.BindMessageExchanges = false;
x.Consumer<MyMover>(() => new MyMover(inputQueueAddress);
});
public class MyMover :
IConsumer<ClaimsMessage>
{
public async Task Consume(ConsumeContext<ClaimsMessage> context)
{
try
{
var endpoint = await context.GetSendEndpoint(_inputQueueAddress);
await endpoint.Send<ClaimsMessage>(context.Message);
}
catch (Exception e)
{
string error = e.Message;
}
}
}
如需加分,请复制原始消息 headers 以保留消息的保真度。