Grails 异步 Promise / for 循环错误 - 如何将变量传递给任务关闭

Grails asynchronous Promise / for loop error - how to pass variable to task closure

我有很多文件需要解析,所以我在多个线程中执行此操作。

int fileCount = 16
def promiseList = []
for (int i = 1; i <= fileCount; i++) {
    println i
    def p = task {
        println "${new Date()} Starting parse of schedules (${i})..."
        // do some parsing here, where I need access to the value i     
    }
    p.onError {Throwable t ->
        println "Serious error when loading schedule ${i}, ${t.getMessage()}"
        t.printStackTrace()
    }
    promiseList << p
}
waitAll(promiseList)

这里的想法是创建多个承诺,将它们全部设置为异步 运行 然后让它们全部完成。

Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (1)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (3)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (4)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (5)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (6)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (7)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (8)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (9)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (10)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (11)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (13)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (12)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (14)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (15)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (16)...
Wed Feb 25 16:36:36 GMT 2015 Starting parse of schedules (17)...

问题:2 号任务究竟发生了什么?如果我输入:

println i

进入 for 循环,然后一切都符合预期(即我得到 1..16)。我认为变量正在递增,然后在稍后的某个时候闭包查看变量并且它已经递增,因此值不正确。如何将值 i 传递给闭包以便获得正确的值?

编辑:为了它的价值,我可以破解它,我真的不喜欢这样做,通过插入:

// for ()
    int taskNumber = i
    // create task here, refer to local variable taskNumber instead of i
    Thread.currentThread().sleep((long)(1000))

在 for 循环的末尾,以便在下一个循环发生之前正确实例化任务。然而,这是一个非常讨厌的 hack,我敢肯定必须有更好的方法将变量传递给线程?

进一步编辑:

这是我使用以下方法实现 for 循环时的输出:

for (i in 1..fileCount)

Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (1)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (3)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (4)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (5)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (6)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (8)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (7)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (9)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (11)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (13)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (13)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (13)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (14)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (15)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (16)...
Wed Feb 25 16:57:54 GMT 2015 Starting parse of schedules (16)...

我有一个似乎有效的解决方案 ,但我不确定这是否出于偶然

编辑:如果我在任务中添加一个 sleep(),正如@cfrick 在问题评论中所建议的那样,如下所示,那么所有项目都会被处理(尽管顺序是随机的)。

for 循环应替换为 collect:

(1..fileCount).collect { i->
    def p = task {
        // this sleep() call is added to check if all tasks are run with the correct parameters
        Thread.currentThread().sleep((long)(1000))
        int taskNumber = i
        println "${new Date()} Starting parse of schedules (${taskNumber})..."
    }
    p.onError {Throwable t ->
        int taskNumber = i
        println "Serious error when loading schedule ${taskNumber}, ${t.getMessage()}"
        t.printStackTrace()
    }
    promiseList << p
}
waitAll(promiseList)

所以,按照@cfrick 的建议,代码可以修改得更简单:

promiseList = (1..fileCount).collect { i->
    def p = task {
        println "${new Date()} Starting parse of schedules (${i})..."
        // do work here
    }
    p.onError {Throwable t ->
        println "Serious error when loading schedule ${i}, ${t.getMessage()}"
        t.printStackTrace()
    }
    return p
}
waitAll(promiseList)

i 目前在您的所有 closures/tasks 之间共享。在闭包创建期间,您需要将 i 的副本从当前 context/current 线程传递到闭包本身。您可以使用柯里化来实现,例如:

def p = task({int locali -> 
    println "${new Date()} Starting parse of schedules (${locali})..."
    // do some parsing here, where I need access to the value i     
}.curry(i))
p.onError({int locali, Throwable t ->
    println "Serious error when loading schedule ${locali}, ${t.getMessage()}"
    t.printStackTrace()
}.curry(i))

另一种方法是从不同的上下文创建闭包(更多 Java 方式,基本上这就是您对 collect hack 所做的):

Closure createTask(final int i) {
  return {
    println "${new Date()} Starting parse of schedules (${i})..."
    // do some parsing here, where I need access to the value i     
  }
}

然后:

def p = task(createTask(i))