完成所有工作后终止现有池

Terminate existing pool when all work is done

好的,b运行d 是 gpars 的新手所以如果这有明显的答案请原谅我。

这是我的场景。我们目前有一段代码包装在 Thread.start {} 块中。它这样做是为了在后台将消息发送到消息队列,而不是阻止用户请求。我们最近 运行 遇到的一个问题是针对大型工作块,用户可能会执行另一个操作,这会导致该块再次执行。由于它是线程化的,因此第二批消息可能会在第一批消息之前发送,从而导致数据损坏。

我想将此流程更改为使用 gpars 的队列流。我见过创建池的示例,例如

def pool = GParsPool.createPool()

def pool = new ForkJoinPool()

然后将池用作

GParsPool.withExistingPool(pool) {
    ...
}

这似乎可以解释如果用户再次执行操作的情况,我可以重用创建的池并且操作不会乱序执行,前提是我的池大小为 1。

我的问题是,这是使用 gpars 执行此操作的最佳方法吗?此外,我怎么知道池何时完成所有工作?当所有工作完成时它会终止吗?如果是这样,是否有一种方法可用于检查池是否 finished/terminated 以了解我需要一个新池?

如有任何帮助,我们将不胜感激。

不,显式创建的池不会自行终止。您必须明确地对它们调用 shutdown()。

但是,使用 withPool() {} 命令将保证在代码块完成后销毁池。

这是我们目前对问题的解决方案。需要注意的是,由于我们的要求,我们选择了这条路线

  • 作品按某些上下文分组
  • 给定上下文中的工作是有序的
  • 给定上下文中的工作是同步的
  • 上下文的其他工作应在前面的工作之后执行
  • 工作不应阻止用户请求
  • 上下文之间是异步的
  • 上下文工作完成后,上下文应该自行清理

鉴于上述情况,我们实施了以下措施:

class AsyncService {
    def queueContexts


    def AsyncService() {
        queueContexts = new QueueContexts()
    }

    def queue(contextString, closure) {
        queueContexts.retrieveContextWithWork(contextString, true).send(closure)
    }

    class QueueContexts {
        def contextMap = [:]

        def synchronized retrieveContextWithWork(contextString, incrementWork) {
            def context = contextMap[contextString]

            if (context) {
                if (!context.hasWork(incrementWork)) {
                    contextMap.remove(contextString)
                    context.terminate()
                }
            } else {
                def queueContexts = this
                contextMap[contextString] = new QueueContext({->
                    queueContexts.retrieveContextWithWork(contextString, false)
                })
            }

            contextMap[contextString]
        }

        class QueueContext {
            def workCount
            def actor

            def QueueContext(finishClosure) {
                workCount = 1
                actor = Actors.actor {
                    loop {
                        react { closure ->
                            try {
                                closure()
                            } catch (Throwable th) {
                                log.error("Uncaught exception in async queue context", th)
                            }

                            finishClosure()
                        }
                    }
                }
            }

            def send(closure) {
                actor.send(closure)
            }

            def terminate(){
                actor.terminate()
            }

            def hasWork(incrementWork) {
                workCount += (incrementWork ? 1 : -1)
                workCount > 0
            }
        }
    }
}