透明地批处理存储
Batching stores transparently
我们使用以下框架和版本:
jOOQ 3.11.1
Spring Boot 2.3.1.RELEASE
Spring 5.2.7.RELEASE
我有一个问题,我们的一些业务逻辑被划分为逻辑单元,如下所示:
- 收到包含用户交易的请求
- 此请求包含各种信息,例如交易类型、本次交易包含哪些产品、进行了何种付款等。
- 然后将这些属性单独存储在数据库中。
在代码中,大致如下所示:
TransactionRecord transaction = transactionRepository.create();
transaction.create(creationCommand);`
在 Transaction#create
(以事务方式运行)中,会发生如下情况:
storeTransaction();
storePayments();
storeProducts();
// ... other relevant information
一个给定的交易可以有许多不同类型的产品和属性,所有这些都被存储。其中许多属性会导致 UPDATE
语句,而有些属性可能会导致 INSERT
语句 - 事先很难完全知道。
例如,storeProducts
方法大致如下所示:
products.forEach(product -> {
ProductRecord record = productRepository.findProductByX(...);
if (record == null) {
record = productRepository.create();
record.setX(...);
record.store();
} else {
// do something else
}
});
如果产品是新的,则 INSERT
ed。否则,可能会发生其他计算。根据事务的大小,此单个用户事务显然可能导致多达 O(n)
数据库 calls/roundtrips,甚至更多,具体取决于存在的其他属性。在存在大量属性的事务中,这可能会导致针对单个请求 (!) 的数百次数据库调用。我想尽可能将其降低到 O(1)
,以便我们的数据库负载更可预测。
自然地,batch 和 bulk inserts/updates 会想到这里。我想做的是使用 jOOQ
将所有这些语句批处理到一个批处理中,并在提交之前成功调用方法后执行。我发现了几个 (, jOOQ API, jOOQ GitHub Feature Request) posts where this topic is implicitly mentioned, and one user groups post 似乎与我的问题明确相关。
由于我将 Spring
与 jOOQ
一起使用,我相信我理想的解决方案(最好是声明式的)如下所示:
@Batched(100) // batch size as parameter, potentially
@Transactional
public void createTransaction(CreationCommand creationCommand) {
// all inserts/updates above are added to a batch and executed on successful invocation
}
为了让它工作,我想我需要管理一个范围(ThreadLocal
/Transactional
/Session
范围)资源,它可以跟踪当前批次,例如那:
- 如果方法是
@Batched
、 ,则在进入方法之前创建一个空批次
- 通过 DI 提供的自定义
DSLContext
(可能扩展 DefaultDSLContext
)有一个 ThreadLocal
标志,用于跟踪是否应批处理任何当前语句,并且如果是的话
- 拦截调用并将它们添加到当前批处理而不是立即执行它们。
但是,第 3 步需要重写我们的大部分代码(IMO)相对可读:
records.forEach(record -> {
record.setX(...);
// ...
record.store();
}
至:
userObjects.forEach(userObject -> {
dslContext.insertInto(...).values(userObject.getX(), ...).execute();
}
这首先会破坏这种抽象的目的,因为第二种形式也可以使用 DSLContext#batchStore
或 DSLContext#batchInsert
重写。然而,IMO,批处理和批量插入不应该由单个开发人员决定,应该能够在更高级别透明地处理(例如由框架)。
我发现 jOOQ
API 的可读性是使用它的一个惊人的好处,但是它似乎不适合(据我所知)interception/extension 非常适合此类情况。使用 jOOQ 3.11.1
(甚至当前)API,是否有可能通过透明 batch/bulk 处理获得类似于前者的行为?这意味着什么?
编辑:
我想到的一个可能但非常棘手的解决方案是启用透明的商店批处理,如下所示:
- 创建一个
RecordListener
并在启用批处理时将其作为默认值添加到 Configuration
。
- 在
RecordListener#storeStart
中,将查询添加到当前事务的批处理中(例如在 ThreadLocal<List>
中)
AbstractRecord
有一个 changed
标志,在存储之前会检查(org.jooq.impl.UpdatableRecordImpl#store0
、org.jooq.impl.TableRecordImpl#addChangedValues
)。重置它(并将其保存以备后用)会使存储操作成为空操作。
- 最后,方法调用成功但在提交之前:
- 将各个记录的
changes
标志重置为正确的值
- 调用
org.jooq.UpdatableRecord#store
,这次不使用 RecordListener
或跳过 storeStart
方法(可能使用另一个 ThreadLocal
标志来检查是否已执行批处理) .
据我所知,这种方法在理论上应该有效。显然,它非常 hacky 并且容易崩溃,因为如果代码依赖反射工作,库内部可能随时更改。
有谁知道更好的方法,只使用 public jOOQ
API?
jOOQ 3.14解决方案
您已经发现了相关的feature request #3419,它将在JDBC 级别从jOOQ 3.14 开始解决这个问题。你可以直接使用 BatchedConnection
,包装你自己的连接来实现下面的,或者使用这个 API:
ctx.batched(c -> {
// Make sure all records are attached to c, not ctx, e.g. by fetching from c.dsl()
records.forEach(record -> {
record.setX(...);
// ...
record.store();
}
});
jOOQ 3.13及之前解决方案
目前,在实施#3419 之前(在 jOOQ 3.14 中将实施),您可以自己实施此作为解决方法。您必须代理 JDBC Connection
和 PreparedStatement
以及 ...
...截取全部:
- 调用
Connection.prepareStatement(String)
,如果SQL字符串与最后一个准备好的语句相同,则返回一个缓存的代理语句,或者批量执行最后一个准备好的语句并创建一个新的。
- 调用
PreparedStatement.executeUpdate()
和 execute()
,并通过调用 PreparedStatement.addBatch()
替换它们
...委托全部:
- 呼叫其他 API,例如
Connection.createStatement()
,它应该刷新上面的缓冲批次,然后调用委托 API。
我不建议绕过 jOOQ 的 RecordListener
和其他 SPI,我认为这是缓冲数据库交互的错误抽象级别。此外,您还需要批处理其他语句类型。
请注意,默认情况下,jOOQ 的 UpdatableRecord
会尝试获取生成的标识值(请参阅 Settings.returnIdentityOnUpdatableRecord
),这会阻止批处理。此类 store()
调用必须立即执行,因为您可能希望标识值可用。
我们使用以下框架和版本:
jOOQ 3.11.1
Spring Boot 2.3.1.RELEASE
Spring 5.2.7.RELEASE
我有一个问题,我们的一些业务逻辑被划分为逻辑单元,如下所示:
- 收到包含用户交易的请求
- 此请求包含各种信息,例如交易类型、本次交易包含哪些产品、进行了何种付款等。
- 然后将这些属性单独存储在数据库中。
在代码中,大致如下所示:
TransactionRecord transaction = transactionRepository.create();
transaction.create(creationCommand);`
在 Transaction#create
(以事务方式运行)中,会发生如下情况:
storeTransaction();
storePayments();
storeProducts();
// ... other relevant information
一个给定的交易可以有许多不同类型的产品和属性,所有这些都被存储。其中许多属性会导致 UPDATE
语句,而有些属性可能会导致 INSERT
语句 - 事先很难完全知道。
例如,storeProducts
方法大致如下所示:
products.forEach(product -> {
ProductRecord record = productRepository.findProductByX(...);
if (record == null) {
record = productRepository.create();
record.setX(...);
record.store();
} else {
// do something else
}
});
如果产品是新的,则 INSERT
ed。否则,可能会发生其他计算。根据事务的大小,此单个用户事务显然可能导致多达 O(n)
数据库 calls/roundtrips,甚至更多,具体取决于存在的其他属性。在存在大量属性的事务中,这可能会导致针对单个请求 (!) 的数百次数据库调用。我想尽可能将其降低到 O(1)
,以便我们的数据库负载更可预测。
自然地,batch 和 bulk inserts/updates 会想到这里。我想做的是使用 jOOQ
将所有这些语句批处理到一个批处理中,并在提交之前成功调用方法后执行。我发现了几个 (
由于我将 Spring
与 jOOQ
一起使用,我相信我理想的解决方案(最好是声明式的)如下所示:
@Batched(100) // batch size as parameter, potentially
@Transactional
public void createTransaction(CreationCommand creationCommand) {
// all inserts/updates above are added to a batch and executed on successful invocation
}
为了让它工作,我想我需要管理一个范围(ThreadLocal
/Transactional
/Session
范围)资源,它可以跟踪当前批次,例如那:
- 如果方法是
@Batched
、 ,则在进入方法之前创建一个空批次
- 通过 DI 提供的自定义
DSLContext
(可能扩展DefaultDSLContext
)有一个ThreadLocal
标志,用于跟踪是否应批处理任何当前语句,并且如果是的话 - 拦截调用并将它们添加到当前批处理而不是立即执行它们。
但是,第 3 步需要重写我们的大部分代码(IMO)相对可读:
records.forEach(record -> {
record.setX(...);
// ...
record.store();
}
至:
userObjects.forEach(userObject -> {
dslContext.insertInto(...).values(userObject.getX(), ...).execute();
}
这首先会破坏这种抽象的目的,因为第二种形式也可以使用 DSLContext#batchStore
或 DSLContext#batchInsert
重写。然而,IMO,批处理和批量插入不应该由单个开发人员决定,应该能够在更高级别透明地处理(例如由框架)。
我发现 jOOQ
API 的可读性是使用它的一个惊人的好处,但是它似乎不适合(据我所知)interception/extension 非常适合此类情况。使用 jOOQ 3.11.1
(甚至当前)API,是否有可能通过透明 batch/bulk 处理获得类似于前者的行为?这意味着什么?
编辑:
我想到的一个可能但非常棘手的解决方案是启用透明的商店批处理,如下所示:
- 创建一个
RecordListener
并在启用批处理时将其作为默认值添加到Configuration
。 - 在
RecordListener#storeStart
中,将查询添加到当前事务的批处理中(例如在ThreadLocal<List>
中) AbstractRecord
有一个changed
标志,在存储之前会检查(org.jooq.impl.UpdatableRecordImpl#store0
、org.jooq.impl.TableRecordImpl#addChangedValues
)。重置它(并将其保存以备后用)会使存储操作成为空操作。- 最后,方法调用成功但在提交之前:
- 将各个记录的
changes
标志重置为正确的值 - 调用
org.jooq.UpdatableRecord#store
,这次不使用RecordListener
或跳过storeStart
方法(可能使用另一个ThreadLocal
标志来检查是否已执行批处理) .
据我所知,这种方法在理论上应该有效。显然,它非常 hacky 并且容易崩溃,因为如果代码依赖反射工作,库内部可能随时更改。
有谁知道更好的方法,只使用 public jOOQ
API?
jOOQ 3.14解决方案
您已经发现了相关的feature request #3419,它将在JDBC 级别从jOOQ 3.14 开始解决这个问题。你可以直接使用 BatchedConnection
,包装你自己的连接来实现下面的,或者使用这个 API:
ctx.batched(c -> {
// Make sure all records are attached to c, not ctx, e.g. by fetching from c.dsl()
records.forEach(record -> {
record.setX(...);
// ...
record.store();
}
});
jOOQ 3.13及之前解决方案
目前,在实施#3419 之前(在 jOOQ 3.14 中将实施),您可以自己实施此作为解决方法。您必须代理 JDBC Connection
和 PreparedStatement
以及 ...
...截取全部:
- 调用
Connection.prepareStatement(String)
,如果SQL字符串与最后一个准备好的语句相同,则返回一个缓存的代理语句,或者批量执行最后一个准备好的语句并创建一个新的。 - 调用
PreparedStatement.executeUpdate()
和execute()
,并通过调用PreparedStatement.addBatch()
替换它们
...委托全部:
- 呼叫其他 API,例如
Connection.createStatement()
,它应该刷新上面的缓冲批次,然后调用委托 API。
我不建议绕过 jOOQ 的 RecordListener
和其他 SPI,我认为这是缓冲数据库交互的错误抽象级别。此外,您还需要批处理其他语句类型。
请注意,默认情况下,jOOQ 的 UpdatableRecord
会尝试获取生成的标识值(请参阅 Settings.returnIdentityOnUpdatableRecord
),这会阻止批处理。此类 store()
调用必须立即执行,因为您可能希望标识值可用。