MassTransit 未能处理作业

MassTransit failes to process job

我正在尝试使用 RabbitMQ 队列连接两个服务。第一个服务将值推送到队列,第二个服务检索它并进行处理。一切都很好,但是当第二个服务尝试处理作业时,它会抛出异常。队列项停留在 JobAttempt 队列中,没有任何信息,消费者服务重试处理作业,但每次都抛出相同的异常。

异常

    fail: MassTransit.ReceiveTransport[0]
      S-FAULT rabbitmq://localhost/JobAttempt f0cb0000-1616-902e-edb0-08d97cd26cf9 MassTransit.Contracts.JobService.JobStatusCheckRequested
fail: MassTransit.ReceiveTransport[0]
    T-FAULT rabbitmq://localhost/JobAttempt f0cb0000-1616-902e-8129-08d97cca994f
      System.Threading.Tasks.TaskCanceledException: A task was canceled.
         at MassTransit.Saga.InMemoryRepository.InMemorySagaRepositoryContext`2.Delete(SagaConsumeContext`1 context)
         at MassTransit.Saga.MissingSagaPipe`2.Send(SagaConsumeContext`2 context)
         at MassTransit.Saga.MissingSagaPipe`2.Send(SagaConsumeContext`2 context)
         at MassTransit.Saga.SendSagaPipe`2.Send(SagaRepositoryContext`2 context)
         at MassTransit.Saga.InMemoryRepository.InMemorySagaRepositoryContextFactory`1.Send[T](ConsumeContext`1 context, IPipe`1 next)
         at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
         at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
         at MassTransit.Pipeline.Filters.InMemoryOutboxFilter`2.Send(TContext context, IPipe`1 next)
         at MassTransit.Pipeline.Filters.InMemoryOutboxFilter`2.Send(TContext context, IPipe`1 next)
         at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
         at GreenPipes.Partitioning.Partition.Send[T](T context, IPipe`1 next)
         at GreenPipes.Filters.TeeFilter`1.<>c__DisplayClass5_0.<<Send>g__SendAsync|1>d.MoveNext()
      --- End of stack trace from previous location ---
         at GreenPipes.Filters.OutputPipeFilter`2.SendToOutput(IPipe`1 next, TOutput pipeContext)
         at GreenPipes.Filters.OutputPipeFilter`2.SendToOutput(IPipe`1 next, TOutput pipeContext)
         at GreenPipes.Filters.DynamicFilter`1.<>c__DisplayClass10_0.<<Send>g__SendAsync|0>d.MoveNext()
      --- End of stack trace from previous location ---
         at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next)
         at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
         at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
         at MassTransit.Pipeline.Filters.DeadLetterFilter.GreenPipes.IFilter<MassTransit.ReceiveContext>.Send(ReceiveContext context, IPipe`1 next)
         at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
         at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
         at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
         at MassTransit.RabbitMqTransport.Pipeline.RabbitMqBasicConsumer.<>c__DisplayClass24_0.<<HandleBasicDeliver>b__0>d.MoveNext()

生产者启动:

services.AddMassTransit(x =>
{
    x.SetKebabCaseEndpointNameFormatter();

    x.UsingRabbitMq((context, cfg) =>
    {
       cfg.ConfigureEndpoints(context);
    });
});
services.AddMassTransitHostedService();

消费者启动:

        services.AddMassTransit(x =>
        {
            x.AddDelayedMessageScheduler();

            x.AddConsumer<LoanRequestJobConsumer>(cfg =>
            {
                cfg.Options<JobOptions<LoanRequestBroker>>(options =>
                {
                    options.SetJobTimeout(TimeSpan.FromMinutes(5));
                    options.SetConcurrentJobLimit(10);
                });
            });

            x.SetKebabCaseEndpointNameFormatter();

            x.UsingRabbitMq((context, cfg) =>
            {
                cfg.UseDelayedMessageScheduler();

                cfg.ServiceInstance(instance =>
                {
                    instance.ConfigureJobServiceEndpoints(js =>
                    {
                        js.SagaPartitionCount = 1;
                        js.FinalizeCompleted = true;
                    });

                    cfg.ReceiveEndpoint("loan-request-processing", e =>
                    {
                        e.ConfigureConsumer<LoanRequestJobConsumer>(context);
                    });

                    instance.ConfigureEndpoints(context);
                });
            });
        });
        services.AddMassTransitHostedService();

工作消费者

public class LoanRequestJobConsumer : IJobConsumer<LoanRequestBroker>
{
    private readonly ILogger<LoanRequestJobConsumer> _logger;
    private readonly ILoanProcessingService _processingService;

    public LoanRequestJobConsumer(
        ILogger<LoanRequestJobConsumer> logger,
        ILoanProcessingService processingService)
    {
        _logger = logger;
        _processingService = processingService;
    }

    public async Task Run(JobContext<LoanRequestBroker> context)
    {
        _logger.LogInformation($"{nameof(LoanRequestJobConsumer)}: start processing loan request id = {context.Job.Id}");

        var processingInfo = new LoanProcessingInfo
        {
            Status = TaskStatus.InProgress,
            LoanRequest = context.Job.Adapt<LoanRequest>()
        };
        processingInfo = await _processingService.SaveProcessingInfoAsync(processingInfo);

        processingInfo = await _processingService.ProcessAsync(processingInfo);

        processingInfo = await _processingService.SaveProcessingInfoAsync(processingInfo);

        _logger.LogInformation($"{nameof(LoanRequestJobConsumer)}: end processing loan request id = {context.Job.Id}" +
                               $"\nResult: {JsonConvert.SerializeObject(processingInfo)}");
    }
}

我如何将项目推送到队列

var endpoint = await _sendEndpointProvider.GetSendEndpoint(_brokerEndpoints.LoanProcessingQueue);
await endpoint.Send(loanRequest.Adapt<LoanRequestBroker>());

如果我不得不猜测,没有任何其他错误日志详细信息,我会认为延迟交换插件不是 installed/enabled 在 RabbitMQ 上。