如何在 yarn-client 中处理任务 运行 太长(与工作中的其他人相比)?
How to deal with tasks running too long (comparing to others in job) in yarn-client?
我们使用一个Spark集群作为yarn-client
来计算多个业务,但有时我们有一个任务运行时间太长:
我们没有设置超时,但我认为这里的 spark 任务的默认超时不会太长 (1.7h)。
谁能给我解决这个问题的理想方案???
如果花费的时间太长,spark 将无法终止其任务。
但我想出了一个方法来处理这个问题,使用 speculation、
This means if one or more tasks are running slowly in a stage, they
will be re-launched.
spark.speculation true
spark.speculation.multiplier 2
spark.speculation.quantile 0
注意:spark.speculation.quantile
表示 "speculation" 将从您的第一个任务开始。所以请谨慎使用。我正在使用它,因为随着时间的推移,某些作业会因 GC 而变慢。所以我认为你应该知道什么时候使用它 - 它不是灵丹妙药。
一些相关链接:http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-always-wait-for-stragglers-to-finish-running-td14298.html and http://mail-archives.us.apache.org/mod_mbox/spark-user/201506.mbox/%3CCAPmMX=rOVQf7JtDu0uwnp1xNYNyz4xPgXYayKex42AZ_9Pvjug@mail.gmail.com%3E
更新
我找到了解决我的问题的方法(可能不适用于所有人)。每个任务我有一堆模拟 运行ning,所以我在 运行 周围添加了超时。如果模拟花费的时间更长(由于特定 运行 的数据倾斜),它将超时。
ExecutorService executor = Executors.newCachedThreadPool();
Callable<SimResult> task = () -> simulator.run();
Future<SimResult> future = executor.submit(task);
try {
result = future.get(1, TimeUnit.MINUTES);
} catch (TimeoutException ex) {
future.cancel(true);
SPARKLOG.info("Task timed out");
}
确保在 simulator
的主循环中处理中断,例如:
if(Thread.currentThread().isInterrupted()){
throw new InterruptedException();
}
这里的技巧是直接登录工作节点并终止进程。通常您可以通过 top
、ps
和 grep
的组合找到有问题的进程。然后做一个 kill pid
.
我们使用一个Spark集群作为yarn-client
来计算多个业务,但有时我们有一个任务运行时间太长:
我们没有设置超时,但我认为这里的 spark 任务的默认超时不会太长 (1.7h)。
谁能给我解决这个问题的理想方案???
如果花费的时间太长,spark 将无法终止其任务。
但我想出了一个方法来处理这个问题,使用 speculation、
This means if one or more tasks are running slowly in a stage, they will be re-launched.
spark.speculation true
spark.speculation.multiplier 2
spark.speculation.quantile 0
注意:spark.speculation.quantile
表示 "speculation" 将从您的第一个任务开始。所以请谨慎使用。我正在使用它,因为随着时间的推移,某些作业会因 GC 而变慢。所以我认为你应该知道什么时候使用它 - 它不是灵丹妙药。
一些相关链接:http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-always-wait-for-stragglers-to-finish-running-td14298.html and http://mail-archives.us.apache.org/mod_mbox/spark-user/201506.mbox/%3CCAPmMX=rOVQf7JtDu0uwnp1xNYNyz4xPgXYayKex42AZ_9Pvjug@mail.gmail.com%3E
更新
我找到了解决我的问题的方法(可能不适用于所有人)。每个任务我有一堆模拟 运行ning,所以我在 运行 周围添加了超时。如果模拟花费的时间更长(由于特定 运行 的数据倾斜),它将超时。
ExecutorService executor = Executors.newCachedThreadPool();
Callable<SimResult> task = () -> simulator.run();
Future<SimResult> future = executor.submit(task);
try {
result = future.get(1, TimeUnit.MINUTES);
} catch (TimeoutException ex) {
future.cancel(true);
SPARKLOG.info("Task timed out");
}
确保在 simulator
的主循环中处理中断,例如:
if(Thread.currentThread().isInterrupted()){
throw new InterruptedException();
}
这里的技巧是直接登录工作节点并终止进程。通常您可以通过 top
、ps
和 grep
的组合找到有问题的进程。然后做一个 kill pid
.