如何使用 LinkedBlockingQueue 执行 运行 个任务

How to use LinkedBlockingQueue to run tasks

我目前正在尝试使用 simpleTesting 中的代码让我的代码打印 simpleTask 方法中的内容 20 次。这个想法是 simpleTesting 将 20 个 simpleTask 实例添加到队列中,然后在 simplePoolThread 中从队列中取出它们。应该发生的是,它打印出测试消息 20 次,然后继续 运行,同时从队列中寻找更多的东西(但有 none)。相反,它目前只是不打印任何内容,并且持续 运行ning。这是我的代码(很多是接口,我相信问题出在 simpleThreadPool 代码上):

package simpleThreadPool;

/**
 *   <<-- Pool Thread -->> 
 *
 *   It will be running continuously. It will try to retrieve new tasks when it is idle. 
 */
public interface ISimplePoolThread extends Runnable {
    /**
     *   Use an infinite loop to retrieve and perform tasks.
     */
    @Override
    public void run();
}

.

package simpleThreadPool;

/**
 *   <<-- Simple Task -->> 
 *
 *   ISimpleTask is to be performed by PoolThread. 
 */
public interface ISimpleTask{
    /**
     *   #1. Create a class to implement ISimpleTask, put content of the task to method run().
     */
    public void run();
}

.

    package simpleThreadPool;


    /**
     *   <<-- Thread Pool -->> 
     *   It manages a queue of tasks, starts some pool threads.
     *      
     *   #1. Create a task queue by using queue data structures, or designing your own data structure. 
     */
    public interface ISimpleThreadPool {

        /**
         *   #1. Initialize your queue (or do so in somewhere)
         *   #2. Starts some ISimplePoolThreads.
         */
        public void start();

        /**
         *   #1. Stops everything
         */
        public void stop();

        /**
         *   #1. Add a task to your queue.
         */
        public void addTask(ISimpleTask task);
    }

.

    package simpleThreadPool;

public class SimpleTask implements ISimpleTask {

    @Override
    public void run() {
        System.out.println("testing testing 1 2 3");
    }
}

。 我认为问题出在这段代码中,任务从队列中取出:

package simpleThreadPool;

import java.util.concurrent.LinkedBlockingQueue;

public class SimplePoolThread implements ISimplePoolThread, Runnable {


    private LinkedBlockingQueue<ISimpleTask> queue = new LinkedBlockingQueue<>();

    @Override
    public void run() {

        while(true) {
            System.out.println("Inserting Element: ");
            try {
                queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


}

.

package simpleThreadPool;

import java.util.concurrent.LinkedBlockingQueue;

public class SimpleThreadPool implements ISimpleThreadPool {

    private LinkedBlockingQueue<ISimpleTask> queue = new LinkedBlockingQueue<>();

    @Override
    public void start() {
        (new Thread(new SimplePoolThread())).start();
    }

    @Override
    public void stop() {
        try {
            queue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void addTask(ISimpleTask task) { 

        try {
            queue.put(task);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

.

测试文件:

package simpleThreadPool;

public class SimpleTesting implements ISimpleTask{

    private int i;

    public SimpleTesting(int i){
        this.i = i;
    }

    @Override
    public void run() {
        System.out.println(i);
    }

    public static void main(String args[]){
        // Initialize thread pool
        SimpleThreadPool pool = new SimpleThreadPool();
        pool.start();
        // Create 20 tasks
        for(int i = 1; i<=20; i++){
            pool.addTask(new SimpleTesting(i));
        }
    }
}

您的SimplePoolThread 中的任务队列是一个阻塞队列。一旦启动,它就会执行 queue.take()。 Take 是一个阻塞操作。线程永远坐在那里等待,直到其他东西将任务添加到队列中。

您对问题位置的预感非常接近。问题是 SimplePoolThread 中的队列和 SimpleThreadPool 中的队列不一样;你有两个独立的队列。因此,当 SimpleTesting 添加任务时,它们会进入 Pool 的队列而不是 Thread 的队列。所以线程将永远坐在那里等待任何事情。您实际上还忘记了 运行 您在 SimplePoolThread 中的任务。

试试下面的方法。

public class SimpleThreadPool implements ISimpleThreadPool {

private LinkedBlockingQueue<ISimpleTask> queue = new LinkedBlockingQueue<>();

@Override
public void start() {
    (new Thread(new SimplePoolThread(queue))).start();
}

请注意,池中的队列已传递到线程中。然后该线程保留对该队列的引用。在线程的 运行() 期间,它现在实际上也是 运行 任务。

public class SimplePoolThread implements ISimplePoolThread, Runnable {


private LinkedBlockingQueue<ISimpleTask> queue;

public SimplePoolThread(LinkedBlockingQueue<ISimpleTask> queue)
{
    this.queue = queue;
}

@Override
public void run() {

    while(true) {
        System.out.println("Inserting Element: ");
        try {
            ISimpleTask task = queue.take();
            task.run();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

这是输出:

Inserting Element: 
1
Inserting Element: 
2
Inserting Element: 
3

..等..

我假设这是作业,否则我会告诉你不要重新发明轮子并去使用 Java 的内置池服务。

http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/package-summary.html