从 producer/consumer 模型中的文件读取

Reading from file in producer/consumer model

我正在尝试从文件中读取字符串,使用该字符串执行 HTTP 请求,如果请求 returns 为 200,则使用它执行另一个 HTTP 请求。

我认为生产者消费者模型是一个很好的模型,但出于某种原因我完全被卡住了。由于某种原因整个过程只是在某个点停止,我不知道为什么。

public static void main(String[] args) throws InterruptedException, IOException {

    ArrayBlockingQueue<String> subQueue = new ArrayBlockingQueue<>(3000000);

    ThreadPoolExecutor consumers = new ThreadPoolExecutor(100, 100, 10000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10));
    ThreadPoolExecutor producers = new ThreadPoolExecutor(100, 100, 10000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(10000000));
    consumers.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    String fileName = "test";
    try (BufferedReader br = new BufferedReader(new FileReader(fileName))) {
        String line;
        while ((line = br.readLine()) != null) {
            String address = new JSONObject(line).getString("Address");
            producers.submit(new Thread(() -> {
                if (requestReturn200(address)) {
                    try {
                        subQueue.put(address);
                    } catch (InterruptedException e) {
                        System.out.println("Error producing.");
                    }
                }
            }));
        }
        producers.shutdown();
    }

    while (subQueue.size() != 0 || !producers.isShutdown()) {
        String address = subQueue.poll(1, TimeUnit.SECONDS);
        if (address != null) {
            consumers.submit(new Thread(() -> {
                try {
                    System.out.println("Doing..." + address);
                    doOtherHTTPReqeust(address);
                } catch (Exception e) {
                    System.out.println("Fatal error consuming);
                }
            }));

        } else {
            System.out.println("Null");
        }
    }

    consumers.shutdown();
}

我们将不胜感激任何帮助。

 while (subQueue.size() != 0 || !producers.isShutdown()) {

首先 !producers.isShutdown() 将始终 return !true 因为它是在 producers.shutdown() 之后检查的。 isShutdown不是说pool中的任务是否还在运行,而是说pool是否已经关闭,导致无法接受新的任务。在您的情况下,这将始终是 false

其次,subQueue.size() != 0 虽然您的消费者创建循环并且消费者从队列中获取的数据比生产者可以提供的数据快得多,但在 "producing" 过程的中间,消费者可能已经清除了导致条件的队列subQueue.size!= 是假的。如您所知,这会打破循环并禁止生产者提交。

您应该停止使用 queue.size(),而是使用 BlockingQueue 的阻塞属性。 queue.take() 将阻塞直到有新元素可用。

所以整体流程应该是这样的。

  1. 启动一些生产者任务池,就像您现在正在做的那样。
  2. 让生产者将数据放入阻塞队列 - 是的,你在这里
  3. 开始一些(我会说是固定的)消费者
  4. 让消费者queue.take() 队列中的数据。这将迫使消费者 "autowait" 获取新数据并在新数据可用时使用。

我将不提创建 200 个线程是疯狂的并且错过了多线程 consumers/producers/task 池的全部目的,至少在您的情况下恕我直言。这个想法是使用少量的线程,因为它们是重量级的,可以执行大量排队的任务。但那是不同时间的讨论。