在 Async Promise 中批量加载时的 Grails DuplicateKeyException / NonUniqueObjectException

Grails DuplicateKeyException / NonUniqueObjectException when batch loading within Async Promise

我有大量数据想使用 GORM 加载到数据库中。

class DbLoadingService {

    static transactional = false    
    // these are used to expedite the batch loading process
    def sessionFactory
    def propertyInstanceMap = org.codehaus.groovy.grails.plugins.DomainClassGrailsPlugin.PROPERTY_INSTANCE_MAP

    // these are example services that will assist in the parsing of the input data    
    def auxLoadingServiceA
    def auxLoadingServiceB

    def handleInputFile(String filename) {
        def inputFile = new File(filename)
        // parse each line and process according to record type
        inputFile.eachLine { line, lineNumber ->
            this.handleLine(line, lineNumber)
        }
    }


    @Transactional
    def handleLine(String line, int lineNumber) {
        // do some further parsing of the line, based on its content
        // example here is based on 1st 2 chars of line
        switch (line[0..1]) {
            case 'AA':
                auxLoadingServiceA.doSomethingWithLine(line)
                break;

            case 'BB':
                auxLoadingServiceB.doSomethingElseWithLine(line)
                break;

            default:
                break;

        }
        if (lineNumber % 100 == 0) cleanUpGorm()
    }

    def cleanUpGorm() {
        def session = sessionFactory.getCurrentSession()
        session.flush()
        session.clear()
        propertyInstanceMap.get().clear()
    }

}

class AuxLoadingServiceA {
    static transactional = false

    doSomethingWithLine(String line) {
        // do something here
    }
}

class AuxLoadingServiceB {
    static transactional = false

    doSomethingElseWithLine(String line) {
        // do something else here
    }
}

我特意让顶层服务只针对每条线路的负载进行事务处理。顶层下实际上有很多级别的服务,而不仅仅是显示的单个 Aux A & B 服务层。因此我不想有多层交易的开销:我想我应该只需要 1.

正在加载到数据库中的数据模型包括几个具有 hasMany / belongsTo 关系的域对象。这种与域对象的交互是在子层内完成的,并且没有显示在我的代码中,以便使示例的大小易于管理。

似乎导致问题的域对象与此类似:

class Parent {
    static hasMany = [children: Child]
    static mapping = {
        children lazy: false
        cache true
    }
}

class Child {
    String someValue
    // also contains some other sub-objects

    static belongsTo = [parent : Parent]

    static mapping = {
        parent index: 'parent_idx'
        cache true
    }
}

需要显示的 cleanupGorm() 方法,否则服务会在大量行后完全停止。

当我启动数据库加载时,一切都完全按预期工作:

// Called from with a service / controller
dbLoadingService.handleInputFile("someFile.txt")

但是,一旦我将负载移动到异步进程中,就像这样:

def promise = task {
    dbLoadingService.handleInputFile("someFile.txt")
}

我得到一个 DuplicateKeyException / NonUniqueObjectException:

error details: org.springframework.dao.DuplicateKeyException: A different object with the same identifier value was already associated with the session : [com.example.SampleDomainObject#1]; nested exception is org.hibernate.NonUniqueObjectException: A different object with the same identifier value was already associated with the session : [com.example.SampleDomainObject#1]

所以,我的问题是,将大量数据异步加载到 Grails 数据库中的最佳实践是什么?关于刷新/清除会话以确保内存中的对象在会话中是一致的,是否需要做些什么?缓存对象时有什么需要做的吗?

解决方案是按照@JoshuaMoore 的建议进行操作并使用新会话。此外,有一个域对象的引用是从一个事务外部引用的,然后在新会话中没有调用 merge(),从而导致错误。

def obj = DomainObject.findBySomeProperty('xyz')

// now start new session

obj.someProperty // causes exception
obj = obj.merge()
obj.someProperty // doesn't cause an exception

Joshua 的评论促使我深入研究了 Hibernate 的文档 (https://docs.jboss.org/hibernate/orm/3.6/reference/en-US/html/transactions.html)

具体来说,来自第 13 章:

A SessionFactory is an expensive-to-create, threadsafe object, intended to be shared by all application threads. It is created once, usually on application startup, from a Configuration instance.

A Session is an inexpensive, non-threadsafe object that should be used once and then discarded for: a single request, a conversation or a single unit of work. A Session will not obtain a JDBC Connection, or a Datasource, unless it is needed. It will not consume any resources until used.

其他人可能感兴趣的是,我一直看到随着被解析对象数量的增加,批处理加载的性能逐渐下降,即使采用了 Burt Beckwith here: and explained in further detail by Ted Naleid here 建议的性能优化。

因此,使用文档中的提示,性能问题的答案不是尝试将会话用于所有处理 - 而是使用它进行少量处理,然后将其丢弃并创建一个新的。

当我删除问题中的 cleanupGorm() 方法并将其替换为以下方法时,我的性能提高了 6 倍,而加载时间完全没有增加具有批量大小,即使在解析了数百万条记录之后也是如此:

// somewhere in the service method that is doing the batch parse
def currentSession = sessionFactory.openSession()

// start some form of batch parse, perhaps in a loop

    // do work here
    // periodically, perhaps in the %N way shown above
    currentSession.flush()
    currentSession.close()
    currentSession = sessionFactory.openSession()

// end of loop

我需要在跨服务的事务中打包内容,我执行了以下操作:

currentSession = sessionFactory.openSession()
currentSession.beginTransaction()

// start loop
// do work

// when we want to commit
def tx = currentSession?.getTransaction()
if (tx?.isActive()) tx.commit()
currentSession?.close()

// if we're in a loop and we need a new transaction
currentSession = sessionFactory.openSession()
currentSession.beginTransaction()

虽然我接受使用 Spring Batch 之类的东西可能更好,但它会涉及丢弃大量代码,否则这些代码可以按预期工作。下次我需要这样做时,我会研究一下,但与此同时,希望这可能对需要使用 Grails 进行大规模批处理的其他人有用,并且发现批量大小会降低性能.

Joshua 的注意事项:非常感谢您的帮助,非常感谢!