MassTransit 未能处理作业
MassTransit failes to process job
我正在尝试使用 RabbitMQ 队列连接两个服务。第一个服务将值推送到队列,第二个服务检索它并进行处理。一切都很好,但是当第二个服务尝试处理作业时,它会抛出异常。队列项停留在 JobAttempt 队列中,没有任何信息,消费者服务重试处理作业,但每次都抛出相同的异常。
异常
fail: MassTransit.ReceiveTransport[0]
S-FAULT rabbitmq://localhost/JobAttempt f0cb0000-1616-902e-edb0-08d97cd26cf9 MassTransit.Contracts.JobService.JobStatusCheckRequested
fail: MassTransit.ReceiveTransport[0]
T-FAULT rabbitmq://localhost/JobAttempt f0cb0000-1616-902e-8129-08d97cca994f
System.Threading.Tasks.TaskCanceledException: A task was canceled.
at MassTransit.Saga.InMemoryRepository.InMemorySagaRepositoryContext`2.Delete(SagaConsumeContext`1 context)
at MassTransit.Saga.MissingSagaPipe`2.Send(SagaConsumeContext`2 context)
at MassTransit.Saga.MissingSagaPipe`2.Send(SagaConsumeContext`2 context)
at MassTransit.Saga.SendSagaPipe`2.Send(SagaRepositoryContext`2 context)
at MassTransit.Saga.InMemoryRepository.InMemorySagaRepositoryContextFactory`1.Send[T](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Pipeline.Filters.InMemoryOutboxFilter`2.Send(TContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.InMemoryOutboxFilter`2.Send(TContext context, IPipe`1 next)
at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at GreenPipes.Partitioning.Partition.Send[T](T context, IPipe`1 next)
at GreenPipes.Filters.TeeFilter`1.<>c__DisplayClass5_0.<<Send>g__SendAsync|1>d.MoveNext()
--- End of stack trace from previous location ---
at GreenPipes.Filters.OutputPipeFilter`2.SendToOutput(IPipe`1 next, TOutput pipeContext)
at GreenPipes.Filters.OutputPipeFilter`2.SendToOutput(IPipe`1 next, TOutput pipeContext)
at GreenPipes.Filters.DynamicFilter`1.<>c__DisplayClass10_0.<<Send>g__SendAsync|0>d.MoveNext()
--- End of stack trace from previous location ---
at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next)
at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.DeadLetterFilter.GreenPipes.IFilter<MassTransit.ReceiveContext>.Send(ReceiveContext context, IPipe`1 next)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.RabbitMqTransport.Pipeline.RabbitMqBasicConsumer.<>c__DisplayClass24_0.<<HandleBasicDeliver>b__0>d.MoveNext()
生产者启动:
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
消费者启动:
services.AddMassTransit(x =>
{
x.AddDelayedMessageScheduler();
x.AddConsumer<LoanRequestJobConsumer>(cfg =>
{
cfg.Options<JobOptions<LoanRequestBroker>>(options =>
{
options.SetJobTimeout(TimeSpan.FromMinutes(5));
options.SetConcurrentJobLimit(10);
});
});
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
cfg.UseDelayedMessageScheduler();
cfg.ServiceInstance(instance =>
{
instance.ConfigureJobServiceEndpoints(js =>
{
js.SagaPartitionCount = 1;
js.FinalizeCompleted = true;
});
cfg.ReceiveEndpoint("loan-request-processing", e =>
{
e.ConfigureConsumer<LoanRequestJobConsumer>(context);
});
instance.ConfigureEndpoints(context);
});
});
});
services.AddMassTransitHostedService();
工作消费者
public class LoanRequestJobConsumer : IJobConsumer<LoanRequestBroker>
{
private readonly ILogger<LoanRequestJobConsumer> _logger;
private readonly ILoanProcessingService _processingService;
public LoanRequestJobConsumer(
ILogger<LoanRequestJobConsumer> logger,
ILoanProcessingService processingService)
{
_logger = logger;
_processingService = processingService;
}
public async Task Run(JobContext<LoanRequestBroker> context)
{
_logger.LogInformation($"{nameof(LoanRequestJobConsumer)}: start processing loan request id = {context.Job.Id}");
var processingInfo = new LoanProcessingInfo
{
Status = TaskStatus.InProgress,
LoanRequest = context.Job.Adapt<LoanRequest>()
};
processingInfo = await _processingService.SaveProcessingInfoAsync(processingInfo);
processingInfo = await _processingService.ProcessAsync(processingInfo);
processingInfo = await _processingService.SaveProcessingInfoAsync(processingInfo);
_logger.LogInformation($"{nameof(LoanRequestJobConsumer)}: end processing loan request id = {context.Job.Id}" +
$"\nResult: {JsonConvert.SerializeObject(processingInfo)}");
}
}
我如何将项目推送到队列
var endpoint = await _sendEndpointProvider.GetSendEndpoint(_brokerEndpoints.LoanProcessingQueue);
await endpoint.Send(loanRequest.Adapt<LoanRequestBroker>());
如果我不得不猜测,没有任何其他错误日志详细信息,我会认为延迟交换插件不是 installed/enabled 在 RabbitMQ 上。
我正在尝试使用 RabbitMQ 队列连接两个服务。第一个服务将值推送到队列,第二个服务检索它并进行处理。一切都很好,但是当第二个服务尝试处理作业时,它会抛出异常。队列项停留在 JobAttempt 队列中,没有任何信息,消费者服务重试处理作业,但每次都抛出相同的异常。
异常
fail: MassTransit.ReceiveTransport[0]
S-FAULT rabbitmq://localhost/JobAttempt f0cb0000-1616-902e-edb0-08d97cd26cf9 MassTransit.Contracts.JobService.JobStatusCheckRequested
fail: MassTransit.ReceiveTransport[0]
T-FAULT rabbitmq://localhost/JobAttempt f0cb0000-1616-902e-8129-08d97cca994f
System.Threading.Tasks.TaskCanceledException: A task was canceled.
at MassTransit.Saga.InMemoryRepository.InMemorySagaRepositoryContext`2.Delete(SagaConsumeContext`1 context)
at MassTransit.Saga.MissingSagaPipe`2.Send(SagaConsumeContext`2 context)
at MassTransit.Saga.MissingSagaPipe`2.Send(SagaConsumeContext`2 context)
at MassTransit.Saga.SendSagaPipe`2.Send(SagaRepositoryContext`2 context)
at MassTransit.Saga.InMemoryRepository.InMemorySagaRepositoryContextFactory`1.Send[T](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Pipeline.Filters.InMemoryOutboxFilter`2.Send(TContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.InMemoryOutboxFilter`2.Send(TContext context, IPipe`1 next)
at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at GreenPipes.Partitioning.Partition.Send[T](T context, IPipe`1 next)
at GreenPipes.Filters.TeeFilter`1.<>c__DisplayClass5_0.<<Send>g__SendAsync|1>d.MoveNext()
--- End of stack trace from previous location ---
at GreenPipes.Filters.OutputPipeFilter`2.SendToOutput(IPipe`1 next, TOutput pipeContext)
at GreenPipes.Filters.OutputPipeFilter`2.SendToOutput(IPipe`1 next, TOutput pipeContext)
at GreenPipes.Filters.DynamicFilter`1.<>c__DisplayClass10_0.<<Send>g__SendAsync|0>d.MoveNext()
--- End of stack trace from previous location ---
at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next)
at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.DeadLetterFilter.GreenPipes.IFilter<MassTransit.ReceiveContext>.Send(ReceiveContext context, IPipe`1 next)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.RabbitMqTransport.Pipeline.RabbitMqBasicConsumer.<>c__DisplayClass24_0.<<HandleBasicDeliver>b__0>d.MoveNext()
生产者启动:
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
消费者启动:
services.AddMassTransit(x =>
{
x.AddDelayedMessageScheduler();
x.AddConsumer<LoanRequestJobConsumer>(cfg =>
{
cfg.Options<JobOptions<LoanRequestBroker>>(options =>
{
options.SetJobTimeout(TimeSpan.FromMinutes(5));
options.SetConcurrentJobLimit(10);
});
});
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
cfg.UseDelayedMessageScheduler();
cfg.ServiceInstance(instance =>
{
instance.ConfigureJobServiceEndpoints(js =>
{
js.SagaPartitionCount = 1;
js.FinalizeCompleted = true;
});
cfg.ReceiveEndpoint("loan-request-processing", e =>
{
e.ConfigureConsumer<LoanRequestJobConsumer>(context);
});
instance.ConfigureEndpoints(context);
});
});
});
services.AddMassTransitHostedService();
工作消费者
public class LoanRequestJobConsumer : IJobConsumer<LoanRequestBroker>
{
private readonly ILogger<LoanRequestJobConsumer> _logger;
private readonly ILoanProcessingService _processingService;
public LoanRequestJobConsumer(
ILogger<LoanRequestJobConsumer> logger,
ILoanProcessingService processingService)
{
_logger = logger;
_processingService = processingService;
}
public async Task Run(JobContext<LoanRequestBroker> context)
{
_logger.LogInformation($"{nameof(LoanRequestJobConsumer)}: start processing loan request id = {context.Job.Id}");
var processingInfo = new LoanProcessingInfo
{
Status = TaskStatus.InProgress,
LoanRequest = context.Job.Adapt<LoanRequest>()
};
processingInfo = await _processingService.SaveProcessingInfoAsync(processingInfo);
processingInfo = await _processingService.ProcessAsync(processingInfo);
processingInfo = await _processingService.SaveProcessingInfoAsync(processingInfo);
_logger.LogInformation($"{nameof(LoanRequestJobConsumer)}: end processing loan request id = {context.Job.Id}" +
$"\nResult: {JsonConvert.SerializeObject(processingInfo)}");
}
}
我如何将项目推送到队列
var endpoint = await _sendEndpointProvider.GetSendEndpoint(_brokerEndpoints.LoanProcessingQueue);
await endpoint.Send(loanRequest.Adapt<LoanRequestBroker>());
如果我不得不猜测,没有任何其他错误日志详细信息,我会认为延迟交换插件不是 installed/enabled 在 RabbitMQ 上。