具有线程消费者和生产者的循环缓冲区:它会卡住一些执行

Circular Buffer with Threads Consumer and Producer: it get stucks some executions

我正在开发一个带有两个线程的循环缓冲区:消费者和生产者。 我正在使用 Thread.yield 的主动等待。 我知道可以用信号量来做到这一点,但我想要没有信号量的缓冲区。

两者都有一个共享变量:bufferCircular

趁着buffer还没有填满有用的信息,producer在array的p位置写入数据,趁着还有一些有用的信息consumerconsumer的位置读取数据c 个数组。 BufferCircular中的变量nElem是尚未读取的数值数据的个数。

该程序运行 9/10 次时效果很好。然后,有时,它会在屏幕上显示最后一个元素(循环的第 500 次)之前陷入无限循环,或者只是不显示任何元素。

我觉得可能是liveLock,但是找不到错误。

共享变量:

public class BufferCircular {
    volatile int[] array;
    volatile int p;
    volatile int c;
    volatile int nElem;

    public BufferCircular(int[] array) {
       this.array = array;
       this.p = 0;
       this.c = 0;
       this.nElem = 0;
    }

    public void writeData (int data) {
       this.array[p] = data;
       this.p = (p + 1) % array.length;
       this.nElem++;
    }

    public int readData() {
       int data = array[c];
       this.c = (c + 1) % array.length;
       this.nElem--;
       return data;
    }

}

生产者线程:

public class Producer extends Thread {
    BufferCircular buffer;
    int bufferTam;
    int contData;

    public Productor(BufferCircular buff) {
       this.buffer = buff;
       this.bufferTam = buffer.array.length;
       this.contData = 0;

    }

    public void produceData() {
       this.contData++;
       this.buffer.writeData(contData);
    }

    public void run() {
       for (int i = 0; i < 500; i++) {
          while (this.buffer.nElem == this.bufferTam) {
          Thread.yield();
        }
          this.produceData();
       }
    }
}

消费者线程:

    public class Consumer extends Thread {
        BufferCircular buffer;
        int cont;

        public Consumer(BufferCircular buff) {
           this.buffer = buff;
           this.cont = 0;
        }

        public void consumeData() {
           int data = buffer.readData();
           cont++;
           System.out.println("data  " + cont + ": " + data);
        }

        public void run() {
           for (int i = 0; i < 500; i++) {
              while (this.buffer.nElem == 0) {
                 Thread.yield();
              }
               this.consumeData();
           }
        }
   }

主要:

public class Main {

    public static void main(String[] args) {
       Random ran = new Random();
       int tamArray = ran.nextInt(21) + 1;
       int[] array = new int[tamArray];

       BufferCircular buffer = new BufferCircular(array);

       Producer producer = new Producer (buffer);
       Consumer consumer = new Consumer (buffer);

       producer.start();
       consumer.start();

       try {
           producer.join();
           consumer.join();
       } catch (InterruptedException e) {
           System.err.println("Error with Threads");
           e.printStackTrace();
    }

    }

}

欢迎任何帮助。

你的问题是你的 BufferCircular 方法对竞争条件很敏感。以 writeData() 为例。它分 3 个步骤执行,其中一些步骤也不是原子的:

this.array[p] = data;             // 1
this.p = (p + 1) % array.length;  // 2  not atomic
this.nElem++;                     // 3  not atomic

假设2个线程同时进入writeData()。在第 1 步,它们都具有相同的 p 值,并且都重写了 array[p] 值。现在,array[p] 被重写了两次,第一个线程必须写入的数据丢失了,因为第二个线程之后写入了同一个索引。然后他们执行第 2 步——结果不可预测,因为 p 可以递增 1 或 2(p = (p + 1) % array.length 由 3 个操作组成,其中线程可以交互)。然后,第 3 步。 ++ 运算符也不是原子的:它在幕后使用 2 个操作。所以 nElem 也会增加 1 或 2。

所以我们得到了完全不可预测的结果。这会导致您的程序执行不力。

最简单的解决方案是使 readData()writeData() 方法序列化。为此,声明它们 synchronized:

public synchronized void writeData (int data) { //...
public synchronized void readData () { //...

如果您只有一个生产者线程和一个消费者线程,则涉及 nElem 的操作可能会出现竞争条件。解决方案是使用 AtomicInteger 而不是 int:

final AtomicInteger nElem = new AtomicInteger();

并使用其 incrementAndGet()decrementAndGet() 方法。