为什么 LogWriter 中的竞争条件会导致生产者阻塞? [实践中的并发]

Why can race condition in LogWriter cause the producer to block? [Concurrency in practice]

首先,为了防止那些不喜欢读到最后的人将问题标记为重复,我已经阅读了 Producer-Consumer Logging service with Unreliable way to shutdown 问题。但是没有完全回答问题,答案与书上的文字相矛盾。

书中提供了以下代码:

public class LogWriter {
    private final BlockingQueue<String> queue;
    private final LoggerThread logger;
    private static final int CAPACITY = 1000;

    public LogWriter(Writer writer) {
        this.queue = new LinkedBlockingQueue<String>(CAPACITY);
        this.logger = new LoggerThread(writer);
    }

    public void start() {
        logger.start();
    }

    public void log(String msg) throws InterruptedException {
        queue.put(msg);
    }

    private class LoggerThread extends Thread {
        private final PrintWriter writer;

        public LoggerThread(Writer writer) {
            this.writer = new PrintWriter(writer, true); // autoflush
        }

        public void run() {
            try {
                while (true)
                    writer.println(queue.take());
            } catch (InterruptedException ignored) {
            } finally {
                writer.close();
            }
        }
    }
}

现在我们应该了解如何停止这个进程了。我们应该停止记录但不应该跳过已经提交的消息。

作者研究方法:

public void log(String msg) throws InterruptedException {
     if(!shutdownRequested)
           queue.put(msg);
     else
           throw new IllegalArgumentException("logger is shut down");
 }

然后这样评论:

Another approach to shutting down LogWriter would be to set a “shutdown requested” flag to prevent further messages from being submitted, as shown in Listing 7.14. The con- sumer could then drain the queue upon being notified that shutdown has been requested, writing out any pending messages and unblocking any producers blocked in log . However, this approach has race conditions that make it unreliable. The implementation of log is a check-then-act sequence: producers could observe that the service has not yet been shut down but still queue messages after the shutdown, again with the risk that the producer might get blocked in log and never become unblocked. There are tricks that reduce the likelihood of this (like having the consumer wait several seconds before declaring the queue drained), but these do not change the fundamental problem, merely the likelihood that it will cause a fail- ure.

build enough hard for me.

这句话

我明白

 if(!shutdownRequested)
           queue.put(msg);

不是原子的,消息可以在关闭后添加到队列中。是的,它不是很准确,但我看不出问题。队列将被排空,当队列为空时我们可以停止 LoggerThread。尤其是我不明白为什么生产者可以被屏蔽

作者没有提供完整的代码,所以我无法理解所有的细节。我相信这本书被社区大多数人读过,这个例子有详细的解释。

请用完整的代码示例进行解释。

首先要了解的是,当请求关闭时,生产者需要停止接受任何更多请求,而消费者(LoggerThread 在这种情况下)需要清空队列。您在问题中提供的代码仅展示了故事的一方面;当 shutdownRequestedtrue 时,生产者拒绝任何进一步的请求。在这个例子之后,作者继续说:

The consumer could then drain the queue upon being notified that shutdown has been requested, writing out any pending messages and unblocking any producers blocked in log

首先,如您的问题所示,LoggerThread 中的 queue.take 将无限阻塞队列中可用的新消息;但是,如果我们想(优雅地)关闭 LoggerThread,我们需要确保 LoggerThread 中的关闭代码在 shutdownRequested 为真时有机会执行,而不是无限地被阻塞通过 queue.take.

当作者说消费者可以 drain 队列时,他的意思是 LogWritter 可以检查 shutdownRequested 是否为真,它可以调用非阻塞 drainTo 方法来在单独的集合中排出队列的当前内容,而不是调用 queue.take (或调用类似的非阻塞方法)。或者,如果 shutdownRequested 为假,LogWriter 可以像往常一样继续调用 queue.take

这种方法的真正问题在于 log 方法(由生产者调用)的实现方式。由于它不是原子的,因此多个线程可能会错过将 shutdownRequested 设置为 true 的情况。如果错过此更新的线程数大于 queueCAPACITY,会发生什么情况。我们再来看看log方法。 (为了便于解释,添加了花括号):

public void log(String msg) throws InterruptedException {
     if(!shutdownRequested) {//A. 1001 threads see shutdownRequested as false and pass the if condition.

           //B. At this point, shutdownRequested is set to true by client code
           //C. Meanwhile, the LoggerThread which is the consumer sees that shutdownRequested is true and calls 
           //queue.drainTo to drain all existing messages in the queue instead of `queue.take`. 
           //D. Producers insert the new message into the queue.    
           queue.put(msg);//Step E
     } else
           throw new IllegalArgumentException("logger is shut down");
     }
}

步骤 E 所示,当 LoggerThread 完成清空队列并退出 w 时,多个生产者线程可以调用 put .在 第 1000 个 线程调用 put 之前应该没有问题。真正的问题是当 1001th 线程调用 put 时。它会阻塞,因为队列容量只有 1000,并且 LoggerThread 可能不再存在或订阅了 queue.take 方法。