透明地批处理存储

Batching stores transparently

我们使用以下框架和版本:

我有一个问题,我们的一些业务逻辑被划分为逻辑单元,如下所示:

在代码中,大致如下所示:

TransactionRecord transaction = transactionRepository.create();
transaction.create(creationCommand);`

Transaction#create(以事务方式运行)中,会发生如下情况:

storeTransaction();
storePayments();
storeProducts();
// ... other relevant information

一个给定的交易可以有许多不同类型的产品和属性,所有这些都被存储。其中许多属性会导致 UPDATE 语句,而有些属性可能会导致 INSERT 语句 - 事先很难完全知道。

例如,storeProducts 方法大致如下所示:

products.forEach(product -> {
    ProductRecord record = productRepository.findProductByX(...);
    if (record == null) {
        record = productRepository.create();
        record.setX(...);
        record.store();
    } else {
      // do something else
    }
});

如果产品是新的,则 INSERTed。否则,可能会发生其他计算。根据事务的大小,此单个用户事务显然可能导致多达 O(n) 数据库 calls/roundtrips,甚至更多,具体取决于存在的其他属性。在存在大量属性的事务中,这可能会导致针对单个请求 (!) 的数百次数据库调用。我想尽可能将其降低到 O(1),以便我们的数据库负载更可预测。

自然地,batch 和 bulk inserts/updates 会想到这里。我想做的是使用 jOOQ 将所有这些语句批处理到一个批处理中,并在提交之前成功调用方法后执行。我发现了几个 (, jOOQ API, jOOQ GitHub Feature Request) posts where this topic is implicitly mentioned, and one user groups post 似乎与我的问题明确相关。

由于我将 SpringjOOQ 一起使用,我相信我理想的解决方案(最好是声明式的)如下所示:

@Batched(100) // batch size as parameter, potentially
@Transactional
public void createTransaction(CreationCommand creationCommand) {
    // all inserts/updates above are added to a batch and executed on successful invocation
}

为了让它工作,我想我需要管理一个范围(ThreadLocal/Transactional/Session 范围)资源,它可以跟踪当前批次,例如那:

  1. 如果方法是 @Batched
  2. ,则在进入方法之前创建一个空批次
  3. 通过 DI 提供的自定义 DSLContext(可能扩展 DefaultDSLContext)有一个 ThreadLocal 标志,用于跟踪是否应批处理任何当前语句,并且如果是的话
  4. 拦截调用并将它们添加到当前批处理而不是立即执行它们。

但是,第 3 步需要重写我们的大部分代码(IMO)相对可读:

records.forEach(record -> {
    record.setX(...);
    // ...
    record.store();
}

至:

userObjects.forEach(userObject -> {
    dslContext.insertInto(...).values(userObject.getX(), ...).execute();
}

这首先会破坏这种抽象的目的,因为第二种形式也可以使用 DSLContext#batchStoreDSLContext#batchInsert 重写。然而,IMO,批处理和批量插入不应该由单个开发人员决定,应该能够在更高级别透明地处理(例如由框架)。

我发现 jOOQ API 的可读性是使用它的一个惊人的好处,但是它似乎不适合(据我所知)interception/extension 非常适合此类情况。使用 jOOQ 3.11.1(甚至当前)API,是否有可能通过透明 batch/bulk 处理获得类似于前者的行为?这意味着什么?


编辑:

我想到的一个可能但非常棘手的解决方案是启用透明的商店批处理,如下所示:

  1. 创建一个 RecordListener 并在启用批处理时将其作为默认值添加到 Configuration
  2. RecordListener#storeStart 中,将查询添加到当前事务的批处理中(例如在 ThreadLocal<List> 中)
  3. AbstractRecord 有一个 changed 标志,在存储之前会检查(org.jooq.impl.UpdatableRecordImpl#store0org.jooq.impl.TableRecordImpl#addChangedValues)。重置它(并将其保存以备后用)会使存储操作成为空操作。
  4. 最后,方法调用成功但在提交之前:

据我所知,这种方法在理论上应该有效。显然,它非常 hacky 并且容易崩溃,因为如果代码依赖反射工作,库内部可能随时更改。

有谁知道更好的方法,只使用 public jOOQ API?

jOOQ 3.14解决方案

您已经发现了相关的feature request #3419,它将在JDBC 级别从jOOQ 3.14 开始解决这个问题。你可以直接使用 BatchedConnection,包装你自己的连接来实现下面的,或者使用这个 API:

ctx.batched(c -> {

    // Make sure all records are attached to c, not ctx, e.g. by fetching from c.dsl()
    records.forEach(record -> {
        record.setX(...);
        // ...
        record.store();
    }
});

jOOQ 3.13及之前解决方案

目前,在实施#3419 之前(在 jOOQ 3.14 中将实施),您可以自己实施此作为解决方法。您必须代理 JDBC ConnectionPreparedStatement 以及 ...

...截取全部:

  • 调用Connection.prepareStatement(String),如果SQL字符串与最后一个准备好的语句相同,则返回一个缓存的代理语句,或者批量执行最后一个准备好的语句并创建一个新的。
  • 调用 PreparedStatement.executeUpdate()execute(),并通过调用 PreparedStatement.addBatch()
  • 替换它们

...委托全部:

  • 呼叫其他 API,例如Connection.createStatement(),它应该刷新上面的缓冲批次,然后调用委托 API。

我不建议绕过 jOOQ 的 RecordListener 和其他 SPI,我认为这是缓冲数据库交互的错误抽象级别。此外,您还需要批处理其他语句类型。

请注意,默认情况下,jOOQ 的 UpdatableRecord 会尝试获取生成的标识值(请参阅 Settings.returnIdentityOnUpdatableRecord),这会阻止批处理。此类 store() 调用必须立即执行,因为您可能希望标识值可用。