GParsExecutorsPool.withPool 没有建立异步连接
GParsExecutorsPool.withPool is not making Async connections
我正在使用 groovy 和 Gpars 建立异步连接。作为对我的 API 的请求,我收到了一个巨大的 JSON,我正在使用 JSON 路径拆分 JSON。
$.${jsonobjstpath}..[i][j],其中 i 和 j 是 0:20 范围内的值,并对其进行循环。我能够得到正确的拆分 json。
我使用 GParsExecutorsPool.withPool 将这些 JSON 批次发送到我的 API。但是 gpar 正在等待响应。
假设如果一个请求的处理 API 需要 10 秒,gpar 正在等待 10 秒发送控制器到循环。我的代码如下。
import groovyx.gpars.GParsExecutorsPool;
import groovyx.gpars.GParsPool;
import jsr166y.ForkJoinPool;
import jsr166y.RecursiveTask;
def invoke(mega) {
def size=mega.get("size"); //get size of actual JSON objects
def body=mega.get("content.body"); // Load JSON body
int counter=Math.ceil(size/20); //Get Number of loops to run
def Path="/AsyncInmp"; // Path to call function
def Name="SplitJsonObject"; //Name of Function
int i=0;
int j=19;
while(j<=size) {
msg.put("i",i); //Send i value to function
msg.put("j",j); // Send j value to function
callPolicy(Path,Name,body); //Call function json path to get split json, receiving JSON with i and j values
def split_body=resp.body;//response from split json
def Path2="/AsyncInmp"; //path to connection function
def Name2="connect"; //name of connection function
GParsExecutorsPool.withPool {
(0..<1).eachParallel { k ->
callPolicy(Path2, Name2,split_body) //Call function to connect using gpars, This is not working
}
}
j=j+20;
i=i+20;
}
return true;
}
- So how can i make async call using gpar as soon as my split json request is ready
- how can i collect response from all async call
您在 while
循环中调用 withPool
并在 eachParallel
中使用大小为 1 的范围,我猜想这些东西结合起来本质上使您的代码表现在单线程方式。
将它改成这样:
import java.util.concurrent.CopyOnWriteArrayList
def futures = [] as CopyOnWriteArrayList
GParsExecutorsPool.withPool {
while(...) {
...
futures << {
callPolicy(Path2, Name2,split_body)
}.async().call()
}
}
// wait for all requests to complete
def results = futures*.get() // or futures.collect { it.get() } if this breaks
// results is now a list of return values from callPolicy
我没有测试或 运行 这段代码,但它应该让您了解如何继续前进。
<-- 评论后编辑 -->
一个工作示例:
@Grab('org.codehaus.gpars:gpars:1.0.0')
import groovyx.gpars.*
import java.util.concurrent.*
import java.util.concurrent.atomic.*
import static groovyx.gpars.dataflow.Dataflow.task
random = new Random()
sequence = new AtomicInteger(-1)
def promises = [] as CopyOnWriteArrayList
GParsPool.withPool(25) { pool ->
10.times { index ->
promises << task {
callPolicy(index)
}
}
}
def results = promises*.get()
results.each { map ->
println map
}
def callPolicy(index) {
Thread.sleep(random.nextInt(100) % 100)
[index: index, sequence: sequence.incrementAndGet(), time: System.currentTimeMillis()]
}
产生以下类型的输出:
~> groovy solution.groovy
[index:0, sequence:9, time:1558893973348]
[index:1, sequence:1, time:1558893973305]
[index:2, sequence:8, time:1558893973337]
[index:3, sequence:5, time:1558893973322]
[index:4, sequence:7, time:1558893973337]
[index:5, sequence:4, time:1558893973320]
[index:6, sequence:3, time:1558893973308]
[index:7, sequence:6, time:1558893973332]
[index:8, sequence:0, time:1558893973282]
[index:9, sequence:2, time:1558893973308]
~>
我们可以看到返回的结果以及调用是以 multi-threaded 方式进行的,因为 sequence
和 index
值并不都是按顺序排列的。
我正在使用 groovy 和 Gpars 建立异步连接。作为对我的 API 的请求,我收到了一个巨大的 JSON,我正在使用 JSON 路径拆分 JSON。 $.${jsonobjstpath}..[i][j],其中 i 和 j 是 0:20 范围内的值,并对其进行循环。我能够得到正确的拆分 json。 我使用 GParsExecutorsPool.withPool 将这些 JSON 批次发送到我的 API。但是 gpar 正在等待响应。 假设如果一个请求的处理 API 需要 10 秒,gpar 正在等待 10 秒发送控制器到循环。我的代码如下。
import groovyx.gpars.GParsExecutorsPool;
import groovyx.gpars.GParsPool;
import jsr166y.ForkJoinPool;
import jsr166y.RecursiveTask;
def invoke(mega) {
def size=mega.get("size"); //get size of actual JSON objects
def body=mega.get("content.body"); // Load JSON body
int counter=Math.ceil(size/20); //Get Number of loops to run
def Path="/AsyncInmp"; // Path to call function
def Name="SplitJsonObject"; //Name of Function
int i=0;
int j=19;
while(j<=size) {
msg.put("i",i); //Send i value to function
msg.put("j",j); // Send j value to function
callPolicy(Path,Name,body); //Call function json path to get split json, receiving JSON with i and j values
def split_body=resp.body;//response from split json
def Path2="/AsyncInmp"; //path to connection function
def Name2="connect"; //name of connection function
GParsExecutorsPool.withPool {
(0..<1).eachParallel { k ->
callPolicy(Path2, Name2,split_body) //Call function to connect using gpars, This is not working
}
}
j=j+20;
i=i+20;
}
return true;
}
- So how can i make async call using gpar as soon as my split json request is ready
- how can i collect response from all async call
您在 while
循环中调用 withPool
并在 eachParallel
中使用大小为 1 的范围,我猜想这些东西结合起来本质上使您的代码表现在单线程方式。
将它改成这样:
import java.util.concurrent.CopyOnWriteArrayList
def futures = [] as CopyOnWriteArrayList
GParsExecutorsPool.withPool {
while(...) {
...
futures << {
callPolicy(Path2, Name2,split_body)
}.async().call()
}
}
// wait for all requests to complete
def results = futures*.get() // or futures.collect { it.get() } if this breaks
// results is now a list of return values from callPolicy
我没有测试或 运行 这段代码,但它应该让您了解如何继续前进。
<-- 评论后编辑 -->
一个工作示例:
@Grab('org.codehaus.gpars:gpars:1.0.0')
import groovyx.gpars.*
import java.util.concurrent.*
import java.util.concurrent.atomic.*
import static groovyx.gpars.dataflow.Dataflow.task
random = new Random()
sequence = new AtomicInteger(-1)
def promises = [] as CopyOnWriteArrayList
GParsPool.withPool(25) { pool ->
10.times { index ->
promises << task {
callPolicy(index)
}
}
}
def results = promises*.get()
results.each { map ->
println map
}
def callPolicy(index) {
Thread.sleep(random.nextInt(100) % 100)
[index: index, sequence: sequence.incrementAndGet(), time: System.currentTimeMillis()]
}
产生以下类型的输出:
~> groovy solution.groovy
[index:0, sequence:9, time:1558893973348]
[index:1, sequence:1, time:1558893973305]
[index:2, sequence:8, time:1558893973337]
[index:3, sequence:5, time:1558893973322]
[index:4, sequence:7, time:1558893973337]
[index:5, sequence:4, time:1558893973320]
[index:6, sequence:3, time:1558893973308]
[index:7, sequence:6, time:1558893973332]
[index:8, sequence:0, time:1558893973282]
[index:9, sequence:2, time:1558893973308]
~>
我们可以看到返回的结果以及调用是以 multi-threaded 方式进行的,因为 sequence
和 index
值并不都是按顺序排列的。