完成所有工作后终止现有池
Terminate existing pool when all work is done
好的,b运行d 是 gpars 的新手所以如果这有明显的答案请原谅我。
这是我的场景。我们目前有一段代码包装在 Thread.start {} 块中。它这样做是为了在后台将消息发送到消息队列,而不是阻止用户请求。我们最近 运行 遇到的一个问题是针对大型工作块,用户可能会执行另一个操作,这会导致该块再次执行。由于它是线程化的,因此第二批消息可能会在第一批消息之前发送,从而导致数据损坏。
我想将此流程更改为使用 gpars 的队列流。我见过创建池的示例,例如
def pool = GParsPool.createPool()
或
def pool = new ForkJoinPool()
然后将池用作
GParsPool.withExistingPool(pool) {
...
}
这似乎可以解释如果用户再次执行操作的情况,我可以重用创建的池并且操作不会乱序执行,前提是我的池大小为 1。
我的问题是,这是使用 gpars 执行此操作的最佳方法吗?此外,我怎么知道池何时完成所有工作?当所有工作完成时它会终止吗?如果是这样,是否有一种方法可用于检查池是否 finished/terminated 以了解我需要一个新池?
如有任何帮助,我们将不胜感激。
不,显式创建的池不会自行终止。您必须明确地对它们调用 shutdown()。
但是,使用 withPool() {} 命令将保证在代码块完成后销毁池。
这是我们目前对问题的解决方案。需要注意的是,由于我们的要求,我们选择了这条路线
- 作品按某些上下文分组
- 给定上下文中的工作是有序的
- 给定上下文中的工作是同步的
- 上下文的其他工作应在前面的工作之后执行
- 工作不应阻止用户请求
- 上下文之间是异步的
- 上下文工作完成后,上下文应该自行清理
鉴于上述情况,我们实施了以下措施:
class AsyncService {
def queueContexts
def AsyncService() {
queueContexts = new QueueContexts()
}
def queue(contextString, closure) {
queueContexts.retrieveContextWithWork(contextString, true).send(closure)
}
class QueueContexts {
def contextMap = [:]
def synchronized retrieveContextWithWork(contextString, incrementWork) {
def context = contextMap[contextString]
if (context) {
if (!context.hasWork(incrementWork)) {
contextMap.remove(contextString)
context.terminate()
}
} else {
def queueContexts = this
contextMap[contextString] = new QueueContext({->
queueContexts.retrieveContextWithWork(contextString, false)
})
}
contextMap[contextString]
}
class QueueContext {
def workCount
def actor
def QueueContext(finishClosure) {
workCount = 1
actor = Actors.actor {
loop {
react { closure ->
try {
closure()
} catch (Throwable th) {
log.error("Uncaught exception in async queue context", th)
}
finishClosure()
}
}
}
}
def send(closure) {
actor.send(closure)
}
def terminate(){
actor.terminate()
}
def hasWork(incrementWork) {
workCount += (incrementWork ? 1 : -1)
workCount > 0
}
}
}
}
好的,b运行d 是 gpars 的新手所以如果这有明显的答案请原谅我。
这是我的场景。我们目前有一段代码包装在 Thread.start {} 块中。它这样做是为了在后台将消息发送到消息队列,而不是阻止用户请求。我们最近 运行 遇到的一个问题是针对大型工作块,用户可能会执行另一个操作,这会导致该块再次执行。由于它是线程化的,因此第二批消息可能会在第一批消息之前发送,从而导致数据损坏。
我想将此流程更改为使用 gpars 的队列流。我见过创建池的示例,例如
def pool = GParsPool.createPool()
或
def pool = new ForkJoinPool()
然后将池用作
GParsPool.withExistingPool(pool) {
...
}
这似乎可以解释如果用户再次执行操作的情况,我可以重用创建的池并且操作不会乱序执行,前提是我的池大小为 1。
我的问题是,这是使用 gpars 执行此操作的最佳方法吗?此外,我怎么知道池何时完成所有工作?当所有工作完成时它会终止吗?如果是这样,是否有一种方法可用于检查池是否 finished/terminated 以了解我需要一个新池?
如有任何帮助,我们将不胜感激。
不,显式创建的池不会自行终止。您必须明确地对它们调用 shutdown()。
但是,使用 withPool() {} 命令将保证在代码块完成后销毁池。
这是我们目前对问题的解决方案。需要注意的是,由于我们的要求,我们选择了这条路线
- 作品按某些上下文分组
- 给定上下文中的工作是有序的
- 给定上下文中的工作是同步的
- 上下文的其他工作应在前面的工作之后执行
- 工作不应阻止用户请求
- 上下文之间是异步的
- 上下文工作完成后,上下文应该自行清理
鉴于上述情况,我们实施了以下措施:
class AsyncService {
def queueContexts
def AsyncService() {
queueContexts = new QueueContexts()
}
def queue(contextString, closure) {
queueContexts.retrieveContextWithWork(contextString, true).send(closure)
}
class QueueContexts {
def contextMap = [:]
def synchronized retrieveContextWithWork(contextString, incrementWork) {
def context = contextMap[contextString]
if (context) {
if (!context.hasWork(incrementWork)) {
contextMap.remove(contextString)
context.terminate()
}
} else {
def queueContexts = this
contextMap[contextString] = new QueueContext({->
queueContexts.retrieveContextWithWork(contextString, false)
})
}
contextMap[contextString]
}
class QueueContext {
def workCount
def actor
def QueueContext(finishClosure) {
workCount = 1
actor = Actors.actor {
loop {
react { closure ->
try {
closure()
} catch (Throwable th) {
log.error("Uncaught exception in async queue context", th)
}
finishClosure()
}
}
}
}
def send(closure) {
actor.send(closure)
}
def terminate(){
actor.terminate()
}
def hasWork(incrementWork) {
workCount += (incrementWork ? 1 : -1)
workCount > 0
}
}
}
}