MassTransit 在单个服务器上消耗
MassTransit consumed on single server
我正在努力使用 RabbitMQ 配置 MassTransit 以发布消息和订阅队列。我认为这是一个需要完成的简单配置更改。我连接了多个服务,但消息交替使用,永远不会在另一台服务器上传递/使用。
我希望每条消息都能传送到每个连接/订阅者。
我 运行 在 ASP.net 核心 6 上使用最新版本的 MassTransit。
services.TryAddSingleton(KebabCaseEndpointNameFormatter.Instance);
services.AddMassTransit(cfg =>
{
cfg.AddBus(context => Bus.Factory.CreateUsingRabbitMq(c =>
{
c.Host(connectionString, c =>
{
c.Heartbeat(10);
});
c.ConfigureEndpoints(
context,
KebabCaseEndpointNameFormatter.Instance);
c.Publish<VideoManagerResultEvent>(x =>
{
x.BindQueue("result", "video-msgs");
x.ExchangeType = Fanout;
});
c.ReceiveEndpoint("result:video-msgs", e =>
{
e.Consumer<VideoManagerResultConsumer>();
});
}));
// Request clients / DTO
RegisterRequestClients(cfg);
});
services.AddMassTransitHostedService();
}
private static void RegisterRequestClients(IServiceCollectionBusConfigurator cfg)
{
cfg.AddRequestClient<VideoManagerResultEvent>();
}
// consumer
public class VideoManagerResultConsumer : BaseConsumer<VideoManagerResultEvent>
{
public override async Task Consume(ConsumeContext<VideoManagerResultEvent> context)
{
Logger.Debug("Consumed video event");
await context.RespondAsync(new GenericResponse());
}
}
我调用“SendMessage()”向 RabbitMQ 发布消息。
// constructor and DI
public EventBusV2(IPublishEndpoint publishEndpoint)
{
_publishEndpoint = publishEndpoint;
}
public async Task SendMessage()
{
await _publishEndpoint.Publish<VideoManagerResultEvent>(msg);
}
要添加到原始问题的图表显示当前正在发生的事情。
图 1 - 请求或消息被发布到事件总线,但只被传送到消费者的一个实例。
图 2 - 消息发布到消费者的两个实例所需的结果。
RaabitMQ 队列
您唯一需要的配置如下:
services.AddMassTransit(cfg =>
{
cfg.AddConsumer<VideoManagerResultConsumer>()
.Endpoint(e => e.InstanceId = "Web1"); // or 2, etc.
cfg.SetKebabCaseEndpointNameFormatter();
cfg.UsingRabbitMq((context, c) =>
{
c.Host(connectionString, c =>
{
c.Heartbeat(10);
});
c.ConfigureEndpoints(context);
});
// Request clients / DTO
RegisterRequestClients(cfg);
});
services.AddMassTransitHostedService();
类型为 VideoManagerResultEvent
的任何已发布消息都将根据消费者名称在队列 video-manager-result
中结束。
我正在努力使用 RabbitMQ 配置 MassTransit 以发布消息和订阅队列。我认为这是一个需要完成的简单配置更改。我连接了多个服务,但消息交替使用,永远不会在另一台服务器上传递/使用。
我希望每条消息都能传送到每个连接/订阅者。
我 运行 在 ASP.net 核心 6 上使用最新版本的 MassTransit。
services.TryAddSingleton(KebabCaseEndpointNameFormatter.Instance);
services.AddMassTransit(cfg =>
{
cfg.AddBus(context => Bus.Factory.CreateUsingRabbitMq(c =>
{
c.Host(connectionString, c =>
{
c.Heartbeat(10);
});
c.ConfigureEndpoints(
context,
KebabCaseEndpointNameFormatter.Instance);
c.Publish<VideoManagerResultEvent>(x =>
{
x.BindQueue("result", "video-msgs");
x.ExchangeType = Fanout;
});
c.ReceiveEndpoint("result:video-msgs", e =>
{
e.Consumer<VideoManagerResultConsumer>();
});
}));
// Request clients / DTO
RegisterRequestClients(cfg);
});
services.AddMassTransitHostedService();
}
private static void RegisterRequestClients(IServiceCollectionBusConfigurator cfg)
{
cfg.AddRequestClient<VideoManagerResultEvent>();
}
// consumer
public class VideoManagerResultConsumer : BaseConsumer<VideoManagerResultEvent>
{
public override async Task Consume(ConsumeContext<VideoManagerResultEvent> context)
{
Logger.Debug("Consumed video event");
await context.RespondAsync(new GenericResponse());
}
}
我调用“SendMessage()”向 RabbitMQ 发布消息。
// constructor and DI
public EventBusV2(IPublishEndpoint publishEndpoint)
{
_publishEndpoint = publishEndpoint;
}
public async Task SendMessage()
{
await _publishEndpoint.Publish<VideoManagerResultEvent>(msg);
}
要添加到原始问题的图表显示当前正在发生的事情。
图 1 - 请求或消息被发布到事件总线,但只被传送到消费者的一个实例。
图 2 - 消息发布到消费者的两个实例所需的结果。
RaabitMQ 队列
您唯一需要的配置如下:
services.AddMassTransit(cfg =>
{
cfg.AddConsumer<VideoManagerResultConsumer>()
.Endpoint(e => e.InstanceId = "Web1"); // or 2, etc.
cfg.SetKebabCaseEndpointNameFormatter();
cfg.UsingRabbitMq((context, c) =>
{
c.Host(connectionString, c =>
{
c.Heartbeat(10);
});
c.ConfigureEndpoints(context);
});
// Request clients / DTO
RegisterRequestClients(cfg);
});
services.AddMassTransitHostedService();
类型为 VideoManagerResultEvent
的任何已发布消息都将根据消费者名称在队列 video-manager-result
中结束。