调用所有处理程序后如何提交 UoW

How to commit UoW once all handlers are invoked

我有一个场景,其中单个节点上的所有处理程序都应该在调用所有处理程序后提交的单个 "Unit of Work" 上运行。我认为最好的方法是执行以下操作:

收到消息后,将这些操作作为管道的一部分执行:

  1. 创建新的 DbContext 实例 (UoW)
  2. 调用处理程序并传递 DbContext 实例
  3. 如果调用所有处理程序而没有错误调用 DbContext.SaveChanges
  4. 处置 DbContext

能否提示我如何自定义 Rebus 管道以满足上述要求?

编辑:

我已经结束了:

private static IBus _bus;

public void ConfigureServices(IServiceCollection services)
{
    services.AddDbContext<MyDbContext>();
    services.AddMvc();
    services.AddTransient<IBus>(sp => _bus);
}

public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
   StartRebus(app);
   ...
}

public static void StartRebus(this IApplicationBuilder app)
{
    var rebusServices = new ServiceCollection();
    rebusServices.AutoRegisterHandlersFromAssemblyOf<ActivityDenormalizer>();
    rebusServices.AddTransient<MyDbContext>(sp =>
    {
        var messageContext = MessageContext.Current
            ?? throw new InvalidOperationException("MessageContext.Current is null.");

        return messageContext.TransactionContext.Items
            .GetOrThrow<MyDbContext>(nameof(MyDbContext));
    });


    rebusServices.AddRebus((configure, sp) => configure
        .Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "Messages"))
        .Options(o =>
        {
            o.EnableUnitOfWork<MyDbContext>(
                unitOfWorkFactoryMethod: messageContext =>
                {
                    //create new dbcontext instance regardless of ServiceLifeTime.
                    //Notice that I'm using ApplicationServices here, not RebusServices.
                    var dbContext = ActivatorUtilities.CreateInstance<MyDbContext>(app.ApplicationServices);
                    messageContext.TransactionContext.Items[nameof(MyDbContext)] = dbContext;
                    return dbContext;
                },
                commitAction: (messageContext, dbContext) => dbContext.SaveChanges(),
                cleanupAction: (messageContext, dbContext) => dbContext.Dispose());

        }));

    var rebusServiceProvider = rebusServices.BuildServiceProvider();
    rebusServiceProvider.UseRebus();
    _bus = rebusServiceProvider.GetRequiredService<IBus>();
}

应用程序服务和 rebus 服务在两个地方互连:

  1. IBus 由 rebusServiceProvider 解析,但该实例也在应用程序服务中注册,因此我可以从我的应用程序向它发送消息。

  2. MyDbContext 依赖项(DbContextOptions 等..)由 ApplicationServices 解析,但 DbContext 也在 Rebus 中实例化 unitOfWorkFactoryMethod 并在 rebusServices 中注册,以便它可以注入到 Rebus 处理程序.

是的 – 查看 Rebus.UnitOfWork – 它有您需要的挂钩。

具体来说,对于 Entity Framework,您可以这样做:

Configure.With(new CastleWindsorContainerAdapter(container))
    .Transport(t => t.UseMsmq("test"))
    .Options(o => o.EnableUnitOfWork(
        async context => new CustomUnitOfWork(context, connectionString),
        commitAction: async (context, uow) => await uow.Commit()
    ))
    .Start();

其中 CustomUnitOfWork 看起来像这样:

class CustomUnitOfWork
{
    public const string ItemsKey = "current-db-context";

    readonly MyDbContext _dbContext;

    public CustomUnitOfWork(IMessageContext messageContext, string connectionString)
    {
        _dbContext = new MyDbContext(connectionString);
        messageContext.TransactionContext.Items[ItemsKey] = this;
    }

    public async Task Commit()
    {
        await _dbContext.SaveChangesAsync();
    }

    public MyDbContext GetDbContext() => _dbContext;
}

然后您将设置 IoC 容器以通过从当前消息上下文中获取它来解析 MyDbContext – 对于 Castle Windsor,可以这样完成:

container.Register(
    Component.For<CustomUnitOfWork>()
        .UsingFactoryMethod(k =>
        {
            var messageContext = MessageContext.Current
                ?? throw new InvalidOperationException("Can't inject uow outside of Rebus handler");

            return messageContext.TransactionContext.Items
                .GetOrThrow<CustomUnitOfWork>(CustomUnitOfWork.ItemsKey);
        })
        .LifestyleTransient(),

    Component.For<MyDbContext>()
        .UsingFactoryMethod(k => k.Resolve<CustomUnitOfWork>().GetDbContext())
        .LifestyleTransient()
);