gpars dataflowQueues 处理或管道似乎仅在 df.val 请求时触发

gpars dataflowQueues processing or pipelines only seems to be triggered on df.val request

需要一些帮助。看着 Gpars dataflows/pipelines 但我不明白

如果您查看下面的示例(我已经使用运算符、管道、chainWith 完成了此操作并遇到了同样的问题)。

在这个例子中,我使用了任务,但没有任务也很容易,同样的问题也出现了。在此示例中,我设置了两个 DataflowQueues,一个用于初始条件,一个用于根据谓词进行评估的结果。然后我布置一个管道,根据谓词(甚至测试)评估输入和输入并将结果存储在输出结果队列

已经设置了管道并将一些条目发布到第一个队列中,我相信这些条目将在数据可用时被处理(这对操作员版本也不起作用),如您所见,我测试了 resultQ 的大小在我将条目写入 sessionQ 之后,它为零(如果我删除了仍然正确的任务)。所以写入数据不会 'trigger' 处理。

第一个任务将一些条目保存到队列中。

import groovyx.gpars.dataflow.Dataflow
import groovyx.gpars.dataflow.DataflowQueue
import groovyx.gpars.dataflow.DataflowVariable
import groovyx.gpars.dataflow.Promise

/**
 * Created by will on 13/01/2017.
 */

def iValues = [1,2,3,4,5]

DataflowQueue sessionQ = new DataflowQueue()
DataflowQueue resultQ = new DataflowQueue()

Dataflow.task {
    println "setup task: set initial conditions list for rule predicate "
    iValues.each {sessionQ << it}
}

Closure evenPredicate = {it %2 == 0}

//layout pipeline 
sessionQ | evenPredicate   | resultQ

assert resultQ.iterator().size() == 0

Promise ans =  Dataflow.task {
    println "result task : get three values from result q "
    def outlist = []
    3.times {
        def res = resultQ.val
        println "got result $res"
        outlist << res
    }
    assert sessionQ.iterator().size() == 0
    assert resultQ.iterator().size() == 2
    outlist
}

println "ans list is $ans.val"
assert resultQ.iterator().size() == 2

它仅在第二个 task/chainWith 等中 - 您在引擎启动的第二个队列上调用 .val (或 get() )运行 并且所有条目都从中处理第一个队列和结果绑定到 resultQ。

您可以从断言中看到这一点,因为一旦第一次触发器 (.val) 同步调用被引擎 运行 处理并处理起始 sessionQ 中的所有绑定条目。

这是个问题,因为直到您 运行 第一个 .val 调用 - 如果您执行 poll() 或 resultQ.interator.size(),例如它是空的且未绑定,大小 ( )=0。所以你不能写

for (dfRes in resultQ) {//do something with dfRes} 

因为它始终是空的,直到您使用 sessionQ 中的第一项。我不明白为什么?在条目绑定到第一个 dataflowQueue 之后,我认为这些项目将在它们变得可用(绑定)时被消耗 - 但它们不是。

现在这很棘手,因为您无法通过条目、检查结果的大小、对 resultQ 执行 poll(),因为在读取来自 sessionQ 的第一个 DF 之前它将失败。

我最终不得不使用初始值数组的大小(告诉我保存到队列中的条目)因为唯一的方法是从 resultQ 中读取相同的数字以清空它(在上面我只使用了 resultQ 中的 3 条记录,断言显示 resultQ 中还剩下 2 条记录(但只有在第一次调用 .val 之后,如果你注释掉所有断言开始失败)

我用 Dataflow.operator、管道等尝试了这个,但遇到了同样的问题。为什么工作没有得到处理,因为每个输入都绑定到 SessionQ?

最后,对于 Pipeline,有一个 .complete() 方法,如果您在管道中处理一个闭包 {},它会保持打开状态 (!complete()),但是当您 运行 .binaryChoice () 之类的方法将管道标记为已完成,并且无法添加进一步的操作。为什么要这样做?

当然,我不明白那个状态在说什么(不会再进行处理),如果您尝试在这样的方法之后再执行一个步骤,将会抛出异常。

无论哪种方式 - 我试过这样的管道线

Pipeline pipeLine = new Pipeline(Q)
pipeLine.tap(log).binaryChoice(evenPathQ, oddPathQ) {println "$it %2 is ${it%2 ==0}"; (it%2 == 0) }

然而,当您将值绑定到 Q 时,什么也不会发生 - 直到您使用

这样的输出
odd.val

当管道突然 'runs' 并处理存储在 Q 中的所有 DF 项目时。

除了第一个 .val 消耗

可以解释这是为什么吗,我必须忽略这里的要点,但是这个 'do nothing' 直到第一个条目被读取并不是我所期望的并且使任何大小评估无效 (.iterator.size()、poll() 等)对 DataflowWriteChannel 目标的类型调用。

我将不胜感激 - 我已经为此苦苦挣扎了两天,但一无所获。我也查看了所有 Gpars 测试,它们调用 .val 的次数与输入绑定的次数相同 - 所以不要显示我描述的问题。

Vaclav Pech,或任何其他观看问题的 Gpars 大师,我将不胜感激任何对此的帮助见解,以帮助我度过难关

提前问候

在您断言大小为 0 之前的一个小修改(添加延迟)将表明计算是由写入的数据触发的:

//layout pipeline
sessionQ | evenPredicate   | resultQ
sleep 5000
assert resultQ.iterator().size() == 0