在 Async Promise 中批量加载时的 Grails DuplicateKeyException / NonUniqueObjectException
Grails DuplicateKeyException / NonUniqueObjectException when batch loading within Async Promise
我有大量数据想使用 GORM 加载到数据库中。
class DbLoadingService {
static transactional = false
// these are used to expedite the batch loading process
def sessionFactory
def propertyInstanceMap = org.codehaus.groovy.grails.plugins.DomainClassGrailsPlugin.PROPERTY_INSTANCE_MAP
// these are example services that will assist in the parsing of the input data
def auxLoadingServiceA
def auxLoadingServiceB
def handleInputFile(String filename) {
def inputFile = new File(filename)
// parse each line and process according to record type
inputFile.eachLine { line, lineNumber ->
this.handleLine(line, lineNumber)
}
}
@Transactional
def handleLine(String line, int lineNumber) {
// do some further parsing of the line, based on its content
// example here is based on 1st 2 chars of line
switch (line[0..1]) {
case 'AA':
auxLoadingServiceA.doSomethingWithLine(line)
break;
case 'BB':
auxLoadingServiceB.doSomethingElseWithLine(line)
break;
default:
break;
}
if (lineNumber % 100 == 0) cleanUpGorm()
}
def cleanUpGorm() {
def session = sessionFactory.getCurrentSession()
session.flush()
session.clear()
propertyInstanceMap.get().clear()
}
}
class AuxLoadingServiceA {
static transactional = false
doSomethingWithLine(String line) {
// do something here
}
}
class AuxLoadingServiceB {
static transactional = false
doSomethingElseWithLine(String line) {
// do something else here
}
}
我特意让顶层服务只针对每条线路的负载进行事务处理。顶层下实际上有很多级别的服务,而不仅仅是显示的单个 Aux A & B 服务层。因此我不想有多层交易的开销:我想我应该只需要 1.
正在加载到数据库中的数据模型包括几个具有 hasMany / belongsTo 关系的域对象。这种与域对象的交互是在子层内完成的,并且没有显示在我的代码中,以便使示例的大小易于管理。
似乎导致问题的域对象与此类似:
class Parent {
static hasMany = [children: Child]
static mapping = {
children lazy: false
cache true
}
}
class Child {
String someValue
// also contains some other sub-objects
static belongsTo = [parent : Parent]
static mapping = {
parent index: 'parent_idx'
cache true
}
}
需要显示的 cleanupGorm() 方法,否则服务会在大量行后完全停止。
当我启动数据库加载时,一切都完全按预期工作:
// Called from with a service / controller
dbLoadingService.handleInputFile("someFile.txt")
但是,一旦我将负载移动到异步进程中,就像这样:
def promise = task {
dbLoadingService.handleInputFile("someFile.txt")
}
我得到一个 DuplicateKeyException / NonUniqueObjectException:
error details: org.springframework.dao.DuplicateKeyException: A different object with the same identifier value was already associated with the session : [com.example.SampleDomainObject#1]; nested exception is org.hibernate.NonUniqueObjectException: A different object with the same identifier value was already associated with the session : [com.example.SampleDomainObject#1]
所以,我的问题是,将大量数据异步加载到 Grails 数据库中的最佳实践是什么?关于刷新/清除会话以确保内存中的对象在会话中是一致的,是否需要做些什么?缓存对象时有什么需要做的吗?
解决方案是按照@JoshuaMoore 的建议进行操作并使用新会话。此外,有一个域对象的引用是从一个事务外部引用的,然后在新会话中没有调用 merge(),从而导致错误。
即
def obj = DomainObject.findBySomeProperty('xyz')
// now start new session
obj.someProperty // causes exception
obj = obj.merge()
obj.someProperty // doesn't cause an exception
Joshua 的评论促使我深入研究了 Hibernate 的文档 (https://docs.jboss.org/hibernate/orm/3.6/reference/en-US/html/transactions.html)
具体来说,来自第 13 章:
A SessionFactory is an expensive-to-create, threadsafe object,
intended to be shared by all application threads. It is created once,
usually on application startup, from a Configuration instance.
A Session is an inexpensive, non-threadsafe object that should be used
once and then discarded for: a single request, a conversation or a
single unit of work. A Session will not obtain a JDBC Connection, or a
Datasource, unless it is needed. It will not consume any resources
until used.
其他人可能感兴趣的是,我一直看到随着被解析对象数量的增加,批处理加载的性能逐渐下降,即使采用了 Burt Beckwith here: and explained in further detail by Ted Naleid here 建议的性能优化。
因此,使用文档中的提示,性能问题的答案不是尝试将会话用于所有处理 - 而是使用它进行少量处理,然后将其丢弃并创建一个新的。
当我删除问题中的 cleanupGorm() 方法并将其替换为以下方法时,我的性能提高了 6 倍,而加载时间完全没有增加具有批量大小,即使在解析了数百万条记录之后也是如此:
// somewhere in the service method that is doing the batch parse
def currentSession = sessionFactory.openSession()
// start some form of batch parse, perhaps in a loop
// do work here
// periodically, perhaps in the %N way shown above
currentSession.flush()
currentSession.close()
currentSession = sessionFactory.openSession()
// end of loop
我需要在跨服务的事务中打包内容,我执行了以下操作:
currentSession = sessionFactory.openSession()
currentSession.beginTransaction()
// start loop
// do work
// when we want to commit
def tx = currentSession?.getTransaction()
if (tx?.isActive()) tx.commit()
currentSession?.close()
// if we're in a loop and we need a new transaction
currentSession = sessionFactory.openSession()
currentSession.beginTransaction()
虽然我接受使用 Spring Batch 之类的东西可能更好,但它会涉及丢弃大量代码,否则这些代码可以按预期工作。下次我需要这样做时,我会研究一下,但与此同时,希望这可能对需要使用 Grails 进行大规模批处理的其他人有用,并且发现批量大小会降低性能.
Joshua 的注意事项:非常感谢您的帮助,非常感谢!
我有大量数据想使用 GORM 加载到数据库中。
class DbLoadingService {
static transactional = false
// these are used to expedite the batch loading process
def sessionFactory
def propertyInstanceMap = org.codehaus.groovy.grails.plugins.DomainClassGrailsPlugin.PROPERTY_INSTANCE_MAP
// these are example services that will assist in the parsing of the input data
def auxLoadingServiceA
def auxLoadingServiceB
def handleInputFile(String filename) {
def inputFile = new File(filename)
// parse each line and process according to record type
inputFile.eachLine { line, lineNumber ->
this.handleLine(line, lineNumber)
}
}
@Transactional
def handleLine(String line, int lineNumber) {
// do some further parsing of the line, based on its content
// example here is based on 1st 2 chars of line
switch (line[0..1]) {
case 'AA':
auxLoadingServiceA.doSomethingWithLine(line)
break;
case 'BB':
auxLoadingServiceB.doSomethingElseWithLine(line)
break;
default:
break;
}
if (lineNumber % 100 == 0) cleanUpGorm()
}
def cleanUpGorm() {
def session = sessionFactory.getCurrentSession()
session.flush()
session.clear()
propertyInstanceMap.get().clear()
}
}
class AuxLoadingServiceA {
static transactional = false
doSomethingWithLine(String line) {
// do something here
}
}
class AuxLoadingServiceB {
static transactional = false
doSomethingElseWithLine(String line) {
// do something else here
}
}
我特意让顶层服务只针对每条线路的负载进行事务处理。顶层下实际上有很多级别的服务,而不仅仅是显示的单个 Aux A & B 服务层。因此我不想有多层交易的开销:我想我应该只需要 1.
正在加载到数据库中的数据模型包括几个具有 hasMany / belongsTo 关系的域对象。这种与域对象的交互是在子层内完成的,并且没有显示在我的代码中,以便使示例的大小易于管理。
似乎导致问题的域对象与此类似:
class Parent {
static hasMany = [children: Child]
static mapping = {
children lazy: false
cache true
}
}
class Child {
String someValue
// also contains some other sub-objects
static belongsTo = [parent : Parent]
static mapping = {
parent index: 'parent_idx'
cache true
}
}
需要显示的 cleanupGorm() 方法,否则服务会在大量行后完全停止。
当我启动数据库加载时,一切都完全按预期工作:
// Called from with a service / controller
dbLoadingService.handleInputFile("someFile.txt")
但是,一旦我将负载移动到异步进程中,就像这样:
def promise = task {
dbLoadingService.handleInputFile("someFile.txt")
}
我得到一个 DuplicateKeyException / NonUniqueObjectException:
error details: org.springframework.dao.DuplicateKeyException: A different object with the same identifier value was already associated with the session : [com.example.SampleDomainObject#1]; nested exception is org.hibernate.NonUniqueObjectException: A different object with the same identifier value was already associated with the session : [com.example.SampleDomainObject#1]
所以,我的问题是,将大量数据异步加载到 Grails 数据库中的最佳实践是什么?关于刷新/清除会话以确保内存中的对象在会话中是一致的,是否需要做些什么?缓存对象时有什么需要做的吗?
解决方案是按照@JoshuaMoore 的建议进行操作并使用新会话。此外,有一个域对象的引用是从一个事务外部引用的,然后在新会话中没有调用 merge(),从而导致错误。
即
def obj = DomainObject.findBySomeProperty('xyz')
// now start new session
obj.someProperty // causes exception
obj = obj.merge()
obj.someProperty // doesn't cause an exception
Joshua 的评论促使我深入研究了 Hibernate 的文档 (https://docs.jboss.org/hibernate/orm/3.6/reference/en-US/html/transactions.html)
具体来说,来自第 13 章:
A SessionFactory is an expensive-to-create, threadsafe object, intended to be shared by all application threads. It is created once, usually on application startup, from a Configuration instance.
A Session is an inexpensive, non-threadsafe object that should be used once and then discarded for: a single request, a conversation or a single unit of work. A Session will not obtain a JDBC Connection, or a Datasource, unless it is needed. It will not consume any resources until used.
其他人可能感兴趣的是,我一直看到随着被解析对象数量的增加,批处理加载的性能逐渐下降,即使采用了 Burt Beckwith here: and explained in further detail by Ted Naleid here 建议的性能优化。
因此,使用文档中的提示,性能问题的答案不是尝试将会话用于所有处理 - 而是使用它进行少量处理,然后将其丢弃并创建一个新的。
当我删除问题中的 cleanupGorm() 方法并将其替换为以下方法时,我的性能提高了 6 倍,而加载时间完全没有增加具有批量大小,即使在解析了数百万条记录之后也是如此:
// somewhere in the service method that is doing the batch parse
def currentSession = sessionFactory.openSession()
// start some form of batch parse, perhaps in a loop
// do work here
// periodically, perhaps in the %N way shown above
currentSession.flush()
currentSession.close()
currentSession = sessionFactory.openSession()
// end of loop
我需要在跨服务的事务中打包内容,我执行了以下操作:
currentSession = sessionFactory.openSession()
currentSession.beginTransaction()
// start loop
// do work
// when we want to commit
def tx = currentSession?.getTransaction()
if (tx?.isActive()) tx.commit()
currentSession?.close()
// if we're in a loop and we need a new transaction
currentSession = sessionFactory.openSession()
currentSession.beginTransaction()
虽然我接受使用 Spring Batch 之类的东西可能更好,但它会涉及丢弃大量代码,否则这些代码可以按预期工作。下次我需要这样做时,我会研究一下,但与此同时,希望这可能对需要使用 Grails 进行大规模批处理的其他人有用,并且发现批量大小会降低性能.
Joshua 的注意事项:非常感谢您的帮助,非常感谢!