如何在 yarn-client 中处理任务 运行 太长(与工作中的其他人相比)?

How to deal with tasks running too long (comparing to others in job) in yarn-client?

我们使用一个Spark集群作为yarn-client来计算多个业务,但有时我们有一个任务运行时间太长:

我们没有设置超时,但我认为这里的 spark 任务的默认超时不会太长 (1.7h)。

谁能给我解决这个问题的理想方案???

如果花费的时间太长,spark 将无法终止其任务。

但我想出了一个方法来处理这个问题,使用 speculation

This means if one or more tasks are running slowly in a stage, they will be re-launched.

spark.speculation                  true
spark.speculation.multiplier       2
spark.speculation.quantile         0

注意:spark.speculation.quantile 表示 "speculation" 将从您的第一个任务开始。所以请谨慎使用。我正在使用它,因为随着时间的推移,某些作业会因 GC 而变慢。所以我认为你应该知道什么时候使用它 - 它不是灵丹妙药。

一些相关链接:http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-always-wait-for-stragglers-to-finish-running-td14298.html and http://mail-archives.us.apache.org/mod_mbox/spark-user/201506.mbox/%3CCAPmMX=rOVQf7JtDu0uwnp1xNYNyz4xPgXYayKex42AZ_9Pvjug@mail.gmail.com%3E

更新

我找到了解决我的问题的方法(可能不适用于所有人)。每个任务我有一堆模拟 运行ning,所以我在 运行 周围添加了超时。如果模拟花费的时间更长(由于特定 运行 的数据倾斜),它将超时。

ExecutorService executor = Executors.newCachedThreadPool();
Callable<SimResult> task = () -> simulator.run();

Future<SimResult> future = executor.submit(task);
try {
    result = future.get(1, TimeUnit.MINUTES);
} catch (TimeoutException ex) {
    future.cancel(true);
    SPARKLOG.info("Task timed out");
}

确保在 simulator 的主循环中处理中断,例如:

if(Thread.currentThread().isInterrupted()){
    throw new InterruptedException();
} 

这里的技巧是直接登录工作节点并终止进程。通常您可以通过 toppsgrep 的组合找到有问题的进程。然后做一个 kill pid.