Java - 具有明显空闲线程的 ExecutorService
Java - ExecutorService with apparently idle thread
我正在观察一个我无法用 ExecutorService 解释的行为。我有一个应用程序在内存中加载了大约 800 个人资料。考虑以下
ExecutorService es = Executors.newFixedThreadPool(8); // 8 cores machine
Runnable enricherService = new Runnable() {
@Override
public void run() {
doEnrichment(conn, tmpCachePerson);
}
};
// tmpCachePerson is a ConcurrentLinkedQueue<Person>
while (tmpCachePerson.isEmpty() == false) {
es.execute(enricherService);
}
es.shutdown();
try {
while (!es.awaitTermination(24L, TimeUnit.HOURS)) {
System.out.println("Waiting for termination");
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
与 1 个线程的池相比,这块非常慢。我在代码中放置了一些 println,我可以看到每隔一段时间(~ 4 秒),所有线程都会停止,在那里停留长达 16 秒,然后重新开始,几乎就像一个批处理,但在迭代之间有一个睡眠。需要 50s 才能完成。然后我尝试了下面的实现:
Runnable enricher2Thread = new Runnable() {
@Override
public void run() {
while (tmpCachePerson.isEmpty() == false) {
doEnrichment(conn, tmpCachePerson);
}
}
};
Thread t = new Thread(enricher2Thread);
t.start();
try {
t.join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
另一方面,这篇文章非常快,只使用一个线程,从不停止在控制台中打印,并在 3 秒内完成。
如果我将第一部分中的固定池替换为缓存池,任务将在生成 800 个线程后在 3 秒内完成。如果我在固定池中放置 800 个线程,速度相同。任何人都明白为什么固定池经常暂停,因此它不比 1 个线程快。下面是我看到的 8 个线程的摘录。如果您查看线程 1,它会在一个简单的 getter 上暂停 5s 。在日志的其他部分,整个任务耗时约 250 毫秒。
2016-11-15 15:54:04.212 - pool-1-thread-1 - Starting
2016-11-15 15:54:04.212 - pool-1-thread-1 - Starting executing SQL
2016-11-15 15:54:04.212 - pool-1-thread-1 - Done executing SQL in 0 ms
2016-11-15 15:54:04.212 - pool-1-thread-1 - Starting adding
2016-11-15 15:54:04.212 - pool-1-thread-1 - Starting Getting Val
2016-11-15 15:54:04.212 - pool-1-thread-1 - Done Getting Val in 0 ms
2016-11-15 15:54:04.212 - pool-1-thread-1 - Starting Getting Root
2016-11-15 15:54:04.212 - pool-1-thread-1 - Done Getting Root in 0 ms
2016-11-15 15:54:04.212 - pool-1-thread-1 - Starting Getting Path
2016-11-15 15:54:04.212 - pool-1-thread-1 - Done Getting Path in 0 ms
2016-11-15 15:54:04.212 - pool-1-thread-6 - Starting
2016-11-15 15:54:04.212 - pool-1-thread-8 - Starting <-------------- All threads stop
2016-11-15 15:54:09.533 - pool-1-thread-8 - Starting executing SQL
2016-11-15 15:54:09.533 - pool-1-thread-6 - Starting executing SQL
2016-11-15 15:54:09.533 - pool-1-thread-8 - Done executing SQL in 0 ms
2016-11-15 15:54:09.533 - pool-1-thread-1 - Starting Getting Full Path
2016-11-15 15:54:09.533 - pool-1-thread-6 - Done executing SQL in 5320 ms
2016-11-15 15:54:09.533 - pool-1-thread-8 - Starting adding
2016-11-15 15:54:09.533 - pool-1-thread-6 - Starting adding
2016-11-15 15:54:09.533 - pool-1-thread-8 - Starting Getting Val
2016-11-15 15:54:09.533 - pool-1-thread-6 - Starting Getting Val
2016-11-15 15:54:09.533 - pool-1-thread-8 - Done Getting Val in 0 ms
2016-11-15 15:54:09.533 - pool-1-thread-8 - Starting Getting Root
2016-11-15 15:54:09.533 - pool-1-thread-1 - Done Getting Full Path in 5320 ms
2016-11-15 15:54:09.533 - pool-1-thread-1 - Starting Adding Image
2016-11-15 15:54:09.533 - pool-1-thread-8 - Done Getting Root in 0 ms
2016-11-15 15:54:09.533 - pool-1-thread-1 - Done Adding Image in 0 ms
2016-11-15 15:54:09.533 - pool-1-thread-8 - Starting Getting Path
2016-11-15 15:54:09.533 - pool-1-thread-1 - Done adding in 5321 ms
2016-11-15 15:54:09.533 - pool-1-thread-1 - Starting setting
2016-11-15 15:54:09.533 - pool-1-thread-1 - Done setting in 0 ms
2016-11-15 15:54:09.533 - pool-1-thread-1 - Done in 5321 ms
知道如何改进这段代码以及为什么它会暂停吗?如果有帮助,我可以 post doEnrichment() 的代码
编辑:这里是:
private void doEnrichment(Connection conn, ConcurrentLinkedQueue<Person> tmpCachePerson) {
Person person = tmpCachePerson.poll();
if (person != null) {
ImageCollection personImageCollection;
String query = "SELECT epi.value, i.path FROM Image i "
+ "INNER JOIN EntityImageRelationship eir ON eir.id_image = i.id "
+ "INNER JOIN EntityType et ON eir.id_entity_type = et.id "
+ "INNER JOIN EntityPrimaryImage epi ON epi.type_to_entity_uid = eir.type_to_entity_uid "
+ "WHERE et.id = ? AND eir.id_entity_id = ? ORDER BY i.id ASC";
String tagQuery = "SELECT id, value FROM Tag t INNER JOIN EntityTagRelationship etr ON etr.id_tag = t.id WHERE etr.id_entity = ? AND etr.id_entity_type = ?";
try (PreparedStatement stmnt = conn.prepareStatement(query);
PreparedStatement tagStmnt = conn.prepareStatement(tagQuery)) {
personImageCollection = getEntityImages(conn, stmnt, person.getId(), person.getMovieEntityType());
person.setImageCollection(personImageCollection);
person.getImageCollection().setPrimaryImageIcon();
Set<String> tags = getEntityTags(conn, tagStmnt, person.getId(), person.getMovieEntityType()).keySet();
person.setTags(tags);
personCache.add(person);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
这可能不是原因,但您在这里遇到了竞争条件:
while (tmpCachePerson.isEmpty() == false) {
es.execute(enricherService);
}
绝对不能保证会发生上下文切换;即使是这样,它也可能比您预期的要慢;在工人启动之前,该循环可能 运行 一百万次。在你解决这个问题之前不要再看下去了;这是大量的旋转和内存开销。
更好的模式是将轮询放在 worker 中:
Runnable enricherService = new Runnable() {
@Override
public void run() {
while (!tmpCachePerson.isEmpty()) {
doEnrichment(conn, tmpCachePerson);
// TODO: error handling? Should a failure in doEnrichment kill the worker?
}
}
};
然后用
启动工人
for (int i = 0; i < 8; ++i) {
es.execute(enricherService);
}
我正在观察一个我无法用 ExecutorService 解释的行为。我有一个应用程序在内存中加载了大约 800 个人资料。考虑以下
ExecutorService es = Executors.newFixedThreadPool(8); // 8 cores machine
Runnable enricherService = new Runnable() {
@Override
public void run() {
doEnrichment(conn, tmpCachePerson);
}
};
// tmpCachePerson is a ConcurrentLinkedQueue<Person>
while (tmpCachePerson.isEmpty() == false) {
es.execute(enricherService);
}
es.shutdown();
try {
while (!es.awaitTermination(24L, TimeUnit.HOURS)) {
System.out.println("Waiting for termination");
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
与 1 个线程的池相比,这块非常慢。我在代码中放置了一些 println,我可以看到每隔一段时间(~ 4 秒),所有线程都会停止,在那里停留长达 16 秒,然后重新开始,几乎就像一个批处理,但在迭代之间有一个睡眠。需要 50s 才能完成。然后我尝试了下面的实现:
Runnable enricher2Thread = new Runnable() {
@Override
public void run() {
while (tmpCachePerson.isEmpty() == false) {
doEnrichment(conn, tmpCachePerson);
}
}
};
Thread t = new Thread(enricher2Thread);
t.start();
try {
t.join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
另一方面,这篇文章非常快,只使用一个线程,从不停止在控制台中打印,并在 3 秒内完成。
如果我将第一部分中的固定池替换为缓存池,任务将在生成 800 个线程后在 3 秒内完成。如果我在固定池中放置 800 个线程,速度相同。任何人都明白为什么固定池经常暂停,因此它不比 1 个线程快。下面是我看到的 8 个线程的摘录。如果您查看线程 1,它会在一个简单的 getter 上暂停 5s 。在日志的其他部分,整个任务耗时约 250 毫秒。
2016-11-15 15:54:04.212 - pool-1-thread-1 - Starting
2016-11-15 15:54:04.212 - pool-1-thread-1 - Starting executing SQL
2016-11-15 15:54:04.212 - pool-1-thread-1 - Done executing SQL in 0 ms
2016-11-15 15:54:04.212 - pool-1-thread-1 - Starting adding
2016-11-15 15:54:04.212 - pool-1-thread-1 - Starting Getting Val
2016-11-15 15:54:04.212 - pool-1-thread-1 - Done Getting Val in 0 ms
2016-11-15 15:54:04.212 - pool-1-thread-1 - Starting Getting Root
2016-11-15 15:54:04.212 - pool-1-thread-1 - Done Getting Root in 0 ms
2016-11-15 15:54:04.212 - pool-1-thread-1 - Starting Getting Path
2016-11-15 15:54:04.212 - pool-1-thread-1 - Done Getting Path in 0 ms
2016-11-15 15:54:04.212 - pool-1-thread-6 - Starting
2016-11-15 15:54:04.212 - pool-1-thread-8 - Starting <-------------- All threads stop
2016-11-15 15:54:09.533 - pool-1-thread-8 - Starting executing SQL
2016-11-15 15:54:09.533 - pool-1-thread-6 - Starting executing SQL
2016-11-15 15:54:09.533 - pool-1-thread-8 - Done executing SQL in 0 ms
2016-11-15 15:54:09.533 - pool-1-thread-1 - Starting Getting Full Path
2016-11-15 15:54:09.533 - pool-1-thread-6 - Done executing SQL in 5320 ms
2016-11-15 15:54:09.533 - pool-1-thread-8 - Starting adding
2016-11-15 15:54:09.533 - pool-1-thread-6 - Starting adding
2016-11-15 15:54:09.533 - pool-1-thread-8 - Starting Getting Val
2016-11-15 15:54:09.533 - pool-1-thread-6 - Starting Getting Val
2016-11-15 15:54:09.533 - pool-1-thread-8 - Done Getting Val in 0 ms
2016-11-15 15:54:09.533 - pool-1-thread-8 - Starting Getting Root
2016-11-15 15:54:09.533 - pool-1-thread-1 - Done Getting Full Path in 5320 ms
2016-11-15 15:54:09.533 - pool-1-thread-1 - Starting Adding Image
2016-11-15 15:54:09.533 - pool-1-thread-8 - Done Getting Root in 0 ms
2016-11-15 15:54:09.533 - pool-1-thread-1 - Done Adding Image in 0 ms
2016-11-15 15:54:09.533 - pool-1-thread-8 - Starting Getting Path
2016-11-15 15:54:09.533 - pool-1-thread-1 - Done adding in 5321 ms
2016-11-15 15:54:09.533 - pool-1-thread-1 - Starting setting
2016-11-15 15:54:09.533 - pool-1-thread-1 - Done setting in 0 ms
2016-11-15 15:54:09.533 - pool-1-thread-1 - Done in 5321 ms
知道如何改进这段代码以及为什么它会暂停吗?如果有帮助,我可以 post doEnrichment() 的代码
编辑:这里是:
private void doEnrichment(Connection conn, ConcurrentLinkedQueue<Person> tmpCachePerson) {
Person person = tmpCachePerson.poll();
if (person != null) {
ImageCollection personImageCollection;
String query = "SELECT epi.value, i.path FROM Image i "
+ "INNER JOIN EntityImageRelationship eir ON eir.id_image = i.id "
+ "INNER JOIN EntityType et ON eir.id_entity_type = et.id "
+ "INNER JOIN EntityPrimaryImage epi ON epi.type_to_entity_uid = eir.type_to_entity_uid "
+ "WHERE et.id = ? AND eir.id_entity_id = ? ORDER BY i.id ASC";
String tagQuery = "SELECT id, value FROM Tag t INNER JOIN EntityTagRelationship etr ON etr.id_tag = t.id WHERE etr.id_entity = ? AND etr.id_entity_type = ?";
try (PreparedStatement stmnt = conn.prepareStatement(query);
PreparedStatement tagStmnt = conn.prepareStatement(tagQuery)) {
personImageCollection = getEntityImages(conn, stmnt, person.getId(), person.getMovieEntityType());
person.setImageCollection(personImageCollection);
person.getImageCollection().setPrimaryImageIcon();
Set<String> tags = getEntityTags(conn, tagStmnt, person.getId(), person.getMovieEntityType()).keySet();
person.setTags(tags);
personCache.add(person);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
这可能不是原因,但您在这里遇到了竞争条件:
while (tmpCachePerson.isEmpty() == false) {
es.execute(enricherService);
}
绝对不能保证会发生上下文切换;即使是这样,它也可能比您预期的要慢;在工人启动之前,该循环可能 运行 一百万次。在你解决这个问题之前不要再看下去了;这是大量的旋转和内存开销。
更好的模式是将轮询放在 worker 中:
Runnable enricherService = new Runnable() {
@Override
public void run() {
while (!tmpCachePerson.isEmpty()) {
doEnrichment(conn, tmpCachePerson);
// TODO: error handling? Should a failure in doEnrichment kill the worker?
}
}
};
然后用
启动工人for (int i = 0; i < 8; ++i) {
es.execute(enricherService);
}