无法在 .NetCore 中使用 Masstransit 接收消息

Can not receive message with Masstransit in .NetCore

我正在使用 Masstransit、RabbitMq 和 .NET 核心。我想发布和接收消息时遇到问题。我的 UserEventHandlerUserCommandHandler class 从不打电话。有谁能够帮助我?总线启动。我也有一种情况,当我从注册中删除 UserEventHandler class 我的断点停止在 UserCommandHandler class.

这是我的 UserEventHandler class:

public class UserEventHandler : IConsumer<IUserEvent>
{
    private ConcertDbContext _context;
    public UserEventHandler(ConcertDbContext context)
    {
        _context = context;
    }
    public Task Consume(ConsumeContext<IUserEvent> context)
    {
        var concerts = _context.Concerts.Where(c => c.Name == context.Message.Name);
        foreach (var concert in concerts)
        {
            concert.SoldTickets = context.Message.TicketBuyed;
            _context.Concerts.Update(concert);
            _context.SaveChanges();
        }
        throw new NotImplementedException();
    }
}

这是我的 UserCommandHandler class:

public class UserCommandHandler : IConsumer<IUserCommand>
{
    public async Task Consume(ConsumeContext<IUserCommand> context)
    {
        await context.Publish<IUserEvent>(new UserEvent(context.Message.Name, context.Message.TicketBuyed));
    }
}

我在Startup.cs这里注册了UserCommandHandler class:

        services.AddMassTransit(x =>
        {
            x.AddConsumer<ConcertEventHandler>();
            x.AddConsumer<UserCommandHandler>();
            x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri),
                    hst =>
                    {
                        hst.Username(RabbitMqConstants.Username);
                        hst.Password(RabbitMqConstants.Password);
                    });

                cfg.ReceiveEndpoint(host, RabbitMqConstants.CreateConcertService, e =>
                {
                    e.PrefetchCount = 16;

                    e.ConfigureConsumer<ConcertEventHandler>(provider);
                });

                cfg.ReceiveEndpoint(host, RabbitMqConstants.BuyCncertTicketService, e =>
                {
                    e.PrefetchCount = 16;

                    e.ConfigureConsumer<UserCommandHandler>(provider);
                });

                // or, configure the endpoints by convention
                cfg.ConfigureEndpoints(provider);
            }));

这里是我注册的UserEventHandlerclass

                services.AddMassTransit(x =>
        {
            x.AddConsumer<ConcertCommandHandler>();
            x.AddConsumer<UserEventHandler >();
            x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(new Uri(RabbitMqConstants.RabbitMqUri),
                    hst =>
                    {
                        hst.Username(RabbitMqConstants.Username);
                        hst.Password(RabbitMqConstants.Password);
                    });

                cfg.ReceiveEndpoint(host, RabbitMqConstants.CreateConcertService, e =>
                {
                    e.PrefetchCount = 16;

                    e.ConfigureConsumer<ConcertCommandHandler>(provider);
                });
                cfg.ReceiveEndpoint(host, RabbitMqConstants.BuyCncertTicketService, e =>
                {
                    e.PrefetchCount = 16;

                    e.ConfigureConsumer<UserEventHandler>(provider);
                });

                // or, configure the endpoints by convention
                cfg.ConfigureEndpoints(provider);
            }));

每个服务都应该使用单独的队列,目前您正在为每个服务配置相同接收端点队列名称上的不同消费者。这将导致消息被移动到 _skipped 队列,因为该服务实例上没有已发布消息的消费者。

此外,删除每个服务配置末尾的 cfg.ConfigureEndpoints。您正在手动配置接收端点,因此它将为您的消费者创建重复的队列和端点。