具有线程消费者和生产者的循环缓冲区:它会卡住一些执行
Circular Buffer with Threads Consumer and Producer: it get stucks some executions
我正在开发一个带有两个线程的循环缓冲区:消费者和生产者。
我正在使用 Thread.yield
的主动等待。
我知道可以用信号量来做到这一点,但我想要没有信号量的缓冲区。
两者都有一个共享变量:bufferCircular
。
趁着buffer还没有填满有用的信息,producer
在array的p
位置写入数据,趁着还有一些有用的信息consumer
在consumer
的位置读取数据c
个数组。 BufferCircular
中的变量nElem
是尚未读取的数值数据的个数。
该程序运行 9/10 次时效果很好。然后,有时,它会在屏幕上显示最后一个元素(循环的第 500 次)之前陷入无限循环,或者只是不显示任何元素。
我觉得可能是liveLock,但是找不到错误。
共享变量:
public class BufferCircular {
volatile int[] array;
volatile int p;
volatile int c;
volatile int nElem;
public BufferCircular(int[] array) {
this.array = array;
this.p = 0;
this.c = 0;
this.nElem = 0;
}
public void writeData (int data) {
this.array[p] = data;
this.p = (p + 1) % array.length;
this.nElem++;
}
public int readData() {
int data = array[c];
this.c = (c + 1) % array.length;
this.nElem--;
return data;
}
}
生产者线程:
public class Producer extends Thread {
BufferCircular buffer;
int bufferTam;
int contData;
public Productor(BufferCircular buff) {
this.buffer = buff;
this.bufferTam = buffer.array.length;
this.contData = 0;
}
public void produceData() {
this.contData++;
this.buffer.writeData(contData);
}
public void run() {
for (int i = 0; i < 500; i++) {
while (this.buffer.nElem == this.bufferTam) {
Thread.yield();
}
this.produceData();
}
}
}
消费者线程:
public class Consumer extends Thread {
BufferCircular buffer;
int cont;
public Consumer(BufferCircular buff) {
this.buffer = buff;
this.cont = 0;
}
public void consumeData() {
int data = buffer.readData();
cont++;
System.out.println("data " + cont + ": " + data);
}
public void run() {
for (int i = 0; i < 500; i++) {
while (this.buffer.nElem == 0) {
Thread.yield();
}
this.consumeData();
}
}
}
主要:
public class Main {
public static void main(String[] args) {
Random ran = new Random();
int tamArray = ran.nextInt(21) + 1;
int[] array = new int[tamArray];
BufferCircular buffer = new BufferCircular(array);
Producer producer = new Producer (buffer);
Consumer consumer = new Consumer (buffer);
producer.start();
consumer.start();
try {
producer.join();
consumer.join();
} catch (InterruptedException e) {
System.err.println("Error with Threads");
e.printStackTrace();
}
}
}
欢迎任何帮助。
你的问题是你的 BufferCircular
方法对竞争条件很敏感。以 writeData()
为例。它分 3 个步骤执行,其中一些步骤也不是原子的:
this.array[p] = data; // 1
this.p = (p + 1) % array.length; // 2 not atomic
this.nElem++; // 3 not atomic
假设2个线程同时进入writeData()
。在第 1 步,它们都具有相同的 p
值,并且都重写了 array[p]
值。现在,array[p]
被重写了两次,第一个线程必须写入的数据丢失了,因为第二个线程之后写入了同一个索引。然后他们执行第 2 步——结果不可预测,因为 p 可以递增 1 或 2(p = (p + 1) % array.length
由 3 个操作组成,其中线程可以交互)。然后,第 3 步。 ++
运算符也不是原子的:它在幕后使用 2 个操作。所以 nElem
也会增加 1 或 2。
所以我们得到了完全不可预测的结果。这会导致您的程序执行不力。
最简单的解决方案是使 readData()
和 writeData()
方法序列化。为此,声明它们 synchronized
:
public synchronized void writeData (int data) { //...
public synchronized void readData () { //...
如果您只有一个生产者线程和一个消费者线程,则涉及 nElem
的操作可能会出现竞争条件。解决方案是使用 AtomicInteger
而不是 int
:
final AtomicInteger nElem = new AtomicInteger();
并使用其 incrementAndGet()
和 decrementAndGet()
方法。
我正在开发一个带有两个线程的循环缓冲区:消费者和生产者。
我正在使用 Thread.yield
的主动等待。
我知道可以用信号量来做到这一点,但我想要没有信号量的缓冲区。
两者都有一个共享变量:bufferCircular
。
趁着buffer还没有填满有用的信息,producer
在array的p
位置写入数据,趁着还有一些有用的信息consumer
在consumer
的位置读取数据c
个数组。 BufferCircular
中的变量nElem
是尚未读取的数值数据的个数。
该程序运行 9/10 次时效果很好。然后,有时,它会在屏幕上显示最后一个元素(循环的第 500 次)之前陷入无限循环,或者只是不显示任何元素。
我觉得可能是liveLock,但是找不到错误。
共享变量:
public class BufferCircular {
volatile int[] array;
volatile int p;
volatile int c;
volatile int nElem;
public BufferCircular(int[] array) {
this.array = array;
this.p = 0;
this.c = 0;
this.nElem = 0;
}
public void writeData (int data) {
this.array[p] = data;
this.p = (p + 1) % array.length;
this.nElem++;
}
public int readData() {
int data = array[c];
this.c = (c + 1) % array.length;
this.nElem--;
return data;
}
}
生产者线程:
public class Producer extends Thread {
BufferCircular buffer;
int bufferTam;
int contData;
public Productor(BufferCircular buff) {
this.buffer = buff;
this.bufferTam = buffer.array.length;
this.contData = 0;
}
public void produceData() {
this.contData++;
this.buffer.writeData(contData);
}
public void run() {
for (int i = 0; i < 500; i++) {
while (this.buffer.nElem == this.bufferTam) {
Thread.yield();
}
this.produceData();
}
}
}
消费者线程:
public class Consumer extends Thread {
BufferCircular buffer;
int cont;
public Consumer(BufferCircular buff) {
this.buffer = buff;
this.cont = 0;
}
public void consumeData() {
int data = buffer.readData();
cont++;
System.out.println("data " + cont + ": " + data);
}
public void run() {
for (int i = 0; i < 500; i++) {
while (this.buffer.nElem == 0) {
Thread.yield();
}
this.consumeData();
}
}
}
主要:
public class Main {
public static void main(String[] args) {
Random ran = new Random();
int tamArray = ran.nextInt(21) + 1;
int[] array = new int[tamArray];
BufferCircular buffer = new BufferCircular(array);
Producer producer = new Producer (buffer);
Consumer consumer = new Consumer (buffer);
producer.start();
consumer.start();
try {
producer.join();
consumer.join();
} catch (InterruptedException e) {
System.err.println("Error with Threads");
e.printStackTrace();
}
}
}
欢迎任何帮助。
你的问题是你的 BufferCircular
方法对竞争条件很敏感。以 writeData()
为例。它分 3 个步骤执行,其中一些步骤也不是原子的:
this.array[p] = data; // 1
this.p = (p + 1) % array.length; // 2 not atomic
this.nElem++; // 3 not atomic
假设2个线程同时进入writeData()
。在第 1 步,它们都具有相同的 p
值,并且都重写了 array[p]
值。现在,array[p]
被重写了两次,第一个线程必须写入的数据丢失了,因为第二个线程之后写入了同一个索引。然后他们执行第 2 步——结果不可预测,因为 p 可以递增 1 或 2(p = (p + 1) % array.length
由 3 个操作组成,其中线程可以交互)。然后,第 3 步。 ++
运算符也不是原子的:它在幕后使用 2 个操作。所以 nElem
也会增加 1 或 2。
所以我们得到了完全不可预测的结果。这会导致您的程序执行不力。
最简单的解决方案是使 readData()
和 writeData()
方法序列化。为此,声明它们 synchronized
:
public synchronized void writeData (int data) { //...
public synchronized void readData () { //...
如果您只有一个生产者线程和一个消费者线程,则涉及 nElem
的操作可能会出现竞争条件。解决方案是使用 AtomicInteger
而不是 int
:
final AtomicInteger nElem = new AtomicInteger();
并使用其 incrementAndGet()
和 decrementAndGet()
方法。