Java 中的生产者-消费者多线程 FIFO
Producer-Consumer multithreading FIFO in Java
社区,我试图用 10 个线程解决这个 Producer/Consumer 问题,但我在实现它时遇到了困难。
问题是这样的:
Problem Scheme
程序本身应该有一个循环,同时插入包含(id,超时)的消息,按id升序排列(1,2,3,4 ...)并且应该简单地打印消息的id出来的顺序和进来的顺序一样,像一个队列。
例如上图中,3条消息Message(1,200)、Message(2, 1000)和Message(3,20)是生产者将产生的前3条消息。
虽然分配了 Message(3,20) 的线程应该首先打印(因为它具有最低的超时(20)),但我希望它等待具有 200ms 超时的第一条消息打印,然后再次等待message2 需要 1000 毫秒才能打印,然后自行打印。所以全部按递增顺序排列(也许使用 id 作为订购编号?)。
到目前为止我已经实现了这个:
public class Main {
private static BlockingQueue<Message> queue = new ArrayBlockingQueue<>(5);
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
try {
producer();
} catch (InterruptedException exception) {
exception.printStackTrace();
}
});
Thread t2 = new Thread(() -> {
try {
consumer();
} catch (InterruptedException exception) {
exception.printStackTrace();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
}
public static void producer() throws InterruptedException {
while (true) {
queue.put(new Message());
}
}
public static void consumer() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.submit(queue.take());
}
executorService.shutdown();
}
}
我的留言 class 在这里:
public class Message implements Runnable {
public static int totalIds = 0;
public int id;
public int timeout;
public Random random = new Random();
public Message() {
this.id = totalIds;
totalIds++;
this.timeout = random.nextInt(5000);
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", timeout=" + timeout +
'}';
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "[RECEIVED] Message = " + toString());
try {
Thread.sleep(timeout);
} catch (InterruptedException exception) {
exception.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "[DONE] Message = " + toString() + "\n");
}
}
到目前为止,除了线程应该等待具有优先级 ID 的线程的部分外,一切正常...这里是输出的第一部分:
All tasks submitted
pool-1-thread-9[RECEIVED] Message = Message{id=13, timeout=1361}
pool-1-thread-10[RECEIVED] Message = Message{id=14, timeout=92}
pool-1-thread-3[RECEIVED] Message = Message{id=7, timeout=3155}
pool-1-thread-5[RECEIVED] Message = Message{id=9, timeout=562}
pool-1-thread-2[RECEIVED] Message = Message{id=6, timeout=4249}
pool-1-thread-1[RECEIVED] Message = Message{id=0, timeout=1909}
pool-1-thread-7[RECEIVED] Message = Message{id=11, timeout=2468}
pool-1-thread-4[RECEIVED] Message = Message{id=8, timeout=593}
pool-1-thread-8[RECEIVED] Message = Message{id=12, timeout=3701}
pool-1-thread-6[RECEIVED] Message = Message{id=10, timeout=806}
pool-1-thread-10[DONE] Message = Message{id=14, timeout=92}
pool-1-thread-10[RECEIVED] Message = Message{id=15, timeout=846}
pool-1-thread-5[DONE] Message = Message{id=9, timeout=562}
pool-1-thread-5[RECEIVED] Message = Message{id=16, timeout=81}
pool-1-thread-4[DONE] Message = Message{id=8, timeout=593}
pool-1-thread-4[RECEIVED] Message = Message{id=17, timeout=4481}
pool-1-thread-5[DONE] Message = Message{id=16, timeout=81}
pool-1-thread-5[RECEIVED] Message = Message{id=18, timeout=2434}
pool-1-thread-6[DONE] Message = Message{id=10, timeout=806}
pool-1-thread-6[RECEIVED] Message = Message{id=19, timeout=10}
pool-1-thread-6[DONE] Message = Message{id=19, timeout=10}
pool-1-thread-6[RECEIVED] Message = Message{id=20, timeout=3776}
pool-1-thread-10[DONE] Message = Message{id=15, timeout=846}
pool-1-thread-10[RECEIVED] Message = Message{id=21, timeout=2988}
pool-1-thread-9[DONE] Message = Message{id=13, timeout=1361}
pool-1-thread-9[RECEIVED] Message = Message{id=22, timeout=462}
pool-1-thread-9[DONE] Message = Message{id=22, timeout=462}
pool-1-thread-9[RECEIVED] Message = Message{id=23, timeout=3074}
pool-1-thread-1[DONE] Message = Message{id=0, timeout=1909}
pool-1-thread-1[RECEIVED] Message = Message{id=24, timeout=725}
pool-1-thread-7[DONE] Message = Message{id=11, timeout=2468}
我的一个朋友告诉我应该用信号量来完成(从来没有用过它们)但我真的不知道如何实现信号量以便它们做我想做的事。
感谢解决此问题的任何线索!
据我了解,您需要两件事:
- 一起启动所有生产者的工作线程并让它们运行并行,但是...
- 等待线程以 FIFO 顺序完成(根据它们的创建 ID)。
因此,您可以一个接一个地启动线程,让它们运行并行,但也维护一个FIFO队列,它们的顺序是按id升序排列的,只是join
序列中的每个线程都被添加到该队列中。
这是一个演示代码,说明您如何做到这一点:
import java.util.LinkedList;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class Main {
private static class Message implements Runnable {
private final TimeUnit sleepUnit;
private final long sleepAmount;
private final int id;
public Message(final int id,
final TimeUnit sleepUnit,
final long sleepAmount) {
this.sleepUnit = Objects.requireNonNull(sleepUnit);
this.sleepAmount = sleepAmount;
this.id = id;
}
@Override
public void run() {
try {
System.out.println(toString() + " started and waiting...");
sleepUnit.sleep(sleepAmount);
}
catch (final InterruptedException ix) {
System.out.println(toString() + " interrupted: " + ix);
}
}
@Override
public String toString() {
return "Message{" + id + ", " + sleepUnit + "(" + sleepAmount + ")}";
}
}
private static class Producer {
private final int parallelism;
private final Consumer<? super Producer> consumer;
public Producer(final int parallelism,
final Consumer<? super Producer> consumer) {
this.parallelism = parallelism;
this.consumer = Objects.requireNonNull(consumer);
}
public void produceWithExecutor() {
System.out.println("Producing with Executor...");
final Random rand = new Random();
final ExecutorService service = Executors.newFixedThreadPool(parallelism);
final LinkedList<Future> q = new LinkedList<>();
for (int i = 0; i < parallelism; ++i) {
final Message msg = new Message(i, TimeUnit.MILLISECONDS, 500 + rand.nextInt(3000));
q.addLast(service.submit(msg, msg));
}
service.shutdown();
while (!q.isEmpty())
try {
System.out.println(q.removeFirst().get().toString() + " joined."); //Will wait for completion of each submitted task (in FIFO sequence).
}
catch (final InterruptedException ix) {
System.out.println("Interrupted: " + ix);
}
catch (final ExecutionException xx) {
System.out.println("Execution failed: " + xx);
}
consumer.accept(this);
}
public void produceWithPlainThreads() throws InterruptedException {
System.out.println("Producing with Threads...");
final Random rand = new Random();
final LinkedList<Thread> q = new LinkedList<>();
for (int i = 0; i < parallelism; ++i) {
final Message msg = new Message(i, TimeUnit.MILLISECONDS, 500 + rand.nextInt(3000));
final Thread t = new Thread(msg, msg.toString());
t.start();
q.add(t);
}
while (!q.isEmpty()) {
final Thread t = q.removeFirst();
t.join(); //Will wait for completion of each submitted task (in FIFO sequence).
System.out.println(t.getName() + " joined.");
}
consumer.accept(this);
}
}
public static void main(final String[] args) throws InterruptedException {
final Consumer<Producer> consumer = producer -> System.out.println("Consuming.");
final int parallelism = 10;
new Producer(parallelism, consumer).produceWithExecutor();
new Producer(parallelism, consumer).produceWithPlainThreads();
}
}
如您所见,这里有两种生产实现:一种是 ExecutorService
运行 连接所有提交的线程,另一种是(几乎)同时启动的普通线程时间。
这会产生如下输出:
Producing with Executor...
Message{1, MILLISECONDS(692)} started and waiting...
Message{2, MILLISECONDS(1126)} started and waiting...
Message{0, MILLISECONDS(3403)} started and waiting...
Message{3, MILLISECONDS(1017)} started and waiting...
Message{4, MILLISECONDS(2861)} started and waiting...
Message{5, MILLISECONDS(2735)} started and waiting...
Message{6, MILLISECONDS(2068)} started and waiting...
Message{7, MILLISECONDS(947)} started and waiting...
Message{8, MILLISECONDS(1091)} started and waiting...
Message{9, MILLISECONDS(1599)} started and waiting...
Message{0, MILLISECONDS(3403)} joined.
Message{1, MILLISECONDS(692)} joined.
Message{2, MILLISECONDS(1126)} joined.
Message{3, MILLISECONDS(1017)} joined.
Message{4, MILLISECONDS(2861)} joined.
Message{5, MILLISECONDS(2735)} joined.
Message{6, MILLISECONDS(2068)} joined.
Message{7, MILLISECONDS(947)} joined.
Message{8, MILLISECONDS(1091)} joined.
Message{9, MILLISECONDS(1599)} joined.
Consuming.
Producing with Threads...
Message{0, MILLISECONDS(3182)} started and waiting...
Message{1, MILLISECONDS(2271)} started and waiting...
Message{2, MILLISECONDS(2861)} started and waiting...
Message{3, MILLISECONDS(2942)} started and waiting...
Message{4, MILLISECONDS(2714)} started and waiting...
Message{5, MILLISECONDS(1228)} started and waiting...
Message{6, MILLISECONDS(2000)} started and waiting...
Message{7, MILLISECONDS(2372)} started and waiting...
Message{8, MILLISECONDS(764)} started and waiting...
Message{9, MILLISECONDS(587)} started and waiting...
Message{0, MILLISECONDS(3182)} joined.
Message{1, MILLISECONDS(2271)} joined.
Message{2, MILLISECONDS(2861)} joined.
Message{3, MILLISECONDS(2942)} joined.
Message{4, MILLISECONDS(2714)} joined.
Message{5, MILLISECONDS(1228)} joined.
Message{6, MILLISECONDS(2000)} joined.
Message{7, MILLISECONDS(2372)} joined.
Message{8, MILLISECONDS(764)} joined.
Message{9, MILLISECONDS(587)} joined.
Consuming.
您可以在输出中看到,在这两种情况下,线程都是(几乎)通过循环一起启动的,但是以 FIFO 有序方式连接。在第一种情况下,您可以看到线程可能以不同的顺序启动,这是启动线程本身的副作用。在使用普通线程的第二种情况下,碰巧所有线程都按照创建和启动的顺序调用它们的 run
方法,因为这种情况发生的时间非常短。但是每个线程的加入总是会按照这段代码id升序排列。如果您多次 运行 此代码,您可能会实现线程 eg 2 在线程 eg 1 之前在其 运行 方法中打印这两种情况,但是我们等待线程完成的顺序 Producer
的方法总是以id升序结束。
所有线程都应该 exit/finish 它们的 run
方法以升序睡眠顺序而不是升序 id 顺序。但是由于我们遍历队列并以有序的方式等待它们 join
的方式,输出将始终按 ID 升序排列。
所以如果你想获得每个 Thread
的结果以升序排列,那么相应的代码必须在你的 Producer
的生产方法中(在你 join
每个线程)和 not 在每个 Message
的 run
方法的末尾(为了避免额外的同步和线程间通信)。
社区,我试图用 10 个线程解决这个 Producer/Consumer 问题,但我在实现它时遇到了困难。
问题是这样的:
Problem Scheme
程序本身应该有一个循环,同时插入包含(id,超时)的消息,按id升序排列(1,2,3,4 ...)并且应该简单地打印消息的id出来的顺序和进来的顺序一样,像一个队列。
例如上图中,3条消息Message(1,200)、Message(2, 1000)和Message(3,20)是生产者将产生的前3条消息。 虽然分配了 Message(3,20) 的线程应该首先打印(因为它具有最低的超时(20)),但我希望它等待具有 200ms 超时的第一条消息打印,然后再次等待message2 需要 1000 毫秒才能打印,然后自行打印。所以全部按递增顺序排列(也许使用 id 作为订购编号?)。
到目前为止我已经实现了这个:
public class Main {
private static BlockingQueue<Message> queue = new ArrayBlockingQueue<>(5);
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
try {
producer();
} catch (InterruptedException exception) {
exception.printStackTrace();
}
});
Thread t2 = new Thread(() -> {
try {
consumer();
} catch (InterruptedException exception) {
exception.printStackTrace();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
}
public static void producer() throws InterruptedException {
while (true) {
queue.put(new Message());
}
}
public static void consumer() throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.submit(queue.take());
}
executorService.shutdown();
}
}
我的留言 class 在这里:
public class Message implements Runnable {
public static int totalIds = 0;
public int id;
public int timeout;
public Random random = new Random();
public Message() {
this.id = totalIds;
totalIds++;
this.timeout = random.nextInt(5000);
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", timeout=" + timeout +
'}';
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "[RECEIVED] Message = " + toString());
try {
Thread.sleep(timeout);
} catch (InterruptedException exception) {
exception.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "[DONE] Message = " + toString() + "\n");
}
}
到目前为止,除了线程应该等待具有优先级 ID 的线程的部分外,一切正常...这里是输出的第一部分:
All tasks submitted
pool-1-thread-9[RECEIVED] Message = Message{id=13, timeout=1361}
pool-1-thread-10[RECEIVED] Message = Message{id=14, timeout=92}
pool-1-thread-3[RECEIVED] Message = Message{id=7, timeout=3155}
pool-1-thread-5[RECEIVED] Message = Message{id=9, timeout=562}
pool-1-thread-2[RECEIVED] Message = Message{id=6, timeout=4249}
pool-1-thread-1[RECEIVED] Message = Message{id=0, timeout=1909}
pool-1-thread-7[RECEIVED] Message = Message{id=11, timeout=2468}
pool-1-thread-4[RECEIVED] Message = Message{id=8, timeout=593}
pool-1-thread-8[RECEIVED] Message = Message{id=12, timeout=3701}
pool-1-thread-6[RECEIVED] Message = Message{id=10, timeout=806}
pool-1-thread-10[DONE] Message = Message{id=14, timeout=92}
pool-1-thread-10[RECEIVED] Message = Message{id=15, timeout=846}
pool-1-thread-5[DONE] Message = Message{id=9, timeout=562}
pool-1-thread-5[RECEIVED] Message = Message{id=16, timeout=81}
pool-1-thread-4[DONE] Message = Message{id=8, timeout=593}
pool-1-thread-4[RECEIVED] Message = Message{id=17, timeout=4481}
pool-1-thread-5[DONE] Message = Message{id=16, timeout=81}
pool-1-thread-5[RECEIVED] Message = Message{id=18, timeout=2434}
pool-1-thread-6[DONE] Message = Message{id=10, timeout=806}
pool-1-thread-6[RECEIVED] Message = Message{id=19, timeout=10}
pool-1-thread-6[DONE] Message = Message{id=19, timeout=10}
pool-1-thread-6[RECEIVED] Message = Message{id=20, timeout=3776}
pool-1-thread-10[DONE] Message = Message{id=15, timeout=846}
pool-1-thread-10[RECEIVED] Message = Message{id=21, timeout=2988}
pool-1-thread-9[DONE] Message = Message{id=13, timeout=1361}
pool-1-thread-9[RECEIVED] Message = Message{id=22, timeout=462}
pool-1-thread-9[DONE] Message = Message{id=22, timeout=462}
pool-1-thread-9[RECEIVED] Message = Message{id=23, timeout=3074}
pool-1-thread-1[DONE] Message = Message{id=0, timeout=1909}
pool-1-thread-1[RECEIVED] Message = Message{id=24, timeout=725}
pool-1-thread-7[DONE] Message = Message{id=11, timeout=2468}
我的一个朋友告诉我应该用信号量来完成(从来没有用过它们)但我真的不知道如何实现信号量以便它们做我想做的事。
感谢解决此问题的任何线索!
据我了解,您需要两件事:
- 一起启动所有生产者的工作线程并让它们运行并行,但是...
- 等待线程以 FIFO 顺序完成(根据它们的创建 ID)。
因此,您可以一个接一个地启动线程,让它们运行并行,但也维护一个FIFO队列,它们的顺序是按id升序排列的,只是join
序列中的每个线程都被添加到该队列中。
这是一个演示代码,说明您如何做到这一点:
import java.util.LinkedList;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public class Main {
private static class Message implements Runnable {
private final TimeUnit sleepUnit;
private final long sleepAmount;
private final int id;
public Message(final int id,
final TimeUnit sleepUnit,
final long sleepAmount) {
this.sleepUnit = Objects.requireNonNull(sleepUnit);
this.sleepAmount = sleepAmount;
this.id = id;
}
@Override
public void run() {
try {
System.out.println(toString() + " started and waiting...");
sleepUnit.sleep(sleepAmount);
}
catch (final InterruptedException ix) {
System.out.println(toString() + " interrupted: " + ix);
}
}
@Override
public String toString() {
return "Message{" + id + ", " + sleepUnit + "(" + sleepAmount + ")}";
}
}
private static class Producer {
private final int parallelism;
private final Consumer<? super Producer> consumer;
public Producer(final int parallelism,
final Consumer<? super Producer> consumer) {
this.parallelism = parallelism;
this.consumer = Objects.requireNonNull(consumer);
}
public void produceWithExecutor() {
System.out.println("Producing with Executor...");
final Random rand = new Random();
final ExecutorService service = Executors.newFixedThreadPool(parallelism);
final LinkedList<Future> q = new LinkedList<>();
for (int i = 0; i < parallelism; ++i) {
final Message msg = new Message(i, TimeUnit.MILLISECONDS, 500 + rand.nextInt(3000));
q.addLast(service.submit(msg, msg));
}
service.shutdown();
while (!q.isEmpty())
try {
System.out.println(q.removeFirst().get().toString() + " joined."); //Will wait for completion of each submitted task (in FIFO sequence).
}
catch (final InterruptedException ix) {
System.out.println("Interrupted: " + ix);
}
catch (final ExecutionException xx) {
System.out.println("Execution failed: " + xx);
}
consumer.accept(this);
}
public void produceWithPlainThreads() throws InterruptedException {
System.out.println("Producing with Threads...");
final Random rand = new Random();
final LinkedList<Thread> q = new LinkedList<>();
for (int i = 0; i < parallelism; ++i) {
final Message msg = new Message(i, TimeUnit.MILLISECONDS, 500 + rand.nextInt(3000));
final Thread t = new Thread(msg, msg.toString());
t.start();
q.add(t);
}
while (!q.isEmpty()) {
final Thread t = q.removeFirst();
t.join(); //Will wait for completion of each submitted task (in FIFO sequence).
System.out.println(t.getName() + " joined.");
}
consumer.accept(this);
}
}
public static void main(final String[] args) throws InterruptedException {
final Consumer<Producer> consumer = producer -> System.out.println("Consuming.");
final int parallelism = 10;
new Producer(parallelism, consumer).produceWithExecutor();
new Producer(parallelism, consumer).produceWithPlainThreads();
}
}
如您所见,这里有两种生产实现:一种是 ExecutorService
运行 连接所有提交的线程,另一种是(几乎)同时启动的普通线程时间。
这会产生如下输出:
Producing with Executor...
Message{1, MILLISECONDS(692)} started and waiting...
Message{2, MILLISECONDS(1126)} started and waiting...
Message{0, MILLISECONDS(3403)} started and waiting...
Message{3, MILLISECONDS(1017)} started and waiting...
Message{4, MILLISECONDS(2861)} started and waiting...
Message{5, MILLISECONDS(2735)} started and waiting...
Message{6, MILLISECONDS(2068)} started and waiting...
Message{7, MILLISECONDS(947)} started and waiting...
Message{8, MILLISECONDS(1091)} started and waiting...
Message{9, MILLISECONDS(1599)} started and waiting...
Message{0, MILLISECONDS(3403)} joined.
Message{1, MILLISECONDS(692)} joined.
Message{2, MILLISECONDS(1126)} joined.
Message{3, MILLISECONDS(1017)} joined.
Message{4, MILLISECONDS(2861)} joined.
Message{5, MILLISECONDS(2735)} joined.
Message{6, MILLISECONDS(2068)} joined.
Message{7, MILLISECONDS(947)} joined.
Message{8, MILLISECONDS(1091)} joined.
Message{9, MILLISECONDS(1599)} joined.
Consuming.
Producing with Threads...
Message{0, MILLISECONDS(3182)} started and waiting...
Message{1, MILLISECONDS(2271)} started and waiting...
Message{2, MILLISECONDS(2861)} started and waiting...
Message{3, MILLISECONDS(2942)} started and waiting...
Message{4, MILLISECONDS(2714)} started and waiting...
Message{5, MILLISECONDS(1228)} started and waiting...
Message{6, MILLISECONDS(2000)} started and waiting...
Message{7, MILLISECONDS(2372)} started and waiting...
Message{8, MILLISECONDS(764)} started and waiting...
Message{9, MILLISECONDS(587)} started and waiting...
Message{0, MILLISECONDS(3182)} joined.
Message{1, MILLISECONDS(2271)} joined.
Message{2, MILLISECONDS(2861)} joined.
Message{3, MILLISECONDS(2942)} joined.
Message{4, MILLISECONDS(2714)} joined.
Message{5, MILLISECONDS(1228)} joined.
Message{6, MILLISECONDS(2000)} joined.
Message{7, MILLISECONDS(2372)} joined.
Message{8, MILLISECONDS(764)} joined.
Message{9, MILLISECONDS(587)} joined.
Consuming.
您可以在输出中看到,在这两种情况下,线程都是(几乎)通过循环一起启动的,但是以 FIFO 有序方式连接。在第一种情况下,您可以看到线程可能以不同的顺序启动,这是启动线程本身的副作用。在使用普通线程的第二种情况下,碰巧所有线程都按照创建和启动的顺序调用它们的 run
方法,因为这种情况发生的时间非常短。但是每个线程的加入总是会按照这段代码id升序排列。如果您多次 运行 此代码,您可能会实现线程 eg 2 在线程 eg 1 之前在其 运行 方法中打印这两种情况,但是我们等待线程完成的顺序 Producer
的方法总是以id升序结束。
所有线程都应该 exit/finish 它们的 run
方法以升序睡眠顺序而不是升序 id 顺序。但是由于我们遍历队列并以有序的方式等待它们 join
的方式,输出将始终按 ID 升序排列。
所以如果你想获得每个 Thread
的结果以升序排列,那么相应的代码必须在你的 Producer
的生产方法中(在你 join
每个线程)和 not 在每个 Message
的 run
方法的末尾(为了避免额外的同步和线程间通信)。