将 GPars 循环收集到地图

Collecting a GPars loop to a Map

我需要迭代一个 List 并为每个项目 运行 一个耗时的操作,然后将其结果收集到地图中,如下所示:

List<String> strings = ['foo', 'bar', 'baz']
Map<String, Object> result = strings.collectEntries { key ->
    [key, expensiveOperation(key)]
}

所以我的结果是这样的

[foo: <an object>, bar: <another object>, baz: <another object>]

因为我需要做的操作很长而且不相互依赖,所以我一直愿意研究使用 GPars 来 运行 并行循环。

但是,GPars 有一个 collectParallel 方法,它并行循环遍历一个集合并收集到一个列表,而不是一个 collectEntriesParallel 收集到一个地图:正确的方法是什么GPars?

没有 collectEntriesParallel 因为它必须产生与以下相同的结果:

collectParallel {}.collectEntries {}

。除了将结果并行收集到列表中并最终以顺序方式收集到映射条目之外,很难以确定性方式减少要映射(或任何其他可变容器)的值列表。考虑以下顺序示例:

static def expensiveOperation(String key) {
    Thread.sleep(1000)
    return key.reverse()
}

List<String> strings = ['foo', 'bar', 'baz']


GParsPool.withPool {
    def result = strings.inject([:]) { seed, key ->
        println "[${Thread.currentThread().name}] (${System.currentTimeMillis()}) seed = ${seed}, key = ${key}"
        seed + [(key): expensiveOperation(key.toString())]
    }

    println result
}

在此示例中,我们使用 Collection.inject(initialValue, closure),它等效于旧的 "fold left" 操作 - 它从初始值 [:] 开始,遍历所有值并将它们添加为键和初始地图的价值。这种情况下的顺序执行大约需要 3 秒(每个 expensiveOperation() 休眠 1 秒)。

控制台输出:

[main] (1519925046610) seed = [:], key = foo
[main] (1519925047773) seed = [foo:oof], key = bar
[main] (1519925048774) seed = [foo:oof, bar:rab], key = baz
[foo:oof, bar:rab, baz:zab]

这基本上就是 collectEntries() 所做的 - 它是一种缩减操作,其中初始值为空映射。

现在让我们看看如果我们尝试并行化它会发生什么 - 我们将使用 injectParallel 方法而不是 inject 方法:

GParsPool.withPool {
    def result = strings.injectParallel([:]) { seed, key ->
        println "[${Thread.currentThread().name}] (${System.currentTimeMillis()}) seed = ${seed}, key = ${key}"
        seed + [(key): expensiveOperation(key.toString())]
    }

    println result
}

让我们看看结果是什么:

[ForkJoinPool-1-worker-1] (1519925323803) seed = foo, key = bar
[ForkJoinPool-1-worker-2] (1519925323811) seed = baz, key = [:]
[ForkJoinPool-1-worker-1] (1519925324822) seed = foo[bar:rab], key = baz[[:]:]:[]
foo[bar:rab][baz[[:]:]:[]:][:]:]:[[zab]

如您所见,inject 的并行版本不关心顺序(这是预期的),例如第一个线程收到 foo 作为 seed 变量和 bar 作为键。如果在没有特定顺序的情况下并行执行对映射(或任何可变对象)的缩减,就会发生这种情况。

解决方案

有两种并行处理的方法:

1。 collectParallel + collectEntries 组合

正如 Tim Yates 在评论中提到的,您可以并行执行昂贵的操作并最终按顺序将结果收集到地图中:

static def expensiveOperation(String key) {
    Thread.sleep(1000)
    return key.reverse()
}

List<String> strings = ['foo', 'bar', 'baz']

GParsPool.withPool {
    def result = strings.collectParallel { [it, expensiveOperation(it)] }.collectEntries { [(it[0]): it[1]] }

    println result
}

此示例大约在 1 秒内执行并产生以下输出:

[foo:oof, bar:rab, baz:zab]

2。 Java的并行流

或者,您可以使用 Java 的并行流和 Collectors.toMap() reducer 函数:

static def expensiveOperation(String key) {
    Thread.sleep(1000)
    return key.reverse()
}

List<String> strings = ['foo', 'bar', 'baz']

def result = strings.parallelStream()
        .collect(Collectors.toMap(Function.identity(), { str -> expensiveOperation(str)}))

println result 

此示例也将在大约 1 秒内执行并产生如下输出:

[bar:rab, foo:oof, baz:zab]

希望对您有所帮助。