Masstransit 无法从 C# 应用程序连接到 RabbitMQ 中的队列
Masstransit can't connect to a queue in RabbitMQ from c# app
我在 RabbitMQ 中有一个队列。我无法配置此队列,我必须使用其中的消息。 Publisher 不使用 Masstransit 进行发布。我正在使用 Masstransit 来使用队列中的消息。
当我尝试配置与队列的连接时,收到此错误:
The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'my_queue' in vhost 'my_vhost': received 'fanout' but current is 'direct'', classId=40, methodId=10
我的配置如下:
Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host("127.0.0.1", "my_virtual_host", credintials =>
{
credintials.Username("myuser");
credintials.Password("mypassword");
});
cfg.ReceiveEndpoint("my_queue", e =>
{
e.UseRawJsonSerializer();
e.Consumer(() => _messageConsumer);
});
}).Start();
队列配置 Durable = true 就这样,没什么特别的。
当我尝试通过 RabbitMQ.Client 连接到队列时,连接没有问题。消耗效果也很好。
我该如何解决这个问题?
问题是 my_queue
交换已经存在 直接 交换类型。默认情况下,MassTransit 会将此交换创建为扇出交换。 Direct 交换用于通过路由键路由消息。有关使用 MassTransit 直接交换的示例,请查看 direct 示例。
您可以看到 MassTransit 为 RabbitMQ 配置的broker topology。
我解决了我的问题。与 Masstransit 的连接如下所示:
private async Task InitMasstransitBusAsync(CancellationToken cancellationToken)
{
await Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(new Uri(_rabbitMqConfig.HostName), credintials =>
{
credintials.Username(_rabbitMqConfig.UserName);
credintials.Password(_rabbitMqConfig.Password);
});
cfg.ReceiveEndpoint(_rabbitMqConfig.QueueName, e =>
{
e.PrefetchCount = 20;
e.ExchangeType = "direct";
e.ConfigureConsumeTopology = false;
e.AddMessageDeserializer(new ContentType("text/plain"),
() => new CustomMessageDeserializer("text/plain"));
e.Consumer(() => _messageConsumer);
});
}).StartAsync(cancellationToken);
}
CustomMessageDeserializer:
public class CustomMessageDeserializer : IMessageDeserializer
{
private readonly string _contentType;
private readonly JsonSerializer _serializer = JsonSerializer.Create();
public CustomMessageDeserializer(string contentType)
{
_contentType = contentType;
}
public ContentType ContentType => new(_contentType);
public ConsumeContext Deserialize(ReceiveContext receiveContext)
{
try
{
var messageEncoding = GetMessageEncoding(receiveContext);
using var body = receiveContext.GetBodyStream();
using var reader = new StreamReader(body, messageEncoding, false, 1024, true);
using var jsonReader = new JsonTextReader(reader);
var messageToken = _serializer.Deserialize<JToken>(jsonReader);
return new RawJsonConsumeContext(_serializer, receiveContext, messageToken);
}
catch (JsonSerializationException ex)
{
throw new SerializationException("A JSON serialization exception occurred while deserializing the message", ex);
}
catch (SerializationException)
{
throw;
}
catch (Exception ex)
{
throw new SerializationException("An exception occurred while deserializing the message", ex);
}
}
public void Probe(ProbeContext context) { }
public static Encoding GetMessageEncoding(ReceiveContext receiveContext)
{
var contentEncoding = receiveContext.TransportHeaders.Get("Content-Encoding", default(string));
return string.IsNullOrWhiteSpace(contentEncoding) ? Encoding.UTF8 : Encoding.GetEncoding(contentEncoding);
}
}
我在 RabbitMQ 中有一个队列。我无法配置此队列,我必须使用其中的消息。 Publisher 不使用 Masstransit 进行发布。我正在使用 Masstransit 来使用队列中的消息。
当我尝试配置与队列的连接时,收到此错误:
The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=406, text='PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'my_queue' in vhost 'my_vhost': received 'fanout' but current is 'direct'', classId=40, methodId=10
我的配置如下:
Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host("127.0.0.1", "my_virtual_host", credintials =>
{
credintials.Username("myuser");
credintials.Password("mypassword");
});
cfg.ReceiveEndpoint("my_queue", e =>
{
e.UseRawJsonSerializer();
e.Consumer(() => _messageConsumer);
});
}).Start();
队列配置 Durable = true 就这样,没什么特别的。
当我尝试通过 RabbitMQ.Client 连接到队列时,连接没有问题。消耗效果也很好。
我该如何解决这个问题?
问题是 my_queue
交换已经存在 直接 交换类型。默认情况下,MassTransit 会将此交换创建为扇出交换。 Direct 交换用于通过路由键路由消息。有关使用 MassTransit 直接交换的示例,请查看 direct 示例。
您可以看到 MassTransit 为 RabbitMQ 配置的broker topology。
我解决了我的问题。与 Masstransit 的连接如下所示:
private async Task InitMasstransitBusAsync(CancellationToken cancellationToken)
{
await Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(new Uri(_rabbitMqConfig.HostName), credintials =>
{
credintials.Username(_rabbitMqConfig.UserName);
credintials.Password(_rabbitMqConfig.Password);
});
cfg.ReceiveEndpoint(_rabbitMqConfig.QueueName, e =>
{
e.PrefetchCount = 20;
e.ExchangeType = "direct";
e.ConfigureConsumeTopology = false;
e.AddMessageDeserializer(new ContentType("text/plain"),
() => new CustomMessageDeserializer("text/plain"));
e.Consumer(() => _messageConsumer);
});
}).StartAsync(cancellationToken);
}
CustomMessageDeserializer:
public class CustomMessageDeserializer : IMessageDeserializer
{
private readonly string _contentType;
private readonly JsonSerializer _serializer = JsonSerializer.Create();
public CustomMessageDeserializer(string contentType)
{
_contentType = contentType;
}
public ContentType ContentType => new(_contentType);
public ConsumeContext Deserialize(ReceiveContext receiveContext)
{
try
{
var messageEncoding = GetMessageEncoding(receiveContext);
using var body = receiveContext.GetBodyStream();
using var reader = new StreamReader(body, messageEncoding, false, 1024, true);
using var jsonReader = new JsonTextReader(reader);
var messageToken = _serializer.Deserialize<JToken>(jsonReader);
return new RawJsonConsumeContext(_serializer, receiveContext, messageToken);
}
catch (JsonSerializationException ex)
{
throw new SerializationException("A JSON serialization exception occurred while deserializing the message", ex);
}
catch (SerializationException)
{
throw;
}
catch (Exception ex)
{
throw new SerializationException("An exception occurred while deserializing the message", ex);
}
}
public void Probe(ProbeContext context) { }
public static Encoding GetMessageEncoding(ReceiveContext receiveContext)
{
var contentEncoding = receiveContext.TransportHeaders.Get("Content-Encoding", default(string));
return string.IsNullOrWhiteSpace(contentEncoding) ? Encoding.UTF8 : Encoding.GetEncoding(contentEncoding);
}
}