[Concurrency in practice 7.2.5]中提到的竞争条件的时间顺序是什么
What's the chronology for a race condition mentioned in [Concurrency in practice 7.2.5]
正如 Brian Goetz 所说:“TrackingExecutor 有一个不可避免的竞争条件,可能会产生误报:任务被识别为已取消但实际上已完成。出现这种情况是因为线程池可能会在最后一条指令之间关闭任务执行以及池何时将任务记录为完成。"
跟踪执行器:
/**
* TrackingExecutor
* <p/>
* ExecutorService that keeps track of cancelled tasks after shutdown
*
* @author Brian Goetz and Tim Peierls
*/
public class TrackingExecutor extends AbstractExecutorService {
private final ExecutorService exec;
private final Set<Runnable> tasksCancelledAtShutdown =
Collections.synchronizedSet(new HashSet<Runnable>());
public TrackingExecutor(ExecutorService exec) {
this.exec = exec;
}
public void shutdown() {
exec.shutdown();
}
public List<Runnable> shutdownNow() {
return exec.shutdownNow();
}
public boolean isShutdown() {
return exec.isShutdown();
}
public boolean isTerminated() {
return exec.isTerminated();
}
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return exec.awaitTermination(timeout, unit);
}
public List<Runnable> getCancelledTasks() {
if (!exec.isTerminated())
throw new IllegalStateException(/*...*/);
return new ArrayList<Runnable>(tasksCancelledAtShutdown);
}
public void execute(final Runnable runnable) {
exec.execute(new Runnable() {
public void run() {
try {
runnable.run();
} finally {
if (isShutdown()
&& Thread.currentThread().isInterrupted())
tasksCancelledAtShutdown.add(runnable);
}
}
});
}
}
然后他创建了使用 TrackingExecutor
:
的 Crawler
抓取工具:
/**
* WebCrawler
* <p/>
* Using TrackingExecutorService to save unfinished tasks for later execution
*
* @author Brian Goetz and Tim Peierls
*/
public abstract class WebCrawler {
private volatile TrackingExecutor exec;
@GuardedBy("this") private final Set<URL> urlsToCrawl = new HashSet<URL>();
private final ConcurrentMap<URL, Boolean> seen = new ConcurrentHashMap<URL, Boolean>();
private static final long TIMEOUT = 500;
private static final TimeUnit UNIT = MILLISECONDS;
public WebCrawler(URL startUrl) {
urlsToCrawl.add(startUrl);
}
public synchronized void start() {
exec = new TrackingExecutor(Executors.newCachedThreadPool());
for (URL url : urlsToCrawl) submitCrawlTask(url);
urlsToCrawl.clear();
}
public synchronized void stop() throws InterruptedException {
try {
saveUncrawled(exec.shutdownNow());
if (exec.awaitTermination(TIMEOUT, UNIT))
saveUncrawled(exec.getCancelledTasks());
} finally {
exec = null;
}
}
protected abstract List<URL> processPage(URL url);
private void saveUncrawled(List<Runnable> uncrawled) {
for (Runnable task : uncrawled)
urlsToCrawl.add(((CrawlTask) task).getPage());
}
private void submitCrawlTask(URL u) {
exec.execute(new CrawlTask(u));
}
private class CrawlTask implements Runnable {
private final URL url;
CrawlTask(URL url) {
this.url = url;
}
private int count = 1;
boolean alreadyCrawled() {
return seen.putIfAbsent(url, true) != null;
}
void markUncrawled() {
seen.remove(url);
System.out.printf("marking %s uncrawled%n", url);
}
public void run() {
for (URL link : processPage(url)) {
if (Thread.currentThread().isInterrupted())
return;
submitCrawlTask(link);
}
}
public URL getPage() {
return url;
}
}
}
但我不明白 runnable.run()、exec.shutdownNow()、exec.awaitTermination(...)、[=30= 调用的确切时间顺序是什么](),tasksCancelledAtShutdown.add(runnable),可运行完成和线程交错,导致竞争条件。
我是这样理解的。例如TrackingExecutor
在CrawlTask
退出之前正在关闭,这个任务也可能被记录为一个taskCancelledAtShutdown
,因为TrackingExecutor#execute
中的if (isShutdown() && Thread.currentThread().isInterrupted())
可能是真的,但是事实上这个任务已经完成了。
private class CrawlTask implements Runnable {
public void run() {
for (URL link : processPage(url)) {
if (Thread.currentThread().isInterrupted())
return;
submitCrawlTask(link);
}
// May be here, trackingExecutor is shutting down.
// Actually this task has completed now.But this method did not exit.
}
}
public void execute(final Runnable runnable) {
exec.execute(new Runnable() {
public void run() {
try {
runnable.run();
} finally {
// isShutdown() && Thread.currentThread().isInterrupted() may be true
if (isShutdown()
&& Thread.currentThread().isInterrupted())
tasksCancelledAtShutdown.add(runnable);
}
}
});
}
正如 Brian Goetz 所说:“TrackingExecutor 有一个不可避免的竞争条件,可能会产生误报:任务被识别为已取消但实际上已完成。出现这种情况是因为线程池可能会在最后一条指令之间关闭任务执行以及池何时将任务记录为完成。"
跟踪执行器:
/**
* TrackingExecutor
* <p/>
* ExecutorService that keeps track of cancelled tasks after shutdown
*
* @author Brian Goetz and Tim Peierls
*/
public class TrackingExecutor extends AbstractExecutorService {
private final ExecutorService exec;
private final Set<Runnable> tasksCancelledAtShutdown =
Collections.synchronizedSet(new HashSet<Runnable>());
public TrackingExecutor(ExecutorService exec) {
this.exec = exec;
}
public void shutdown() {
exec.shutdown();
}
public List<Runnable> shutdownNow() {
return exec.shutdownNow();
}
public boolean isShutdown() {
return exec.isShutdown();
}
public boolean isTerminated() {
return exec.isTerminated();
}
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return exec.awaitTermination(timeout, unit);
}
public List<Runnable> getCancelledTasks() {
if (!exec.isTerminated())
throw new IllegalStateException(/*...*/);
return new ArrayList<Runnable>(tasksCancelledAtShutdown);
}
public void execute(final Runnable runnable) {
exec.execute(new Runnable() {
public void run() {
try {
runnable.run();
} finally {
if (isShutdown()
&& Thread.currentThread().isInterrupted())
tasksCancelledAtShutdown.add(runnable);
}
}
});
}
}
然后他创建了使用 TrackingExecutor
:
抓取工具:
/**
* WebCrawler
* <p/>
* Using TrackingExecutorService to save unfinished tasks for later execution
*
* @author Brian Goetz and Tim Peierls
*/
public abstract class WebCrawler {
private volatile TrackingExecutor exec;
@GuardedBy("this") private final Set<URL> urlsToCrawl = new HashSet<URL>();
private final ConcurrentMap<URL, Boolean> seen = new ConcurrentHashMap<URL, Boolean>();
private static final long TIMEOUT = 500;
private static final TimeUnit UNIT = MILLISECONDS;
public WebCrawler(URL startUrl) {
urlsToCrawl.add(startUrl);
}
public synchronized void start() {
exec = new TrackingExecutor(Executors.newCachedThreadPool());
for (URL url : urlsToCrawl) submitCrawlTask(url);
urlsToCrawl.clear();
}
public synchronized void stop() throws InterruptedException {
try {
saveUncrawled(exec.shutdownNow());
if (exec.awaitTermination(TIMEOUT, UNIT))
saveUncrawled(exec.getCancelledTasks());
} finally {
exec = null;
}
}
protected abstract List<URL> processPage(URL url);
private void saveUncrawled(List<Runnable> uncrawled) {
for (Runnable task : uncrawled)
urlsToCrawl.add(((CrawlTask) task).getPage());
}
private void submitCrawlTask(URL u) {
exec.execute(new CrawlTask(u));
}
private class CrawlTask implements Runnable {
private final URL url;
CrawlTask(URL url) {
this.url = url;
}
private int count = 1;
boolean alreadyCrawled() {
return seen.putIfAbsent(url, true) != null;
}
void markUncrawled() {
seen.remove(url);
System.out.printf("marking %s uncrawled%n", url);
}
public void run() {
for (URL link : processPage(url)) {
if (Thread.currentThread().isInterrupted())
return;
submitCrawlTask(link);
}
}
public URL getPage() {
return url;
}
}
}
但我不明白 runnable.run()、exec.shutdownNow()、exec.awaitTermination(...)、[=30= 调用的确切时间顺序是什么](),tasksCancelledAtShutdown.add(runnable),可运行完成和线程交错,导致竞争条件。
我是这样理解的。例如TrackingExecutor
在CrawlTask
退出之前正在关闭,这个任务也可能被记录为一个taskCancelledAtShutdown
,因为TrackingExecutor#execute
中的if (isShutdown() && Thread.currentThread().isInterrupted())
可能是真的,但是事实上这个任务已经完成了。
private class CrawlTask implements Runnable {
public void run() {
for (URL link : processPage(url)) {
if (Thread.currentThread().isInterrupted())
return;
submitCrawlTask(link);
}
// May be here, trackingExecutor is shutting down.
// Actually this task has completed now.But this method did not exit.
}
}
public void execute(final Runnable runnable) {
exec.execute(new Runnable() {
public void run() {
try {
runnable.run();
} finally {
// isShutdown() && Thread.currentThread().isInterrupted() may be true
if (isShutdown()
&& Thread.currentThread().isInterrupted())
tasksCancelledAtShutdown.add(runnable);
}
}
});
}