如何使用 Apache Ignite/GridGain 在任何节点上重试失败的作业
How to retry failed job on any node with Apache Ignite/GridGain
我正在 Apache Ignite 中试验 fault tolerance。
我想不通的是如何在任何节点上重试失败的作业。我有一个用例,我的工作将通过进程构建器调用第三方工具作为系统进程来进行一些计算。在某些情况下,该工具可能会失败,但在大多数情况下,可以在任何节点上重试作业 - 包括之前失败的节点。
目前 Ignite 似乎将作业重新路由到之前没有此作业的另一个节点。所以,过了一会儿,所有节点都消失了,任务失败了。
我正在寻找的是如何在任何节点上重试作业。
这里有一个测试来证明我的问题。
这是我随机失败的工作:
public static class RandomlyFailingComputeJob implements ComputeJob {
private static final long serialVersionUID = -8351095134107406874L;
private final String data;
public RandomlyFailingComputeJob(String data) {
Validate.notNull(data);
this.data = data;
}
public void cancel() {
}
public Object execute() throws IgniteException {
final double random = Math.random();
if (random > 0.5) {
throw new IgniteException();
} else {
return StringUtils.reverse(data);
}
}
}
下面是任务:
public static class RandomlyFailingComputeTask extends
ComputeTaskSplitAdapter<String, String> {
private static final long serialVersionUID = 6756691331287458885L;
@Override
public ComputeJobResultPolicy result(ComputeJobResult res,
List<ComputeJobResult> rcvd) throws IgniteException {
if (res.getException() != null) {
return ComputeJobResultPolicy.FAILOVER;
}
return ComputeJobResultPolicy.WAIT;
}
public String reduce(List<ComputeJobResult> results)
throws IgniteException {
final Collection<String> reducedResults = new ArrayList<String>(
results.size());
for (ComputeJobResult result : results) {
reducedResults.add(result.<String> getData());
}
return StringUtils.join(reducedResults, ' ');
}
@Override
protected Collection<? extends ComputeJob> split(int gridSize,
String arg) throws IgniteException {
final String[] args = StringUtils.split(arg, ' ');
final Collection<ComputeJob> computeJobs = new ArrayList<ComputeJob>(
args.length);
for (String data : args) {
computeJobs.add(new RandomlyFailingComputeJob(data));
}
return computeJobs;
}
}
测试代码:
final Ignite ignite = Ignition.start();
final String original = "The quick brown fox jumps over the lazy dog";
final String reversed = StringUtils.join(
ignite.compute().execute(new RandomlyFailingComputeTask(),
original), ' ');
如您所见,应始终进行故障转移。由于失败概率 != 1,我希望任务在某个时候成功终止。
概率阈值为 0.5,总共有 3 个节点,这几乎不会发生。我遇到了 class org.apache.ignite.cluster.ClusterTopologyException: Failed to failover a job to another node (failover SPI returned null)
这样的异常。经过一些调试后,我发现这是因为我最终 运行 超出了节点。都没有了。
我知道我可以自己写 FailoverSpi
来处理这个问题。
但这就是感觉不对。
首先,这样做似乎有些矫枉过正。
但是 SPI 是一种全球性的东西。我想决定每个作业是否应该重试或故障转移。例如,这可能取决于我正在调用的第三方工具的退出代码。因此通过全局 SPI 配置故障转移是不正确的。
如果 AlwaysFailoverSpi 的当前实现(默认实现)已经为特定作业尝试了所有节点,则不会进行故障转移。我相信它可以是一个配置选项,但现在你必须实现你自己的故障转移 SPI(它应该很简单 - 每次作业尝试故障转移时只需从拓扑中选择一个随机节点)。
至于 SPI 的全局性质,你是对的,但它的 failover() 采用 FailoverContext,它包含有关失败作业的信息(任务名称、属性、异常等),因此你可以根据决策做出决策关于此信息。
我正在 Apache Ignite 中试验 fault tolerance。
我想不通的是如何在任何节点上重试失败的作业。我有一个用例,我的工作将通过进程构建器调用第三方工具作为系统进程来进行一些计算。在某些情况下,该工具可能会失败,但在大多数情况下,可以在任何节点上重试作业 - 包括之前失败的节点。
目前 Ignite 似乎将作业重新路由到之前没有此作业的另一个节点。所以,过了一会儿,所有节点都消失了,任务失败了。
我正在寻找的是如何在任何节点上重试作业。
这里有一个测试来证明我的问题。
这是我随机失败的工作:
public static class RandomlyFailingComputeJob implements ComputeJob {
private static final long serialVersionUID = -8351095134107406874L;
private final String data;
public RandomlyFailingComputeJob(String data) {
Validate.notNull(data);
this.data = data;
}
public void cancel() {
}
public Object execute() throws IgniteException {
final double random = Math.random();
if (random > 0.5) {
throw new IgniteException();
} else {
return StringUtils.reverse(data);
}
}
}
下面是任务:
public static class RandomlyFailingComputeTask extends
ComputeTaskSplitAdapter<String, String> {
private static final long serialVersionUID = 6756691331287458885L;
@Override
public ComputeJobResultPolicy result(ComputeJobResult res,
List<ComputeJobResult> rcvd) throws IgniteException {
if (res.getException() != null) {
return ComputeJobResultPolicy.FAILOVER;
}
return ComputeJobResultPolicy.WAIT;
}
public String reduce(List<ComputeJobResult> results)
throws IgniteException {
final Collection<String> reducedResults = new ArrayList<String>(
results.size());
for (ComputeJobResult result : results) {
reducedResults.add(result.<String> getData());
}
return StringUtils.join(reducedResults, ' ');
}
@Override
protected Collection<? extends ComputeJob> split(int gridSize,
String arg) throws IgniteException {
final String[] args = StringUtils.split(arg, ' ');
final Collection<ComputeJob> computeJobs = new ArrayList<ComputeJob>(
args.length);
for (String data : args) {
computeJobs.add(new RandomlyFailingComputeJob(data));
}
return computeJobs;
}
}
测试代码:
final Ignite ignite = Ignition.start();
final String original = "The quick brown fox jumps over the lazy dog";
final String reversed = StringUtils.join(
ignite.compute().execute(new RandomlyFailingComputeTask(),
original), ' ');
如您所见,应始终进行故障转移。由于失败概率 != 1,我希望任务在某个时候成功终止。
概率阈值为 0.5,总共有 3 个节点,这几乎不会发生。我遇到了 class org.apache.ignite.cluster.ClusterTopologyException: Failed to failover a job to another node (failover SPI returned null)
这样的异常。经过一些调试后,我发现这是因为我最终 运行 超出了节点。都没有了。
我知道我可以自己写 FailoverSpi
来处理这个问题。
但这就是感觉不对。
首先,这样做似乎有些矫枉过正。
但是 SPI 是一种全球性的东西。我想决定每个作业是否应该重试或故障转移。例如,这可能取决于我正在调用的第三方工具的退出代码。因此通过全局 SPI 配置故障转移是不正确的。
如果 AlwaysFailoverSpi 的当前实现(默认实现)已经为特定作业尝试了所有节点,则不会进行故障转移。我相信它可以是一个配置选项,但现在你必须实现你自己的故障转移 SPI(它应该很简单 - 每次作业尝试故障转移时只需从拓扑中选择一个随机节点)。
至于 SPI 的全局性质,你是对的,但它的 failover() 采用 FailoverContext,它包含有关失败作业的信息(任务名称、属性、异常等),因此你可以根据决策做出决策关于此信息。