如何在 MassTransit 和 Automatonymous 中配置 EF Core 持久性?
How to configure EF Core persistence in MassTransit and Automatonymous?
我正在尝试使用 EF Core 配置自动匿名工作器实现作为持久性。我通过 api 发布事件并使用 RabbitMq 作为传输在托管服务中处理它。不幸的是,数据库不存储机器状态。我应用了迁移,我看到了 table OrderState
,但是在我发布 OrderSubmmited
事件后里面没有数据。它得到了正确的处理,因为我在控制台中看到了一个日志,但数据库 table 仍然是空的。我错过了什么?
这是我的代码:
事件:
public interface OrderSubmitted : CorrelatedBy<Guid>
{
}
消费者:
public class OrderSubmittedConsumer : IConsumer<OrderSubmitted>
{
public Task Consume(ConsumeContext<OrderSubmitted> context)
{
Console.Out.WriteLine($"Order with id {context.Message.CorrelationId} has been submitted.");
return Task.CompletedTask;
}
}
我的 saga 实例:
public class OrderState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
}
状态机:
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public State Submitted { get; private set; }
public Event<OrderSubmitted> OrderSubmitted { get; set; }
public OrderStateMachine()
{
Event(() => OrderSubmitted);
InstanceState(x => x.CurrentState);
Initially(
When(OrderSubmitted)
.TransitionTo(Submitted));
DuringAny(
When(OrderSubmitted)
.TransitionTo(Submitted));
}
}
public class OrderStateMachineDefinition : SagaDefinition<OrderState>
{
public OrderStateMachineDefinition()
{
ConcurrentMessageLimit = 15;
}
}
数据库上下文:
public class OrderStateDbContext : SagaDbContext
{
public OrderStateDbContext(DbContextOptions options) : base(options)
{
}
protected override IEnumerable<ISagaClassMap> Configurations
{
get
{
yield return new OrderStateMap();
}
}
}
public class OrderStateMap : SagaClassMap<OrderState>
{
protected override void Configure(EntityTypeBuilder<OrderState> entity, ModelBuilder model)
{
entity.Property(x => x.CurrentState).HasMaxLength(64);
}
}
计划class:
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddDbContext<OrderStateDbContext>(builder =>
builder.UseSqlServer("Server=(localdb)\mssqllocaldb;Database=OrderState;Trusted_Connection=True;", m =>
{
m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
m.MigrationsHistoryTable($"__{nameof(OrderStateDbContext)}");
}));
services.AddMassTransit(config =>
{
config.AddSagaRepository<OrderState>()
.EntityFrameworkRepository(r =>
{
r.ExistingDbContext<OrderStateDbContext>();
r.LockStatementProvider = new SqlServerLockStatementProvider();
});
config.AddConsumer<OrderSubmittedConsumer>();
config.UsingRabbitMq((ctx, cfg) => {
cfg.Host("amqp://guest:guest@localhost:5672");
cfg.ReceiveEndpoint("order-queue", c => {
c.ConfigureConsumer<OrderSubmittedConsumer>(ctx);
// I'm assuming this is the place where something like c.StateMachineSaga() is missing, but I don't know how should this look like with EF Core
});
});
});
services.AddHostedService<Worker>();
});
}
工人class:
public class Worker : IHostedService
{
private readonly IBusControl _bus;
public Worker(IBusControl bus)
{
_bus = bus;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _bus.StartAsync(cancellationToken).ConfigureAwait(false);
}
public Task StopAsync(CancellationToken cancellationToken)
{
return _bus.StopAsync(cancellationToken);
}
}
在您的代码示例中,您没有添加传奇,也没有在接收端点上配置传奇。消费者是一个独立的事物,与传奇完全无关。您应该致电:
AddSagaStateMachine<OrderStateMachine, OrderState, OrderStateMachineDefinition>()
然后使用 ConfigureSaga<OrderState>
或切换到 ConfigureEndpoints
以自动配置端点。
我正在尝试使用 EF Core 配置自动匿名工作器实现作为持久性。我通过 api 发布事件并使用 RabbitMq 作为传输在托管服务中处理它。不幸的是,数据库不存储机器状态。我应用了迁移,我看到了 table OrderState
,但是在我发布 OrderSubmmited
事件后里面没有数据。它得到了正确的处理,因为我在控制台中看到了一个日志,但数据库 table 仍然是空的。我错过了什么?
这是我的代码:
事件:
public interface OrderSubmitted : CorrelatedBy<Guid>
{
}
消费者:
public class OrderSubmittedConsumer : IConsumer<OrderSubmitted>
{
public Task Consume(ConsumeContext<OrderSubmitted> context)
{
Console.Out.WriteLine($"Order with id {context.Message.CorrelationId} has been submitted.");
return Task.CompletedTask;
}
}
我的 saga 实例:
public class OrderState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
}
状态机:
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public State Submitted { get; private set; }
public Event<OrderSubmitted> OrderSubmitted { get; set; }
public OrderStateMachine()
{
Event(() => OrderSubmitted);
InstanceState(x => x.CurrentState);
Initially(
When(OrderSubmitted)
.TransitionTo(Submitted));
DuringAny(
When(OrderSubmitted)
.TransitionTo(Submitted));
}
}
public class OrderStateMachineDefinition : SagaDefinition<OrderState>
{
public OrderStateMachineDefinition()
{
ConcurrentMessageLimit = 15;
}
}
数据库上下文:
public class OrderStateDbContext : SagaDbContext
{
public OrderStateDbContext(DbContextOptions options) : base(options)
{
}
protected override IEnumerable<ISagaClassMap> Configurations
{
get
{
yield return new OrderStateMap();
}
}
}
public class OrderStateMap : SagaClassMap<OrderState>
{
protected override void Configure(EntityTypeBuilder<OrderState> entity, ModelBuilder model)
{
entity.Property(x => x.CurrentState).HasMaxLength(64);
}
}
计划class:
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddDbContext<OrderStateDbContext>(builder =>
builder.UseSqlServer("Server=(localdb)\mssqllocaldb;Database=OrderState;Trusted_Connection=True;", m =>
{
m.MigrationsAssembly(Assembly.GetExecutingAssembly().GetName().Name);
m.MigrationsHistoryTable($"__{nameof(OrderStateDbContext)}");
}));
services.AddMassTransit(config =>
{
config.AddSagaRepository<OrderState>()
.EntityFrameworkRepository(r =>
{
r.ExistingDbContext<OrderStateDbContext>();
r.LockStatementProvider = new SqlServerLockStatementProvider();
});
config.AddConsumer<OrderSubmittedConsumer>();
config.UsingRabbitMq((ctx, cfg) => {
cfg.Host("amqp://guest:guest@localhost:5672");
cfg.ReceiveEndpoint("order-queue", c => {
c.ConfigureConsumer<OrderSubmittedConsumer>(ctx);
// I'm assuming this is the place where something like c.StateMachineSaga() is missing, but I don't know how should this look like with EF Core
});
});
});
services.AddHostedService<Worker>();
});
}
工人class:
public class Worker : IHostedService
{
private readonly IBusControl _bus;
public Worker(IBusControl bus)
{
_bus = bus;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _bus.StartAsync(cancellationToken).ConfigureAwait(false);
}
public Task StopAsync(CancellationToken cancellationToken)
{
return _bus.StopAsync(cancellationToken);
}
}
在您的代码示例中,您没有添加传奇,也没有在接收端点上配置传奇。消费者是一个独立的事物,与传奇完全无关。您应该致电:
AddSagaStateMachine<OrderStateMachine, OrderState, OrderStateMachineDefinition>()
然后使用 ConfigureSaga<OrderState>
或切换到 ConfigureEndpoints
以自动配置端点。