Java 中优化多线程队列处理代码
Optimizing multithreaded queue processing code in Java
我有一个代码可以创建 class 的 10 个对象,实现可运行。
每个对象都保存在 hashmap 中以备后用。
每个对象在一个单独的线程上 运行。每个对象都有一个 public 方法,可以将项目添加到队列中。
对象以无限循环处理队列。
我想知道这个解决方案是否可行,或者是否有一些完全wrong/useless/missing(尤其是使用 volatile 和 syncronized 关键字)?
MultithreadingTest.class
package multithreadingtest;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Multithreading example.
*
* @author lkallas
*/
public class MultithreadingTest {
private static final int NUM_OF_THREADS = 10;
private static String name;
private static final Map<Integer, ThreadWorker> objectMap = new HashMap<>(); //Map or storing Threadworker objects
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(NUM_OF_THREADS);
//Creating threads
for (int i = 0; i < NUM_OF_THREADS; i++) {
name = "ThreadWorker" + String.valueOf(i);
ThreadWorker thread = new ThreadWorker(name);
objectMap.put(i, thread); //Add objects to map
executor.execute(thread);
}
for (int i = 0; i < 10; i++) {
ThreadWorker worker = objectMap.get(i);
for (int j = 0; j < 10; j++) {
worker.addToQueue("Test1");
}
}
}
}
ThreadWorker.class
package multithreadingtest;
import java.util.LinkedList;
import java.util.Queue;
/**
* Worker class that performs operations in another thread.
*
* @author lkallas
*/
public class ThreadWorker implements Runnable {
private final String threadName;
private volatile Queue workQueue; //Does this have to volatile??
/**
* Class constructor.
*
* @param threadName Name of the thread for identifying.
*
*/
public ThreadWorker(String threadName) {
this.threadName = threadName;
this.workQueue = new LinkedList();
System.out.println(String.format("Thread %s started!", threadName));
}
/**
* Adds items to the queue.
*
* @param object Object to be added to the queue.
*/
public synchronized void addToQueue(String object) {
workQueue.add(object); //Does it have to be syncronized void
}
@Override
public void run() {
while (true) {
if (!workQueue.isEmpty()) {
System.out.println("Queue size: " + workQueue.size());
String item = (String) workQueue.peek();
//Process item
System.out.println(threadName + " just processed " + item);
workQueue.remove();
}
}
}
}
非常感谢任何帮助和建议!
workQueue
是线程本地的,不需要是 volatile
(它是私有的,没有 public setter 方法)
- 使
workQueue
变成 BlockingQueue - 这个队列是线程安全的,所以你不需要同步 addToQueue
。此外,您不需要在 run
内部旋转 - 而是在队列上调用 take()
,并且线程会阻塞,直到有一个项目可用。
- 您似乎在
MultithreadingTest
中做了太多工作 - 与其将项目添加到单独的队列,不如让所有工作人员共享相同的 BlockingQueue
,然后 main
只需要向那个 BlockingQueue
添加项目,工作人员将自己负责负载平衡。请注意,即使 BlockingQueue
是共享的,它仍然不需要是 volatile
因为一旦工作程序初始化,对 BlockingQueue
的引用就不会改变(使字段 private final BlockingQueue<String> workQueue
- final
字段永远不需要 volatile
).
我有一个代码可以创建 class 的 10 个对象,实现可运行。 每个对象都保存在 hashmap 中以备后用。 每个对象在一个单独的线程上 运行。每个对象都有一个 public 方法,可以将项目添加到队列中。 对象以无限循环处理队列。
我想知道这个解决方案是否可行,或者是否有一些完全wrong/useless/missing(尤其是使用 volatile 和 syncronized 关键字)?
MultithreadingTest.class
package multithreadingtest;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Multithreading example.
*
* @author lkallas
*/
public class MultithreadingTest {
private static final int NUM_OF_THREADS = 10;
private static String name;
private static final Map<Integer, ThreadWorker> objectMap = new HashMap<>(); //Map or storing Threadworker objects
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(NUM_OF_THREADS);
//Creating threads
for (int i = 0; i < NUM_OF_THREADS; i++) {
name = "ThreadWorker" + String.valueOf(i);
ThreadWorker thread = new ThreadWorker(name);
objectMap.put(i, thread); //Add objects to map
executor.execute(thread);
}
for (int i = 0; i < 10; i++) {
ThreadWorker worker = objectMap.get(i);
for (int j = 0; j < 10; j++) {
worker.addToQueue("Test1");
}
}
}
}
ThreadWorker.class
package multithreadingtest;
import java.util.LinkedList;
import java.util.Queue;
/**
* Worker class that performs operations in another thread.
*
* @author lkallas
*/
public class ThreadWorker implements Runnable {
private final String threadName;
private volatile Queue workQueue; //Does this have to volatile??
/**
* Class constructor.
*
* @param threadName Name of the thread for identifying.
*
*/
public ThreadWorker(String threadName) {
this.threadName = threadName;
this.workQueue = new LinkedList();
System.out.println(String.format("Thread %s started!", threadName));
}
/**
* Adds items to the queue.
*
* @param object Object to be added to the queue.
*/
public synchronized void addToQueue(String object) {
workQueue.add(object); //Does it have to be syncronized void
}
@Override
public void run() {
while (true) {
if (!workQueue.isEmpty()) {
System.out.println("Queue size: " + workQueue.size());
String item = (String) workQueue.peek();
//Process item
System.out.println(threadName + " just processed " + item);
workQueue.remove();
}
}
}
}
非常感谢任何帮助和建议!
workQueue
是线程本地的,不需要是volatile
(它是私有的,没有 public setter 方法)- 使
workQueue
变成 BlockingQueue - 这个队列是线程安全的,所以你不需要同步addToQueue
。此外,您不需要在run
内部旋转 - 而是在队列上调用take()
,并且线程会阻塞,直到有一个项目可用。 - 您似乎在
MultithreadingTest
中做了太多工作 - 与其将项目添加到单独的队列,不如让所有工作人员共享相同的BlockingQueue
,然后main
只需要向那个BlockingQueue
添加项目,工作人员将自己负责负载平衡。请注意,即使BlockingQueue
是共享的,它仍然不需要是volatile
因为一旦工作程序初始化,对BlockingQueue
的引用就不会改变(使字段private final BlockingQueue<String> workQueue
-final
字段永远不需要volatile
).