在 request-response 模式中等待响应时超时
Timeout while waiting for response in request-response pattern
我正在尝试使用 MassTransit 和 RabbitMq 设置 request-response 模式。请求已交付给消费者,RespondAsync 成功,但客户端的 GetResponse 永远挂起,最后超时。我做错了什么?似乎 RabbitMQ 端的所有配置都正确(见下面的屏幕)。
以下是我在“service-server”上添加公共交通的方法:
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using MassTransit;
namespace Airstrike.Backend.GroupService.Service
{
public class Program
{
public static async Task Main(string[] args)
{
await CreateHostBuilder(args).Build().RunAsync();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
var entryAssembly = Assembly.GetEntryAssembly();
x.AddConsumers(entryAssembly);
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService(true);
});
}
}
这是我的消费者/消费者定义的样子:
using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransit.ConsumeConfigurators;
using MassTransit.Definition;
using Airstrike.Backend.GroupService.ServiceContract.Actions.CreateGroup;
namespace Airstrike.Backend.GroupService.Service.Actions.CreateGroup
{
public class CreateGroupAction :
IConsumer<ICreateGroupPayload>
{
public async Task Consume(ConsumeContext<ICreateGroupPayload> consumeContext)
{
await consumeContext.RespondAsync<ICreateGroupResult>(new
{
GroupGuid = Guid.NewGuid()
});
}
}
public class CreateGroupActionDefinition : ConsumerDefinition<CreateGroupAction>
{
public CreateGroupActionDefinition()
{
EndpointName = "group-service";
}
}
}
这是“service-client”一侧的样子:
using System;
using System.Collections.Generic;
using Airstrike.Backend.GraphqlPublicGateway.Services;
using Airstrike.Backend.GroupService.ServiceContract.Actions.CreateGroup;
using GraphQL.Server.Ui.Altair;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using GraphQL.Server;
using MassTransit;
namespace Airstrike.Backend.GraphqlPublicGateway
{
using Schema;
public class Startup
{
public Startup(IConfiguration configuration, IWebHostEnvironment environment)
{
Configuration = configuration;
Environment = environment;
}
public IConfiguration Configuration { get; }
public IWebHostEnvironment Environment { get; }
public void ConfigureServices(IServiceCollection services)
{
services.AddScoped<IGroupService, Services.GroupService>();
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
});
x.AddRequestClient<ICreateGroupPayload>();
});
}
}
}
这里是我调用服务的方式(来自 service-client):
using System;
using Airstrike.Backend.GroupService.ServiceContract.Actions.CreateGroup;
using System.Threading.Tasks;
using MassTransit;
namespace Airstrike.Backend.GraphqlPublicGateway.Services
{
public class GroupService : IGroupService
{
private readonly IRequestClient<ICreateGroupPayload> _createGroupClient;
public GroupService(
IRequestClient<ICreateGroupPayload> createGroupClient)
{
_createGroupClient = createGroupClient;
}
public async Task<ICreateGroupResult> CreateGroup(ICreateGroupPayload payload)
{
var response = await _createGroupClient.GetResponse<ICreateGroupResult>(new
{
Name = payload.Name,
Description = payload.Description
});
return response.Message;
}
}
}
您似乎没有在服务客户端启动总线。为什么不使用与在服务服务器应用程序中相同的 AddMassTransitHostedService
?
Always, always start the bus.
来自文档 Requests 部分:
The bus must always be started, so if the hosted service is not included, be sure to start the bus manually using IBusControl.
我正在尝试使用 MassTransit 和 RabbitMq 设置 request-response 模式。请求已交付给消费者,RespondAsync 成功,但客户端的 GetResponse 永远挂起,最后超时。我做错了什么?似乎 RabbitMQ 端的所有配置都正确(见下面的屏幕)。
以下是我在“service-server”上添加公共交通的方法:
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using MassTransit;
namespace Airstrike.Backend.GroupService.Service
{
public class Program
{
public static async Task Main(string[] args)
{
await CreateHostBuilder(args).Build().RunAsync();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
var entryAssembly = Assembly.GetEntryAssembly();
x.AddConsumers(entryAssembly);
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService(true);
});
}
}
这是我的消费者/消费者定义的样子:
using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransit.ConsumeConfigurators;
using MassTransit.Definition;
using Airstrike.Backend.GroupService.ServiceContract.Actions.CreateGroup;
namespace Airstrike.Backend.GroupService.Service.Actions.CreateGroup
{
public class CreateGroupAction :
IConsumer<ICreateGroupPayload>
{
public async Task Consume(ConsumeContext<ICreateGroupPayload> consumeContext)
{
await consumeContext.RespondAsync<ICreateGroupResult>(new
{
GroupGuid = Guid.NewGuid()
});
}
}
public class CreateGroupActionDefinition : ConsumerDefinition<CreateGroupAction>
{
public CreateGroupActionDefinition()
{
EndpointName = "group-service";
}
}
}
这是“service-client”一侧的样子:
using System;
using System.Collections.Generic;
using Airstrike.Backend.GraphqlPublicGateway.Services;
using Airstrike.Backend.GroupService.ServiceContract.Actions.CreateGroup;
using GraphQL.Server.Ui.Altair;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using GraphQL.Server;
using MassTransit;
namespace Airstrike.Backend.GraphqlPublicGateway
{
using Schema;
public class Startup
{
public Startup(IConfiguration configuration, IWebHostEnvironment environment)
{
Configuration = configuration;
Environment = environment;
}
public IConfiguration Configuration { get; }
public IWebHostEnvironment Environment { get; }
public void ConfigureServices(IServiceCollection services)
{
services.AddScoped<IGroupService, Services.GroupService>();
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
});
x.AddRequestClient<ICreateGroupPayload>();
});
}
}
}
这里是我调用服务的方式(来自 service-client):
using System;
using Airstrike.Backend.GroupService.ServiceContract.Actions.CreateGroup;
using System.Threading.Tasks;
using MassTransit;
namespace Airstrike.Backend.GraphqlPublicGateway.Services
{
public class GroupService : IGroupService
{
private readonly IRequestClient<ICreateGroupPayload> _createGroupClient;
public GroupService(
IRequestClient<ICreateGroupPayload> createGroupClient)
{
_createGroupClient = createGroupClient;
}
public async Task<ICreateGroupResult> CreateGroup(ICreateGroupPayload payload)
{
var response = await _createGroupClient.GetResponse<ICreateGroupResult>(new
{
Name = payload.Name,
Description = payload.Description
});
return response.Message;
}
}
}
您似乎没有在服务客户端启动总线。为什么不使用与在服务服务器应用程序中相同的 AddMassTransitHostedService
?
Always, always start the bus.
来自文档 Requests 部分:
The bus must always be started, so if the hosted service is not included, be sure to start the bus manually using IBusControl.