使用线程修改对象

Using threads to modify an object

我是线程的新手。我想让两个线程将一个整数递增到某个值。因为 int 类型是不可变的,所以我切换到原子整数。我还尝试将一个 int 包装到一个 class 中,但这也没有用。我也试过 static/volatile int 但没有用。我也尝试使用公平政策。主要问题是 "counterObj" 没有正确增加并且仍然设置为 0,即使它被注入到两个线程。

我预期的 运行 行为:

thread       value
thread 0     0
thread 1     1
thread 0     2
...

到目前为止我写了什么:

import java.util.concurrent.atomic.AtomicInteger;

public class Application {
    public static void main(String[] args) {
        Application app = new Application();
        try {
            app.launch();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void launch() throws InterruptedException {
        int increments = 100;
        AtomicInteger counterObj = new AtomicInteger(0);
        CounterThread th1 = new CounterThread("1", counterObj, increments);
        CounterThread th2 = new CounterThread("2", counterObj, increments);
        th1.start();
        th2.start();


        System.out.println(counterObj.get());

    }


}

import java.util.concurrent.atomic.AtomicInteger;

public class CounterThread implements Runnable {
    private final String threadID;
    private AtomicInteger counterObj;
    private int bound;

    public CounterThread(String threadID, AtomicInteger counter, int bound) {
        this.threadID = threadID;
        this.counterObj = counter;
        this.bound = bound;
    }

    @Override
    public synchronized void run() {
        while (counterObj.get() < bound) {
            synchronized (this) {
                counterObj.incrementAndGet();
            }
        }
        System.out.println("Thread " + threadID + " finished");
    }


    public void start() throws InterruptedException {
        Thread thread = new Thread(this, threadID);
        thread.join();
        thread.start();
    }
}

干杯!

我认为你的程序在你的线程有机会做任何事情之前就已经退出了(可能是由于你的启动和连接的顺序。我会把你的线程启动逻辑移到你的主要(或启动)方法中。就像以下。

Thread thread1 = new Thread(new MyCounterRunnable("1", counterObj, increments));
Thread thread2 = new Thread(new MyCounterRunnable("2", counterObj, increments));

然后,在您的 main 中,您需要在 启动线程后调用 join ...如下所示:

thread1.start(); // starts first thread.
thread2.start(); // starts second thread.

thread1.join(); // don't let main exit until thread 1 is done.
thread2.join(); // don't let main exit until thread 2 is done.

您真正想要的是一次只有一个线程递增一个整数。 int 变量是您想要在同步块中使用的资源,因此不同的线程可以一次递增一个。 这可以单独使用 syncrhonize 来完成。 免责声明:我没有 运行 代码,因此它可能有一些拼写错误或异常要从应用程序中删除 class。

public class Application {

  private int theVar = 0;
  private int increments = 100;

  public static void main(String[] args) {
    Application app = new Application();
    try {
        app.launch();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
  }

  public synchronized addOne(){
    this.theVar++;
  }

  private void launch() throws InterruptedException {
    Runnable counter1 = new Counter(this, increments), counter2 = new Counter(this, increments);
    Thread t1 = new Thread(counter1);
    Thread t2 = new Thread(counter2);
    t1.start();
    t2.start();
  }
}

一个计数器class

public class Counter implements Runnable{
  private Application app;
  int rounds = -1;

  public Counter(Application app, rounds){
    this.app = app;
    this.rounds = rounds;
  }
  public void run(){
    while(int i=0; i<rounds; i++){
        this.app.addOne();
    }
  }
}

我已经对 AtomicInteger 进行了双重检查,这似乎是您一直试图完成的。

import java.util.concurrent.atomic.AtomicInteger;

public class DualCounters{

    public static void main(String[] args) throws Exception{
        AtomicInteger i = new AtomicInteger(0);
        int bounds = 3;

        Thread a = new Thread(()->{
            int last = 0;
            while(i.get()<bounds){
                synchronized(i){
                    if(i.get()<bounds){
                        last = i.getAndIncrement();
                    }
                }
            }
            System.out.println("a last " + last);
        });

        Thread b = new Thread(()->{
            int last = 0;
            while(i.get()<bounds){
                synchronized(i){
                    if(i.get()<bounds){
                        last = i.getAndIncrement();
                    }
                }
            }
            System.out.println("b last " + last);
        });

        a.start();
        b.start();

        a.join();
        b.join();

        System.out.println(i.get() + " afterwards");

    }
}

双重检查在 java 中是一个错误的概念,AtomicInteger 提供了无需任何同步即可完成此操作的工具。

int a;
while((a = i.getAndIncrement())<bounds){
    ...
}

现在,在 while 循环 中,a 永远不会大于边界 。循环完成后,ia 的值可能大于边界。

如果这是一个问题,总有其他方法getAndUpdate

while((a = i.getAndUpdate(i->i<bounds?i+1:i)<bounds){
    ...
} 

AtomicInteger 本身负责原子性,因此您不需要使用 synchronized —— 但前提是您遵守规则,并在一次调用中执行原子操作。

你没有这样做,因为你调用 counterObj.get() 然后根据结果 counterObj.incrementAndGet()。您需要避免这种情况,因为您希望检查和更新成为同一原子工作块的一部分。

您可以近距离接触:

while(counterObj.incrementAndGet() < bound) {} ;

但这总是至少递增一次,一次可能太多了。

稍微复杂一点:

IntUnaryOperator incrementWithLimit = x -> 
   ( x < bound ? x + 1 : x );

while(counterObj.updateAndGet(incrementWithLimit) < bound) {};

也就是说,我们创建了一个函数,该函数仅在小于 bound 时才递增一个数字,并且我们告诉 AtomicInteger 应用它。

There are a couple of issues with your code:

Thread.join method works only if the thread has started, else it does nothing. So you must reorder your code, but if you just move the join method after start, when starting the first thread by calling CounterThread.start, the main thread will wait until the started thread has finished, blocked in the Thread.join method, and only then will continue to starting第二个。 A solution is to make an additional method in the CounterThread class, that will be called after both threads have been started:

public void waitFinish() throws InterruptedException {
    thread.join();
}

synchronized (this) is synchronizing on the CounterThread instance that has been created when you called new CounterThread(...), but you have two instances so each will be synchronizing on a different object. For synchronized to work, you need to use a common instance of an object, in this case you can use the shared counterObj.

Only the AtomicInteger methods are guaranteed to be thread safe, so after you check if the bound has been reached outside the synchronized block, when entering the synchronized block the value can already be changed by another thread. So you need to do a recheck inside the synchronized block OR to first synchronize on the shared lock(counterObj) before the check and increment.

        while (true) {
            synchronized (counterObj) {
                if (counterObj.get() < bound)
                    counterObj.incrementAndGet();
                else break;
            }
        }

Note that the AtomicInteger class synchronized methods aren't helping now, but because it is a mutable object, it helps to use it as a shared lock. If you used an Integer instead, being immutable, a new instance will have been created when you incremented it. So now, it's only function is a wrapper holding the integer result.

Putting it all together:

public class Application {
    public static void main(String[] args) {
        Application app = new Application();
        try {
            app.launch();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void launch() throws InterruptedException {
        int increments = 100;
        AtomicInteger counterObj = new AtomicInteger(0);
        CounterThread th1 = new CounterThread("1", counterObj, increments);
        CounterThread th2 = new CounterThread("2", counterObj, increments);
        th1.start();
        th2.start();
        th1.waitFinish();
        th2.waitFinish();

        System.out.println(counterObj.get());
    }
}

public class CounterThread implements Runnable {
    private final String threadID;
    private AtomicInteger counterObj;
    private int bound;
    private Thread thread;

    public CounterThread(String threadID, AtomicInteger counter, int bound) {
        this.threadID = threadID;
        this.counterObj = counter;
        this.bound = bound;
    }

    @Override
    public void run() {
        while (true) {
            synchronized (counterObj) {
                if (counterObj.get() < bound)
                    counterObj.incrementAndGet();
                else break;
            }
        }
        System.out.println("Thread " + threadID + " finished");
    }


    public void start() throws InterruptedException {
        thread = new Thread(this, threadID);
        thread.start();
    }

    public void waitFinish() throws InterruptedException {
        thread.join();
    }
}