带有自定义绑定到队列的公共交通发布

mass transit publisher winth custom bind to queue

我想将消息发布到不是公共交通图书馆的客户端,我想为我的实体自定义命名交换点,并让这个交换点与队列有我自己的自定义连接,请告诉我该怎么做?

我试过了,但是没用 =(

启动

public void ConfigureServices(IServiceCollection services)
        {
            services.AddMassTransit(busConfig =>
            {
                busConfig.UsingRabbitMq((context, cfg) =>
                {
                    cfg.Host("localhost", 5672, "/", configurator =>
                    {
                        configurator.Username("guest");
                        configurator.Password("guest");
                    });
                    
                    cfg.UsePrometheusMetrics(serviceName: "publisher_service");

                    cfg.ConfigurePublish(x =>
                    {
                        x.UseExecute(publishContext =>
                        {
                            publishContext.Serializer =
                                new RawJsonMessageSerializer(RawJsonSerializerOptions.AnyMessageType);
                            publishContext.ContentType.MediaType = "application/json";
                        });
                    });

                    cfg.Message<Message>(configurator => configurator.SetEntityName("mass_transit_exchange"));
                    
                    cfg.Publish<Message>(configurator =>
                    {   
                        configurator.BindQueue("mass_transit_exchange", "testQUEUE", bindingConfigurator =>
                        {
                            bindingConfigurator.ExchangeType = ExchangeType.Direct;
                            bindingConfigurator.RoutingKey = "key";
                        });
                    });
                });
            });

            services.AddMassTransitHostedService();
            services.AddControllers();
        }

控制器方法

[HttpPost]
        public async Task<ActionResult> SentMessage([FromBody] Message message, CancellationToken cancellationToken)
        {
            message.CreatedOn = DateTimeOffset.Now;

            await _endpoint.Publish<Message>(message,
                context =>
                {
                    context.Durable = true;
                    var a = context.Headers.GetAll();
                    foreach (var keyValuePair in a)
                    {
                        context.Headers.Set(keyValuePair.Key, null);
                    }
                    context.MessageId = null;
                    context.TimeToLive = TimeSpan.FromMilliseconds(30000);
                },
                cancellationToken);
            _logger.LogInformation($"message was sent: {JsonConvert.SerializeObject(message, Formatting.Indented)}");

            return Ok();
        }

我想在发布者中创建这个拓扑 topology

错误

MassTransit.RabbitMqTransport.RabbitMqAddressException: The entity name must be a sequence of these characters: letters, digits, hyphen, underscore, period, or colon.
   at MassTransit.RabbitMqTransport.Topology.RabbitMqEntityNameValidator.ThrowIfInvalidEntityName(String name)
   at MassTransit.RabbitMqTransport.RabbitMqEndpointAddress..ctor(Uri hostAddress, Uri address)
   at MassTransit.RabbitMqTransport.Integration.ConnectionContextSupervisor.NormalizeAddress(Uri address)
   at MassTransit.RabbitMqTransport.Transport.RabbitMqSendTransportProvider.NormalizeAddress(Uri address)
   at MassTransit.Transports.SendEndpointProvider.GetSendEndpoint(Uri address)
   at MassTransit.Transports.ReceiveEndpoint.GetSendEndpoint(Uri address)
   at MassTransit.MassTransitBus.MassTransit.ISendEndpointProvider.GetSendEndpoint(Uri address)
   at MassTransit.Scoping.ScopedSendEndpointProvider`1.MassTransit.ISendEndpointProvider.GetSendEndpoint(Uri address)
   at publisherMassTransit.Controllers.ApiController.SentMessage(Message message, CancellationToken cancellationToken) in C:\Users\qualita\Desktop\examples\publisherMassTransit\Controllers\ApiController.cs:line 32
   at lambda_method(Closure , Object )
   at Microsoft.Extensions.Internal.ObjectMethodExecutorAwaitable.Awaiter.GetResult()
   at Microsoft.AspNetCore.Mvc.Infrastructure.ActionMethodExecutor.TaskOfActionResultExecutor.Execute(IActionResultTypeMapper mapper, ObjectMethodExecutor executor, Object controller, Object[] arguments)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeActionMethodAsync>g__Awaited|12_0(ControllerActionInvoker invoker, ValueTask`1 actionResultValueTask)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.<InvokeNextActionFilterAsync>g__Awaited|10_0(ControllerActionInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.Rethrow(ActionExecutedContextSealed context)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.Next(State& next, Scope& scope, Object& state, Boolean& isCompleted)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ControllerActionInvoker.InvokeInnerFilterAsync()
--- End of stack trace from previous location where exception was thrown ---
   at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeFilterPipelineAsync>g__Awaited|19_0(ResourceInvoker invoker, Task lastTask, State next, Scope scope, Object state, Boolean isCompleted)
   at Microsoft.AspNetCore.Mvc.Infrastructure.ResourceInvoker.<InvokeAsync>g__Awaited|17_0(ResourceInvoker invoker, Task task, IDisposable scope)
   at Microsoft.AspNetCore.Routing.EndpointMiddleware.<Invoke>g__AwaitRequestTask|6_0(Endpoint endpoint, Task requestTask, ILogger logger)
   at Microsoft.AspNetCore.Diagnostics.DeveloperExceptionPageMiddleware.Invoke(HttpContext context)

HEADERS
=======
Connection: keep-alive
Content-Type: application/json
Accept: */*
Accept-Encoding: gzip, deflate, br
Host: localhost:5000
User-Agent: PostmanRuntime/7.28.4
Content-Length: 29
Postman-Token: f93cd39a-0757-4929-9c1a-4bc63393f1cf

您可以使用原始的JSON序列化器,只需要为发布的消息配置交换类型。

public void ConfigureServices(IServiceCollection services)
{
    services.AddMassTransit(busConfig =>
    {
        busConfig.UsingRabbitMq((context, cfg) =>
        {
            cfg.Host("localhost", 5672, "/", configurator =>
            {
                configurator.Username("guest");
                configurator.Password("guest");
            });

            cfg.UseRawJsonSerializer();
 
             cfg.UsePrometheusMetrics(serviceName: "publisher_service");

        });
    });
    services.AddMassTransitHostedService();
}

无需连接所有令人困惑的发布拓扑,只需发送消息:

public class Controller 
{
    readonly ISendEndpointProvider _sendEndpointProvider;

    public Controller(ISendEndpointProvider sendEndpointProvider)
    {
        _sendEndpointProvider = sendEndpointProvider;
    }

    [HttpPost]
    public async Task<ActionResult> SentMessage([FromBody] Message message, CancellationToken cancellationToken)
    {
        message.CreatedOn = DateTimeOffset.Now;

        var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri('queue:name&type=direct'));

        await endpoint.Send<Message>(message,
            context =>
            {
                var a = context.Headers.GetAll();
                foreach (var keyValuePair in a)
                {
                    context.Headers.Set(keyValuePair.Key, null);
                }
                context.MessageId = null;
                context.TimeToLive = TimeSpan.FromMilliseconds(30000);
            },
            cancellationToken);

        _logger.LogInformation($"message was sent: {JsonConvert.SerializeObject(message, Formatting.Indented)}");

        return Ok();
    }
}

或者类似的东西。您可以根据需要指定其他查询字符串参数来调整交换类型。