无法在 .NetCore 中使用 Masstransit 接收消息
Can not receive message with Masstransit in .NetCore
我正在使用 Masstransit、RabbitMq 和 .NET 核心。我想发布和接收消息时遇到问题。我的 UserEventHandler
和 UserCommandHandler
class 从不打电话。有谁能够帮助我?总线启动。我也有一种情况,当我从注册中删除 UserEventHandler
class 我的断点停止在 UserCommandHandler
class.
这是我的 UserEventHandler
class:
public class UserEventHandler : IConsumer<IUserEvent>
{
private ConcertDbContext _context;
public UserEventHandler(ConcertDbContext context)
{
_context = context;
}
public Task Consume(ConsumeContext<IUserEvent> context)
{
var concerts = _context.Concerts.Where(c => c.Name == context.Message.Name);
foreach (var concert in concerts)
{
concert.SoldTickets = context.Message.TicketBuyed;
_context.Concerts.Update(concert);
_context.SaveChanges();
}
throw new NotImplementedException();
}
}
这是我的 UserCommandHandler class:
public class UserCommandHandler : IConsumer<IUserCommand>
{
public async Task Consume(ConsumeContext<IUserCommand> context)
{
await context.Publish<IUserEvent>(new UserEvent(context.Message.Name, context.Message.TicketBuyed));
}
}
我在Startup.cs这里注册了UserCommandHandler class:
services.AddMassTransit(x =>
{
x.AddConsumer<ConcertEventHandler>();
x.AddConsumer<UserCommandHandler>();
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri),
hst =>
{
hst.Username(RabbitMqConstants.Username);
hst.Password(RabbitMqConstants.Password);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.CreateConcertService, e =>
{
e.PrefetchCount = 16;
e.ConfigureConsumer<ConcertEventHandler>(provider);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.BuyCncertTicketService, e =>
{
e.PrefetchCount = 16;
e.ConfigureConsumer<UserCommandHandler>(provider);
});
// or, configure the endpoints by convention
cfg.ConfigureEndpoints(provider);
}));
这里是我注册的UserEventHandlerclass
services.AddMassTransit(x =>
{
x.AddConsumer<ConcertCommandHandler>();
x.AddConsumer<UserEventHandler >();
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri),
hst =>
{
hst.Username(RabbitMqConstants.Username);
hst.Password(RabbitMqConstants.Password);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.CreateConcertService, e =>
{
e.PrefetchCount = 16;
e.ConfigureConsumer<ConcertCommandHandler>(provider);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.BuyCncertTicketService, e =>
{
e.PrefetchCount = 16;
e.ConfigureConsumer<UserEventHandler>(provider);
});
// or, configure the endpoints by convention
cfg.ConfigureEndpoints(provider);
}));
每个服务都应该使用单独的队列,目前您正在为每个服务配置相同接收端点队列名称上的不同消费者。这将导致消息被移动到 _skipped 队列,因为该服务实例上没有已发布消息的消费者。
此外,删除每个服务配置末尾的 cfg.ConfigureEndpoints
。您正在手动配置接收端点,因此它将为您的消费者创建重复的队列和端点。
我正在使用 Masstransit、RabbitMq 和 .NET 核心。我想发布和接收消息时遇到问题。我的 UserEventHandler
和 UserCommandHandler
class 从不打电话。有谁能够帮助我?总线启动。我也有一种情况,当我从注册中删除 UserEventHandler
class 我的断点停止在 UserCommandHandler
class.
这是我的 UserEventHandler
class:
public class UserEventHandler : IConsumer<IUserEvent>
{
private ConcertDbContext _context;
public UserEventHandler(ConcertDbContext context)
{
_context = context;
}
public Task Consume(ConsumeContext<IUserEvent> context)
{
var concerts = _context.Concerts.Where(c => c.Name == context.Message.Name);
foreach (var concert in concerts)
{
concert.SoldTickets = context.Message.TicketBuyed;
_context.Concerts.Update(concert);
_context.SaveChanges();
}
throw new NotImplementedException();
}
}
这是我的 UserCommandHandler class:
public class UserCommandHandler : IConsumer<IUserCommand>
{
public async Task Consume(ConsumeContext<IUserCommand> context)
{
await context.Publish<IUserEvent>(new UserEvent(context.Message.Name, context.Message.TicketBuyed));
}
}
我在Startup.cs这里注册了UserCommandHandler class:
services.AddMassTransit(x =>
{
x.AddConsumer<ConcertEventHandler>();
x.AddConsumer<UserCommandHandler>();
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri),
hst =>
{
hst.Username(RabbitMqConstants.Username);
hst.Password(RabbitMqConstants.Password);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.CreateConcertService, e =>
{
e.PrefetchCount = 16;
e.ConfigureConsumer<ConcertEventHandler>(provider);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.BuyCncertTicketService, e =>
{
e.PrefetchCount = 16;
e.ConfigureConsumer<UserCommandHandler>(provider);
});
// or, configure the endpoints by convention
cfg.ConfigureEndpoints(provider);
}));
这里是我注册的UserEventHandlerclass
services.AddMassTransit(x =>
{
x.AddConsumer<ConcertCommandHandler>();
x.AddConsumer<UserEventHandler >();
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri),
hst =>
{
hst.Username(RabbitMqConstants.Username);
hst.Password(RabbitMqConstants.Password);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.CreateConcertService, e =>
{
e.PrefetchCount = 16;
e.ConfigureConsumer<ConcertCommandHandler>(provider);
});
cfg.ReceiveEndpoint(host, RabbitMqConstants.BuyCncertTicketService, e =>
{
e.PrefetchCount = 16;
e.ConfigureConsumer<UserEventHandler>(provider);
});
// or, configure the endpoints by convention
cfg.ConfigureEndpoints(provider);
}));
每个服务都应该使用单独的队列,目前您正在为每个服务配置相同接收端点队列名称上的不同消费者。这将导致消息被移动到 _skipped 队列,因为该服务实例上没有已发布消息的消费者。
此外,删除每个服务配置末尾的 cfg.ConfigureEndpoints
。您正在手动配置接收端点,因此它将为您的消费者创建重复的队列和端点。