需要在 GridGain 中未来完成时异步通知的解决方案

Solution for asynchronous notification upon future completion in GridGain needed

我们目前正在评估 Grid Gain 6.5.5 作为在网格上分配计算作业的潜在解决方案。 我们目前面临的问题是缺乏合适的异步通知机制,在作业完成(或未来完成)时异步通知发送方。

原型架构相对简单,核心问题在下面的伪代码中呈现(由于NDA,无法发布完整代码)。 *** 重要 - 代码仅代表 "problem",有问题的可能解决方案在底部的文本中与问题一起描述。

//will be used as an entry point to the grid for each client that will submit jobs to the grid
public class GridClient{

//client node for submission that will be reused 
private static Grid gNode = GridGain.start("config xml file goes here");


//provides the functionality of submitting multiple jobs to the grid for calculation
public int sendJobs2Grid(GridJob[] jobs){

    Collection<GridCallable<GridJobOutput>> calls = new ArrayList<>();

    for (final GridJob job : jobs) {

                calls.add(new GridCallable<GridJobOutput>() {
                    @Override public GridJobOutput call() throws Exception {
                        GridJobOutput  result = job.process();
                        return result; 
                    }
                });
        }

        GridFuture<Collection<GridJobOutput>> fut =  this.gNode.compute().call(calls);

        fut.listenAsync(new GridInClosure<GridFuture<Collection<GridJobOutput>>>(){

            @Override public void  apply(GridFuture<Collection<GridJobOutput>> jobsOutputCollection) {

                Collection<GridJobOutput> jobsOutput;

                try {
                    jobsOutput = jobsOutputCollection.get();

                    for(GridJobOutput currResult: jobsOutput){
                        //do something with the current job output BUT CANNOT call jobFinished(GridJobOutput out) method 
                        //of sendJobs2Grid class here
                    }
                } catch (GridException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        });

        return calls.size();
    }


//This function should be invoked asynchronously when the GridFuture is 
//will invoke some processing/aggregation of the result for each submitted job
public void jobFinished(GridJobOutput out) {}

    }

}

//represents a job type that is to be submitted to the grid
public class GridJob{

public GridJobOutput process(){}
}

描述:

想法是,GridClient 实例将用于向网格提交 list/array 个作业,通知发送者提交了多少作业以及作业何时完成(异步)是将对结果进行一些处理。对于结果处理部分,应该调用 "GridClient.jobFinished(GridJobOutput out)" 方法。

现在开始提问,我们知道可以与 "GridFuture.listenAsync(GridInClosure> lsnr)" 一起使用的 GridInClosure 接口 为了注册一个未来的听众。 问题(如果我的理解是正确的)是,如果未来的结果是给定 GridInClosure 范围内的代码 "processed",这是一个很好且非常直接的解决方案。在我们的例子中,我们需要使用超出范围的 "GridClient.jobFinished(GridJobOutput out)"。
由于 GridInClosure 有一个参数 R 并且它必须与 GridFuture 结果的类型相同,因此似乎不可能以直接的方式使用这种方法。

如果我到现在为止都正确,那么为了使用 "GridFuture.listenAsync(..)" 方法,必须完成以下操作:

所以,如果到现在为止我都没有问题,它似乎有点笨拙,所以我希望我错过了一些东西,并且有更好的方法来处理这种相对简单的异步回调情况。

期待任何有用的反馈,非常感谢。

您的代码看起来是正确的,我没有发现从未来的侦听器闭包中调用 jobFinished 方法有任何问题。您将其声明为匿名 class,它始终引用外部 class(在您的情况下为 GridClient),因此您可以访问 GridClient 的所有变量和方法实例.