需要在 GridGain 中未来完成时异步通知的解决方案
Solution for asynchronous notification upon future completion in GridGain needed
我们目前正在评估 Grid Gain 6.5.5 作为在网格上分配计算作业的潜在解决方案。
我们目前面临的问题是缺乏合适的异步通知机制,在作业完成(或未来完成)时异步通知发送方。
原型架构相对简单,核心问题在下面的伪代码中呈现(由于NDA,无法发布完整代码)。 *** 重要 - 代码仅代表 "problem",有问题的可能解决方案在底部的文本中与问题一起描述。
//will be used as an entry point to the grid for each client that will submit jobs to the grid
public class GridClient{
//client node for submission that will be reused
private static Grid gNode = GridGain.start("config xml file goes here");
//provides the functionality of submitting multiple jobs to the grid for calculation
public int sendJobs2Grid(GridJob[] jobs){
Collection<GridCallable<GridJobOutput>> calls = new ArrayList<>();
for (final GridJob job : jobs) {
calls.add(new GridCallable<GridJobOutput>() {
@Override public GridJobOutput call() throws Exception {
GridJobOutput result = job.process();
return result;
}
});
}
GridFuture<Collection<GridJobOutput>> fut = this.gNode.compute().call(calls);
fut.listenAsync(new GridInClosure<GridFuture<Collection<GridJobOutput>>>(){
@Override public void apply(GridFuture<Collection<GridJobOutput>> jobsOutputCollection) {
Collection<GridJobOutput> jobsOutput;
try {
jobsOutput = jobsOutputCollection.get();
for(GridJobOutput currResult: jobsOutput){
//do something with the current job output BUT CANNOT call jobFinished(GridJobOutput out) method
//of sendJobs2Grid class here
}
} catch (GridException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
return calls.size();
}
//This function should be invoked asynchronously when the GridFuture is
//will invoke some processing/aggregation of the result for each submitted job
public void jobFinished(GridJobOutput out) {}
}
}
//represents a job type that is to be submitted to the grid
public class GridJob{
public GridJobOutput process(){}
}
描述:
想法是,GridClient 实例将用于向网格提交 list/array 个作业,通知发送者提交了多少作业以及作业何时完成(异步)是将对结果进行一些处理。对于结果处理部分,应该调用 "GridClient.jobFinished(GridJobOutput out)" 方法。
现在开始提问,我们知道可以与 "GridFuture.listenAsync(GridInClosure> lsnr)" 一起使用的 GridInClosure 接口
为了注册一个未来的听众。
问题(如果我的理解是正确的)是,如果未来的结果是给定 GridInClosure 范围内的代码 "processed",这是一个很好且非常直接的解决方案。在我们的例子中,我们需要使用超出范围的 "GridClient.jobFinished(GridJobOutput out)"。
由于 GridInClosure 有一个参数 R 并且它必须与 GridFuture 结果的类型相同,因此似乎不可能以直接的方式使用这种方法。
如果我到现在为止都正确,那么为了使用 "GridFuture.listenAsync(..)" 方法,必须完成以下操作:
- GridClient 将必须实现一个接口,以授予对 "jobFinished(..)" 方法的访问权,我们将其命名为 GridJobFinishedListener。
- GridJob 在新 class 中必须是 "wrapped" 才能有一个额外的 属性 类型的 GridJobFinishedListener。
- GridJobOutput 在新的 class 中必须是 "wrapped" 才能有一个额外的 属性 GridJobFinishedListener 类型。
- 当 GridJob 完成时,除了 "standard" 结果外,GridJobOutput 将包含相应的 GridJobFinishedListener 引用。
- 鉴于上述修改,现在可以使用 GridInClosure,并且在 apply(GridJobOutput) 方法中可以通过 GridJobFinishedListener 接口调用 GridClient.jobFinished(GridJobOutput out) 方法。
所以,如果到现在为止我都没有问题,它似乎有点笨拙,所以我希望我错过了一些东西,并且有更好的方法来处理这种相对简单的异步回调情况。
期待任何有用的反馈,非常感谢。
您的代码看起来是正确的,我没有发现从未来的侦听器闭包中调用 jobFinished
方法有任何问题。您将其声明为匿名 class,它始终引用外部 class(在您的情况下为 GridClient
),因此您可以访问 GridClient
的所有变量和方法实例.
我们目前正在评估 Grid Gain 6.5.5 作为在网格上分配计算作业的潜在解决方案。 我们目前面临的问题是缺乏合适的异步通知机制,在作业完成(或未来完成)时异步通知发送方。
原型架构相对简单,核心问题在下面的伪代码中呈现(由于NDA,无法发布完整代码)。 *** 重要 - 代码仅代表 "problem",有问题的可能解决方案在底部的文本中与问题一起描述。
//will be used as an entry point to the grid for each client that will submit jobs to the grid
public class GridClient{
//client node for submission that will be reused
private static Grid gNode = GridGain.start("config xml file goes here");
//provides the functionality of submitting multiple jobs to the grid for calculation
public int sendJobs2Grid(GridJob[] jobs){
Collection<GridCallable<GridJobOutput>> calls = new ArrayList<>();
for (final GridJob job : jobs) {
calls.add(new GridCallable<GridJobOutput>() {
@Override public GridJobOutput call() throws Exception {
GridJobOutput result = job.process();
return result;
}
});
}
GridFuture<Collection<GridJobOutput>> fut = this.gNode.compute().call(calls);
fut.listenAsync(new GridInClosure<GridFuture<Collection<GridJobOutput>>>(){
@Override public void apply(GridFuture<Collection<GridJobOutput>> jobsOutputCollection) {
Collection<GridJobOutput> jobsOutput;
try {
jobsOutput = jobsOutputCollection.get();
for(GridJobOutput currResult: jobsOutput){
//do something with the current job output BUT CANNOT call jobFinished(GridJobOutput out) method
//of sendJobs2Grid class here
}
} catch (GridException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
return calls.size();
}
//This function should be invoked asynchronously when the GridFuture is
//will invoke some processing/aggregation of the result for each submitted job
public void jobFinished(GridJobOutput out) {}
}
}
//represents a job type that is to be submitted to the grid
public class GridJob{
public GridJobOutput process(){}
}
描述:
想法是,GridClient 实例将用于向网格提交 list/array 个作业,通知发送者提交了多少作业以及作业何时完成(异步)是将对结果进行一些处理。对于结果处理部分,应该调用 "GridClient.jobFinished(GridJobOutput out)" 方法。
现在开始提问,我们知道可以与 "GridFuture.listenAsync(GridInClosure> lsnr)" 一起使用的 GridInClosure 接口
为了注册一个未来的听众。
问题(如果我的理解是正确的)是,如果未来的结果是给定 GridInClosure 范围内的代码 "processed",这是一个很好且非常直接的解决方案。在我们的例子中,我们需要使用超出范围的 "GridClient.jobFinished(GridJobOutput out)"。
由于 GridInClosure 有一个参数 R 并且它必须与 GridFuture 结果的类型相同,因此似乎不可能以直接的方式使用这种方法。
如果我到现在为止都正确,那么为了使用 "GridFuture.listenAsync(..)" 方法,必须完成以下操作:
- GridClient 将必须实现一个接口,以授予对 "jobFinished(..)" 方法的访问权,我们将其命名为 GridJobFinishedListener。
- GridJob 在新 class 中必须是 "wrapped" 才能有一个额外的 属性 类型的 GridJobFinishedListener。
- GridJobOutput 在新的 class 中必须是 "wrapped" 才能有一个额外的 属性 GridJobFinishedListener 类型。
- 当 GridJob 完成时,除了 "standard" 结果外,GridJobOutput 将包含相应的 GridJobFinishedListener 引用。
- 鉴于上述修改,现在可以使用 GridInClosure,并且在 apply(GridJobOutput) 方法中可以通过 GridJobFinishedListener 接口调用 GridClient.jobFinished(GridJobOutput out) 方法。
所以,如果到现在为止我都没有问题,它似乎有点笨拙,所以我希望我错过了一些东西,并且有更好的方法来处理这种相对简单的异步回调情况。
期待任何有用的反馈,非常感谢。
您的代码看起来是正确的,我没有发现从未来的侦听器闭包中调用 jobFinished
方法有任何问题。您将其声明为匿名 class,它始终引用外部 class(在您的情况下为 GridClient
),因此您可以访问 GridClient
的所有变量和方法实例.