在 request-response 模式中等待响应时超时

Timeout while waiting for response in request-response pattern

我正在尝试使用 MassTransit 和 RabbitMq 设置 request-response 模式。请求已交付给消费者,RespondAsync 成功,但客户端的 GetResponse 永远挂起,最后超时。我做错了什么?似乎 RabbitMQ 端的所有配置都正确(见下面的屏幕)。

以下是我在“service-server”上添加公共交通的方法:

using System.Reflection;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using MassTransit;

namespace Airstrike.Backend.GroupService.Service
{
    public class Program
    {
        public static async Task Main(string[] args)
        {
            await CreateHostBuilder(args).Build().RunAsync();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureServices((hostContext, services) =>
                {
                    services.AddMassTransit(x =>
                    {
                        x.SetKebabCaseEndpointNameFormatter();

                        var entryAssembly = Assembly.GetEntryAssembly();

                        x.AddConsumers(entryAssembly);

                        x.UsingRabbitMq((context, cfg) =>
                        {
                            cfg.ConfigureEndpoints(context);
                        });
                    });

                    services.AddMassTransitHostedService(true);
                });
    }
}

这是我的消费者/消费者定义的样子:

using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransit.ConsumeConfigurators;
using MassTransit.Definition;
using Airstrike.Backend.GroupService.ServiceContract.Actions.CreateGroup;

namespace Airstrike.Backend.GroupService.Service.Actions.CreateGroup
{
    public class CreateGroupAction :
        IConsumer<ICreateGroupPayload>
    {
        public async Task Consume(ConsumeContext<ICreateGroupPayload> consumeContext)
        {
            await consumeContext.RespondAsync<ICreateGroupResult>(new
            {
                GroupGuid = Guid.NewGuid()
            });
        }
    }

    public class CreateGroupActionDefinition : ConsumerDefinition<CreateGroupAction>
    {
        public CreateGroupActionDefinition()
        {
            EndpointName = "group-service";
        }
    }
}

这是“service-client”一侧的样子:

using System;
using System.Collections.Generic;
using Airstrike.Backend.GraphqlPublicGateway.Services;
using Airstrike.Backend.GroupService.ServiceContract.Actions.CreateGroup;
using GraphQL.Server.Ui.Altair;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using GraphQL.Server;
using MassTransit;

namespace Airstrike.Backend.GraphqlPublicGateway
{
    using Schema;
    public class Startup
    {
        public Startup(IConfiguration configuration, IWebHostEnvironment environment)
        {
            Configuration = configuration;
            Environment = environment;
        }

        public IConfiguration Configuration { get; }

        public IWebHostEnvironment Environment { get; }

        public void ConfigureServices(IServiceCollection services)
        {
            services.AddScoped<IGroupService, Services.GroupService>();
            
            services.AddMassTransit(x =>
            {
                x.SetKebabCaseEndpointNameFormatter();
                x.UsingRabbitMq((context, cfg) =>
                {
                    
                });
                x.AddRequestClient<ICreateGroupPayload>();
            });
        }
    }
}

这里是我调用服务的方式(来自 service-client):

using System;
using Airstrike.Backend.GroupService.ServiceContract.Actions.CreateGroup;
using System.Threading.Tasks;
using MassTransit;

namespace Airstrike.Backend.GraphqlPublicGateway.Services
{
    public class GroupService : IGroupService
    {
        private readonly IRequestClient<ICreateGroupPayload> _createGroupClient;
        
        public GroupService(
            IRequestClient<ICreateGroupPayload> createGroupClient)
        {
            _createGroupClient = createGroupClient;
        }
        
        public async Task<ICreateGroupResult> CreateGroup(ICreateGroupPayload payload)
        {
            var response = await _createGroupClient.GetResponse<ICreateGroupResult>(new
            {
                Name = payload.Name,
                Description = payload.Description
            });

            return response.Message;
        }
    }
}

您似乎没有在服务客户端启动总线。为什么不使用与在服务服务器应用程序中相同的 AddMassTransitHostedService

Always, always start the bus.

来自文档 Requests 部分:

The bus must always be started, so if the hosted service is not included, be sure to start the bus manually using IBusControl.