如何避免 Java ExecutorService 中的上下文切换
How to avoid context switching in Java ExecutorService
我使用软件 (AnyLogic) 导出 运行nable jar 文件,这些文件本身会重复 运行 一组具有不同参数的模拟(所谓的参数变化实验)。我正在 运行ning 进行的模拟非常占用 RAM,因此我必须限制 jar 文件可用的核心数。在 AnyLogic 中,可用核心的数量很容易设置,但是从服务器上的 Linux 命令行,我知道如何做到这一点的唯一方法是使用 taskset
命令手动指定可用的内核(使用 CPU 亲和力“掩码”)。到目前为止,这种方法运行良好,但由于您必须指定要使用的各个内核,我了解到根据您使用的内核 select,性能可能会有相当大的差异。例如,您可能希望最大限度地利用 CPU 缓存级别,因此如果您选择共享太多缓存的核心,您的性能会大大降低。
由于 AnyLogic 是用 Java 编写的,我可以使用 Java 代码来指定 运行 模拟。我正在考虑使用 Java ExecutorService 来构建一个单独的 运行s 池,这样我就可以指定池的大小为与我机器的 RAM 匹配的任何内核数正在使用。我认为这会带来很多好处,最重要的是,也许计算机的调度程序可以更好地 select 处理内核以最大限度地减少 运行 时间。
在我的测试中,我构建了一个小型 AnyLogic 模型,它需要大约 10 秒才能 运行(它只是在 2 个状态图状态之间反复切换)。然后我用这个简单的代码创建了一个自定义实验。
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i=0; i<10; i++)
{
Simulation experiment = new Simulation();
experiment.variable = i;
service.execute( () -> experiment.run() );
}
我希望看到的是一次只有 2 个 Simulation
对象启动,因为这是线程池的大小。但是我看到所有 10 个启动并且 运行 在 2 个线程上并行运行。这让我觉得上下文切换正在发生,我认为这是非常低效的。
当我没有调用 AnyLogic Simulation
,而是在 service.execute
函数中调用自定义 Java class(如下)时,它似乎工作正常, 一次只显示 2 Tasks
运行ning。
public class Task implements Runnable, Serializable {
public void run() {
traceln("Starting task on thread " + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
traceln("Ending task on thread " + Thread.currentThread().getName());
}
}
有谁知道为什么 AnyLogic 函数似乎同时设置了所有模拟?
我猜 Simulation
从 ExperimentParamVariation 延伸而来。实现您想要的目标的关键是确定实验何时结束。
文档显示了一些有趣的方法,例如 getProgress() and getState(), but you would have to poll those methods until the progress is 1
or the state is FINISHED
or ERROR
. There are also the methods onAfterExperiment() and onError() that should be called by the engine to indicate that the experiment has ended or there was an error. I think you could use these last two methods with a Semaphore 来控制一次实验的数量 运行:
import java.util.concurrent.Semaphore;
import com.anylogic.engine.ExperimentParamVariation;
public class Simulation extends ExperimentParamVariation</* Agent */> {
private final Semaphore semaphore;
public Simulation(Semaphore semaphore) {
this.semaphore = semaphore;
}
public void onAfterExperiment() {
this.semaphore.release();
super.onAfterExperiment();
}
public void onError(Throwable error) {
this.semaphore.release();
super.onError(error);
}
// run() cannot be overriden because it is final
// You could create another run method or acquire a permit from the semaphore elsewhere
public void runWithSemaphore() throws InterruptedException {
// This acquire() will block until a permit is available or the thread is interrupted
this.semaphore.acquire();
this.run();
}
}
然后您必须配置一个具有所需许可数量的信号量并将其传递给 Simulation
个实例:
import java.util.concurrent.Semaphore;
// ...
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 10; i++)
{
Simulation experiment = new Simulation(semaphore);
// ...
// Handle the InterruptedException thrown here
experiment.runWithSemaphore();
/* Alternative to runWithSemaphore(): acquire the permit and call run().
semaphore.acquire();
experiment.run();
*/
}
首先,我认为这是对 AnyLogic 功能的一个相对较新的补充,这整个问题已被取消。您可以指定一个带有指定数量的“并行工作者”的 ini 文件。
但在找到这个(更好的)选项之前,我设法找到了一个可行的解决方案。 Hernan 的回答几乎 就够了。我认为它受到了 AnyLogic 引擎的一些变幻莫测的阻碍(正如我在评论中详述的那样)。
我能找到的最好的版本是使用 ExecuterService
。在自定义实验中,我输入了这段代码:
ExecutorService service = Executors.newFixedThreadPool(2);
List<Callable<Integer>> tasks = new ArrayList<>();
for (int i=0; i<10; i++)
{
int t = i;
tasks.add( () -> simulate(t) );
}
try{
traceln("starting setting up service");
List<Future<Integer>> futureResults = service.invokeAll(tasks);
traceln("finished setting up service");
List<Integer> res = futureResults.stream().parallel().map(
f -> {
try {
return f.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return null;
}).collect(Collectors.toList());
System.out.println("----- Future Results are ready -------");
System.out.println("----- Finished -------");
} catch (InterruptedException e) {
e.printStackTrace();
}
service.shutdown();
这里的关键是使用Java Future
。此外,为了使用 invokeAll
函数,我在附加 class 代码块中创建了一个函数:
public int simulate(int variable){
// Create Engine, initialize random number generator:
Engine engine = createEngine();
// Set stop time
engine.setStopTime( 100000 );
// Create new root object:
Main root = new Main( engine, null, null );
root.parameter = variable;
// Prepare Engine for simulation:
engine.start( root );
// Start simulation in fast mode:
//traceln("attempting to acquire 1 permit on run "+variable);
//s.acquireUninterruptibly(1);
traceln("starting run "+variable);
engine.runFast();
traceln("ending run "+variable);
//s.release();
// Destroy the model:
engine.stop();
traceln( "Finished, run "+variable);
return 1;
}
我能看到这种方法的唯一限制是我没有等待循环来每隔几分钟输出一次进度。但是我没有找到解决方案,而是必须放弃这项工作以获得更好的设置文件解决方案 link up top.
我使用软件 (AnyLogic) 导出 运行nable jar 文件,这些文件本身会重复 运行 一组具有不同参数的模拟(所谓的参数变化实验)。我正在 运行ning 进行的模拟非常占用 RAM,因此我必须限制 jar 文件可用的核心数。在 AnyLogic 中,可用核心的数量很容易设置,但是从服务器上的 Linux 命令行,我知道如何做到这一点的唯一方法是使用 taskset
命令手动指定可用的内核(使用 CPU 亲和力“掩码”)。到目前为止,这种方法运行良好,但由于您必须指定要使用的各个内核,我了解到根据您使用的内核 select,性能可能会有相当大的差异。例如,您可能希望最大限度地利用 CPU 缓存级别,因此如果您选择共享太多缓存的核心,您的性能会大大降低。
由于 AnyLogic 是用 Java 编写的,我可以使用 Java 代码来指定 运行 模拟。我正在考虑使用 Java ExecutorService 来构建一个单独的 运行s 池,这样我就可以指定池的大小为与我机器的 RAM 匹配的任何内核数正在使用。我认为这会带来很多好处,最重要的是,也许计算机的调度程序可以更好地 select 处理内核以最大限度地减少 运行 时间。
在我的测试中,我构建了一个小型 AnyLogic 模型,它需要大约 10 秒才能 运行(它只是在 2 个状态图状态之间反复切换)。然后我用这个简单的代码创建了一个自定义实验。
ExecutorService service = Executors.newFixedThreadPool(2);
for (int i=0; i<10; i++)
{
Simulation experiment = new Simulation();
experiment.variable = i;
service.execute( () -> experiment.run() );
}
我希望看到的是一次只有 2 个 Simulation
对象启动,因为这是线程池的大小。但是我看到所有 10 个启动并且 运行 在 2 个线程上并行运行。这让我觉得上下文切换正在发生,我认为这是非常低效的。
当我没有调用 AnyLogic Simulation
,而是在 service.execute
函数中调用自定义 Java class(如下)时,它似乎工作正常, 一次只显示 2 Tasks
运行ning。
public class Task implements Runnable, Serializable {
public void run() {
traceln("Starting task on thread " + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
traceln("Ending task on thread " + Thread.currentThread().getName());
}
}
有谁知道为什么 AnyLogic 函数似乎同时设置了所有模拟?
我猜 Simulation
从 ExperimentParamVariation 延伸而来。实现您想要的目标的关键是确定实验何时结束。
文档显示了一些有趣的方法,例如 getProgress() and getState(), but you would have to poll those methods until the progress is 1
or the state is FINISHED
or ERROR
. There are also the methods onAfterExperiment() and onError() that should be called by the engine to indicate that the experiment has ended or there was an error. I think you could use these last two methods with a Semaphore 来控制一次实验的数量 运行:
import java.util.concurrent.Semaphore;
import com.anylogic.engine.ExperimentParamVariation;
public class Simulation extends ExperimentParamVariation</* Agent */> {
private final Semaphore semaphore;
public Simulation(Semaphore semaphore) {
this.semaphore = semaphore;
}
public void onAfterExperiment() {
this.semaphore.release();
super.onAfterExperiment();
}
public void onError(Throwable error) {
this.semaphore.release();
super.onError(error);
}
// run() cannot be overriden because it is final
// You could create another run method or acquire a permit from the semaphore elsewhere
public void runWithSemaphore() throws InterruptedException {
// This acquire() will block until a permit is available or the thread is interrupted
this.semaphore.acquire();
this.run();
}
}
然后您必须配置一个具有所需许可数量的信号量并将其传递给 Simulation
个实例:
import java.util.concurrent.Semaphore;
// ...
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 10; i++)
{
Simulation experiment = new Simulation(semaphore);
// ...
// Handle the InterruptedException thrown here
experiment.runWithSemaphore();
/* Alternative to runWithSemaphore(): acquire the permit and call run().
semaphore.acquire();
experiment.run();
*/
}
首先,我认为这是对 AnyLogic 功能的一个相对较新的补充,这整个问题已被取消。您可以指定一个带有指定数量的“并行工作者”的 ini 文件。
但在找到这个(更好的)选项之前,我设法找到了一个可行的解决方案。 Hernan 的回答几乎 就够了。我认为它受到了 AnyLogic 引擎的一些变幻莫测的阻碍(正如我在评论中详述的那样)。
我能找到的最好的版本是使用 ExecuterService
。在自定义实验中,我输入了这段代码:
ExecutorService service = Executors.newFixedThreadPool(2);
List<Callable<Integer>> tasks = new ArrayList<>();
for (int i=0; i<10; i++)
{
int t = i;
tasks.add( () -> simulate(t) );
}
try{
traceln("starting setting up service");
List<Future<Integer>> futureResults = service.invokeAll(tasks);
traceln("finished setting up service");
List<Integer> res = futureResults.stream().parallel().map(
f -> {
try {
return f.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return null;
}).collect(Collectors.toList());
System.out.println("----- Future Results are ready -------");
System.out.println("----- Finished -------");
} catch (InterruptedException e) {
e.printStackTrace();
}
service.shutdown();
这里的关键是使用Java Future
。此外,为了使用 invokeAll
函数,我在附加 class 代码块中创建了一个函数:
public int simulate(int variable){
// Create Engine, initialize random number generator:
Engine engine = createEngine();
// Set stop time
engine.setStopTime( 100000 );
// Create new root object:
Main root = new Main( engine, null, null );
root.parameter = variable;
// Prepare Engine for simulation:
engine.start( root );
// Start simulation in fast mode:
//traceln("attempting to acquire 1 permit on run "+variable);
//s.acquireUninterruptibly(1);
traceln("starting run "+variable);
engine.runFast();
traceln("ending run "+variable);
//s.release();
// Destroy the model:
engine.stop();
traceln( "Finished, run "+variable);
return 1;
}
我能看到这种方法的唯一限制是我没有等待循环来每隔几分钟输出一次进度。但是我没有找到解决方案,而是必须放弃这项工作以获得更好的设置文件解决方案 link up top.