Masstransit 和 RabbitMQ 到控制台应用程序(使用 Asp.net Core Web API)

Masstransit and RabbitMQ to Console Application (use Asp.net Core Web API)

我创建了一个包含 ASP.NET 核心 WEB API 项目、一些 class 库(域、DI 等)和控制台应用程序的解决方案。 我用作带有 Masstransit 库的 RabbitMQ Consumer 的控制台应用程序它应该从 RabbitMQ 获取消息(我有 Producer 项目并且它发送 RabbitMQ 消息没有问题)

我的控制台应用程序: 像这样 Program.cs:

 public class Program
{
    public static void Main(string[] args)
    {
        CreateHostBuilder(args).Build().Run();
    }

    public static IHostBuilder CreateHostBuilder(string[] args) =>
    Host.CreateDefaultBuilder(args)
        .ConfigureServices((hostContext, services) =>
        {
            services.AddMassTransit(x =>
            {
                x.AddConsumer<MessageConsumer>();

                x.UsingRabbitMq((context, cfg) =>
                {
                    var connectionString = new Uri("RabbitMQ_URL");
                    cfg.Host(connectionString);

                    cfg.ConfigureEndpoints(context);
                });
            });
            services.AddMassTransitHostedService(true);


            services.AddHostedService<Worker>();
        });
}

与Worker.cs:

  public class Worker : BackgroundService
{
    readonly IBus _bus;

    public Worker(IBus bus)
    {
        _bus = bus;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        
        while (!stoppingToken.IsCancellationRequested)
        {
            var factory = new ConnectionFactory() { Uri = new 
            Uri("RabbitMQ_URL"), DispatchConsumersAsync = true };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue: "MessageQueue",
                 durable: true,
                 exclusive: false,
                 autoDelete: false,
                 arguments: null);

            var consumer = new AsyncEventingBasicConsumer(channel);
            consumer.Received += async (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);

                var @event = JsonConvert.DeserializeObject<Event> 
                    (message);

                await _bus.Publish(new Event { DataJson = @event });

                await Task.Yield(); 
            };

            channel.BasicConsume(queue: "MessageQueue",
                                 autoAck: true,
                                 consumer: consumer);
            
            _logger.LogInformation("Received Text: {Text}", context.Message.DataJson);

            return Task.CompletedTask;
        }
    }
}

MessageConsumer.cs:

 public class MessageConsumer :
    IConsumer<Event>
{
    readonly ILogger<MessageConsumer> _logger;

    public MessageConsumer(ILogger<MessageConsumer> logger)
    {
        _logger = logger;
    }

    public Task Consume(ConsumeContext<Event> context)
    {
        _logger.LogInInformation("Recieved Text: {Text}, 
          context.Message.DataJson");

        return Task.CompletedTask;
     }
    }

还有我的 Event.cs:

 public class Event
{
    public ServiceType ServiceType { get; set; }
    public string DataJson { get; set; }
}

public enum ServiceType
{
    ComplareSitter
}

请帮帮我 非常感谢。

您可以使用其中一个 MassTransit 模板从干净简单的工作人员服务开始,只是为了验证您的 setup/configuration。 video 显示了如何设置和使用模板。

但一个明显的问题是,你到底为什么要连接到 RabbitMQ 并在 Consume 方法中创建一个基本使用者?该消息已被反序列化为您的 Event 类型,可以使用了。使用 MassTransit 时,绝对不需要在您的应用程序中使用 RabbitMQ 客户端库的任何部分。